guozhangwang commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1134383599


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
         }
     }
 
+    static class UnsentOffsetFetchRequestState extends RequestState {
+        public final Set<TopicPartition> requestedPartitions;
+        public final GroupState.Generation requestedGeneration;
+        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
+
+        public UnsentOffsetFetchRequestState(final Set<TopicPartition> 
partitions,
+                                             final GroupState.Generation 
generation,
+                                             final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future,
+                                             final long retryBackoffMs) {
+            super(retryBackoffMs);
+            this.requestedPartitions = partitions;
+            this.requestedGeneration = generation;
+            this.future = future;
+        }
+
+        public boolean sameRequest(final UnsentOffsetFetchRequestState 
request) {
+            return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+        }
+    }
+
+    /**
+     * <p>This is used to support the committed() API. Here we use a Java 
Collections, {@code unsentRequests}, to
+     * track
+     * the OffsetFetchRequests that haven't been sent, to prevent sending the 
same requests in the same batch.
+     *
+     * <p>If the request is new. It will be enqueued to the {@code 
unsentRequest}, and will be sent upon the next
+     * poll.
+     *
+     * <p>If the same request has been sent, the request's {@code 
CompletableFuture} will be completed upon the
+     * completion of the existing one.
+     *
+     * TODO: There is an optimization to present duplication to the sent but 
incompleted requests. I'm not sure if we
+     * need that.
+     */
+    class UnsentOffsetFetchRequests {
+        private List<UnsentOffsetFetchRequestState> unsentRequests = new 
ArrayList<>();
+
+        public boolean canSendOffsetFetch() {

Review Comment:
   I think the func name could be misleading in the long run. Let's just use 
more specific name like `hasUnsentRequests`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -140,6 +158,24 @@ CompletableFuture<ClientResponse> sendAutoCommit(final 
Map<TopicPartition, Offse
         return future;
     }
 
+    /**
+     * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}.
 It creates an
+     * {@link UnsentOffsetFetchRequestState} and enqueue it to send later.
+     */
+    public void addCommittedRequest(final Set<TopicPartition> partitions,

Review Comment:
   nit: let's rename the other `add` func to `addCommitOffsetRequest` to be 
more distinguishable with this one?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
         }
     }
 
+    static class UnsentOffsetFetchRequestState extends RequestState {
+        public final Set<TopicPartition> requestedPartitions;
+        public final GroupState.Generation requestedGeneration;
+        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
+
+        public UnsentOffsetFetchRequestState(final Set<TopicPartition> 
partitions,
+                                             final GroupState.Generation 
generation,
+                                             final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future,
+                                             final long retryBackoffMs) {
+            super(retryBackoffMs);
+            this.requestedPartitions = partitions;
+            this.requestedGeneration = generation;
+            this.future = future;
+        }
+
+        public boolean sameRequest(final UnsentOffsetFetchRequestState 
request) {
+            return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+        }
+    }
+
+    /**
+     * <p>This is used to support the committed() API. Here we use a Java 
Collections, {@code unsentRequests}, to
+     * track
+     * the OffsetFetchRequests that haven't been sent, to prevent sending the 
same requests in the same batch.
+     *
+     * <p>If the request is new. It will be enqueued to the {@code 
unsentRequest}, and will be sent upon the next
+     * poll.
+     *
+     * <p>If the same request has been sent, the request's {@code 
CompletableFuture} will be completed upon the
+     * completion of the existing one.
+     *
+     * TODO: There is an optimization to present duplication to the sent but 
incompleted requests. I'm not sure if we
+     * need that.
+     */
+    class UnsentOffsetFetchRequests {

Review Comment:
   The original flag is used not only to cover the period when there's already 
a request not yet sent, but also during the time when the request is sent but 
the response is not yet received. It seems that we would not cover the second 
period i.e. if a request is sent and response not yet received, we may send the 
same request again.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -149,6 +185,29 @@ Queue<StagedCommit> stagedCommits() {
         return this.stagedCommits;
     }
 
+    /**
+     * Get all the sendable requests, and create a list of UnsentRequest.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> 
sendOffsetFetchRequests(final long currentTimeMs) {

Review Comment:
   Again the func name is a bit misleading. What about 
`constructOffsetFetchUnsentRequests`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -140,6 +158,24 @@ CompletableFuture<ClientResponse> sendAutoCommit(final 
Map<TopicPartition, Offse
         return future;
     }
 
+    /**
+     * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}.
 It creates an
+     * {@link UnsentOffsetFetchRequestState} and enqueue it to send later.
+     */
+    public void addCommittedRequest(final Set<TopicPartition> partitions,
+                                    final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
+        if (partitions.isEmpty()) {
+            future.complete(new HashMap<>());
+            return;
+        }
+
+        unsentOffsetFetchRequests.enqueue(new UnsentOffsetFetchRequestState(

Review Comment:
   This one reminds me: if we can directly construct the `RequestState` here, 
we we cannot do the same for commit request and instead have to add to a 
`stagedCommit` first?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -149,6 +185,29 @@ Queue<StagedCommit> stagedCommits() {
         return this.stagedCommits;
     }
 
+    /**
+     * Get all the sendable requests, and create a list of UnsentRequest.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> 
sendOffsetFetchRequests(final long currentTimeMs) {
+        List<UnsentOffsetFetchRequestState> requests = 
unsentOffsetFetchRequests.sendableRequests(currentTimeMs);
+        List<NetworkClientDelegate.UnsentRequest> pollResults = new 
ArrayList<>();
+        requests.forEach(req -> {
+            OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+                    groupState.groupId, true,
+                    new ArrayList<>(req.requestedPartitions),
+                    throwOnFetchStableOffsetUnsupported);
+            NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+                    builder,
+                    coordinatorRequestManager.coordinator());
+            FetchCommittedOffsetResponseHandler cb = new 
FetchCommittedOffsetResponseHandler(req);
+            unsentRequest.future().whenComplete((r, t) -> {

Review Comment:
   Why we use a `FetchCommittedOffsetResponseHandler` for committed API, while 
using a general `FutureCompletionHandler` for commit API? Is this inconsistency 
intentional?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -306,13 +310,28 @@ public OffsetAndMetadata committed(TopicPartition 
partition, Duration timeout) {
     }
 
     @Override
-    public Map<TopicPartition, OffsetAndMetadata> 
committed(Set<TopicPartition> partitions) {
-        throw new KafkaException("method not implemented");
+    public Map<TopicPartition, OffsetAndMetadata> committed(final 
Set<TopicPartition> partitions) {
+        return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     @Override
-    public Map<TopicPartition, OffsetAndMetadata> 
committed(Set<TopicPartition> partitions, Duration timeout) {
-        throw new KafkaException("method not implemented");
+    public Map<TopicPartition, OffsetAndMetadata> committed(final 
Set<TopicPartition> partitions,
+                                                            final Duration 
timeout) {
+        maybeThrowInvalidGroupIdException();
+        final OffsetFetchApplicationEvent event = new 
OffsetFetchApplicationEvent(partitions);
+        eventHandler.add(event);
+        try {
+            return event.complete(Duration.ofMillis(100));
+        } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
+            throw new KafkaException(e);

Review Comment:
   I think it's not... why not read through the current code and make the 
throwing behavior more consistent?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -80,16 +97,18 @@ public CommitRequestManager(
      */
     @Override
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-        maybeAutoCommit(currentTimeMs);
+        maybeAutoCommit();
+        List<NetworkClientDelegate.UnsentRequest> unsentRequests = new 
ArrayList<>();
 
-        if (stagedCommits.isEmpty()) {
-            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>());
+        if (!stagedCommits.isEmpty()) {

Review Comment:
   I think with the new `unsentOffsetFetchRequests` the `stagedCommits` can be 
merged into it, since it seems unnecessary to keep two sets of unsent requests 
just to pour one of its content into another periodically.
   
   Also I feel that it's not necessarily to keep a second placeholder for 
`UnsentRequest` in `UnsentOffsetFetchRequestState`.
   
   More specifically, upon `manager.add` we can check whether there's a dup and 
if not construct an `UnsentRequest` with the handler hooked right away and add 
to the `unsentOffsetFetchRequests`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -140,6 +158,24 @@ CompletableFuture<ClientResponse> sendAutoCommit(final 
Map<TopicPartition, Offse
         return future;
     }
 
+    /**
+     * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}.
 It creates an
+     * {@link UnsentOffsetFetchRequestState} and enqueue it to send later.
+     */
+    public void addCommittedRequest(final Set<TopicPartition> partitions,
+                                    final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
+        if (partitions.isEmpty()) {
+            future.complete(new HashMap<>());
+            return;
+        }
+
+        unsentOffsetFetchRequests.enqueue(new UnsentOffsetFetchRequestState(

Review Comment:
   Okay this train of thought leads me to suggest merging the two as well as 
merging the `UnsentOffsetFetchRequestState` with `UnsentRequest` as well (see 
other detailed comments). LMK what do you think.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -80,16 +97,18 @@ public CommitRequestManager(
      */
     @Override
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-        maybeAutoCommit(currentTimeMs);
+        maybeAutoCommit();
+        List<NetworkClientDelegate.UnsentRequest> unsentRequests = new 
ArrayList<>();
 
-        if (stagedCommits.isEmpty()) {
-            return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>());
+        if (!stagedCommits.isEmpty()) {

Review Comment:
   Of course that means we need the `UnsentRequest` to also include a field of 
`RequestState` to be able to check `canSendRequest` and mark attempt timestamp 
etc.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
         }
     }
 
+    static class UnsentOffsetFetchRequestState extends RequestState {
+        public final Set<TopicPartition> requestedPartitions;
+        public final GroupState.Generation requestedGeneration;
+        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
+
+        public UnsentOffsetFetchRequestState(final Set<TopicPartition> 
partitions,
+                                             final GroupState.Generation 
generation,
+                                             final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future,
+                                             final long retryBackoffMs) {
+            super(retryBackoffMs);
+            this.requestedPartitions = partitions;
+            this.requestedGeneration = generation;
+            this.future = future;
+        }
+
+        public boolean sameRequest(final UnsentOffsetFetchRequestState 
request) {
+            return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+        }
+    }
+
+    /**
+     * <p>This is used to support the committed() API. Here we use a Java 
Collections, {@code unsentRequests}, to
+     * track
+     * the OffsetFetchRequests that haven't been sent, to prevent sending the 
same requests in the same batch.
+     *
+     * <p>If the request is new. It will be enqueued to the {@code 
unsentRequest}, and will be sent upon the next
+     * poll.
+     *
+     * <p>If the same request has been sent, the request's {@code 
CompletableFuture} will be completed upon the
+     * completion of the existing one.
+     *
+     * TODO: There is an optimization to present duplication to the sent but 
incompleted requests. I'm not sure if we

Review Comment:
   Yeah I think we can defer that for future work.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) {
             this.timer.update(currentTimeMs);
         }
     }
+
+    private class FetchCommittedOffsetResponseHandler {

Review Comment:
   Assuming this is just copy-paste here, without any handler logic changes, so 
I skipped this class. @philipnee let me know if otherwise.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -149,6 +185,29 @@ Queue<StagedCommit> stagedCommits() {
         return this.stagedCommits;
     }
 
+    /**
+     * Get all the sendable requests, and create a list of UnsentRequest.
+     */
+    private List<NetworkClientDelegate.UnsentRequest> 
sendOffsetFetchRequests(final long currentTimeMs) {
+        List<UnsentOffsetFetchRequestState> requests = 
unsentOffsetFetchRequests.sendableRequests(currentTimeMs);
+        List<NetworkClientDelegate.UnsentRequest> pollResults = new 
ArrayList<>();
+        requests.forEach(req -> {
+            OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+                    groupState.groupId, true,
+                    new ArrayList<>(req.requestedPartitions),
+                    throwOnFetchStableOffsetUnsupported);
+            NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+                    builder,
+                    coordinatorRequestManager.coordinator());
+            FetchCommittedOffsetResponseHandler cb = new 
FetchCommittedOffsetResponseHandler(req);

Review Comment:
   I think it is actually fine :) But just to echo my other comments, I think 
we can merge `stagedCommits` with this class as well as eliminating 
`UnsentOffsetFetchRequestState`, in which case we will always construct the 
`UnsentRequest` right away and add to the list, so that we do not need to do it 
here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -437,29 +456,24 @@ public void wakeup() {
      */
     @Override
     public void commitSync(final Duration timeout) {
-        final CommitApplicationEvent commitEvent = new 
CommitApplicationEvent(subscriptions.allConsumed());
-        eventHandler.add(commitEvent);
-
-        final CompletableFuture<Void> commitFuture = commitEvent.future();
-        try {
-            commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-        } catch (final TimeoutException e) {
-            throw new org.apache.kafka.common.errors.TimeoutException(
-                     "timeout");
-        } catch (final Exception e) {
-            // handle exception here
-            throw new RuntimeException(e);
-        }
+        commitSync(subscriptions.allConsumed(), timeout);
     }
 
     @Override
     public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
-        throw new KafkaException("method not implemented");
+        commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     @Override
     public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, 
Duration timeout) {
-        throw new KafkaException("method not implemented");
+        CompletableFuture<Void> commitFuture = commit(offsets);
+        try {
+            commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+        } catch (final TimeoutException e) {

Review Comment:
   Ditto here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to