Spring-Kafka

本文最后更新于:2025年2月18日 下午

Spring-Kafka示例

Maven依赖

1
2
3
4
5
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
spring:
kafka:
bootstrap-servers: # Kafka 服务器配置
- localhost:9090 # Kafka 服务器集群1
- localhost:9091 # Kafka 服务器集群2
- localhost:9092 # Kafka 服务器集群3
# Kafka 消费者配置
consumer:
bootstrap-servers: localhost:9092 # Kafka 服务器地址
group-id: apple # 消费者组 ID
auto-offset-reset: earliest # 消费者偏移量重置策略("earliest", "latest", "none")
enable-auto-commit: true # 是否自动提交偏移量
auto-commit-interval: 1000 # 自动提交偏移量的间隔时间(毫秒)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 键反序列化类
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值反序列化类
max-poll-records: 500 # 每次调用 poll() 方法返回的最大记录数
properties:
session-timeout-ms: 15000 # 会话超时时间
admin:
client-id: 1 # 客户端 ID,用于标识客户端应用程序
# Kafka 生产者配置
producer:
bootstrap-servers: localhost:9092 # Kafka 服务器地址
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值序列化类
acks: all # 确认模式("all", "1", "0" 等)
retries: 3 # 失败时的重试次数
batch-size: 16384 # 批量发送的消息大小
buffer-memory: 33554432 # 缓存的内存大小
properties:
linger-ms: 1 # 批量发送的延迟时间

配置类(可选)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.example.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.example.utils;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

private final KafkaTemplate<String, String> kafkaTemplate;

public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.example.utils;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

@KafkaListener(topics = "topic", groupId = "apple")
public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC)String topic) {

System.out.println("Received Topic: " + topic);
System.out.println("Received Message: " + message);
}
}

控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.example.controller;

import com.example.utils.KafkaProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("/kafka")
@RequiredArgsConstructor
public class KafkaController {

private final KafkaProducer kafkaProducer;

@PostMapping(value = "/producer",produces = "application/json")
public String producer(@RequestBody String message){
kafkaProducer.sendMessage("topic",message);
return message;
}
}

运行

启动Zookeeper,然后启动Kafka,最后启动项目。

Postman测试

请求类型:POST

请求地址:localhost/kafka/producer

请求体内容:

1
2
3
{
"message": "Hello World"
}

控制台输出打印信息:

1
2
3
4
Received Topic: topic
Received Message: {
"message": "Hello World"
}

Spring-Kafka
https://superlovelace.top/2024/11/01/Kafka/
作者
棱境
发布于
2024年11月1日
更新于
2025年2月18日
许可协议