本文最后更新于:2025年2月18日 下午
Spring-Kafka
示例
Maven依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
|
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| spring: kafka: bootstrap-servers: - localhost:9090 - localhost:9091 - localhost:9092 consumer: bootstrap-servers: localhost:9092 group-id: apple auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 1000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 500 properties: session-timeout-ms: 15000 admin: client-id: 1 producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all retries: 3 batch-size: 16384 buffer-memory: 33554432 properties: linger-ms: 1
|
配置类(可选)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package com.example.config;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*;
import java.util.HashMap; import java.util.Map;
@Configuration @EnableKafka public class KafkaConfig {
@Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); }
@Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }
@Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); }
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| package com.example.utils;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service;
@Service public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; }
public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package com.example.utils;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;
@Component public class KafkaConsumer {
@KafkaListener(topics = "topic", groupId = "apple") public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC)String topic) {
System.out.println("Received Topic: " + topic); System.out.println("Received Message: " + message); } }
|
控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package com.example.controller;
import com.example.utils.KafkaProducer; import lombok.RequiredArgsConstructor; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController @RequestMapping("/kafka") @RequiredArgsConstructor public class KafkaController {
private final KafkaProducer kafkaProducer;
@PostMapping(value = "/producer",produces = "application/json") public String producer(@RequestBody String message){ kafkaProducer.sendMessage("topic",message); return message; } }
|
运行
启动Zookeeper
,然后启动Kafka
,最后启动项目。
Postman
测试
请求类型:POST
请求地址:localhost/kafka/producer
请求体内容:
1 2 3
| { "message": "Hello World" }
|
控制台输出打印信息:
1 2 3 4
| Received Topic: topic Received Message: { "message": "Hello World" }
|