[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17310106#comment-17310106
 ] 

Guozhang Wang commented on KAFKA-12478:
---------------------------------------

Thanks [~hudeqi]. I think it is still better to implement it on the client said 
to be more flexible across different clients.

Regarding on how to implement it, I think you can read about a relevant 
ticket's discussion 
(https://issues.apache.org/jira/browse/KAFKA-3370?focusedCommentId=15299252&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15299252)
 especially from [~gwenshap] and [~vahid]. And there's a PR for part of that 
approach: https://github.com/apache/kafka/pull/9006

If you are interested, I'd suggest you to follow a KIP process to add options 
to the reset.policy config. People across the community can discuss more about 
this, but on top of my head here are a few more options we can consider:

* besides `latest` and `earliest`, we also add `nearest`: reset to either 
latest or earliest depending on the current offset (i.e. this policy won't 
trigger under the scenario when we see a partition for the first time, without 
committed offsets; it will only trigger for out-of-range).
* `latest-on-start`, `earliest-on-start`: reset to either latest or earliest 
only when we see the partition for the first time without committed offset; 
when out-of-range default to `none`, i.e. throw exception.
* an additional `timestamp` limit used for 
`latest/earliest/latest-on-start/earliest-on-start`: it means we only reset to 
latest / earliest if its corresponding record timestamp is smaller / larger 
than the given `time` parameter, otherwise, reset to earliest / latest. This 
can use for your feature, i.e. you can set the config as `earliest` with 
`timestamp` set to when the consumer group started, then at starting up it 
would reset to latest since the earliest record's timestamp is smaller than the 
given parameter, and then later when new partitions are added it would reset to 
`earliest`.

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12478
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12478
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 2.7.0
>            Reporter: hudeqi
>            Priority: Blocker
>              Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to