kirktrue commented on code in PR #15438: URL: https://github.com/apache/kafka/pull/15438#discussion_r1505131769
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ########## @@ -16,10 +16,14 @@ */ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.common.Uuid; + import java.util.Objects; /** - * This is the abstract definition of the events created by the KafkaConsumer API + * This is the abstract definition of the events created by the {@link AsyncKafkaConsumer} on the user's + * application thread. Review Comment: Minor comments tweak. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java: ########## @@ -99,18 +100,17 @@ public long maximumTimeToWait() { * * <p/> * - * See {@link CompletableApplicationEvent#get(Timer)} and {@link Future#get(long, TimeUnit)} for more details. + * See {@link ConsumerUtils#getResult(Future, Timer)} and {@link Future#get(long, TimeUnit)} for more details. * * @param event A {@link CompletableApplicationEvent} created by the polling thread - * @param timer Timer for which to wait for the event to complete * @return Value that is the result of the event * @param <T> Type of return value of the event */ public <T> T addAndGet(final CompletableApplicationEvent<T> event, final Timer timer) { Objects.requireNonNull(event, "CompletableApplicationEvent provided to addAndGet must be non-null"); Objects.requireNonNull(timer, "Timer provided to addAndGet must be non-null"); add(event); - return event.get(timer); + return ConsumerUtils.getResult(event.future(), timer); } Review Comment: The `CompletableApplicationEvent.get()` method was removed. Since it was just a shim over `ConsumerUtils.getResult()` anyway, I removed it to avoid confusion/misuse. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -988,12 +988,11 @@ public List<PartitionInfo> partitionsFor(String topic, Duration timeout) { throw new TimeoutException(); } - final TopicMetadataApplicationEvent topicMetadataApplicationEvent = - new TopicMetadataApplicationEvent(topic, timeout.toMillis()); - wakeupTrigger.setActiveTask(topicMetadataApplicationEvent.future()); + final TopicMetadataEvent topicMetadataEvent = new TopicMetadataEvent(Optional.of(topic), timeout.toMillis()); + wakeupTrigger.setActiveTask(topicMetadataEvent.future()); Review Comment: A minor internal change was to use an `Optional` for the `TopicMetadataEvent` constructor to indicate "all topics" (`Optional.empty()`) vs a specific topic (`Optional.of(topic)`). ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java: ########## @@ -22,14 +22,14 @@ import java.util.Collections; import java.util.Map; -public abstract class CommitApplicationEvent extends CompletableApplicationEvent<Void> { +public abstract class CommitEvent extends CompletableApplicationEvent<Void> { /** * Offsets to commit per partition. */ private final Map<TopicPartition, OffsetAndMetadata> offsets; - public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, Type type) { + protected CommitEvent(final Type type, final Map<TopicPartition, OffsetAndMetadata> offsets) { Review Comment: I reordered the parameters and made it `protected` since its an abstract class. Just being a little anal again. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -142,40 +143,40 @@ private void process(final PollApplicationEvent event) { requestManagers.heartbeatRequestManager.ifPresent(hrm -> hrm.resetPollTimer(event.pollTimeMs())); } - private void process(final AsyncCommitApplicationEvent event) { + private void process(final AsyncCommitEvent event) { CommitRequestManager manager = requestManagers.commitRequestManager.get(); - CompletableFuture<Void> commitResult = manager.commitAsync(event.offsets()); - event.chain(commitResult); + CompletableFuture<Void> future = manager.commitAsync(event.offsets()); + chain(future, event.future()); } - private void process(final SyncCommitApplicationEvent event) { + private void process(final SyncCommitEvent event) { CommitRequestManager manager = requestManagers.commitRequestManager.get(); long expirationTimeoutMs = getExpirationTimeForTimeout(event.retryTimeoutMs()); - CompletableFuture<Void> commitResult = manager.commitSync(event.offsets(), expirationTimeoutMs); - event.chain(commitResult); + CompletableFuture<Void> future = manager.commitSync(event.offsets(), expirationTimeoutMs); + chain(future, event.future()); } - private void process(final FetchCommittedOffsetsApplicationEvent event) { + private void process(final FetchCommittedOffsetsEvent event) { if (!requestManagers.commitRequestManager.isPresent()) { event.future().completeExceptionally(new KafkaException("Unable to fetch committed " + "offset because the CommittedRequestManager is not available. Check if group.id was set correctly")); return; } CommitRequestManager manager = requestManagers.commitRequestManager.get(); long expirationTimeMs = getExpirationTimeForTimeout(event.timeout()); - event.chain(manager.fetchOffsets(event.partitions(), expirationTimeMs)); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = manager.fetchOffsets(event.partitions(), expirationTimeMs); + chain(future, event.future()); } private void process(final NewTopicsMetadataUpdateRequestEvent ignored) { metadata.requestUpdateForNewTopics(); } - Review Comment: Sorry for the whitespace change, but this one has been bugging me for months. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -293,6 +294,16 @@ long getExpirationTimeForTimeout(final long timeoutMs) { return expiration; } + private <T> void chain(final CompletableFuture<T> a, final CompletableFuture<T> b) { + a.whenComplete((value, exception) -> { + if (exception != null) { + b.completeExceptionally(exception); + } else { + b.complete(value); + } + }); + } + Review Comment: `chain()` has been in `CompletedApplicationEvent` since its inception, but since that method is only used in this one class, I thought I'd migrate it here. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1017,11 +1016,10 @@ public Map<String, List<PartitionInfo>> listTopics(Duration timeout) { throw new TimeoutException(); } - final TopicMetadataApplicationEvent topicMetadataApplicationEvent = - new TopicMetadataApplicationEvent(timeout.toMillis()); - wakeupTrigger.setActiveTask(topicMetadataApplicationEvent.future()); + final TopicMetadataEvent topicMetadataEvent = new TopicMetadataEvent(Optional.empty(), timeout.toMillis()); + wakeupTrigger.setActiveTask(topicMetadataEvent.future()); Review Comment: Same thing here with the use of `Optional`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -213,38 +214,38 @@ private void process(final SubscriptionChangeApplicationEvent ignored) { * execution for releasing the assignment completes, and the request to leave * the group is sent out. */ - private void process(final UnsubscribeApplicationEvent event) { + private void process(final UnsubscribeEvent event) { if (!requestManagers.heartbeatRequestManager.isPresent()) { KafkaException error = new KafkaException("Group membership manager not present when processing an unsubscribe event"); event.future().completeExceptionally(error); return; } MembershipManager membershipManager = requestManagers.heartbeatRequestManager.get().membershipManager(); - CompletableFuture<Void> result = membershipManager.leaveGroup(); - event.chain(result); + CompletableFuture<Void> future = membershipManager.leaveGroup(); + chain(future, event.future()); } - private void process(final ResetPositionsApplicationEvent event) { - CompletableFuture<Void> result = requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); - event.chain(result); + private void process(final ResetPositionsEvent event) { + CompletableFuture<Void> future = requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); + chain(future, event.future()); } - private void process(final ValidatePositionsApplicationEvent event) { - CompletableFuture<Void> result = requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); - event.chain(result); + private void process(final ValidatePositionsEvent event) { + CompletableFuture<Void> future = requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); + chain(future, event.future()); } - private void process(final TopicMetadataApplicationEvent event) { + private void process(final TopicMetadataEvent event) { final CompletableFuture<Map<String, List<PartitionInfo>>> future; - long expirationTimeMs = getExpirationTimeForTimeout(event.getTimeoutMs()); - if (event.isAllTopics()) { + long expirationTimeMs = getExpirationTimeForTimeout(event.timeoutMs()); + if (!event.topic().isPresent()) { future = requestManagers.topicMetadataRequestManager.requestAllTopicsMetadata(expirationTimeMs); } else { - future = requestManagers.topicMetadataRequestManager.requestTopicMetadata(event.topic(), expirationTimeMs); + future = requestManagers.topicMetadataRequestManager.requestTopicMetadata(event.topic().get(), expirationTimeMs); } - event.chain(future); + chain(future, event.future()); Review Comment: This is related to the use of `Optional` in the `TopicMetadataEvent` class. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java: ########## @@ -29,11 +27,10 @@ */ public class GroupMetadataUpdateEvent extends BackgroundEvent { - final private int memberEpoch; - final private String memberId; + private final int memberEpoch; + private final String memberId; Review Comment: Being anal again. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java: ########## @@ -26,7 +26,7 @@ /** * An event handler that receives {@link BackgroundEvent background events} from the * {@link ConsumerNetworkThread network thread} which are then made available to the application thread - * via the {@link BackgroundEventProcessor}. + * via an {@link EventProcessor}. Review Comment: Fixing an old comment. `BackgroundEventProcessor` doesn't exist anymore. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -142,40 +143,40 @@ private void process(final PollApplicationEvent event) { requestManagers.heartbeatRequestManager.ifPresent(hrm -> hrm.resetPollTimer(event.pollTimeMs())); } - private void process(final AsyncCommitApplicationEvent event) { + private void process(final AsyncCommitEvent event) { CommitRequestManager manager = requestManagers.commitRequestManager.get(); - CompletableFuture<Void> commitResult = manager.commitAsync(event.offsets()); - event.chain(commitResult); + CompletableFuture<Void> future = manager.commitAsync(event.offsets()); + chain(future, event.future()); Review Comment: I wanted to make the different method implementations more consistent, so I renamed all the `CompletableFuture` variables to `future` to be consistent/anal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org