[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2022-06-23 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557884#comment-17557884
 ] 

hudeqi commented on KAFKA-12478:


Finally you are here:D,thanks,Guozhang.

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.1
>Reporter: hudeqi
>Priority: Blocker
>  Labels: kip-842
> Attachments: safe-console-consumer.png, safe-consume.png, 
> safe-produce.png, trunk-console-consumer.png, trunk-consume.png, 
> trunk-produce.png
>
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*
>  
> I did a test and the result is as attached screenshot. Firstly, set by 
> producer and consumer "metadata.max.age.ms" are 500ms and 3ms 
> respectively.
> _trunk-console-consumer.png_ means to use the community version to start the 
> consumer and set "latest". 
> _trunk-produce.png_ means the data produced, "partition_count" means the 
> number of partitions of the current topic, "message" means the digital 
> content of the corresponding message, "send_to_partition_index" Indicates the 
> index of the partition to which the corresponding message is sent. It can be 
> seen that at 11:32:10, the producer perceives the expansion of the total 
> partitions from 2 to 3, and writes the numbers 38, 41, and 44 into the newly 
> expanded partition 2.
> _trunk-consume.png_ represents all the digital content consumed by the 
> community version. You can see that 38 and 41 sent to partition 2 were not 
> consumed at the beginning. Finally, after partition 2 was perceived, 38 and 
> 41 were still not consumed. Instead, it has been consumed from the latest 44, 
> so the two data of 38 and 41 are discarded.
>  
> _safe-console-consumer.png_ means to use the fixed version to start the 
> consumer and set "safe_latest". 
> _safe-produce.png_ means the data produced. It can be seen that at 12:12:09, 
> the producer perceives the expansion of the total partitions from 4 to 5, and 
> writes the numbers 109 and 114 into the newly expanded partition 4.
> _safe-consume.png_ represents all the digital content consumed by the fixed 
> version. You can see that 109 sent to partition 4 were not consumed at the 
> beginning. Finally, after partition 4 was perceived,109 was consumed as the 
> first data of partition 4. So the fixed version will not cause consumption to 
> lose data under this condition.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2022-06-22 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557576#comment-17557576
 ] 

Guozhang Wang commented on KAFKA-12478:
---

Thanks [~hudeqi], I will take a look at the KIP.

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.1
>Reporter: hudeqi
>Priority: Blocker
>  Labels: kip-842
> Attachments: safe-console-consumer.png, safe-consume.png, 
> safe-produce.png, trunk-console-consumer.png, trunk-consume.png, 
> trunk-produce.png
>
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*
>  
> I did a test and the result is as attached screenshot. Firstly, set by 
> producer and consumer "metadata.max.age.ms" are 500ms and 3ms 
> respectively.
> _trunk-console-consumer.png_ means to use the community version to start the 
> consumer and set "latest". 
> _trunk-produce.png_ means the data produced, "partition_count" means the 
> number of partitions of the current topic, "message" means the digital 
> content of the corresponding message, "send_to_partition_index" Indicates the 
> index of the partition to which the corresponding message is sent. It can be 
> seen that at 11:32:10, the producer perceives the expansion of the total 
> partitions from 2 to 3, and writes the numbers 38, 41, and 44 into the newly 
> expanded partition 2.
> _trunk-consume.png_ represents all the digital content consumed by the 
> community version. You can see that 38 and 41 sent to partition 2 were not 
> consumed at the beginning. Finally, after partition 2 was perceived, 38 and 
> 41 were still not consumed. Instead, it has been consumed from the latest 44, 
> so the two data of 38 and 41 are discarded.
>  
> _safe-console-consumer.png_ means to use the fixed version to start the 
> consumer and set "safe_latest". 
> _safe-produce.png_ means the data produced. It can be seen that at 12:12:09, 
> the producer perceives the expansion of the total partitions from 4 to 5, and 
> writes the numbers 109 and 114 into the newly expanded partition 4.
> _safe-consume.png_ represents all the digital content consumed by the fixed 
> version. You can see that 109 sent to partition 4 were not consumed at the 
> beginning. Finally, after partition 4 was perceived,109 was consumed as the 
> first data of partition 4. So the fixed version will not cause consumption to 
> lose data under this condition.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2022-06-15 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554884#comment-17554884
 ] 

hudeqi commented on KAFKA-12478:


