[ https://issues.apache.org/jira/browse/KAFKA-13463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang updated KAFKA-13463: ---------------------------------- Labels: new-consumer-threading-should-fix (was: new-rebalance-should-fix) > Improvement: KafkaConsumer pause(Collection<TopicPartition> partitions) > ----------------------------------------------------------------------- > > Key: KAFKA-13463 > URL: https://issues.apache.org/jira/browse/KAFKA-13463 > Project: Kafka > Issue Type: Improvement > Components: consumer > Affects Versions: 3.0.0 > Reporter: RivenSun > Priority: Major > Labels: new-consumer-threading-should-fix > > h1. 1.Background > When users use the kafkaConsumer#pause(...) method, they will maybe ignore: > the pause method may no longer work, and data will be lost. > For example, the following simple code: > {code:java} > while (true) { > try { > kafkaConsumer.pause(kafkaConsumer.assignment()); > ConsumerRecords<String, String> records = > kafkaConsumer.poll(Duration.ofSeconds(2)); > if (!records.isEmpty()) { > log.error("kafka poll for rebalance discard some record!"); > } > } catch (Exception e) { > log.error("maintain poll for rebalance with error:{}", > e.getMessage(), e); > } > }{code} > Even if you call pause(assignment) before the poll method every time, the > poll method may still return messages. > > h1. 2. RootCause: > In short, during the rebalance of the group, > ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark > on the partitions previously held by kafkaConsumer. However, while clearing > the paused mark of partitions, the corresponding message in the memory > (Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in > Fetcher#fetchedRecords() still fetching the message and returning it to the > customer. > For more detailed analysis, if you are interested, you can read Jira > https://issues.apache.org/jira/browse/KAFKA-13425 > looking forward to your reply. > > h1. 3.Discuss : Can KafkaConsumer support the pause method that is not > affected by groupRebalance? > The KafkaConsumer#pause method actually stated one point at the beginning of > its design: > * Rebalance does not preserve pause/resume state. > link:https://issues.apache.org/jira/browse/KAFKA-2350 > Unfortunately, I did not see this from the comments of the > KafkaConsumer#pause(...) method. At the same time, > ConsumerCoordinator#invokePartitionsRevoked did not have any log output when > cleaning up the paused mark. I believe that this will cause many users to use > the KafkaConsumer#pause(...) method incorrectly. > But I think it is necessary for KafkaConsumer to provide a pause method that > is not affected by groupRebalance. > > h1. 4. Suggestions > I will optimize the existing pause method from several different > perspectives, or provide some new {{pause}} methods, and each point is an > independent solution > h2. 1)ConsumerCoordinator#invokePartitionsRevoked should also trigger Fetcher > to clean up the revokedAndPausedPartitions message in memory when clearing > the paused mark > This can prevent the Fetcher#fetchedRecords() method from mistakenly thinking > that revokedAndPausedPartitions is legal and returning messages. There are > various checks on the partition in the fetchedRecords method. > The price of this is that if the user does not call the pause(...) method > before calling the poll method next time, a new FetchMessage request may be > initiated, which will cause additional network transmission. > > h2. 2)Efforts to maintain the old paused mark on the KafkaConsumer side > <1>In the ConsumerCoordinator#onJoinPrepare(...) method, record all > pausedTopicPartitions from the current assignment of KafkaConsumer; > <2> In the ConsumerCoordinator#onJoinComplete(...) method, use > pausedTopicPartitions to render the latest assignment and restore the paused > marks of the partitions that are still in the latest assignment. > {*}Note{*}: If the new assignment of kafkaConsumer no longer contains > topicPartitions that have been paused before rebalance, the paused mark of > these topicPartitions will be lost forever on the kafkaConsumer side, even if > in a future rebalance, the kafkaConsumer will hold these partitions again. > At the end of the Jira KAFKA-13425 I mentioned above, I gave a draft code > suggestion on this point > <3> In fact, for consumers who use the RebalanceProtocol.COOPERATIVE protocol > For example, consumers who use the currently supported PartitionAssignor: > CooperativeStickyAssignor, through code analysis, we can find that the > default behavior of these consumers is to maintain the old paused flag, and > consumers who use the RebalanceProtocol.EAGER protocol default to clear all > paused marks. > I suggest that the KafkaConsumer behavior of the two RebalanceProtocol should > be consistent, otherwise it will cause ambiguity to the existing > KafkaConsumer#pause(...) and cause great confusion to users. > > h2. 3)In the groupRebalance process, pass the paused flag of topicPartitions > In the JoinGroup request, in addition to reporting the topic that it wants to > subscribe to, each consumerMember should also report its > pausedTopicPartitions. The JoinGroup response received by the LeaderConsumer > should contain all paused partitions under the entire group. > The latest assignment made by LeaderConsumer should maintain the paused mark > and be packaged in LeaderConsumer's SyncGroup request > In this way, after groupRebalance is completed, even if a paused > topicpartition is assigned to a new consumer, the new consumer can continue > to maintain the paused mark. > The KafkaConsumer#paused() method can return the partitions that > KafkaConsumer did not call the pause(Collection<TopicPartition> partitions) > method. > > h2. 4)KafkaConsumer provides a pause method for topic level and supports > regular expressions > {{KafkaConsumer#pause(Collection<String> topics)}} > {{KafkaConsumer#pause(Pattern pattern)}} > Similar to the paused mark in SubscriptionState.assignment, we need to > provide a new instance variable ‘TopicState’ in SubscriptionState to store > the topic-level paused mark. The ‘TopicState’ data structure can refer to the > existing TopicPartitionState. > <1> ‘TopicState’ should not be affected by groupRebalance, and the paused > mark in TopicState will not be changed during the groupRebalance process. > TopicState should be the memory mark of a single KafkaConsumer, and it does > not have to be passed to other consumers after the rebalance is completed. > > <2> {{{}pause(Collection<String> topics){}}}, throws IllegalStateException if > this consumer is not currently subscribed to any topic provided > > <3> Fetcher's fetchedRecords() and sendFetches() can be combined with > TopicState considerations to decide whether to return a message to the user > or initiate a Fetch request > > <4> Provide KafkaConsumer#resume(Collection<String> topics) and > KafkaConsumer#resume(Pattern pattern) methods to clean up topic-level paused > marks. > > h2. 5)KafkaConsumer provides a pause method for the consumer level > {{KafkaConsumer#pause()}} > The existing pause method is for topicPartition and may sometimes be too > fine-grained. And the paused mark is bound in the assignment, it is > inevitable that it will not be affected by groupRebalance. > <1> This method may also be the user's most urgent need. After calling this > pause() method, kafkaConsumer will mark itself as a paused state, and the > poll method will determine the value of isKafkaConsumerPaused to decide > whether to return a message to the user or initiate a Fetch request. This > isKafkaConsumerPaused mark should also be held by a single KafkaConsumer > itself. > > <2> Users do not need to worry about the poll method returning data after > calling the KafkaConsumer#pause() method. > Users can always call the poll method to avoid the following two results if > kafkaConsumer does not call the poll method for a long time > (1) The heartbeat thread detection mechanism causes > kafkaConsumer to actively leaveGroup; > (2) At this time, groupRebalance is triggered. The > groupCoordinator will wait for the consumer to initiate a Join Group request. > The groupRebalance cannot be completed for a long time (limited by > max.poll.interval.ms), causing all consumers under the entire group to > suspend consumption. > > <3> Provide KafkaConsumer#resume() at the kafkaConsumer level, to clean up > the paused mark of KafkaConsumer -- This message was sent by Atlassian Jira (v8.20.1#820001)