Kafka生产者
Published in:2024-03-01 | category: 学习

生产者发送原理

img

消息的发送可能会经过拦截器、序列化、分区器等过程。消息发送的主要涉及两个线程,分别为main线程和sender线程,其中main线程是消息的生产线程,而sender线程是jvm单例的线程,专门用于消息的发送。

在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,sender线程不断从RecordAccumulator中拉取消息发送到kafka Broker。

Kafka发送消息的三种形式

发后即忘:只管往Kafka中发送消息而不关心消息是否正确到达。在大多数情况下,这种发送方式没有什么问题,不过在有些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式性能最高,可靠性也最差。

img

同步:通过get方法等待Kafka的响应,判断消息是否发送成功。以同步的方式发送消息时,一条一条地发送,对每条消息返回的结果判断,可以明确地知道每条消息的发送情况,但是由于同步的方式会阻塞,只有当消息通过get返回future对象时,才会继续下一条消息的发送。

img

异步:消息以异步的方式发送,通过回调函数返回消息成功/失败。在调用send方法发送消息的同时,指定一个回调函数,服务器在返回响应时会调用该回调函数,通过回调函数能够对异常情况进行处理,当调用了回调函数时,只有回调函数执行完毕生产者才会结束,否则一直会阻塞。

img

生产者拦截器

生产者拦截器和消费者拦截器时在Kafka0.10版本被引入的,主要用于实现Client端的定制化控制逻辑。

对于生产者而言,拦截器是的用户在消息发送前以及生产者回调逻辑千有机会对消息做一些定制化需求,比如修改消息等,可以指定多个拦截器按序作用于同一条消息从而形成一个拦截链。

自定义拦截器:可以通过实现org.apache.kafka.clients.producer.ProducerInterceptor接口编写拦截器。ProducerInterceptor包含configure,onSend,onAcknowledgement,close。

onSend:该方法在消息发送前调用,可以编写一些对消息做过滤,修改的程序等,这里可以对消息做任何操作。但是最好不要改变消息的主题和分区信息,否则会影响目标分区的计算。

onAcknowledgement:该方法会在消息从RecordAccumulator成功发送到Kafka之后或者发送过程中失败调用,可以用于地消息失败成功发送数量做统计等。但是该方法允许在生产者的IO线程中,尽量不要写一些重要的逻辑,否则会拖慢生产者消息发送效率。

close:关闭拦截器,主要用于执行一些资源清理工作。

序列器

在Kafka中,创建一个生产者对象必须指定序列化器,我们有两种选择:

1、使用自定义序列化器。

2、使用已有的序列化器和反序列化器。

通常不建议使用自定义序列化器,因为使用不同版本的序列化器和反序列化器会出现新旧消息的兼容性问题,特变时当消息记录中有修改字段的情况。

Apache Avro是一种与编程语言无关的序列化格式,推荐在Kafka上使用Avro序列化器。

Avro数据通过与语言无关的schema来定义,schema通过JSON来描述,数据被序列化成二进制文件或JSON文件,不过一般会使用二进制文件。Avro在读写文件时需要用到schema,schema一般会被内嵌在数据文件里。

Avro有一个很好的特性,就是当负责写消息的应用程序使用了新的schema,负责读消息的应用程序可以继续处理消息无需做任何改动。

分区器

分区好处:

1、便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上,合理控制分区的任务,可以实现负载均衡的效果。

2、提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费数据。

分区策略:

1、在指明partition的情况下,直接将指明的值作为partition值。

2、没有指明partition值但是有key的情况下,将key的hash值与topic的partition数进行取余得到partition值。

3、既没有partition值又没有key值得情况下,Kafka采用Sricky Partition(黏性分区器),会随机选择一个分区,并尽可能用一直使用该分区,待该分区得batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

自定义分区:

1、定义类实现Partition方法。

2、重写partition()方法。

3、使用分区器方法,再生产者的配置中添加分区器参数。

生产者提高吞吐量

