[
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang resolved KAFKA-13310.
-----------------------------------
Fix Version/s: 3.2.0
Resolution: Fixed
> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer).
> Cpu and traffic of Broker‘s side increase sharply
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 2.8.1
> Environment: prod
> Reporter: RivenSun
> Assignee: RivenSun
> Priority: Major
> Fix For: 3.2.0
>
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png,
> ThirdDebugLog1.png, ThirdDebugLog2.png, brokerCpu.png, brokerNetBytes.png,
> kafkaConsumerLog.png
>
>
> h2. Foreword
> Because our consumers' consumption logic is sometimes heavier, we refer
> to the configuration of Kafka stream
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
> Set max.poll.interval.ms to Integer.MAX_VALUE
> Our consumers have adopted method :
> consumer.subscribe(Pattern.compile(".*riven.*"));
>
> h2. Recurrence of the problem scene
> operate steps are
> (1) Test environment Kafka cluster: three brokers
> (2) Topics conforming to regular expressions include rivenTest1, rivenTest2,
> and rivenTest88
> (3) Only one consumer is needed, group.id is "rivenReassign",
> consumer.subscribe(Pattern.compile(".*riven.*"));
> (4) At the beginning, the group status is stable, and everything is normal
> for consumers, then I delete topic: rivenTest88
>
> h2. Phenomenon
> Problem phenomenon
> (1) The consumer is blocked in the poll method, no longer consume any
> messages, and the consumer log is always printing
> [main] WARN
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit
> failed on partition rivenTest88-1 at offset 0: This server does not host this
> topic-partition.
> (2) The describe consumerGroup interface of Adminclient has always timed
> out, and the group status is no longer stable
> (3) The cpu and traffic of the broker are *significantly increased*
>
>
> h2. Problem tracking
> By analyzing the kafkaConsumer code, the version is 2.8.1.
> I found that you introduced the waitForJoinGroup variable in the
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the
> comment on the method: "try to update assignment metadata BUT do not need to
> block on the timer for join group". See as below:
>
> {code:java}
> if (includeMetadataInTimeout) {
> // try to update assignment metadata BUT do not need to block on the
> timer for join group
> updateAssignmentMetadataIfNeeded(timer, false);
> } else {
> while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE),
> true)) {
> log.warn("Still waiting for metadata");
> }
> }{code}
>
>
> By tracing the code back layer by layer, it is found that the function of
> this variable is to construct a time.timer(0L) and pass it back to the method
> joinGroupIfNeeded (final Timer timer) in AbstractCoordinator. See as below:
> {code:java}
> // if not wait for join group, we would just use a timer of 0
> if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need
> // to update the original timer's current time after the call
> timer.update(time.milliseconds());
> return false;
> }
> {code}
> But you will find that there is a submethod onJoinPrepare in the method
> stack of joinGroupIfNeeded, and then there is a line of code in the
> onJoinPrepare method
> maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)),
> the value of rebalanceConfig.rebalanceTimeoutMs is actually
> max.poll.interval.ms.
> Finally, I tracked down ConsumerCoordinator's method
> commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer)
> The input parameter offsets is subscriptions.allConsumed(), when I delete
> the topic: rivenTest88, commitOffsetsSync(Map<TopicPartition,
> OffsetAndMetadata> offsets, Timer timer) method will *fall into an infinite
> loop! !*
> {code:java}
> public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata>
> offsets, Timer timer) {
> invokeCompletedOffsetCommitCallbacks();
> if (offsets.isEmpty())
> return true;
> do {
> if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
> return false;
> }
> RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
> client.poll(future, timer);
> // We may have had in-flight offset commits when the synchronous commit
> began. If so, ensure that
> // the corresponding callbacks are invoked prior to returning in order to
> preserve the order that
> // the offset commits were applied.
> invokeCompletedOffsetCommitCallbacks();
> if (future.succeeded()) {
> if (interceptors != null)
> interceptors.onCommit(offsets);
> return true;
> }
> if (future.failed() && !future.isRetriable())
> throw future.exception();
> timer.sleep(rebalanceConfig.retryBackoffMs);
> } while (timer.notExpired());
> return false;
> }{code}
>
>
> *The reason for the endless loop is:*
> (1) The expiration time of the timer is too long, which is
> max.poll.interval.ms
> (2) The offsets to be submitted contain dirty data and TopicPartition that
> no longer exists
> (3) The response future of sendOffsetCommitRequest(final Map<TopicPartition,
> OffsetAndMetadata> offsets) has always failed, and the exception in the
> future is UnknownTopicOrPartitionException. This exception is allowed to be
> retried.
> Then since the infinite loop interval above is 100ms by default,
> timer.sleep(rebalanceConfig.retryBackoffMs);
> If a large number of consumers have this problem at the same time, a large
> number of network requests will be generated to the Kafka broker, *resulting
> in a sharp increase in the cpu and traffic of the broker machine!*
>
>
> h2. Suggest
> 1.maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)),
> the time of this method is recommended not to use max.poll.interval.ms,
> This parameter is open to users to configure. Through the explanation of
> this parameter on the official website, I would never think that this
> parameter will be used in this place. At the same time, it will block
> KafkaConsumer's poll (final Duration timeout), even if I set consumer.poll
> (Duration.ofMillis(1000)).
> 2. In fact, in the poll (Timer timer, boolean waitForJoinGroup) method of
> ConsumerCoordinatord, before calling the ensureActiveGroup method, the
> consumer ensures that the local metadata is up to date, see the code
>
> {code:java}
> if (rejoinNeededOrPending()) {
> // due to a race condition between the initial metadata fetch and the
> initial rebalance,
> // we need to ensure that the metadata is fresh before joining initially.
> This ensures
> // that we have matched the pattern against the cluster's topics at least
> once before joining.
> if (subscriptions.hasPatternSubscription()) {
> // For consumer group that uses pattern-based subscription, after a
> topic is created,
> // any consumer that discovers the topic after metadata refresh can
> trigger rebalance
> // across the entire consumer group. Multiple rebalances can be
> triggered after one topic
> // creation if consumers refresh metadata at vastly different times.
> We can significantly
> // reduce the number of rebalances caused by single topic creation by
> asking consumer to
> // refresh metadata before re-joining the group as long as the
> refresh backoff time has
> // passed.
> if (this.metadata.timeToAllowUpdate(time.milliseconds()) == 0) {
> this.metadata.requestUpdate();
> }
> if (!client.ensureFreshMetadata(timer)) {
> return false;
> }
> }
> if (!ensureActiveGroup(timer)) {
> return false;
> }
> }
> {code}
>
> That is to say, the consumer knows which topic/topicPartition is legal before
> onJoinPrepare. In this case, why didn't you find the
> UnknownTopicOrPartitionException in the commitOffsetsSync method mentioned
> above,do not put the submitted offsets and the latest local metadata together
> for analysis, remove the non-existent topicpartitions, and then try to submit
> the offsets again. I think I can break out of the infinite loop by doing this
> 3. Why must the offset be submitted synchronously in the onJoinPrepare
> method? Can't the offset be submitted asynchronously? Or provide a parameter
> for the user to choose whether to submit synchronously or asynchronously. Or
> provide a new parameter to control the maximum number of retries for
> synchronous submission here, instead of using the Timer constructed by
> max.poll.interval.ms.
> And if you don’t really submit the offset here, it will not have much
> impact. It may cause repeated consumption of some messages. I still suggest
> to provide a new parameter to control whether you need to submit the offset.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)