Hello, Guozhang. I have started a vote on KIP-842 for this issue. Does the 
status of this issue also need to be changed synchronously? In addition, please 
check and vote on this vote, thank you. cc @ [~showuon] 

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.1
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
> Attachments: safe-console-consumer.png, safe-consume.png, 
> safe-produce.png, trunk-console-consumer.png, trunk-consume.png, 
> trunk-produce.png
>
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*
>  
> I did a test and the result is as attached screenshot. Firstly, set by 
> producer and consumer "metadata.max.age.ms" are 500ms and 3ms 
> respectively.
> _trunk-console-consumer.png_ means to use the community version to start the 
> consumer and set "latest". 
> _trunk-produce.png_ means the data produced, "partition_count" means the 
> number of partitions of the current topic, "message" means the digital 
> content of the corresponding message, "send_to_partition_index" Indicates the 
> index of the partition to which the corresponding message is sent. It can be 
> seen that at 11:32:10, the producer perceives the expansion of the total 
> partitions from 2 to 3, and writes the numbers 38, 41, and 44 into the newly 
> expanded partition 2.
> _trunk-consume.png_ represents all the digital content consumed by the 
> community version. You can see that 38 and 41 sent to partition 2 were not 
> consumed at the beginning. Finally, after partition 2 was perceived, 38 and 
> 41 were still not consumed. Instead, it has been consumed from the latest 44, 
> so the two data of 38 and 41 are discarded.
>  
> _safe-console-consumer.png_ means to use the fixed version to start the 
> consumer and set "safe_latest". 
> _safe-produce.png_ means the data produced. It can be seen that at 12:12:09, 
> the producer perceives the expansion of the total partitions from 4 to 5, and 
> writes the numbers 109 and 114 into the newly expanded partition 4.
> _safe-consume.png_ represents all the digital content consumed by the fixed 
> version. You can see that 109 sent to partition 4 were not consumed at the 
> beginning. Finally, after partition 4 was perceived,109 was consumed as the 
> first data of partition 4. So the fixed version will not cause consumption to 
> lose data under this condition.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-27 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17310106#comment-17310106
 ] 

Guozhang Wang commented on KAFKA-12478:
---

Thanks [~hudeqi]. I think it is still better to implement it on the client said 
to be more flexible across different clients.

Regarding on how to implement it, I think you can read about a relevant 
ticket's discussion 
(https://issues.apache.org/jira/browse/KAFKA-3370?focusedCommentId=15299252=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15299252)
 especially from [~gwenshap] and [~vahid]. And there's a PR for part of that 
approach: https://github.com/apache/kafka/pull/9006

If you are interested, I'd suggest you to follow a KIP process to add options 
to the reset.policy config. People across the community can discuss more about 
this, but on top of my head here are a few more options we can consider:

* besides `latest` and `earliest`, we also add `nearest`: reset to either 
latest or earliest depending on the current offset (i.e. this policy won't 
trigger under the scenario when we see a partition for the first time, without 
committed offsets; it will only trigger for out-of-range).
* `latest-on-start`, `earliest-on-start`: reset to either latest or earliest 
only when we see the partition for the first time without committed offset; 
when out-of-range default to `none`, i.e. throw exception.
* an additional `timestamp` limit used for 
`latest/earliest/latest-on-start/earliest-on-start`: it means we only reset to 
latest / earliest if its corresponding record timestamp is smaller / larger 
than the given `time` parameter, otherwise, reset to earliest / latest. This 
can use for your feature, i.e. you can set the config as `earliest` with 
`timestamp` set to when the consumer group started, then at starting up it 
would reset to latest since the earliest record's timestamp is smaller than the 
given parameter, and then later when new partitions are added it would reset to 
`earliest`.

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-26 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17309276#comment-17309276
 ] 

hudeqi commented on KAFKA-12478:


Ok, If I submit a patch, do you prefer to modify the logic on the client side 
or on the server side?

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-25 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17308875#comment-17308875
 ] 

Guozhang Wang commented on KAFKA-12478:
---

I think your way works too -- what I meant before is to handle the race 
condition when new consumer starts, and new partitions added, at the same time.

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-25 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17308575#comment-17308575
 ] 

hudeqi commented on KAFKA-12478:


Thank you very much for your reply! I think you may already understand the 
scenario I said.

