kirktrue commented on code in PR #15438:
URL: https://github.com/apache/kafka/pull/15438#discussion_r1505131769


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##########
@@ -16,10 +16,14 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.Uuid;
+
 import java.util.Objects;
 
 /**
- * This is the abstract definition of the events created by the KafkaConsumer 
API
+ * This is the abstract definition of the events created by the {@link 
AsyncKafkaConsumer} on the user's
+ * application thread.

Review Comment:
   Minor comments tweak.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java:
##########
@@ -99,18 +100,17 @@ public long maximumTimeToWait() {
      *
      * <p/>
      *
-     * See {@link CompletableApplicationEvent#get(Timer)} and {@link 
Future#get(long, TimeUnit)} for more details.
+     * See {@link ConsumerUtils#getResult(Future, Timer)} and {@link 
Future#get(long, TimeUnit)} for more details.
      *
      * @param event A {@link CompletableApplicationEvent} created by the 
polling thread
-     * @param timer Timer for which to wait for the event to complete
      * @return      Value that is the result of the event
      * @param <T>   Type of return value of the event
      */
     public <T> T addAndGet(final CompletableApplicationEvent<T> event, final 
Timer timer) {
         Objects.requireNonNull(event, "CompletableApplicationEvent provided to 
addAndGet must be non-null");
         Objects.requireNonNull(timer, "Timer provided to addAndGet must be 
non-null");
         add(event);
-        return event.get(timer);
+        return ConsumerUtils.getResult(event.future(), timer);
     }

Review Comment:
   The `CompletableApplicationEvent.get()` method was removed. Since it was 
just a shim over `ConsumerUtils.getResult()` anyway, I removed it to avoid 
confusion/misuse.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -988,12 +988,11 @@ public List<PartitionInfo> partitionsFor(String topic, 
Duration timeout) {
                 throw new TimeoutException();
             }
 
-            final TopicMetadataApplicationEvent topicMetadataApplicationEvent =
-                    new TopicMetadataApplicationEvent(topic, 
timeout.toMillis());
-            
wakeupTrigger.setActiveTask(topicMetadataApplicationEvent.future());
+            final TopicMetadataEvent topicMetadataEvent = new 
TopicMetadataEvent(Optional.of(topic), timeout.toMillis());
+            wakeupTrigger.setActiveTask(topicMetadataEvent.future());

Review Comment:
   A minor internal change was to use an `Optional` for the 
`TopicMetadataEvent` constructor to indicate "all topics" (`Optional.empty()`) 
vs a specific topic (`Optional.of(topic)`).



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java:
##########
@@ -22,14 +22,14 @@
 import java.util.Collections;
 import java.util.Map;
 
-public abstract class CommitApplicationEvent extends 
CompletableApplicationEvent<Void> {
+public abstract class CommitEvent extends CompletableApplicationEvent<Void> {
 
     /**
      * Offsets to commit per partition.
      */
     private final Map<TopicPartition, OffsetAndMetadata> offsets;
 
-    public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets, Type type) {
+    protected CommitEvent(final Type type, final Map<TopicPartition, 
OffsetAndMetadata> offsets) {

Review Comment:
   I reordered the parameters and made it `protected` since its an abstract 
class. Just being a little anal again.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -142,40 +143,40 @@ private void process(final PollApplicationEvent event) {
         requestManagers.heartbeatRequestManager.ifPresent(hrm -> 
hrm.resetPollTimer(event.pollTimeMs()));
     }
 
-    private void process(final AsyncCommitApplicationEvent event) {
+    private void process(final AsyncCommitEvent event) {
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        CompletableFuture<Void> commitResult = 
manager.commitAsync(event.offsets());
-        event.chain(commitResult);
+        CompletableFuture<Void> future = manager.commitAsync(event.offsets());
+        chain(future, event.future());
     }
 
-    private void process(final SyncCommitApplicationEvent event) {
+    private void process(final SyncCommitEvent event) {
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
         long expirationTimeoutMs = 
getExpirationTimeForTimeout(event.retryTimeoutMs());
-        CompletableFuture<Void> commitResult = 
manager.commitSync(event.offsets(), expirationTimeoutMs);
-        event.chain(commitResult);
+        CompletableFuture<Void> future = manager.commitSync(event.offsets(), 
expirationTimeoutMs);
+        chain(future, event.future());
     }
 
-    private void process(final FetchCommittedOffsetsApplicationEvent event) {
+    private void process(final FetchCommittedOffsetsEvent event) {
         if (!requestManagers.commitRequestManager.isPresent()) {
             event.future().completeExceptionally(new KafkaException("Unable to 
fetch committed " +
                     "offset because the CommittedRequestManager is not 
available. Check if group.id was set correctly"));
             return;
         }
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
         long expirationTimeMs = getExpirationTimeForTimeout(event.timeout());
-        event.chain(manager.fetchOffsets(event.partitions(), 
expirationTimeMs));
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.fetchOffsets(event.partitions(), expirationTimeMs);
+        chain(future, event.future());
     }
 
     private void process(final NewTopicsMetadataUpdateRequestEvent ignored) {
         metadata.requestUpdateForNewTopics();
     }
 
-

Review Comment:
   Sorry for the whitespace change, but this one has been bugging me for months.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -293,6 +294,16 @@ long getExpirationTimeForTimeout(final long timeoutMs) {
         return expiration;
     }
 
+    private <T> void chain(final CompletableFuture<T> a, final 
CompletableFuture<T> b) {
+        a.whenComplete((value, exception) -> {
+            if (exception != null) {
+                b.completeExceptionally(exception);
+            } else {
+                b.complete(value);
+            }
+        });
+    }
+

Review Comment:
   `chain()` has been in `CompletedApplicationEvent` since its inception, but 
since that method is only used in this one class, I thought I'd migrate it here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1017,11 +1016,10 @@ public Map<String, List<PartitionInfo>> 
listTopics(Duration timeout) {
                 throw new TimeoutException();
             }
 
-            final TopicMetadataApplicationEvent topicMetadataApplicationEvent =
-                    new TopicMetadataApplicationEvent(timeout.toMillis());
-            
wakeupTrigger.setActiveTask(topicMetadataApplicationEvent.future());
+            final TopicMetadataEvent topicMetadataEvent = new 
TopicMetadataEvent(Optional.empty(), timeout.toMillis());
+            wakeupTrigger.setActiveTask(topicMetadataEvent.future());

Review Comment:
   Same thing here with the use of `Optional`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -213,38 +214,38 @@ private void process(final 
SubscriptionChangeApplicationEvent ignored) {
      *              execution for releasing the assignment completes, and the 
request to leave
      *              the group is sent out.
      */
-    private void process(final UnsubscribeApplicationEvent event) {
+    private void process(final UnsubscribeEvent event) {
         if (!requestManagers.heartbeatRequestManager.isPresent()) {
             KafkaException error = new KafkaException("Group membership 
manager not present when processing an unsubscribe event");
             event.future().completeExceptionally(error);
             return;
         }
         MembershipManager membershipManager = 
requestManagers.heartbeatRequestManager.get().membershipManager();
-        CompletableFuture<Void> result = membershipManager.leaveGroup();
-        event.chain(result);
+        CompletableFuture<Void> future = membershipManager.leaveGroup();
+        chain(future, event.future());
     }
 
-    private void process(final ResetPositionsApplicationEvent event) {
-        CompletableFuture<Void> result = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
-        event.chain(result);
+    private void process(final ResetPositionsEvent event) {
+        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
+        chain(future, event.future());
     }
 
-    private void process(final ValidatePositionsApplicationEvent event) {
-        CompletableFuture<Void> result = 
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
-        event.chain(result);
+    private void process(final ValidatePositionsEvent event) {
+        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
+        chain(future, event.future());
     }
 
-    private void process(final TopicMetadataApplicationEvent event) {
+    private void process(final TopicMetadataEvent event) {
         final CompletableFuture<Map<String, List<PartitionInfo>>> future;
 
-        long expirationTimeMs = 
getExpirationTimeForTimeout(event.getTimeoutMs());
-        if (event.isAllTopics()) {
+        long expirationTimeMs = getExpirationTimeForTimeout(event.timeoutMs());
+        if (!event.topic().isPresent()) {
             future = 
requestManagers.topicMetadataRequestManager.requestAllTopicsMetadata(expirationTimeMs);
         } else {
-            future = 
requestManagers.topicMetadataRequestManager.requestTopicMetadata(event.topic(), 
expirationTimeMs);
+            future = 
requestManagers.topicMetadataRequestManager.requestTopicMetadata(event.topic().get(),
 expirationTimeMs);
         }
 
-        event.chain(future);
+        chain(future, event.future());

Review Comment:
   This is related to the use of `Optional` in the `TopicMetadataEvent` class.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java:
##########
@@ -29,11 +27,10 @@
  */
 public class GroupMetadataUpdateEvent extends BackgroundEvent {
 
-    final private int memberEpoch;
-    final private String memberId;
+    private final int memberEpoch;
+    private final String memberId;

Review Comment:
   Being anal again.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java:
##########
@@ -26,7 +26,7 @@
 /**
  * An event handler that receives {@link BackgroundEvent background events} 
from the
  * {@link ConsumerNetworkThread network thread} which are then made available 
to the application thread
- * via the {@link BackgroundEventProcessor}.
+ * via an {@link EventProcessor}.

Review Comment:
   Fixing an old comment. `BackgroundEventProcessor` doesn't exist anymore.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -142,40 +143,40 @@ private void process(final PollApplicationEvent event) {
         requestManagers.heartbeatRequestManager.ifPresent(hrm -> 
hrm.resetPollTimer(event.pollTimeMs()));
     }
 
-    private void process(final AsyncCommitApplicationEvent event) {
+    private void process(final AsyncCommitEvent event) {
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        CompletableFuture<Void> commitResult = 
manager.commitAsync(event.offsets());
-        event.chain(commitResult);
+        CompletableFuture<Void> future = manager.commitAsync(event.offsets());
+        chain(future, event.future());

Review Comment:
   I wanted to make the different method implementations more consistent, so I 
renamed all the `CompletableFuture` variables to `future` to be consistent/anal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to