philipnee commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1139653568


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +223,209 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
         }
     }
 
+    private class UnsentOffsetFetchRequest extends RequestState {
+        public final Set<TopicPartition> requestedPartitions;
+        public final GroupState.Generation requestedGeneration;
+        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
+
+        public UnsentOffsetFetchRequest(final Set<TopicPartition> partitions,
+                                        final GroupState.Generation generation,
+                                        final long retryBackoffMs) {
+            super(retryBackoffMs);
+            this.requestedPartitions = partitions;
+            this.requestedGeneration = generation;
+            this.future = new CompletableFuture<>();
+        }
+
+        public boolean sameRequest(final UnsentOffsetFetchRequest request) {
+            return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+        }
+
+        public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long 
currentTimeMs) {
+            OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+                    groupState.groupId,
+                    true,
+                    new ArrayList<>(this.requestedPartitions),
+                    throwOnFetchStableOffsetUnsupported);
+            NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+                    builder,
+                    coordinatorRequestManager.coordinator());
+            unsentRequest.future().whenComplete((r, t) -> {
+                onResponse(currentTimeMs, (OffsetFetchResponse) 
r.responseBody());
+            });
+            return unsentRequest;
+        }
+
+        public void onResponse(

Review Comment:
   I combined the handler into `UnsentOffsetFetchRequest` as we don't really 
need to split them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to