For the solution of this scenario, my idea is very similar to your suggestion. 
The biggest difference is: I implement it completely on the server side, 
because the company’s business uses too many types and numbers of kafka 
clients, we need to fix each type of client, it is still a big trouble to 
promote the upgrade. Secondly, I don't quite understand what you said above:"if 
the new partitions are added around the same time when consumers are started". 
My idea is to find out all the groups subscribed to this topic before the admin 
starts to add partitions, and then let these groups commit an initial offset 0 
for these expanded partitions (also using adminClient). Finally, the real 
process of adding partitions is carried out. In this way, the above scenario 
can be completely solved, and it is transparent to the client. Can I mention a 
KIP and a patch for this problem? 

Looking forward to your reply!

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-24 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17308383#comment-17308383
 ] 

Guozhang Wang commented on KAFKA-12478:
---

Hello [~hudeqi] Thanks for the updates. I think I understand your scenarios 
better now: what you need, is that while a consumer group is newly started on 
an existing topic that may already have many data, it is okay to skip all the 
old data produced before this consumer starts up; but once the consumer group 
has started fetching, do not miss any data from then on, even under 
add-partition events.

At the moment, my suggestion would be, to not rely on the reset.policy config 
for such cases: your scenario is intricate enough to have some customized logic 
while setting the reset.policy to earliest. For example, I'd suggest you have a 
wrapper around your consumer such that, before a new group is started, you 
first commit an offset based on the current timestamp (this is doable via an 
admin client, to retrieve offsets by time, and to write them as committed 
offsets of a given group name), and then start the consumers. At that time the 
consumers would just start from the committed offsets which are relatively 
close to the latest log end offsets anyways. When new partitions are created, 
since there's no committed offsets yet the consumers would fetch from earliest; 
if the new partitions are added around the same time when consumers are 
started, then the committed offsets should just be the starting offset since 
the timestamp given should be smaller than any newly produced messages to that 
partition, so you would still not miss any data.

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-18 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304592#comment-17304592
 ] 

hudeqi commented on KAFKA-12478:


Thank you very much for your reply! I belong to Kuaishou Message-oriented 
middleware group, which is mainly responsible for the secondary development and 
personalized customization of kafka.

  Kafka is widely used throughout the company, and the case proposed by the 
issue was recently discovered by a business partner who is very sensitive to 
data. This shocked us, the company has a large number of topic, and 
add-partition is a relatively high-frequency operation, but a considerable part 
of business uses latest parameters. If the consumer client perceives the 
expansion lagging behind the producer client, data will be definitely lost. As 
a storage middleware, losing data must be a serious problem. *Although this 
problem can be avoided by config earliest, but it is not elegant, and the 
company uses clients in many other languages, such as rdkafka,go,python, etc. 
We expect to be transparent to the client without losing data, and if the 
amount of topic data is large. "earliest" may also put some pressure on the 
kafka servers, so we want to optimize the server logic to nearly completely 
solve this case.*

  Looking forward to your reply!

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-18 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304286#comment-17304286
 ] 

hudeqi commented on KAFKA-12478:


z

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-18 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304285#comment-17304285
 ] 

hudeqi commented on KAFKA-12478:


  Thank you very much for your reply! I belong to Kuaishou Message-oriented 
middleware group, which is mainly responsible for the secondary development and 
personalized customization of kafka.

  Kafka is widely used throughout the company, and the case proposed by the 
issue was recently discovered by a business partner who is very sensitive to 
data. This shocked us, the company has a large number of topic, and 
add-partition is a relatively high-frequency operation, but a considerable part 
of business uses latest parameters. If the consumer client perceives the 
expansion lagging behind the producer client, data will be definitely lost. As 
a storage middleware, losing data must be a serious problem. Although this 
problem can be avoided by config earliest, but it is not elegant, and the 
company uses clients in many other languages, such as rdkafka,go,python, etc. 
We expect to be transparent to the client without losing data, and if the 
amount of topic data is large. "earliest" may also put some pressure on the 
kafka servers, so we want to optimize the server logic to nearly completely 
solve this case.

  Looking forward to your reply!

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-17 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17303739#comment-17303739
 ] 

Guozhang Wang commented on KAFKA-12478:
---

Hello [~hudeqi] this is a valid concern. One workaround for now is 1) set the 
config to earliest, but note that this would only take effects if there's no 
committed offsets, 2) when starting a consumer for the first time on a new 
topic, manually reset to latest via consumer.seekToEnd() -> consumer.commit() 
(you can even skip the second if you are not depending on the subscription 
group protocol to distribute partitions for you).

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)