[ https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17310106#comment-17310106 ]
Guozhang Wang commented on KAFKA-12478: --------------------------------------- Thanks [~hudeqi]. I think it is still better to implement it on the client said to be more flexible across different clients. Regarding on how to implement it, I think you can read about a relevant ticket's discussion (https://issues.apache.org/jira/browse/KAFKA-3370?focusedCommentId=15299252&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15299252) especially from [~gwenshap] and [~vahid]. And there's a PR for part of that approach: https://github.com/apache/kafka/pull/9006 If you are interested, I'd suggest you to follow a KIP process to add options to the reset.policy config. People across the community can discuss more about this, but on top of my head here are a few more options we can consider: * besides `latest` and `earliest`, we also add `nearest`: reset to either latest or earliest depending on the current offset (i.e. this policy won't trigger under the scenario when we see a partition for the first time, without committed offsets; it will only trigger for out-of-range). * `latest-on-start`, `earliest-on-start`: reset to either latest or earliest only when we see the partition for the first time without committed offset; when out-of-range default to `none`, i.e. throw exception. * an additional `timestamp` limit used for `latest/earliest/latest-on-start/earliest-on-start`: it means we only reset to latest / earliest if its corresponding record timestamp is smaller / larger than the given `time` parameter, otherwise, reset to earliest / latest. This can use for your feature, i.e. you can set the config as `earliest` with `timestamp` set to when the consumer group started, then at starting up it would reset to latest since the earliest record's timestamp is smaller than the given parameter, and then later when new partitions are added it would reset to `earliest`. > Consumer group may lose data for newly expanded partitions when add > partitions for topic if the group is set to consume from the latest > --------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-12478 > URL: https://issues.apache.org/jira/browse/KAFKA-12478 > Project: Kafka > Issue Type: Improvement > Components: clients > Affects Versions: 2.7.0 > Reporter: hudeqi > Priority: Blocker > Labels: patch > Original Estimate: 1,158h > Remaining Estimate: 1,158h > > This problem is exposed in our product environment: a topic is used to > produce monitoring data. *After expanding partitions, the consumer side of > the business reported that the data is lost.* > After preliminary investigation, the lost data is all concentrated in the > newly expanded partitions. The reason is: when the server expands, the > producer firstly perceives the expansion, and some data is written in the > newly expanded partitions. But the consumer group perceives the expansion > later, after the rebalance is completed, the newly expanded partitions will > be consumed from the latest if it is set to consume from the latest. Within a > period of time, the data of the newly expanded partitions is skipped and lost > by the consumer. > If it is not necessarily set to consume from the earliest for a huge data > flow topic when starts up, this will make the group consume historical data > from the broker crazily, which will affect the performance of brokers to a > certain extent. Therefore, *it is necessary to consume these partitions from > the earliest separately.* -- This message was sent by Atlassian Jira (v8.3.4#803005)