philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1144002857


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
         }
     }
 
+    private class UnsentOffsetFetchRequest extends RequestState {
+        public final Set<TopicPartition> requestedPartitions;
+        public final GroupState.Generation requestedGeneration;
+        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
+
+        public UnsentOffsetFetchRequest(final Set<TopicPartition> partitions,
+                                        final GroupState.Generation generation,
+                                        final long retryBackoffMs) {
+            super(retryBackoffMs);
+            this.requestedPartitions = partitions;
+            this.requestedGeneration = generation;
+            this.future = new CompletableFuture<>();
+        }
+
+        public boolean sameRequest(final UnsentOffsetFetchRequest request) {
+            return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+        }
+
+        public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long 
currentTimeMs) {
+            OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+                    groupState.groupId,
+                    true,
+                    new ArrayList<>(this.requestedPartitions),
+                    throwOnFetchStableOffsetUnsupported);
+            NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+                    builder,
+                    coordinatorRequestManager.coordinator());
+            unsentRequest.future().whenComplete((r, t) -> {
+                onResponse(currentTimeMs, (OffsetFetchResponse) 
r.responseBody());
+            });
+            return unsentRequest;
+        }
+
+        public void onResponse(
+                final long currentTimeMs,
+                final OffsetFetchResponse response) {
+            Errors responseError = 
response.groupLevelError(groupState.groupId);
+            if (responseError != Errors.NONE) {
+                onFailure(currentTimeMs, responseError);
+                return;
+            }
+            onSuccess(currentTimeMs, response);
+        }
+
+        private void onFailure(final long currentTimeMs,
+                               final Errors responseError) {
+            log.debug("Offset fetch failed: {}", responseError.message());
+
+            // TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
+            if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                retry(currentTimeMs);
+            } else if (responseError == Errors.NOT_COORDINATOR) {
+                // re-discover the coordinator and retry
+                
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), 
Time.SYSTEM.milliseconds());
+                retry(currentTimeMs);
+            } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
+                
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
+            } else {
+                future.completeExceptionally(new KafkaException("Unexpected 
error in fetch offset response: " + responseError.message()));
+            }
+        }
+
+        private void retry(final long currentTimeMs) {
+            onFailedAttempt(currentTimeMs);
+            onSendAttempt(currentTimeMs);
+            pendingRequests.addOffsetFetchRequest(this);
+        }
+
+        private void onSuccess(final long currentTimeMs,
+                               final OffsetFetchResponse response) {
+            Set<String> unauthorizedTopics = null;
+            Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData =
+                    response.partitionDataMap(groupState.groupId);
+            Map<TopicPartition, OffsetAndMetadata> offsets = new 
HashMap<>(responseData.size());
+            Set<TopicPartition> unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> 
entry : responseData.entrySet()) {
+                TopicPartition tp = entry.getKey();
+                OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+                if (partitionData.hasError()) {
+                    Errors error = partitionData.error;
+                    log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
+
+                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                        future.completeExceptionally(new KafkaException("Topic 
or Partition " + tp + " does " +
+                                "not " +
+                                "exist"));
+                        return;
+                    } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+                        if (unauthorizedTopics == null) {
+                            unauthorizedTopics = new HashSet<>();
+                        }
+                        unauthorizedTopics.add(tp.topic());
+                    } else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
+                        unstableTxnOffsetTopicPartitions.add(tp);
+                    } else {
+                        future.completeExceptionally(new 
KafkaException("Unexpected error in fetch offset " +
+                                "response for partition " + tp + ": " + 
error.message()));
+                        return;
+                    }
+                } else if (partitionData.offset >= 0) {
+                    // record the position with the offset (-1 indicates no 
committed offset to fetch);
+                    // if there's no committed offset, record as null
+                    offsets.put(tp, new 
OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, 
partitionData.metadata));
+                } else {
+                    log.info("Found no committed offset for partition {}", tp);
+                    offsets.put(tp, null);
+                }
+            }
+
+            if (unauthorizedTopics != null) {
+                future.completeExceptionally(new 
TopicAuthorizationException(unauthorizedTopics));
+            } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
+                // TODO: Optimization question: Do we need to retry all 
partitions upon a single partition error?
+                log.info("The following partitions still have unstable offsets 
" +
+                        "which are not cleared on the broker side: {}" +
+                        ", this could be either " +
+                        "transactional offsets waiting for completion, or " +
+                        "normal offsets waiting for replication after 
appending to local log", unstableTxnOffsetTopicPartitions);
+                retry(currentTimeMs);
+            } else {
+                future.complete(offsets);
+            }
+        }
+
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
chainFuture(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future) {
+            return this.future.whenComplete((r, t) -> {
+                if (t != null) {
+                    future.completeExceptionally(t);
+                } else {
+                    future.complete(r);
+                }
+            });
+        }
+    }
+
+    /**
+     * <p>This is used to stage the unsent requests, i.e., {@link 
UnsentOffsetCommitRequest} and {@link UnsentOffsetFetchRequest}.
+     *
+     * <p>If the request is new. It will be enqueued and will be sent upon the 
next
+     * poll.
+     *
+     * <p>If the same request has been sent, the request's {@code 
CompletableFuture} will be completed upon the
+     * completion of the existing one.
+     *
+     * <p>There is a special handling for the {@link 
UnsentOffsetFetchRequest}. If a duplicated request was sent
+     * previously, we will chain the future to the current future.
+     */
+
+    class PendingRequests {
+        // Queue is used to ensure the sequence of commit
+        Queue<UnsentOffsetCommitRequest> unsentOffsetCommits = new 
LinkedList<>();
+        List<UnsentOffsetFetchRequest> unsentOffsetFetches = new ArrayList<>();
+        List<UnsentOffsetFetchRequest> inflightOffsetFetches = new 
ArrayList<>();
+
+        public boolean hasUnsentRequests() {
+            return !unsentOffsetCommits.isEmpty() || 
!unsentOffsetFetches.isEmpty();
+        }
+
+        public CompletableFuture<ClientResponse> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {
+            UnsentOffsetCommitRequest request = new UnsentOffsetCommitRequest(
+                    offsets,
+                    groupState.groupId,
+                    groupState.groupInstanceId.orElse(null),
+                    groupState.generation);
+            unsentOffsetCommits.add(request);
+            return request.future();
+        }
+
+        /**
+         *  We want to avoid duplication when sending an {@link 
OffsetFetchRequest}. The following checks are done:
+         *  <li>1. dedupe against unsents: if a duplicated request was 
previously made, we chain the future</>
+         *  <li>2. dedupe against incompleted: If a duplicated request was 
sent but hasn't gotten a results, we chain
+         *  the future.</>
+         *
+         *  <p>If the request is new, we chain a call back to remove itself 
from the {@code inflightOffsetFetches}
+         *  upon completion.</>
+         */
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(final UnsentOffsetFetchRequest request) {
+            Optional<UnsentOffsetFetchRequest> dupe =
+                    unsentOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
+            Optional<UnsentOffsetFetchRequest> inflight =
+                    inflightOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
+
+            if (dupe.isPresent() || inflight.isPresent()) {
+                log.info("Duplicated OffsetFetchRequest: " + 
request.requestedPartitions);
+                dupe.orElseGet(() -> 
inflight.get()).chainFuture(request.future);
+            } else {
+                // remove the request from the outbound buffer: 
inflightOffsetFetches
+                request.future.whenComplete((r, t) -> {
+                    if (!inflightOffsetFetches.remove(request)) {
+                        log.info("unable to remove request from the outbound 
buffer:" + request);
+                    }
+                });
+                this.unsentOffsetFetches.add(request);
+            }
+            return request.future;
+        }
+
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(final Set<TopicPartition> partitions) {
+            UnsentOffsetFetchRequest request = new UnsentOffsetFetchRequest(
+                    partitions,
+                    groupState.generation,
+                    retryBackoffMs);
+            return addOffsetFetchRequest(request);
+        }
+
+        public List<NetworkClientDelegate.UnsentRequest> drain(final long 
currentTimeMs) {
+            List<NetworkClientDelegate.UnsentRequest> unsent = new 
ArrayList<>();
+            unsent.addAll(unsentOffsetCommits.stream()
+                    .map(UnsentOffsetCommitRequest::toUnsentRequest)
+                    .collect(Collectors.toList()));
+            List<UnsentOffsetFetchRequest> sendables = 
unsentOffsetFetches.stream()
+                    .filter(r -> r.canSendRequest(currentTimeMs))
+                    .collect(Collectors.toList());
+            inflightOffsetFetches.addAll(sendables);
+            unsent.addAll(sendables.stream()
+                    .peek(r -> r.onSendAttempt(currentTimeMs))
+                    .map(r -> r.toUnsentRequest(currentTimeMs))
+                    .collect(Collectors.toList()));
+            // empty the staged requests
+            unsentOffsetCommits.clear();

Review Comment:
   seems like a bug here: I cleared the non-sendables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to