buffer.memory:该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,send()方法调用要么被阻塞,要么抛出异常,取决于如何设max.block.ms。当生产者调用时send(),消息并不会立即发送,而是会添加到内部缓冲区中。默认buffer.memory值为32MB。如果生产者发送消息的速度超过了将消息发送到broker的速度,或者存在网络问题,send()方法调用会被阻塞max.block.ms参数配置的时常,默认1分钟。

max.block.ms:该参数指定了在调用send()方法或使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会被阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常。

linger.ms:该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。kafka生产者会在批次填满或linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把linger.ms设置成比0大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。

batch.size:当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者井不一定都会等到批次被填满才发送,这取决于linger.ms的配置,比如如果linger.ms时间到了,即便批次只包含一个消息,也会被立即发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。

可以使用配置使用linger.ms和batch.size。linger.ms是准备好发送批次之前的延迟时间,默认值为0。这意味着即使批次中只有1条消息,批次也会立即发送。有时,会增加linger.ms以减少请求数量并提高吞吐量。但这将导致更多消息保留在内存中。batch.size是单个批次的最大大小,当满足这两个要求中的任何一个时,将发送批次。

compression.type:默认情况下,消息发送时不会被压缩。该参数可以设置为snappy 、gzip 或lz4,它指定了消息被发送给broker 之前使用哪一种压缩算也进行压缩。使用压缩可以降低网络传输开销和存储开销,而这往往是向Kafka 发送消息的瓶颈所在。

故要提高生产者的吞吐量,就必须合理的配置这几个参数的值,要多方面综合考虑。如为了增大batch.size就无限设置linger.ms,让其等待时间过长,这样就会导致消息发送延迟过高,这是不可取的。

消息累加器

kafka为了提高Producer客户端的发送吞吐量和提高性能,选择了将消息暂时缓存起来,等到满足一定的条件,再进行批量发送,,这样可以减少网络请求,提高吞吐量。

消息累加器默认32m,如果生产者的发送速率大于sender发送的速率,消息就会堆满累加器。生产者就会阻塞,或者报错,报错取决于阻塞时间的配置。

累加器的存储形式为ConcurrentMap<TopicPartition, Deque>,可以看出来就是一个分区对应一个双端队列,队列中存储的是ProducerBatch一般大小是16k根据batch.size配置,新的消息会append到ProducerBatch中,满16k就会创建新的ProducerBatch,并且触发sender线程进行发送。

如果消息量非常大,生成了大量的ProducerBatch,在发送后,又需要JVM通过GC回收这ProducerBatch就变得非常影响性能,所以kafka通过 BufferPool作为内存池来管理ProducerBatch的创建和回收,需要申请一个新的ProducerBatch空间时,调用 free.allocate(size, maxTimeToBlock)找内存池申请空间。

如果单条消息大于16k,那么就不会复用内存池了,会生成一个更大的ProducerBatch专门存放大消息,发送完后GC回收该内存空间。

消息确认机制-ACK

Kafka中消息确认(Acknowledgement)是指生产者发送消息后,等待Kafka返回的确认信息。信息确认用于保证消息发送的可靠性,主要有一下几种模式:

1、最速消息确认:生产者发送消息后立即返回确认,不等待Broker的确认。此模式吞吐量最高,但可靠性最低。

2、知识消息确认:生产者发送消息后等待Broker落盘确认,再返回确认给生产者。此模式吞吐量较高,且消息不会丢失。

3、完成消息确认:生产者发送消息后等待消息被提交后再确认,再返回确认给消费者。此模式吞吐量较低,但可靠性最高。

4、等待ISR确认:生产者发送消息后等待消息被复制到ISR集合内的副本数后再确认,再返回确认给消费者。此模式吞吐量中等,且保证高可靠性。

acks确认机制

acks参数指定了必须要有多少个分区副本收到消息,生产者才认为该消息是写入成功的。

acks=0,表示生产者在成功写入消息之前不会等待任何来自服务器的响应。。换句话说,一旦出现了问题导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。

acks=1,表示只要集群的leader分区副本接收到了消息,就会向生产者发送一个成功响应的ack,此时生产者接收到ack之后就可以认为该消息是写入成功的。 一旦消息无法写入leader分区副本(比如网络原因、leader节点崩溃),生产者会收到一个错误响应。

