[ 
https://issues.apache.org/jira/browse/KAFKA-13463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13463:
----------------------------------
    Labels: new-consumer-threading-should-fix  (was: new-rebalance-should-fix)

> Improvement: KafkaConsumer pause(Collection<TopicPartition> partitions)
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-13463
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13463
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>    Affects Versions: 3.0.0
>            Reporter: RivenSun
>            Priority: Major
>              Labels: new-consumer-threading-should-fix
>
> h1. 1.Background
> When users use the kafkaConsumer#pause(...) method, they will maybe ignore: 
> the pause method may no longer work, and data will be lost.
> For example, the following simple code:
> {code:java}
> while (true) {
>     try {
>         kafkaConsumer.pause(kafkaConsumer.assignment());
>         ConsumerRecords<String, String> records = 
> kafkaConsumer.poll(Duration.ofSeconds(2));
>         if (!records.isEmpty()) {
>             log.error("kafka poll for rebalance discard some record!");
>         }
>     } catch (Exception e) {
>         log.error("maintain poll for rebalance with error:{}", 
> e.getMessage(), e);
>     }
> }{code}
> Even if you call pause(assignment) before the poll method every time, the 
> poll method may still return messages.
>  
> h1. 2. RootCause:
> In short, during the rebalance of the group, 
> ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark 
> on the partitions previously held by kafkaConsumer. However, while clearing 
> the paused mark of partitions, the corresponding message in the memory 
> (Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in 
> Fetcher#fetchedRecords() still fetching the message and returning it to the 
> customer.
> For more detailed analysis, if you are interested, you can read Jira 
> https://issues.apache.org/jira/browse/KAFKA-13425 
> looking forward to your reply.
>  
> h1. 3.Discuss : Can KafkaConsumer support the pause method that is not 
> affected by groupRebalance?
> The KafkaConsumer#pause method actually stated one point at the beginning of 
> its design:
>  * Rebalance does not preserve pause/resume state.
> link:https://issues.apache.org/jira/browse/KAFKA-2350
> Unfortunately, I did not see this from the comments of the 
> KafkaConsumer#pause(...) method. At the same time, 
> ConsumerCoordinator#invokePartitionsRevoked did not have any log output when 
> cleaning up the paused mark. I believe that this will cause many users to use 
> the KafkaConsumer#pause(...) method incorrectly.
> But I think it is necessary for KafkaConsumer to provide a pause method that 
> is not affected by groupRebalance.
>  
> h1. 4. Suggestions
> I will optimize the existing pause method from several different 
> perspectives, or provide some new {{pause}} methods, and each point is an 
> independent solution
> h2. 1)ConsumerCoordinator#invokePartitionsRevoked should also trigger Fetcher 
> to clean up the revokedAndPausedPartitions message in memory when clearing 
> the paused mark
> This can prevent the Fetcher#fetchedRecords() method from mistakenly thinking 
> that revokedAndPausedPartitions is legal and returning messages. There are 
> various checks on the partition in the fetchedRecords method.
> The price of this is that if the user does not call the pause(...) method 
> before calling the poll method next time, a new FetchMessage request may be 
> initiated, which will cause additional network transmission.
>  
> h2. 2)Efforts to maintain the old paused mark on the KafkaConsumer side
> <1>In the ConsumerCoordinator#onJoinPrepare(...) method, record all 
> pausedTopicPartitions from the current assignment of KafkaConsumer;
>  <2> In the ConsumerCoordinator#onJoinComplete(...) method, use 
> pausedTopicPartitions to render the latest assignment and restore the paused 
> marks of the partitions that are still in the latest assignment.
> {*}Note{*}: If the new assignment of kafkaConsumer no longer contains 
> topicPartitions that have been paused before rebalance, the paused mark of 
> these topicPartitions will be lost forever on the kafkaConsumer side, even if 
> in a future rebalance, the kafkaConsumer will hold these partitions again.
> At the end of the Jira KAFKA-13425 I mentioned above, I gave a draft code 
> suggestion on this point
> <3> In fact, for consumers who use the RebalanceProtocol.COOPERATIVE protocol
> For example, consumers who use the currently supported PartitionAssignor: 
> CooperativeStickyAssignor, through code analysis, we can find that the 
> default behavior of these consumers is to maintain the old paused flag, and 
> consumers who use the RebalanceProtocol.EAGER protocol default to clear all 
> paused marks.
> I suggest that the KafkaConsumer behavior of the two RebalanceProtocol should 
> be consistent, otherwise it will cause ambiguity to the existing 
> KafkaConsumer#pause(...) and cause great confusion to users.
>  
> h2. 3)In the groupRebalance process, pass the paused flag of topicPartitions
> In the JoinGroup request, in addition to reporting the topic that it wants to 
> subscribe to, each consumerMember should also report its 
> pausedTopicPartitions. The JoinGroup response received by the LeaderConsumer 
> should contain all paused partitions under the entire group.
> The latest assignment made by LeaderConsumer should maintain the paused mark 
> and be packaged in LeaderConsumer's SyncGroup request
> In this way, after groupRebalance is completed, even if a paused 
> topicpartition is assigned to a new consumer, the new consumer can continue 
> to maintain the paused mark.
> The KafkaConsumer#paused() method can return the partitions that 
> KafkaConsumer did not call the pause(Collection<TopicPartition> partitions) 
> method.
>  
> h2. 4)KafkaConsumer provides a pause method for topic level and supports 
> regular expressions
> {{KafkaConsumer#pause(Collection<String> topics)}}
> {{KafkaConsumer#pause(Pattern pattern)}}
> Similar to the paused mark in SubscriptionState.assignment, we need to 
> provide a new instance variable ‘TopicState’ in SubscriptionState to store 
> the topic-level paused mark. The ‘TopicState’ data structure can refer to the 
> existing TopicPartitionState.
> <1> ‘TopicState’ should not be affected by groupRebalance, and the paused 
> mark in TopicState will not be changed during the groupRebalance process. 
> TopicState should be the memory mark of a single KafkaConsumer, and it does 
> not have to be passed to other consumers after the rebalance is completed.
>  
> <2> {{{}pause(Collection<String> topics){}}}, throws IllegalStateException if 
> this consumer is not currently subscribed to any topic provided
>  
> <3> Fetcher's fetchedRecords() and sendFetches() can be combined with 
> TopicState considerations to decide whether to return a message to the user 
> or initiate a Fetch request
>  
> <4> Provide KafkaConsumer#resume(Collection<String> topics) and 
> KafkaConsumer#resume(Pattern pattern) methods to clean up topic-level paused 
> marks.
>  
> h2. 5)KafkaConsumer provides a pause method for the consumer level
> {{KafkaConsumer#pause()}}
> The existing pause method is for topicPartition and may sometimes be too 
> fine-grained. And the paused mark is bound in the assignment, it is 
> inevitable that it will not be affected by groupRebalance.
> <1> This method may also be the user's most urgent need. After calling this 
> pause() method, kafkaConsumer will mark itself as a paused state, and the 
> poll method will determine the value of isKafkaConsumerPaused to decide 
> whether to return a message to the user or initiate a Fetch request. This 
> isKafkaConsumerPaused mark should also be held by a single KafkaConsumer 
> itself.
>  
> <2> Users do not need to worry about the poll method returning data after 
> calling the KafkaConsumer#pause() method.
> Users can always call the poll method to avoid the following two results if 
> kafkaConsumer does not call the poll method for a long time
>              (1) The heartbeat thread detection mechanism causes 
> kafkaConsumer to actively leaveGroup;
>              (2) At this time, groupRebalance is triggered. The 
> groupCoordinator will wait for the consumer to initiate a Join Group request. 
> The groupRebalance cannot be completed for a long time (limited by 
> max.poll.interval.ms), causing all consumers under the entire group to 
> suspend consumption.
>  
> <3> Provide KafkaConsumer#resume() at the kafkaConsumer level, to clean up 
> the paused mark of KafkaConsumer



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to