guozhangwang commented on a change in pull request #11340: URL: https://github.com/apache/kafka/pull/11340#discussion_r727557427
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -994,11 +996,16 @@ public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, if (offsets.isEmpty()) return true; + boolean shouldCleanUpConsumedOffsets = !checkConsumedOffsetsAreValid(offsets); Review comment: Instead of letting the callee `maybeAutoCommitOffsetsSync` silently dropping the given offsets that no longer exists, I think we should let the callee to simply report unknown topics out to the callers after timer elapsed and let callers to handle them. The main reason is that unknownTopicOrPartition may just temporary and hence silently dropping them at the callee can lead to confusing behaviors. Note there are several callers: * When re-join group via `maybeAutoCommitOffsetsSync`: if `commitOffsetsSync` throws the unknown topic or partition exception after exhausting retries within the timer, the caller `maybeAutoCommitOffsetsSync` would log it upon capturing the exception and then wrap it as a `InterruptException` still. Note that the in the next poll call since `needsJoinPrepare` is false already we would not try to call `onJoinPrepare` again and if the topic is indeed deleted, the rebalance would remove the partitions from the subscription later. * When customer called via `Consumer#commitSync` directly: we can directly throw the timeout exception to the customer caller still. Note that since we would continuously log the error for `UnknownTopicOrPartition` error, user debugging it would still learn about the root cause why this commit call timed out. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -505,7 +497,7 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) { } // if not wait for join group, we would just use a timer of 0 - if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) { + if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L), timer)) { Review comment: The main idea of passing a `timer(0)` here is that if ever the `poll` call triggers the rejoin group protocol, we do not block on any of that procedure, but in an after thought I agree it overlooks the needs to auto-commit offset during re-join which would definitely need more than `timer(0)`, but also should be less than `max.poll.interval`. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -994,11 +996,16 @@ public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, if (offsets.isEmpty()) return true; + boolean shouldCleanUpConsumedOffsets = !checkConsumedOffsetsAreValid(offsets); Review comment: 1) I think the max.poll.interval.ms is no longer set to `Integer.MAX_VALUE` in 2.3.0 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-442%3A+Return+to+default+max+poll+interval+in+Streams). 2) For `Consumer#commitSync`, the `default.api.timeout.ms` is used if user do not specify the timeout, not the `max.poll.interval.ms`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org