philipnee commented on code in PR #14639:
URL: https://github.com/apache/kafka/pull/14639#discussion_r1373567351


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -215,16 +244,115 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
                     .setMemberId(generation.memberId)
                     .setGroupInstanceId(groupInstanceId)
                     .setTopics(new ArrayList<>(requestTopicDataMap.values())));
-            return new NetworkClientDelegate.UnsentRequest(
+            NetworkClientDelegate.UnsentRequest resp = new 
NetworkClientDelegate.UnsentRequest(
                 builder,
-                coordinatorRequestManager.coordinator(),
+                coordinatorRequestManager.coordinator());
+            resp.future().whenComplete(
                 (response, throwable) -> {
-                    if (throwable == null) {
-                        future.complete(null);
-                    } else {
-                        future.completeExceptionally(throwable);
+                    try {
+                        if (throwable == null) {
+                            onResponse(response);
+                        } else {
+                            onError(throwable, 
resp.handler().completionTimeMs());
+                        }
+                    } catch (Throwable t) {
+                        log.error("Unexpected error when completing offset 
commit: {}", this, t);
+                        future.completeExceptionally(t);
                     }
                 });
+            return resp;
+        }
+
+        public CompletableFuture<Void> future() {
+            return future;
+        }
+
+        public void onError(final Throwable exception, final long 
currentTimeMs) {
+            if (exception instanceof RetriableException) {
+                handleCoordinatorDisconnect(exception, currentTimeMs);
+                retry(currentTimeMs);
+            }
+        }
+
+        public void onResponse(final ClientResponse response) {
+            long responseTime = response.receivedTimeMs();
+            OffsetCommitResponse commitResponse = (OffsetCommitResponse) 
response.responseBody();
+            Set<String> unauthorizedTopics = new HashSet<>();
+            for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+                for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
+                    TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
+                    OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+                    long offset = offsetAndMetadata.offset();
+                    Errors error = Errors.forCode(partition.errorCode());
+                    if (error == Errors.NONE) {
+                        log.debug("OffsetCommit {} for partition {}", offset, 
tp);
+                        continue;
+                    }
+
+                    if (error.exception() instanceof RetriableException) {
+                        log.warn("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset,
+                            error.message());
+                    } else {
+                        log.error("OffsetCommit failed on partition {} at 
offset {}: {}", tp, offset, error.message());
+                    }
+
+                    if (!continueHandlePartitionErrors(error, tp, offset, 
unauthorizedTopics, responseTime)) {
+                        return;
+                    }
+                }
+            }
+
+            if (!unauthorizedTopics.isEmpty()) {
+                log.error("OffsetCommit failed due to not authorized to commit 
to topics {}", unauthorizedTopics);
+                future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else {
+                future.complete(null);
+            }
+        }
+
+        private void retry(final long currentTimeMs) {
+            System.out.println("timeout" + currentTimeMs);

Review Comment:
   Accidentally exposed my debug skills.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to