guozhangwang commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1134383599
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { } } + static class UnsentOffsetFetchRequestState extends RequestState { + public final Set<TopicPartition> requestedPartitions; + public final GroupState.Generation requestedGeneration; + public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future; + + public UnsentOffsetFetchRequestState(final Set<TopicPartition> partitions, + final GroupState.Generation generation, + final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future, + final long retryBackoffMs) { + super(retryBackoffMs); + this.requestedPartitions = partitions; + this.requestedGeneration = generation; + this.future = future; + } + + public boolean sameRequest(final UnsentOffsetFetchRequestState request) { + return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions); + } + } + + /** + * <p>This is used to support the committed() API. Here we use a Java Collections, {@code unsentRequests}, to + * track + * the OffsetFetchRequests that haven't been sent, to prevent sending the same requests in the same batch. + * + * <p>If the request is new. It will be enqueued to the {@code unsentRequest}, and will be sent upon the next + * poll. + * + * <p>If the same request has been sent, the request's {@code CompletableFuture} will be completed upon the + * completion of the existing one. + * + * TODO: There is an optimization to present duplication to the sent but incompleted requests. I'm not sure if we + * need that. + */ + class UnsentOffsetFetchRequests { + private List<UnsentOffsetFetchRequestState> unsentRequests = new ArrayList<>(); + + public boolean canSendOffsetFetch() { Review Comment: I think the func name could be misleading in the long run. Let's just use more specific name like `hasUnsentRequests`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -140,6 +158,24 @@ CompletableFuture<ClientResponse> sendAutoCommit(final Map<TopicPartition, Offse return future; } + /** + * Handles {@link org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}. It creates an + * {@link UnsentOffsetFetchRequestState} and enqueue it to send later. + */ + public void addCommittedRequest(final Set<TopicPartition> partitions, Review Comment: nit: let's rename the other `add` func to `addCommitOffsetRequest` to be more distinguishable with this one? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { } } + static class UnsentOffsetFetchRequestState extends RequestState { + public final Set<TopicPartition> requestedPartitions; + public final GroupState.Generation requestedGeneration; + public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future; + + public UnsentOffsetFetchRequestState(final Set<TopicPartition> partitions, + final GroupState.Generation generation, + final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future, + final long retryBackoffMs) { + super(retryBackoffMs); + this.requestedPartitions = partitions; + this.requestedGeneration = generation; + this.future = future; + } + + public boolean sameRequest(final UnsentOffsetFetchRequestState request) { + return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions); + } + } + + /** + * <p>This is used to support the committed() API. Here we use a Java Collections, {@code unsentRequests}, to + * track + * the OffsetFetchRequests that haven't been sent, to prevent sending the same requests in the same batch. + * + * <p>If the request is new. It will be enqueued to the {@code unsentRequest}, and will be sent upon the next + * poll. + * + * <p>If the same request has been sent, the request's {@code CompletableFuture} will be completed upon the + * completion of the existing one. + * + * TODO: There is an optimization to present duplication to the sent but incompleted requests. I'm not sure if we + * need that. + */ + class UnsentOffsetFetchRequests { Review Comment: The original flag is used not only to cover the period when there's already a request not yet sent, but also during the time when the request is sent but the response is not yet received. It seems that we would not cover the second period i.e. if a request is sent and response not yet received, we may send the same request again. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -149,6 +185,29 @@ Queue<StagedCommit> stagedCommits() { return this.stagedCommits; } + /** + * Get all the sendable requests, and create a list of UnsentRequest. + */ + private List<NetworkClientDelegate.UnsentRequest> sendOffsetFetchRequests(final long currentTimeMs) { Review Comment: Again the func name is a bit misleading. What about `constructOffsetFetchUnsentRequests`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -140,6 +158,24 @@ CompletableFuture<ClientResponse> sendAutoCommit(final Map<TopicPartition, Offse return future; } + /** + * Handles {@link org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}. It creates an + * {@link UnsentOffsetFetchRequestState} and enqueue it to send later. + */ + public void addCommittedRequest(final Set<TopicPartition> partitions, + final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future) { + if (partitions.isEmpty()) { + future.complete(new HashMap<>()); + return; + } + + unsentOffsetFetchRequests.enqueue(new UnsentOffsetFetchRequestState( Review Comment: This one reminds me: if we can directly construct the `RequestState` here, we we cannot do the same for commit request and instead have to add to a `stagedCommit` first? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -149,6 +185,29 @@ Queue<StagedCommit> stagedCommits() { return this.stagedCommits; } + /** + * Get all the sendable requests, and create a list of UnsentRequest. + */ + private List<NetworkClientDelegate.UnsentRequest> sendOffsetFetchRequests(final long currentTimeMs) { + List<UnsentOffsetFetchRequestState> requests = unsentOffsetFetchRequests.sendableRequests(currentTimeMs); + List<NetworkClientDelegate.UnsentRequest> pollResults = new ArrayList<>(); + requests.forEach(req -> { + OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder( + groupState.groupId, true, + new ArrayList<>(req.requestedPartitions), + throwOnFetchStableOffsetUnsupported); + NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest( + builder, + coordinatorRequestManager.coordinator()); + FetchCommittedOffsetResponseHandler cb = new FetchCommittedOffsetResponseHandler(req); + unsentRequest.future().whenComplete((r, t) -> { Review Comment: Why we use a `FetchCommittedOffsetResponseHandler` for committed API, while using a general `FutureCompletionHandler` for commit API? Is this inconsistency intentional? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -306,13 +310,28 @@ public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) { } @Override - public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) { - throw new KafkaException("method not implemented"); + public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) { + return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs)); } @Override - public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, Duration timeout) { - throw new KafkaException("method not implemented"); + public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, + final Duration timeout) { + maybeThrowInvalidGroupIdException(); + final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(partitions); + eventHandler.add(event); + try { + return event.complete(Duration.ofMillis(100)); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + throw new KafkaException(e); Review Comment: I think it's not... why not read through the current code and make the throwing behavior more consistent? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -80,16 +97,18 @@ public CommitRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { - maybeAutoCommit(currentTimeMs); + maybeAutoCommit(); + List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>(); - if (stagedCommits.isEmpty()) { - return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()); + if (!stagedCommits.isEmpty()) { Review Comment: I think with the new `unsentOffsetFetchRequests` the `stagedCommits` can be merged into it, since it seems unnecessary to keep two sets of unsent requests just to pour one of its content into another periodically. Also I feel that it's not necessarily to keep a second placeholder for `UnsentRequest` in `UnsentOffsetFetchRequestState`. More specifically, upon `manager.add` we can check whether there's a dup and if not construct an `UnsentRequest` with the handler hooked right away and add to the `unsentOffsetFetchRequests`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -140,6 +158,24 @@ CompletableFuture<ClientResponse> sendAutoCommit(final Map<TopicPartition, Offse return future; } + /** + * Handles {@link org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}. It creates an + * {@link UnsentOffsetFetchRequestState} and enqueue it to send later. + */ + public void addCommittedRequest(final Set<TopicPartition> partitions, + final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future) { + if (partitions.isEmpty()) { + future.complete(new HashMap<>()); + return; + } + + unsentOffsetFetchRequests.enqueue(new UnsentOffsetFetchRequestState( Review Comment: Okay this train of thought leads me to suggest merging the two as well as merging the `UnsentOffsetFetchRequestState` with `UnsentRequest` as well (see other detailed comments). LMK what do you think. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -80,16 +97,18 @@ public CommitRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { - maybeAutoCommit(currentTimeMs); + maybeAutoCommit(); + List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>(); - if (stagedCommits.isEmpty()) { - return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()); + if (!stagedCommits.isEmpty()) { Review Comment: Of course that means we need the `UnsentRequest` to also include a field of `RequestState` to be able to check `canSendRequest` and mark attempt timestamp etc. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { } } + static class UnsentOffsetFetchRequestState extends RequestState { + public final Set<TopicPartition> requestedPartitions; + public final GroupState.Generation requestedGeneration; + public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future; + + public UnsentOffsetFetchRequestState(final Set<TopicPartition> partitions, + final GroupState.Generation generation, + final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future, + final long retryBackoffMs) { + super(retryBackoffMs); + this.requestedPartitions = partitions; + this.requestedGeneration = generation; + this.future = future; + } + + public boolean sameRequest(final UnsentOffsetFetchRequestState request) { + return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions); + } + } + + /** + * <p>This is used to support the committed() API. Here we use a Java Collections, {@code unsentRequests}, to + * track + * the OffsetFetchRequests that haven't been sent, to prevent sending the same requests in the same batch. + * + * <p>If the request is new. It will be enqueued to the {@code unsentRequest}, and will be sent upon the next + * poll. + * + * <p>If the same request has been sent, the request's {@code CompletableFuture} will be completed upon the + * completion of the existing one. + * + * TODO: There is an optimization to present duplication to the sent but incompleted requests. I'm not sure if we Review Comment: Yeah I think we can defer that for future work. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) { this.timer.update(currentTimeMs); } } + + private class FetchCommittedOffsetResponseHandler { Review Comment: Assuming this is just copy-paste here, without any handler logic changes, so I skipped this class. @philipnee let me know if otherwise. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -149,6 +185,29 @@ Queue<StagedCommit> stagedCommits() { return this.stagedCommits; } + /** + * Get all the sendable requests, and create a list of UnsentRequest. + */ + private List<NetworkClientDelegate.UnsentRequest> sendOffsetFetchRequests(final long currentTimeMs) { + List<UnsentOffsetFetchRequestState> requests = unsentOffsetFetchRequests.sendableRequests(currentTimeMs); + List<NetworkClientDelegate.UnsentRequest> pollResults = new ArrayList<>(); + requests.forEach(req -> { + OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder( + groupState.groupId, true, + new ArrayList<>(req.requestedPartitions), + throwOnFetchStableOffsetUnsupported); + NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest( + builder, + coordinatorRequestManager.coordinator()); + FetchCommittedOffsetResponseHandler cb = new FetchCommittedOffsetResponseHandler(req); Review Comment: I think it is actually fine :) But just to echo my other comments, I think we can merge `stagedCommits` with this class as well as eliminating `UnsentOffsetFetchRequestState`, in which case we will always construct the `UnsentRequest` right away and add to the list, so that we do not need to do it here. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -437,29 +456,24 @@ public void wakeup() { */ @Override public void commitSync(final Duration timeout) { - final CommitApplicationEvent commitEvent = new CommitApplicationEvent(subscriptions.allConsumed()); - eventHandler.add(commitEvent); - - final CompletableFuture<Void> commitFuture = commitEvent.future(); - try { - commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - } catch (final TimeoutException e) { - throw new org.apache.kafka.common.errors.TimeoutException( - "timeout"); - } catch (final Exception e) { - // handle exception here - throw new RuntimeException(e); - } + commitSync(subscriptions.allConsumed(), timeout); } @Override public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) { - throw new KafkaException("method not implemented"); + commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs)); } @Override public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) { - throw new KafkaException("method not implemented"); + CompletableFuture<Void> commitFuture = commit(offsets); + try { + commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (final TimeoutException e) { Review Comment: Ditto here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org