RivenSun created KAFKA-13463:
--------------------------------
Summary: Improvement: KafkaConsumer
pause(Collection<TopicPartition> partitions)
Key: KAFKA-13463
URL: https://issues.apache.org/jira/browse/KAFKA-13463
Project: Kafka
Issue Type: Improvement
Reporter: RivenSun
h1. 1.Background
When users use the kafkaConsumer#pause(...) method, they will maybe ignore: the
pause method may no longer work, and data will be lost.
For example, the following simple code:
{code:java}
while (true) {
try {
kafkaConsumer.pause(kafkaConsumer.assignment());
ConsumerRecords<String, String> records =
kafkaConsumer.poll(Duration.ofSeconds(2));
if (!records.isEmpty()) {
log.error("kafka poll for rebalance discard some record!");
}
} catch (Exception e) {
log.error("maintain poll for rebalance with error:{}", e.getMessage(),
e);
}
}{code}
Even if you call pause(assignment) before the poll method every time, the poll
method may still return messages.
h1. 2. RootCause:
In short, during the rebalance of the group,
ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark on
the partitions previously held by kafkaConsumer. However, while clearing the
paused mark of partitions, the corresponding message in the memory
(Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in
Fetcher#fetchedRecords() still fetching the message and returning it to the
customer.
For more detailed analysis, if you are interested, you can read Jira
!https://issues.apache.org/jira/s/xdetwe/820001/13pdxe5/_/images/fav-jsw.png![KAFKA-13425]
KafkaConsumer#pause() will lose its effect after groupRebalance occurs, which
maybe cause data loss on the consumer side - ASF JIRA , looking forward to your
reply.
h1. 3.Discuss : Can KafkaConsumer support the pause method that is not affected
by groupRebalance?
The KafkaConsumer#pause method actually stated one point at the beginning of
its design:
* Rebalance does not preserve pause/resume state.
link:!https://issues.apache.org/jira/s/xdetwe/820001/13pdxe5/_/images/fav-jsw.png![KAFKA-2350]
Add KafkaConsumer pause capability - ASF JIRA
Unfortunately, I did not see this from the comments of the
KafkaConsumer#pause(...) method. At the same time,
ConsumerCoordinator#invokePartitionsRevoked did not have any log output when
cleaning up the paused mark. I believe that this will cause many users to use
the KafkaConsumer#pause(...) method incorrectly.
But I think it is necessary for KafkaConsumer to provide a pause method that is
not affected by groupRebalance.
h1. 4. Suggestions
I will optimize the existing pause method from several different perspectives,
or provide some new {{pause}} methods, and each point is an independent solution
h2. 1)ConsumerCoordinator#invokePartitionsRevoked should also trigger Fetcher
to clean up the revokedAndPausedPartitions message in memory when clearing the
paused mark
This can prevent the Fetcher#fetchedRecords() method from mistakenly thinking
that revokedAndPausedPartitions is legal and returning messages. There are
various checks on the partition in the fetchedRecords method.
The price of this is that if the user does not call the pause(...) method
before calling the poll method next time, a new FetchMessage request may be
initiated, which will cause additional network transmission.
h2. 2)Efforts to maintain the old paused mark on the KafkaConsumer side
<1>In the ConsumerCoordinator#onJoinPrepare(...) method, record all
pausedTopicPartitions from the current assignment of KafkaConsumer;
<2> In the ConsumerCoordinator#onJoinComplete(...) method, use
pausedTopicPartitions to render the latest assignment and restore the paused
marks of the partitions that are still in the latest assignment.
{*}Note{*}: If the new assignment of kafkaConsumer no longer contains
topicPartitions that have been paused before rebalance, the paused mark of
these topicPartitions will be lost forever on the kafkaConsumer side, even if
in a future rebalance, the kafkaConsumer will hold these partitions again.
At the end of the Jira KAFKA-13425 I mentioned above, I gave a draft code
suggestion on this point
<3> In fact, for consumers who use the RebalanceProtocol.COOPERATIVE protocol
For example, consumers who use the currently supported PartitionAssignor:
CooperativeStickyAssignor, through code analysis, we can find that the default
behavior of these consumers is to maintain the old paused flag, and consumers
who use the RebalanceProtocol.EAGER protocol default to clear all paused marks.
I suggest that the KafkaConsumer behavior of the two RebalanceProtocol should
be consistent, otherwise it will cause ambiguity to the existing
KafkaConsumer#pause(...) and cause great confusion to users.
h2. 3)In the groupRebalance process, pass the paused flag of topicPartitions
In the JoinGroup request, in addition to reporting the topic that it wants to
subscribe to, each consumerMember should also report its pausedTopicPartitions.
The JoinGroup response received by the LeaderConsumer should contain all paused
partitions under the entire group.
The latest assignment made by LeaderConsumer should maintain the paused mark
and be packaged in LeaderConsumer's SyncGroup request
In this way, after groupRebalance is completed, even if a paused topicpartition
is assigned to a new consumer, the new consumer can continue to maintain the
paused mark.
The KafkaConsumer#paused() method can return the partitions that KafkaConsumer
did not call the pause(Collection<TopicPartition> partitions) method.
h2. 4)KafkaConsumer provides a pause method for topic level and supports
regular expressions
{{KafkaConsumer#pause(Collection<String> topics)}}
{{KafkaConsumer#pause(Pattern pattern)}}
Similar to the paused mark in SubscriptionState.assignment, we need to provide
a new instance variable ‘TopicState’ in SubscriptionState to store the
topic-level paused mark. The ‘TopicState’ data structure can refer to the
existing TopicPartitionState.
<1> ‘TopicState’ should not be affected by groupRebalance, and the paused mark
in TopicState will not be changed during the groupRebalance process. TopicState
should be the memory mark of a single KafkaConsumer, and it does not have to be
passed to other consumers after the rebalance is completed.
<2> {{{}pause(Collection<String> topics){}}}, throws IllegalStateException if
this consumer is not currently subscribed to any topic provided
<3> Fetcher's fetchedRecords() and sendFetches() can be combined with
TopicState considerations to decide whether to return a message to the user or
initiate a Fetch request
<4> Provide KafkaConsumer#resume(Collection<String> topics) and
KafkaConsumer#resume(Pattern pattern) methods to clean up topic-level paused
marks.
h2. 5)KafkaConsumer provides a pause method for the consumer level
{{KafkaConsumer#pause()}}
The existing pause method is for topicPartition and may sometimes be too
fine-grained. And the paused mark is bound in the assignment, it is inevitable
that it will not be affected by groupRebalance.
<1> This method may also be the user's most urgent need. After calling this
pause() method, kafkaConsumer will mark itself as a paused state, and the poll
method will determine the value of isKafkaConsumerPaused to decide whether to
return a message to the user or initiate a Fetch request. This
isKafkaConsumerPaused mark should also be held by a single KafkaConsumer itself.
<2> Users do not need to worry about the poll method returning data after
calling the KafkaConsumer#pause() method.
Users can always call the poll method to avoid the following two results if
kafkaConsumer does not call the poll method for a long time
(1) The heartbeat thread detection mechanism causes kafkaConsumer
to actively leaveGroup;
(2) At this time, groupRebalance is triggered. The
groupCoordinator will wait for the consumer to initiate a Join Group request.
The groupRebalance cannot be completed for a long time (limited by
max.poll.interval.ms), causing all consumers under the entire group to suspend
consumption.
<3> Provide KafkaConsumer#resume() at the kafkaConsumer level, to clean up the
paused mark of KafkaConsumer
--
This message was sent by Atlassian Jira
(v8.20.1#820001)