Natan Silnitsky created KAFKA-8358: -------------------------------------- Summary: KafkaConsumer.endOffsets should be able to also return end offsets while not ignoring control records Key: KAFKA-8358 URL: https://issues.apache.org/jira/browse/KAFKA-8358 Project: Kafka Issue Type: Improvement Reporter: Natan Silnitsky
We have a use case where we have a wrapper on top of {{kafkaConsumer}} for compact logs. In order to know that a user can get "new" values for a key in the compact log, on init, or on rebalance, we need to block until all "old" values were read. We wanted to use {{KafkaConsumer.endOffsets}} to help us find out where the "old" values end. once all "old" values arrive from {{KafkaConsumer.poll}}, we can release the blocking on getting new values. But it seems that [control records|https://github.com/apache/kafka/blob/c09e25fac2aaea61af892ae3e5273679a4bdbc7d/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L128] are not received in {{KafkaConsumer.poll }} but are taking into account for {{KafkaConsumer.endOffsets }} So the Feature request is for {{KafkaConsumer.endOffsets}} to have a flag to ignore control records, the same way that {{KafkaConsumer.poll }} ignores them. (From a quick review of the code, it seems that {{LeaderEpochFile}}.[assign|https://github.com/apache/kafka/blob/c09e25fac2aaea61af892ae3e5273679a4bdbc7d/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L51] can be given the flag isControl from [batch.isControlBatch|https://github.com/apache/kafka/blob/c09e25fac2aaea61af892ae3e5273679a4bdbc7d/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java#L239] But I'm maybe wrong with my understanding there...) CC: [~berman7] [~berman] -- This message was sent by Atlassian JIRA (v7.6.3#76005)