guozhangwang commented on a change in pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#discussion_r727557427



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -994,11 +996,16 @@ public boolean commitOffsetsSync(Map<TopicPartition, 
OffsetAndMetadata> offsets,
         if (offsets.isEmpty())
             return true;
 
+        boolean shouldCleanUpConsumedOffsets = 
!checkConsumedOffsetsAreValid(offsets);

Review comment:
       Instead of letting the callee `maybeAutoCommitOffsetsSync` silently 
dropping the given offsets that no longer exists, I think we should let the 
callee to simply report unknown topics out to the callers after timer elapsed 
and let callers to handle them. The main reason is that unknownTopicOrPartition 
may just temporary and hence silently dropping them at the callee can lead to 
confusing behaviors. 
   
   Note there are several callers:
   
   * When re-join group via `maybeAutoCommitOffsetsSync`: if 
`commitOffsetsSync` throws the unknown topic or partition exception after 
exhausting retries within the timer, the caller `maybeAutoCommitOffsetsSync` 
would log it upon capturing the exception and then wrap it as a 
`InterruptException` still. Note that the in the next poll call since 
`needsJoinPrepare` is false already we would not try to call `onJoinPrepare` 
again and if the topic is indeed deleted, the rebalance would remove the 
partitions from the subscription later. 
   * When customer called via `Consumer#commitSync` directly: we can directly 
throw the timeout exception to the customer caller still. Note that since we 
would continuously log the error for `UnknownTopicOrPartition` error, user 
debugging it would still learn about the root cause why this commit call timed 
out.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -505,7 +497,7 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
                 }
 
                 // if not wait for join group, we would just use a timer of 0
-                if (!ensureActiveGroup(waitForJoinGroup ? timer : 
time.timer(0L))) {
+                if (!ensureActiveGroup(waitForJoinGroup ? timer : 
time.timer(0L), timer)) {

Review comment:
       The main idea of passing a `timer(0)` here is that if ever the `poll` 
call triggers the rejoin group protocol, we do not block on any of that 
procedure, but in an after thought I agree it overlooks the needs to 
auto-commit offset during re-join which would definitely need more than 
`timer(0)`, but also should be less than `max.poll.interval`.
   
   

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -994,11 +996,16 @@ public boolean commitOffsetsSync(Map<TopicPartition, 
OffsetAndMetadata> offsets,
         if (offsets.isEmpty())
             return true;
 
+        boolean shouldCleanUpConsumedOffsets = 
!checkConsumedOffsetsAreValid(offsets);

Review comment:
       1) I think the max.poll.interval.ms is no longer set to 
`Integer.MAX_VALUE` in 2.3.0 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-442%3A+Return+to+default+max+poll+interval+in+Streams).
   
   2) For `Consumer#commitSync`, the `default.api.timeout.ms` is used if user 
do not specify the timeout, not the `max.poll.interval.ms`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to