kafka源码分析(一)- KafkaProducer

  |   0 评论   |   0 浏览

Kafka自定义了一套网络协议,只要遵循这个标准,就能向kafka发送消息,也可以从kafka中拉取消息。kafka有多个版本的生产者实现。我们这篇文章讲述的是Java版本的消费者实现-KafkaProducer,能轻松实现同步、异步发送消息、批量发送、超时重发等复杂的功能。

1.KafkaProducer基本流程

基本流程如下:

  • producerInterceptors对消息进行拦截
  • Serializer 对消息的key和value进行序列化
  • partitioner 为消息选择合适的partition
  • RecordAccumulator收集消息,实现批量发送
  • Sender从RecordAccumulator 获取消息
  • 构造ClientRequest
  • 将ClientRequest交给NetworkClient,准备发送
  • NetworkClient将请求放入KafkaChannel的缓存
  • 执行网络IO,发送请求
  • 收到响应,调用ClientRequest的回调函数
  • 调用RecordBatch的回调函数,最终调用每个消息上注册的回调函数

image.png

消息发送的过程,涉及两个线程的协同工作,主线程将业务数据封装成ProducerRecord对象,之后调用send()将消息放入RecordAccumulator(消息收集器)中暂存,Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送,KafkaProducer是线程安全的。多个线程间可以共享使用一个KafkaProducer。

KafkaProducer实现了Producer接口,在Producer接口中定义KafkaProducer对外提供的API、分为四类方法。

  • send()方法:发送消息,实际是将消息放入RecordAccumulator中暂存,等待发送。
  • flush()方法,刷新操作,等待RecordAccumulator中所有消息发送完成,在刷新完成之前会阻塞调用的线程。
  • partitionsFor方法:在KafkaProducer中维护了一个Metadata对象存储kafka集群的元数据,metadata中的数据会定期更新,partitionsFor()方法负责从metadata中获取指定topic中的分区信息
  • close()方法,关闭此Producer对象,主要操作是设置close标志,等待RecordAccumulator中的消息清空,关闭sender线程

KafkaProducer中关键字段分析:
image.png

  • PRODUCER_CLIENT_ID_SEQUENCE:clientId的生成器,如果没有明确指定client的id,则使用字段生成一个ID
  • clientId:此生产者的唯一的标识
  • partitioner:分区选择器,根据一定的策略,将消息路由到合适的分区。
  • maxRequestSize:消息的最大长度,这个长度包含了消息头,序列化后的key和序列化后的value的长度
  • totalMemorySize:发送单个消息的缓冲区大小
  • accumulator:用于手机并缓存消息,等待Sender线程发送
  • sender: 发送消息的Sender任务,实现了Runnable接口,在ioThread线程中执行。
  • isThread: 执行sender任务发送消息的线程,称为“Sender线程”
  • compressionType:压缩算法,可选项有none、gzip、snappy,lz4.
  • keySerializer:key的序列化器
  • valueSerializer : value的序列化器
  • Metadata : 整个kafka集群的元数据
  • maxBlockTimeMs:等待更新kafka集群元数据的最大时长
  • requestTimeoutMs:消息的超时时间,也就是消息发送到收到ACK响应的最长时长
  • interceptors:对消息进行拦截或修改,也可以先于用户的callback,对ack响应进行预处理
  • producerConfig :配置对象,使用反射初始化KafkaProducer配置的相对对象

KafkaProducer构造函数中,一些关键函数的初始化如下:

通过反射机制实例化配置的partitioner类,keySerializer类,valueSerializer类

this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.keySerializer.configure(config.originals(), true);

// 创建RecordAccumulator

this.accumulator = new RecordAccumulator(logContext,
                    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.compressionType,
                    config.getInt(ProducerConfig.LINGER_MS_CONFIG),
                    retryBackoffMs,
                    deliveryTimeoutMs,
                    metrics,
                    PRODUCER_METRIC_GROUP_NAME,
                    time,
                    apiVersions,
                    transactionManager,
                    new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));

// 创建NetworkClient,这个是KafkaProducer网络I/O的核心

KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
                new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                        this.metrics, time, "producer", channelBuilder, logContext),
                metadata,
                clientId,
                maxInflightRequests,
                producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                requestTimeoutMs,
                ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
                time,
                true,
                apiVersions,
                throttleTimeSensor,
                logContext);

// 启动Sender对应的线程

this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

2. KafkaProducer send()

KafkaProducer构造完成之后,详细看下send的调用流程。

@Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }
  • 调用ProducerInterceptors.onSend()方法,通过ProducerInterceptors对消息进行拦截或修改
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue calling other interceptors
                // be careful not to throw exception from here
                if (record != null)
                    log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }
  • 调用waitOnMetadata()方法获取kafka集群的信息,底层会唤醒Send线程更新Metadata中保存的kafka集群元数据

clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);

  • 调用Serializer()方法序列化消息的key和value
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
  • 调用partition为消息选择合适的分区

int partition = partition(record, serializedKey, serializedValue, cluster);

  • 调用RecordAccumulator.append()方法,将消息追加到RecordAccumulator中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);
  • 唤醒Sender线程,由Sender线程RecordAccumulator中缓存的消息发送出去
if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
}

3. ProducerInterceptors

ProducerInterceptors 是ProducerInterceptor的集合。可以在消息发送之前对其进行修改或者拦截,也可以先于用户的callback,对ack响应进行预处理

image.png

4. Kafka集群元数据

5. Serializer&Deserializer

6. Partitioner


标题:kafka源码分析(一)- KafkaProducer
作者:guobing
地址:http://guobingwei.tech/articles/2019/04/29/1556493797701.html