acks =-1/all,表示只有所有参与复制的节点(ISR列表的副本)全部收到消息时,生产者才会接收到来自服务器的响应。这种模式是最高级别的,也是最安全的,可以确保不止一个Broker接收到了消息。 该模式的延迟会很高。

数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

幂等性

在一般的MQ模型中,常有以下的消息通信概念

至少一次: ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量>=2。可以保证数据不丢失,但是不能保证数据不重复。

最多一次:ACK级别设置为0 。可以保证数据不重复,但是不能保证数据不丢失。

精确一次:至少一次 + 幂等性 。 Kafka 0.11版本引入一项重大特性:幂等性和事务。

幂等性,简单地说就是对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,而使用Kafka 的幂等性功能之后就可以避免这种情况。(不产生重复数据)

实现幂等的关键点就是服务端可以区分请求是否重复,过滤掉重复的请求。要区分请求是否重复的有两点:

唯一标识:要想区分请求是否重复,请求中就得有唯一标识。例如支付请求中,订单号就是唯一标识

记录下已处理过的请求标识:光有唯一标识还不够,还需要记录下那些请求是已经处理过的,这样当收到新的请求时,用新请求中的标识和处理记录进行比较,如果处理记录中有相同的标识,说明是重复交易,拒绝掉。

为了实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。

PID:每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。

Sequence Numbler:对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。

Kafka可能存在多个生产者,会同时产生消息,但对Kafka来说,只需要保证每个生产者内部的消息幂等就可以了,所有引入了PID来标识不同的生产者。

对于Kafka来说,要解决的是生产者发送消息的幂等问题。也即需要区分每条消息是否重复。

Kafka通过为每条消息增加一个Sequence Numbler,通过Sequence Numbler来区分每条消息。每条消息对应一个分区,不同的分区产生的消息不可能重复。所有Sequence Numbler对应每个分区

Broker端在缓存中保存了这seq number,对于接收的每条消息,如果其序号比Broker缓存中序号大于1则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。但是,只能保证单个Producer对于同一个<Topic, Partition>的Exactly Once语义。不能保证同一个Producer一个topic不同的partion幂等。

开启幂等性:生产者客户端参数enable.idempotence设置为true。

事务消息

由于幂等性不能跨分区运作,为了保证同时发的多条消息,要么全成功,要么全失败。kafka引入了事务的概念。

原子性:事务性消息要么完全成功,要么完全失败。这确保了消息不会被部分处理。
可靠性:一旦消息被写入Kafka,它们将被视为已经处理,即使发生了应用程序或系统故障。
顺序性:事务性消息在单个分区内保持顺序。这对于需要按顺序处理的应用程序至关重要。

Kafka事务的回滚,并不是删除已写入的数据,而是将写入数据的事务标记为 Rollback/Abort 从而在读数据时过滤该数据。

保证消息顺序

针对消息有序的业务需求,还分为全局有序和局部有序。

  • 全局有序:一个Topic下的所有消息都需要按照生产顺序消费。
  • 局部有序:一个Topic下的消息,只需要满足同一业务字段的要按照生产顺序消费。例如:Topic消息是订单的流水表,包含订单orderId,业务要求同一个orderId的消息需要按照生产顺序进行消费。

全局有序

由于Kafka的一个Topic可以分为了多个Partition,Producer发送消息的时候,是分散在不同 Partition的。当Producer按顺序发消息给Broker,但进入Kafka之后,这些消息就不一定进到哪个Partition,会导致顺序是乱的。

因此要满足全局有序,需要1个Topic只能对应1个Partition。

而且对应的consumer也要使用单线程或者保证消费顺序的线程模型。

局部有序

要满足局部有序,只需要在发消息的时候指定Partition Key,Kafka对其进行Hash计算,根据计算结果决定放入哪个Partition。这样Partition Key相同的消息会放在同一个Partition。此时,Partition的数量仍然可以设置多个,提升Topic的整体吞吐量。

img

Next:
Kafka简介