dajac commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1414403961
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -979,12 +1026,124 @@ private CompletableFuture<Void> invokeOnPartitionsLostCallback(Set<TopicPartitio // behaviour. Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener(); if (!partitionsLost.isEmpty() && listener.isPresent()) { - throw new UnsupportedOperationException("User-defined callbacks not supported yet"); + return enqueueConsumerRebalanceListenerCallback(onPartitionsLost, partitionsLost); } else { return CompletableFuture.completedFuture(null); } } + /** + * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to trigger the execution of the + * appropriate {@link ConsumerRebalanceListener} {@link ConsumerRebalanceListenerMethodName method} on the + * application thread. + * + * <p/> + * + * This method is essentially "giving" the baton from the background thread to the application thread for + * processing of the reconciliation logic. It will "receive" the "baton" back via the + * {@link #consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerMethodName, Optional)} method. + * + * <p/> + * + * Because the reconciliation process (run in the background thread) will be blocked by the application thread + * until it completes this, we need to leave a {@link ConsumerRebalanceListenerCallbackBreadcrumb breadcrumb} + * by which to remember where we left off. + * + * @param methodName Callback method that needs to be executed on the application thread + * @param partitions Partitions to supply to the callback method + * @return Future that will be chained within the rest of the reconciliation logic + */ + private CompletableFuture<Void> enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName methodName, + Set<TopicPartition> partitions) { + CompletableFuture<Void> future = new CompletableFuture<>(); + ConsumerRebalanceListenerCallbackBreadcrumb newBreadcrumb = new ConsumerRebalanceListenerCallbackBreadcrumb( + methodName, + future + ); + + if (breadcrumbRef.compareAndSet(null, newBreadcrumb)) { Review Comment: I have doubts about this. My understanding is that you are trying to prevent the state machine from scheduling multiple callbacks. Is my understanding right? For the context, I think that the current implementation of the state machine can actually schedule multiple callbacks. It does not do it in the reconciliation process but it could happen when the member is fenced or when the user leaves/unsubscribe. So, I think that this won't work. @lianetm It would be great if you could look into this as well. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName; +import org.apache.kafka.common.TopicPartition; + +import java.time.Duration; +import java.util.Collections; +import java.util.Objects; +import java.util.SortedSet; + +/** + * Event that signifies that the network I/O thread wants to invoke one of the callback methods on the + * {@link ConsumerRebalanceListener}. This event will be processed by the application thread when the next + * {@link Consumer#poll(Duration)} call is performed by the user. When processed, the application thread should + * invoke the appropriate callback method (based on {@link #methodName()}) with the given partitions. + */ +public class ConsumerRebalanceListenerCallbackNeededEvent extends BackgroundEvent { + + private final ConsumerRebalanceListenerMethodName methodName; + private final SortedSet<TopicPartition> partitions; + + public ConsumerRebalanceListenerCallbackNeededEvent(ConsumerRebalanceListenerMethodName methodName, Review Comment: There is actually a desire to have the ability to execute multiple callbacks (e.g. onAssigned, onRevoked) with a single event. Would it be possible to handle this here? @lianetm Could provide more details. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java: ########## @@ -66,15 +66,15 @@ Exception invokePartitionsAssigned(final SortedSet<TopicPartition> assignedParti throw e; } catch (Exception e) { log.error("User provided listener {} failed on invocation of onPartitionsAssigned for partitions {}", - listener.getClass().getName(), assignedPartitions, e); + listener.get().getClass().getName(), assignedPartitions, e); Review Comment: Should we get this change in a separate PR as this touches the legacy consumer as well? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -183,6 +183,10 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request)); } + public MembershipManager membershipManager() { + return membershipManager; + } Review Comment: Why do we need this? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerMethodName.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; + +/** + * This class just provides a static name for the methods in the {@link ConsumerRebalanceListener} interface + * for a bit more compile time assurance. + */ +public enum ConsumerRebalanceListenerMethodName { + + onPartitionsRevoked, onPartitionsAssigned, onPartitionsLost; Review Comment: Enums should be all caps, like constants. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -650,7 +693,7 @@ boolean reconcile() { "\tCurrent owned partitions: {}\n" + "\tAdded partitions (assigned - owned): {}\n" + "\tRevoked partitions (owned - assigned): {}\n", - assignedTopicIdPartitions, + assignedTopicPartitions, Review Comment: Hum... I would keep it as it was as the topic ids may be useful. @lianetm What do you think? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1376,6 +1425,66 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal } } + /** + * This method can be used by cases where the caller has an event that needs to both block for completion but + * also process background events. For some events, in order to fully process the associated logic, the + * {@link ConsumerNetworkThread background thread} needs assistance from the application thread to complete. + * If the application thread simply blocked on the event after submitting it, the processing would deadlock. + * The logic herein is basically a loop that performs two tasks in each iteration: + * + * <ol> + * <li>Process background events, if any</li> + * <li><em>Briefly</em> wait for {@link CompletableApplicationEvent an event} to complete</li> + * </ol> + * + * <p/> + * + * Each iteration gives the application thread an opportunity to process background events, which may be + * necessary to complete the overall processing. + * + * <p/> + * + * As an example, take {@link #unsubscribe()}. To start unsubscribing, the application thread enqueues an + * {@link UnsubscribeApplicationEvent} on the application event queue. That event will eventually trigger the + * rebalancing logic in the background thread. Critically, as part of this rebalancing work, the + * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} callback needs to be invoked. However, + * this callback must be executed on the application thread. To achieve this, the background thread enqueues a + * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background event queue. That event queue is + * periodically queried by the application thread to see if there's work to be done. When the application thread + * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is processed, and then a + * {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then enqueued by the application thread on the + * background event queue. Moments later, the background thread will see that event, process it, and continue + * execution of the rebalancing logic. The rebalancing logic cannot complete until the + * {@link ConsumerRebalanceListener} callback is performed. + * + * @param event Event that contains a {@link CompletableFuture}; it is on this future that the application thread + * will wait for completion + * @param timer Overall timer that bounds how long the application thread will wait for the event to complete + * @return {@code true} if the event completed within the timeout, {@code false} otherwise + */ + private boolean processBackgroundEvents(CompletableApplicationEvent<?> event, Timer timer) { + log.trace("Enqueuing event {} for processing; will wait up to {} ms to complete", event, timer.remainingMs()); + + do { + backgroundEventProcessor.process(); Review Comment: Does this call process all the pending events or only one event? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org