kafka源码分析(一)- KafkaProducer
Kafka自定义了一套网络协议,只要遵循这个标准,就能向kafka发送消息,也可以从kafka中拉取消息。kafka有多个版本的生产者实现。我们这篇文章讲述的是Java版本的消费者实现-KafkaProducer,能轻松实现同步、异步发送消息、批量发送、超时重发等复杂的功能。
1.KafkaProducer基本流程
基本流程如下:
- producerInterceptors对消息进行拦截
- Serializer 对消息的key和value进行序列化
- partitioner 为消息选择合适的partition
- RecordAccumulator收集消息,实现批量发送
- Sender从RecordAccumulator 获取消息
- 构造ClientRequest
- 将ClientRequest交给NetworkClient,准备发送
- NetworkClient将请求放入KafkaChannel的缓存
- 执行网络IO,发送请求
- 收到响应,调用ClientRequest的回调函数
- 调用RecordBatch的回调函数,最终调用每个消息上注册的回调函数
消息发送的过程,涉及两个线程的协同工作,主线程将业务数据封装成ProducerRecord对象,之后调用send()将消息放入RecordAccumulator(消息收集器)中暂存,Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送,KafkaProducer是线程安全的。多个线程间可以共享使用一个KafkaProducer。
KafkaProducer实现了Producer接口,在Producer接口中定义KafkaProducer对外提供的API、分为四类方法。
- send()方法:发送消息,实际是将消息放入RecordAccumulator中暂存,等待发送。
- flush()方法,刷新操作,等待RecordAccumulator中所有消息发送完成,在刷新完成之前会阻塞调用的线程。
- partitionsFor方法:在KafkaProducer中维护了一个Metadata对象存储kafka集群的元数据,metadata中的数据会定期更新,partitionsFor()方法负责从metadata中获取指定topic中的分区信息
- close()方法,关闭此Producer对象,主要操作是设置close标志,等待RecordAccumulator中的消息清空,关闭sender线程
KafkaProducer中关键字段分析:
- PRODUCER_CLIENT_ID_SEQUENCE:clientId的生成器,如果没有明确指定client的id,则使用字段生成一个ID
- clientId:此生产者的唯一的标识
- partitioner:分区选择器,根据一定的策略,将消息路由到合适的分区。
- maxRequestSize:消息的最大长度,这个长度包含了消息头,序列化后的key和序列化后的value的长度
- totalMemorySize:发送单个消息的缓冲区大小
- accumulator:用于手机并缓存消息,等待Sender线程发送
- sender: 发送消息的Sender任务,实现了Runnable接口,在ioThread线程中执行。
- isThread: 执行sender任务发送消息的线程,称为“Sender线程”
- compressionType:压缩算法,可选项有none、gzip、snappy,lz4.
- keySerializer:key的序列化器
- valueSerializer : value的序列化器
- Metadata : 整个kafka集群的元数据
- maxBlockTimeMs:等待更新kafka集群元数据的最大时长
- requestTimeoutMs:消息的超时时间,也就是消息发送到收到ACK响应的最长时长
- interceptors:对消息进行拦截或修改,也可以先于用户的callback,对ack响应进行预处理
- producerConfig :配置对象,使用反射初始化KafkaProducer配置的相对对象
KafkaProducer构造函数中,一些关键函数的初始化如下:
通过反射机制实例化配置的partitioner类,keySerializer类,valueSerializer类
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.keySerializer.configure(config.originals(), true);
// 创建RecordAccumulator
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
config.getInt(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
// 创建NetworkClient,这个是KafkaProducer网络I/O的核心
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder, logContext),
metadata,
clientId,
maxInflightRequests,
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
requestTimeoutMs,
ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time,
true,
apiVersions,
throttleTimeSensor,
logContext);
// 启动Sender对应的线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
2. KafkaProducer send()
KafkaProducer构造完成之后,详细看下send的调用流程。
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
- 调用ProducerInterceptors.onSend()方法,通过ProducerInterceptors对消息进行拦截或修改
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
interceptRecord = interceptor.onSend(interceptRecord);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue calling other interceptors
// be careful not to throw exception from here
if (record != null)
log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
else
log.warn("Error executing interceptor onSend callback", e);
}
}
return interceptRecord;
}
- 调用waitOnMetadata()方法获取kafka集群的信息,底层会唤醒Send线程更新Metadata中保存的kafka集群元数据
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
- 调用Serializer()方法序列化消息的key和value
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
- 调用partition为消息选择合适的分区
int partition = partition(record, serializedKey, serializedValue, cluster);
- 调用RecordAccumulator.append()方法,将消息追加到RecordAccumulator中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);
- 唤醒Sender线程,由Sender线程RecordAccumulator中缓存的消息发送出去
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
3. ProducerInterceptors
ProducerInterceptors 是ProducerInterceptor的集合。可以在消息发送之前对其进行修改或者拦截,也可以先于用户的callback,对ack响应进行预处理
4. Kafka集群元数据
5. Serializer&Deserializer
6. Partitioner
标题:kafka源码分析(一)- KafkaProducer
作者:guobing
地址:http://guobingwei.tech/articles/2019/04/29/1556493797701.html