kafka
一、概述
1、定义
Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
2、消息队列
1.点对点模式
一对一,消费者主动拉取数据,消息收到后消息清除.
消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。 消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。 Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
2.发布、订阅模式
一对多,消费者消费数据之后不会清除消息。
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
3、kafka架构
1.Producer
- 消息生产者,就是向 kafka broker 发消息的客户端
- broker接收到生产者发送的消息后,将该消息追加到当前用于追加数据的segment文件中
- 生产者发送的消息,存到一个partition中,生产者也可指定数据存储的partition
2.Consumer
- 消息消费者,向 kafka broker 取消息的客户端
3.Consumer Group (CG)
- 消费者组,由多个 consumer 组成。
- 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内的消费者消费;消费者组之间互不影响。
- 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
4.Broker
- 一台 kafka 服务器就是一个 broker
- 一个集群由多个 broker 组成
- 一个 broker 可以容纳多个 topic
5.Topic
- 每条发布到kafka的消息都有一个类别(主题),这个类别被称为Topic
- 类似数据库的表名
- 物理上不同的topic分开存储
- 逻辑上一个topic的消息虽然保存与一个或多个broker中,但用户只需执行topic即可生产或消费数据而不需要知道存储在何处
6.Partition
- topic中的数据分为一个或多个partition
- 每个topic至少有一个partition,当生产者产生数据的时候,根据分配策略,将数据追加到指定的分区末尾(队列)
- 每条消息都会有一个自增的编号
- 每个partition中的数据用多个segment文件存储
- 每个partition的数据是有序的,不同partition间的数据丢失了数据的有序性
- 如果topic有多个partition,消费数据时就不能保证数据的顺序,在需要保证数据顺序的场景下,需要将partition设为1
7.Replication副本
- 为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本, 一个 leader 和若干个 follower
8.leader
- 每个partition有多个副本,其中有且仅有一个作为leader,作为主,负责数据的读写
9.follower
- 每个partition有多个副本,其中有多个follower,作为leader的从,负责数据的备份,实时从 leader 中同步数据,保持和 leader 数据的同步
- leader 发生故障时,会选举出一个 follower 成为新的 leader
- 当follower挂掉,卡主或者同步太慢,leader会把这个follower从ISR列表中删除,重新创建一个follower
10.offset偏移量
-
可以唯一的标识一条信息。
-
偏移量决定数据读取的位置,不会有线程安全问题,消费者通过偏移量来决定下次读取的信息
-
消息被消费后,并不会马上被删除,这样多个业务就可以重复使用kafka的信息
-
可以通过修改偏移量来重新读取消息,偏移量由用户控制
-
消息最终还是会被删除的,默认生命周期为一周
11.zookeeper
- kafka通过zookeeper存储集群的meta信息
二、搭建
1、环境
linux 7.6、jdk1.8
2、安装
1.安装宝塔
yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh
2.安装jdk
yum install -y java-1.8.0-openjdk-devel.x86_64
3.放行端口
放行1-65535端口
firewall-cmd --permanent --add-port 1-65535/tcp
4.定义目录结构
opt/kafka
opt/zookeeper/server1
opt/zookeeper/server2
opt/zookeeper/server3
5.安装zookeeper、kafka
#将下载好的zookeeper、kafka分别解压至如下目录
opt/kafka
opt/zookeeper/server1
opt/zookeeper/server2
opt/zookeeper/server3
6.创建zookerper各个节点的data和logs目录以及zookeeper节点标识文件的myid
#server1
opt/zookeeper/server1/apache-zookeeper-3.7.0-bin/data
opt/zookeeper/server1/apache-zookeeper-3.7.0-bin/data/myid #内容:1
opt/zookeeper/server1/apache-zookeeper-3.7.0-bin/logs
#server2
opt/zookeeper/server2/apache-zookeeper-3.7.0-bin/data
opt/zookeeper/server2/apache-zookeeper-3.7.0-bin/data/myid #内容:2
opt/zookeeper/server2/apache-zookeeper-3.7.0-bin/logs
#server3
opt/zookeeper/server3/apache-zookeeper-3.7.0-bin/data
opt/zookeeper/server3/apache-zookeeper-3.7.0-bin/data/myid #内容:3
opt/zookeeper/server3/apache-zookeeper-3.7.0-bin/logs
7.修改zookeeper配置文件
#1.将zoo_sample.cfg重命名为zoo.cfg
#server1
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/zookeeper/server1/apache-zookeeper-3.7.0-bin/data
dataLogDir=/opt/zookeeper/server1/apache-zookeeper-3.7.0-bin/logs
# the port at which the clients will connect
clientPort=2181
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
#server2
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/zookeeper/server2/apache-zookeeper-3.7.0-bin/data
dataLogDir=/opt/zookeeper/server2/apache-zookeeper-3.7.0-bin/logs
# the port at which the clients will connect
clientPort=2182
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.1=127.0.0.1:2890:3890
#server3
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/zookeeper/server3/apache-zookeeper-3.7.0-bin/data
dataLogDir=/opt/zookeeper/server3/apache-zookeeper-3.7.0-bin/logs
# the port at which the clients will connect
clientPort=2183
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
8.启动zookeeper节点
/opt/zookeeper/server1/apache-zookeeper-3.7.0-bin/bin/zkServer.sh start
/opt/zookeeper/server2/apache-zookeeper-3.7.0-bin/bin/zkServer.sh start
/opt/zookeeper/server3/apache-zookeeper-3.7.0-bin/bin/zkServer.sh start
9.配置kafka
#1.进入/opt/kafka/kafka_2.12-1.1.0/config
#2.复制server.preperties文件并重命名为server2.preperties、server3.preperties
#server.preperties
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs1
zookeeper.connect=localhost:2181
#server2.preperties
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs2
zookeeper.connect=localhost:2182
#server3.preperties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs3
zookeeper.connect=localhost:2183
10.启动kafka
#进入到/opt/kafka/kafka_2.12-1.1.0/bin中
./kafka-server-start.sh -daemon /opt/kafka/kafka_2.12-1.1.0/config/server.properties
./kafka-server-start.sh -daemon /opt/kafka/kafka_2.12-1.1.0/config/server2.properties
./kafka-server-start.sh -daemon /opt/kafka/kafka_2.12-1.1.0/config/server3.properties
11.测试
#创建主题
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testing
#查看主题
./kafka-topics.sh --list --zookeeper localhost:2181
#运行生产者,发送消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic testing
>hello,world
>thanks,bey
#获取消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testing --from-beginning
三、数据检索机制
-
topic在物理层面以partition为分组,一个topic可以分成若干个partition
-
partition还可以细分为segment,一个partition物理上由多个segment组成
-
segment的参数有2个
- log.segment.bytes:单个segment最大可容纳的数据量,默认1g
- log.segment.ms:kafka在commit一个未写满的segment前,所等待的时间,默认7天,即当7天后该segment文件还是没有写满,则也会重新创建一个新的segment
-
LogSegment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment的索引文件和数据文件
- partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值
- 数值大小为64位,20位数据字符长度,没有数字用0填充
- 消息都具有固定的物理结构,包括: offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magi(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小, 即读取到哪里截止。
四、数据的安全性
At least one:消息绝不会丢,但可能会重复传输
At most once:消息可能会丢,但绝不会重复传输
Exactly once:每条消息肯定会被传输一次且仅传输一次
1、producer delivery guarantee
为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后,都需要向producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
- acks=0:**At Most Once **
- 生产者每条消息只会被发送一次
- producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据
- acks=1:
- producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据
- acks=all:At Least Once
- 保证 Producer 到 Server 之间不会丢失数据
- producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复
2、ISR
kafka中一个topic可以分为多个Partition,每个partition又有多个副本,对于每个topic的partition而言,有一个leader副本,其余的都是follower,leader负责读与写,follower同步leader的数据。
当leader挂掉的时候,由controller主持在剩余follower中选举出一个leader但是这个剩余follower有一个条件,就是follower必须在ISR列表中,ISR (IN-SYNC Replication) 维护了与leader信息一致的follower的信息当leader挂掉的时候 就从这个ISR中选举。
在zookeeper中存储了一份ISR,所以当leader挂掉了也没关系。
关键词:
- AR: Assigned Replicas用来标识副本的全集
- OSR: out -sync Replicas离开同步队列的副本
- ISR: in -sync Replicas加入同步队列的副本
- ISR= Leader +没有落后太多的follower
- AR= OSR+ ISR
我们备份数据就是防止数据丢失,当主节点挂掉时,可以启用备份节点
- producer–push–>leader
- leader<–pull–follower
- Follower每间隔一定时间去Leader拉取数据,来保证数据的同步
ISR(in-syncReplica)
- 当主节点挂掉,并不是去Follower选择主,而是从ISR中选择主
- 判断标准
- 超过10秒钟没有同步数据
- replica.lag.time.max.ms=10000
- 主副节点差4000条数据
- rerplica.lag.max.messages=4000
- 超过10秒钟没有同步数据
- 脏节点选举
- kafka采用一种降级措施来处理
- 选举第一个恢复的node作为leader提供服务,以它的数据为基准,这个措施被称为脏leader选举
3、broker数据存储机制
无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据
1.基于时间: 1og.retention.hours=168
2.基于大小: 1og.retention.bytes=1073741824
4、consumer delivery guarantee
-
如果将consumer设置为autocommit, consumer 一旦读到数据立即自动commit
- 如果只讨论这一读取消息的过程, 那Kafka确保了Exactly once
-
读完消息先commit再处理消息
- 如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once
-
读完消息先处理再commit
- 如果在处理完消息之后commit之前consumer crash 了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once
-
如果一定要做到Exactly once,就需要协调offset和实际操作的输出
- 经典的做法是引入两阶段提交
-
Kafka 默认保证At least once,并且允许通过设置producer异步提交来实现At most once
五、kafka API
1、生产者
1.消息发送流程
Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了 两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。 main 线程将消息发送给 RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取 消息发送到 Kafka broker。
2.异步发送
1.导入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
2.编写代码
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list
props.put("acks", "all");
props.put("retries", 1);//重试次数
props.put("batch.size", 16384);//批次大小
props.put("linger.ms", 1);//等待时间
props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
//不带回调
//producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)), new Callback() {
//回调函数,该方法会在Producer收到ack时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("success->" + metadata.offset());
} else {
exception.printStackTrace();
}
}
});
}
producer.close();
}
}
回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试
3.同步发送
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同步发送的效果,只需在调用 Future 对象的 get 方发即可。
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list
props.put("acks", "all");
props.put("retries", 1);//重试次数
props.put("batch.size", 16384);//批次大小
props.put("linger.ms", 1);//等待时间
props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))).get();
}
producer.close();
}
}
2、消费者
Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
所以offset的维护是Consumer消费数据是必须考虑的问题。
1.手动提交
public class CustomConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop102:9092");
props.put("group.id", "test");//消费者组,只要group.id相同,就属于同一个消费者组
props.put("enable.auto.commit", "false");//自动提交offset
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("first"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}
}
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync会失败重试,一直到提交成功(如果由于不可恢复原因导致,也会提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。
2.自动提交
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
自动提交offset的相关参数:
- enable.auto.commit:是否开启自动提交offset功能
- auto.commit.interval.ms:自动提交offset的时间间隔
public class CustomConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop102:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("first"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
六、springboot整合kafka
1、前期准备
1.kafka配置外网
在kafka配置文件:server.properties中
#填入公网ip
advertised.listeners=PLAINTEXT://your ip:9092
2.新建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic2
也可以不手动创建topic,在执行代码kafkaTemplate.send(“topic1”, normalMessage)发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化topic,如下:
@Configuration
public class KafkaInitialConfiguration {
// 创建一个名为testtopic的Topic并设置分区数为8,分区副本数为2
@Bean
public NewTopic initialTopic() {
return new NewTopic("testtopic",8, (short) 2 );
}
// 如果要修改分区数,只需修改配置值重启项目即可
// 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
@Bean
public NewTopic updateTopic() {
return new NewTopic("testtopic",10, (short) 2 );
}
}
3.引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
4.application.propertise配置
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=ip:9092,ip:9093
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=1
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=all
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50
2、使用
1.简单生产者
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/kafka/normal/{message}")
public void sendMessage1(@PathVariable("message") String normalMessage) {
kafkaTemplate.send("topic1", normalMessage);
}
}
2.简单消费者
@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"topic1"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}
3、生产者
1.带回调的生产者
kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法:
@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}
@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
2.自定义分区器
我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:
-
若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
-
若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;
-
patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;
可以自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区
public class CustomizePartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 自定义分区规则(这里假设全部发到0号分区)
// ......
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
在application.propertise中配置自定义分区器,配置的值就是分区器类的全路径名
# 自定义分区器
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
3.kafka事务提交
如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务
@GetMapping("/kafka/transaction")
public void sendMessage7(){
// 声明事务:后面报错消息不会发出去
kafkaTemplate.executeInTransaction(operations -> {
operations.send("topic1","test executeInTransaction");
throw new RuntimeException("fail");
});
// 不声明事务:后面报错但前面消息已经发送成功了
kafkaTemplate.send("topic1","test executeInTransaction");
throw new RuntimeException("fail");
}
4、消费者
1.指定topic、partition、offset消费
前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供
/**
* @Title 指定topic、partition、offset消费
* @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
* @Author long.yuan
* @Date 2020/3/22 13:38
* @Param [record]
* @return void
**/
@KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
@TopicPartition(topic = "topic1", partitions = { "0" }),
@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
public void onMessage2(ConsumerRecord<?, ?> record) {
System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
}
属性解释:
① id:消费者ID;
② groupId:消费组ID;
③ topics:监听的topic,可监听多个;
④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。
上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。
注意:topics和topicPartitions不能同时使用
2.批量消费
注意:批量消费与单个消费仅可存在一个,若是选择批量消费则KafkaConsumer
中的onMessage1
需要注释掉
设置application.prpertise开启批量消费即可
# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
接收消息时用List来接收,监听代码如下
@KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
public void onMessage3(List<ConsumerRecord<?, ?>> records) {
System.out.println(">>>批量消费一次,records.size()="+records.size());
for (ConsumerRecord<?, ?> record : records) {
System.out.println(record.value());
}
}
3.ConsumerAwareListenerErrorHandler 异常处理器
通过异常处理器,我们可以处理consumer在消费时发生的异常。
新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器
// 新建一个异常处理器,用@Bean注入
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return (message, exception, consumer) -> {
System.out.println("消费异常:"+message.getPayload());
return null;
};
}
// 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
@KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
throw new Exception("简单消费-模拟异常");
}
// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
@KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
System.out.println("批量消费一次...");
throw new Exception("批量消费-模拟异常");
}
4.消息过滤器
消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。
配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
@Component
public class KafkaConsumer {
@Autowired
ConsumerFactory consumerFactory;
// 消息过滤器
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 被过滤的消息将被丢弃
factory.setAckDiscarded(true);
// 消息过滤策略
factory.setRecordFilterStrategy(consumerRecord -> {
if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
return false;
}
//返回true消息则被过滤
return true;
});
return factory;
}
// 消息过滤监听
@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
public void onMessage6(ConsumerRecord<?, ?> record) {
System.out.println(record.value());
}
}
上面实现了一个"过滤奇数、接收偶数"的过滤策略,我们向topic1发送奇数会发现不会被消费,只有偶数才会被消费。
5.消息转发
在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。
在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下:
@KafkaListener(topics = {"topic1"})
@SendTo("topic2")
public String onMessage7(ConsumerRecord<?, ?> record) {
return record.value()+"-forward message";
}