[
https://issues.apache.org/jira/browse/KAFKA-13463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
RivenSun updated KAFKA-13463:
-----------------------------
Component/s: consumer
Affects Version/s: 3.0.0
> Improvement: KafkaConsumer pause(Collection<TopicPartition> partitions)
> -----------------------------------------------------------------------
>
> Key: KAFKA-13463
> URL: https://issues.apache.org/jira/browse/KAFKA-13463
> Project: Kafka
> Issue Type: Improvement
> Components: consumer
> Affects Versions: 3.0.0
> Reporter: RivenSun
> Priority: Major
>
> h1. 1.Background
> When users use the kafkaConsumer#pause(...) method, they will maybe ignore:
> the pause method may no longer work, and data will be lost.
> For example, the following simple code:
> {code:java}
> while (true) {
> try {
> kafkaConsumer.pause(kafkaConsumer.assignment());
> ConsumerRecords<String, String> records =
> kafkaConsumer.poll(Duration.ofSeconds(2));
> if (!records.isEmpty()) {
> log.error("kafka poll for rebalance discard some record!");
> }
> } catch (Exception e) {
> log.error("maintain poll for rebalance with error:{}",
> e.getMessage(), e);
> }
> }{code}
> Even if you call pause(assignment) before the poll method every time, the
> poll method may still return messages.
>
> h1. 2. RootCause:
> In short, during the rebalance of the group,
> ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark
> on the partitions previously held by kafkaConsumer. However, while clearing
> the paused mark of partitions, the corresponding message in the memory
> (Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in
> Fetcher#fetchedRecords() still fetching the message and returning it to the
> customer.
> For more detailed analysis, if you are interested, you can read Jira
> https://issues.apache.org/jira/browse/KAFKA-13425
> looking forward to your reply.
>
> h1. 3.Discuss : Can KafkaConsumer support the pause method that is not
> affected by groupRebalance?
> The KafkaConsumer#pause method actually stated one point at the beginning of
> its design:
> * Rebalance does not preserve pause/resume state.
> link:https://issues.apache.org/jira/browse/KAFKA-2350
> Unfortunately, I did not see this from the comments of the
> KafkaConsumer#pause(...) method. At the same time,
> ConsumerCoordinator#invokePartitionsRevoked did not have any log output when
> cleaning up the paused mark. I believe that this will cause many users to use
> the KafkaConsumer#pause(...) method incorrectly.
> But I think it is necessary for KafkaConsumer to provide a pause method that
> is not affected by groupRebalance.
>
> h1. 4. Suggestions
> I will optimize the existing pause method from several different
> perspectives, or provide some new {{pause}} methods, and each point is an
> independent solution
> h2. 1)ConsumerCoordinator#invokePartitionsRevoked should also trigger Fetcher
> to clean up the revokedAndPausedPartitions message in memory when clearing
> the paused mark
> This can prevent the Fetcher#fetchedRecords() method from mistakenly thinking
> that revokedAndPausedPartitions is legal and returning messages. There are
> various checks on the partition in the fetchedRecords method.
> The price of this is that if the user does not call the pause(...) method
> before calling the poll method next time, a new FetchMessage request may be
> initiated, which will cause additional network transmission.
>
> h2. 2)Efforts to maintain the old paused mark on the KafkaConsumer side
> <1>In the ConsumerCoordinator#onJoinPrepare(...) method, record all
> pausedTopicPartitions from the current assignment of KafkaConsumer;
> <2> In the ConsumerCoordinator#onJoinComplete(...) method, use
> pausedTopicPartitions to render the latest assignment and restore the paused
> marks of the partitions that are still in the latest assignment.
> {*}Note{*}: If the new assignment of kafkaConsumer no longer contains
> topicPartitions that have been paused before rebalance, the paused mark of
> these topicPartitions will be lost forever on the kafkaConsumer side, even if
> in a future rebalance, the kafkaConsumer will hold these partitions again.
> At the end of the Jira KAFKA-13425 I mentioned above, I gave a draft code
> suggestion on this point
> <3> In fact, for consumers who use the RebalanceProtocol.COOPERATIVE protocol
> For example, consumers who use the currently supported PartitionAssignor:
> CooperativeStickyAssignor, through code analysis, we can find that the
> default behavior of these consumers is to maintain the old paused flag, and
> consumers who use the RebalanceProtocol.EAGER protocol default to clear all
> paused marks.
> I suggest that the KafkaConsumer behavior of the two RebalanceProtocol should
> be consistent, otherwise it will cause ambiguity to the existing
> KafkaConsumer#pause(...) and cause great confusion to users.
>
> h2. 3)In the groupRebalance process, pass the paused flag of topicPartitions
> In the JoinGroup request, in addition to reporting the topic that it wants to
> subscribe to, each consumerMember should also report its
> pausedTopicPartitions. The JoinGroup response received by the LeaderConsumer
> should contain all paused partitions under the entire group.
> The latest assignment made by LeaderConsumer should maintain the paused mark
> and be packaged in LeaderConsumer's SyncGroup request
> In this way, after groupRebalance is completed, even if a paused
> topicpartition is assigned to a new consumer, the new consumer can continue
> to maintain the paused mark.
> The KafkaConsumer#paused() method can return the partitions that
> KafkaConsumer did not call the pause(Collection<TopicPartition> partitions)
> method.
>
> h2. 4)KafkaConsumer provides a pause method for topic level and supports
> regular expressions
> {{KafkaConsumer#pause(Collection<String> topics)}}
> {{KafkaConsumer#pause(Pattern pattern)}}
> Similar to the paused mark in SubscriptionState.assignment, we need to
> provide a new instance variable ‘TopicState’ in SubscriptionState to store
> the topic-level paused mark. The ‘TopicState’ data structure can refer to the
> existing TopicPartitionState.
> <1> ‘TopicState’ should not be affected by groupRebalance, and the paused
> mark in TopicState will not be changed during the groupRebalance process.
> TopicState should be the memory mark of a single KafkaConsumer, and it does
> not have to be passed to other consumers after the rebalance is completed.
>
> <2> {{{}pause(Collection<String> topics){}}}, throws IllegalStateException if
> this consumer is not currently subscribed to any topic provided
>
> <3> Fetcher's fetchedRecords() and sendFetches() can be combined with
> TopicState considerations to decide whether to return a message to the user
> or initiate a Fetch request
>
> <4> Provide KafkaConsumer#resume(Collection<String> topics) and
> KafkaConsumer#resume(Pattern pattern) methods to clean up topic-level paused
> marks.
>
> h2. 5)KafkaConsumer provides a pause method for the consumer level
> {{KafkaConsumer#pause()}}
> The existing pause method is for topicPartition and may sometimes be too
> fine-grained. And the paused mark is bound in the assignment, it is
> inevitable that it will not be affected by groupRebalance.
> <1> This method may also be the user's most urgent need. After calling this
> pause() method, kafkaConsumer will mark itself as a paused state, and the
> poll method will determine the value of isKafkaConsumerPaused to decide
> whether to return a message to the user or initiate a Fetch request. This
> isKafkaConsumerPaused mark should also be held by a single KafkaConsumer
> itself.
>
> <2> Users do not need to worry about the poll method returning data after
> calling the KafkaConsumer#pause() method.
> Users can always call the poll method to avoid the following two results if
> kafkaConsumer does not call the poll method for a long time
> (1) The heartbeat thread detection mechanism causes
> kafkaConsumer to actively leaveGroup;
> (2) At this time, groupRebalance is triggered. The
> groupCoordinator will wait for the consumer to initiate a Join Group request.
> The groupRebalance cannot be completed for a long time (limited by
> max.poll.interval.ms), causing all consumers under the entire group to
> suspend consumption.
>
> <3> Provide KafkaConsumer#resume() at the kafkaConsumer level, to clean up
> the paused mark of KafkaConsumer
--
This message was sent by Atlassian Jira
(v8.20.1#820001)