philipnee commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1139653568
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -207,6 +223,209 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { } } + private class UnsentOffsetFetchRequest extends RequestState { + public final Set<TopicPartition> requestedPartitions; + public final GroupState.Generation requestedGeneration; + public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future; + + public UnsentOffsetFetchRequest(final Set<TopicPartition> partitions, + final GroupState.Generation generation, + final long retryBackoffMs) { + super(retryBackoffMs); + this.requestedPartitions = partitions; + this.requestedGeneration = generation; + this.future = new CompletableFuture<>(); + } + + public boolean sameRequest(final UnsentOffsetFetchRequest request) { + return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions); + } + + public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long currentTimeMs) { + OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder( + groupState.groupId, + true, + new ArrayList<>(this.requestedPartitions), + throwOnFetchStableOffsetUnsupported); + NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest( + builder, + coordinatorRequestManager.coordinator()); + unsentRequest.future().whenComplete((r, t) -> { + onResponse(currentTimeMs, (OffsetFetchResponse) r.responseBody()); + }); + return unsentRequest; + } + + public void onResponse( Review Comment: I combined the handler into `UnsentOffsetFetchRequest` as we don't really need to split them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org