[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308383#comment-17308383
 ] 

Guozhang Wang commented on KAFKA-12478:
---------------------------------------

Hello [~hudeqi] Thanks for the updates. I think I understand your scenarios 
better now: what you need, is that while a consumer group is newly started on 
an existing topic that may already have many data, it is okay to skip all the 
old data produced before this consumer starts up; but once the consumer group 
has started fetching, do not miss any data from then on, even under 
add-partition events.

At the moment, my suggestion would be, to not rely on the reset.policy config 
for such cases: your scenario is intricate enough to have some customized logic 
while setting the reset.policy to earliest. For example, I'd suggest you have a 
wrapper around your consumer such that, before a new group is started, you 
first commit an offset based on the current timestamp (this is doable via an 
admin client, to retrieve offsets by time, and to write them as committed 
offsets of a given group name), and then start the consumers. At that time the 
consumers would just start from the committed offsets which are relatively 
close to the latest log end offsets anyways. When new partitions are created, 
since there's no committed offsets yet the consumers would fetch from earliest; 
if the new partitions are added around the same time when consumers are 
started, then the committed offsets should just be the starting offset since 
the timestamp given should be smaller than any newly produced messages to that 
partition, so you would still not miss any data.

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12478
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12478
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 2.7.0
>            Reporter: hudeqi
>            Priority: Blocker
>              Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to