dajac commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1414403961


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -979,12 +1026,124 @@ private CompletableFuture<Void> 
invokeOnPartitionsLostCallback(Set<TopicPartitio
         // behaviour.
         Optional<ConsumerRebalanceListener> listener = 
subscriptions.rebalanceListener();
         if (!partitionsLost.isEmpty() && listener.isPresent()) {
-            throw new UnsupportedOperationException("User-defined callbacks 
not supported yet");
+            return enqueueConsumerRebalanceListenerCallback(onPartitionsLost, 
partitionsLost);
         } else {
             return CompletableFuture.completedFuture(null);
         }
     }
 
+    /**
+     * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to 
trigger the execution of the
+     * appropriate {@link ConsumerRebalanceListener} {@link 
ConsumerRebalanceListenerMethodName method} on the
+     * application thread.
+     *
+     * <p/>
+     *
+     * This method is essentially "giving" the baton from the background 
thread to the application thread for
+     * processing of the reconciliation logic. It will "receive" the "baton" 
back via the
+     * {@link 
#consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerMethodName,
 Optional)} method.
+     *
+     * <p/>
+     *
+     * Because the reconciliation process (run in the background thread) will 
be blocked by the application thread
+     * until it completes this, we need to leave a {@link 
ConsumerRebalanceListenerCallbackBreadcrumb breadcrumb}
+     * by which to remember where we left off.
+     *
+     * @param methodName Callback method that needs to be executed on the 
application thread
+     * @param partitions Partitions to supply to the callback method
+     * @return Future that will be chained within the rest of the 
reconciliation logic
+     */
+    private CompletableFuture<Void> 
enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName 
methodName,
+                                                                             
Set<TopicPartition> partitions) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ConsumerRebalanceListenerCallbackBreadcrumb newBreadcrumb = new 
ConsumerRebalanceListenerCallbackBreadcrumb(
+            methodName,
+            future
+        );
+
+        if (breadcrumbRef.compareAndSet(null, newBreadcrumb)) {

Review Comment:
   I have doubts about this. My understanding is that you are trying to prevent 
the state machine from scheduling multiple callbacks. Is my understanding right?
   
   For the context, I think that the current implementation of the state 
machine can actually schedule multiple callbacks. It does not do it in the 
reconciliation process but it could happen when the member is fenced or when 
the user leaves/unsubscribe. So, I think that this won't work. @lianetm It 
would be great if you could look into this as well.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * Event that signifies that the network I/O thread wants to invoke one of the 
callback methods on the
+ * {@link ConsumerRebalanceListener}. This event will be processed by the 
application thread when the next
+ * {@link Consumer#poll(Duration)} call is performed by the user. When 
processed, the application thread should
+ * invoke the appropriate callback method (based on {@link #methodName()}) 
with the given partitions.
+ */
+public class ConsumerRebalanceListenerCallbackNeededEvent extends 
BackgroundEvent {
+
+    private final ConsumerRebalanceListenerMethodName methodName;
+    private final SortedSet<TopicPartition> partitions;
+
+    public 
ConsumerRebalanceListenerCallbackNeededEvent(ConsumerRebalanceListenerMethodName
 methodName,

Review Comment:
   There is actually a desire to have the ability to execute multiple callbacks 
(e.g. onAssigned, onRevoked) with a single event. Would it be possible to 
handle this here? @lianetm Could provide more details.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java:
##########
@@ -66,15 +66,15 @@ Exception invokePartitionsAssigned(final 
SortedSet<TopicPartition> assignedParti
                 throw e;
             } catch (Exception e) {
                 log.error("User provided listener {} failed on invocation of 
onPartitionsAssigned for partitions {}",
-                        listener.getClass().getName(), assignedPartitions, e);
+                        listener.get().getClass().getName(), 
assignedPartitions, e);

Review Comment:
   Should we get this change in a separate PR as this touches the legacy 
consumer as well?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -183,6 +183,10 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
         return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, 
Collections.singletonList(request));
     }
 
+    public MembershipManager membershipManager() {
+        return membershipManager;
+    }

Review Comment:
   Why do we need this?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerMethodName.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+/**
+ * This class just provides a static name for the methods in the {@link 
ConsumerRebalanceListener} interface
+ * for a bit more compile time assurance.
+ */
+public enum ConsumerRebalanceListenerMethodName {
+
+    onPartitionsRevoked, onPartitionsAssigned, onPartitionsLost;

Review Comment:
   Enums should be all caps, like constants.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -650,7 +693,7 @@ boolean reconcile() {
                         "\tCurrent owned partitions:                  {}\n" +
                         "\tAdded partitions (assigned - owned):       {}\n" +
                         "\tRevoked partitions (owned - assigned):     {}\n",
-                assignedTopicIdPartitions,
+                assignedTopicPartitions,

Review Comment:
   Hum... I would keep it as it was as the topic ids may be useful. @lianetm 
What do you think?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1376,6 +1425,66 @@ private void subscribeInternal(Collection<String> 
topics, Optional<ConsumerRebal
         }
     }
 
+    /**
+     * This method can be used by cases where the caller has an event that 
needs to both block for completion but
+     * also process background events. For some events, in order to fully 
process the associated logic, the
+     * {@link ConsumerNetworkThread background thread} needs assistance from 
the application thread to complete.
+     * If the application thread simply blocked on the event after submitting 
it, the processing would deadlock.
+     * The logic herein is basically a loop that performs two tasks in each 
iteration:
+     *
+     * <ol>
+     *     <li>Process background events, if any</li>
+     *     <li><em>Briefly</em> wait for {@link CompletableApplicationEvent an 
event} to complete</li>
+     * </ol>
+     *
+     * <p/>
+     *
+     * Each iteration gives the application thread an opportunity to process 
background events, which may be
+     * necessary to complete the overall processing.
+     *
+     * <p/>
+     *
+     * As an example, take {@link #unsubscribe()}. To start unsubscribing, the 
application thread enqueues an
+     * {@link UnsubscribeApplicationEvent} on the application event queue. 
That event will eventually trigger the
+     * rebalancing logic in the background thread. Critically, as part of this 
rebalancing work, the
+     * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} 
callback needs to be invoked. However,
+     * this callback must be executed on the application thread. To achieve 
this, the background thread enqueues a
+     * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background 
event queue. That event queue is
+     * periodically queried by the application thread to see if there's work 
to be done. When the application thread
+     * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is 
processed, and then a
+     * {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then 
enqueued by the application thread on the
+     * background event queue. Moments later, the background thread will see 
that event, process it, and continue
+     * execution of the rebalancing logic. The rebalancing logic cannot 
complete until the
+     * {@link ConsumerRebalanceListener} callback is performed.
+     *
+     * @param event Event that contains a {@link CompletableFuture}; it is on 
this future that the application thread
+     *              will wait for completion
+     * @param timer Overall timer that bounds how long the application thread 
will wait for the event to complete
+     * @return {@code true} if the event completed within the timeout, {@code 
false} otherwise
+     */
+    private boolean processBackgroundEvents(CompletableApplicationEvent<?> 
event, Timer timer) {
+        log.trace("Enqueuing event {} for processing; will wait up to {} ms to 
complete", event, timer.remainingMs());
+
+        do {
+            backgroundEventProcessor.process();

Review Comment:
   Does this call process all the pending events or only one event?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to