lianetm commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1419257433


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -979,12 +1026,124 @@ private CompletableFuture<Void> 
invokeOnPartitionsLostCallback(Set<TopicPartitio
         // behaviour.
         Optional<ConsumerRebalanceListener> listener = 
subscriptions.rebalanceListener();
         if (!partitionsLost.isEmpty() && listener.isPresent()) {
-            throw new UnsupportedOperationException("User-defined callbacks 
not supported yet");
+            return enqueueConsumerRebalanceListenerCallback(onPartitionsLost, 
partitionsLost);
         } else {
             return CompletableFuture.completedFuture(null);
         }
     }
 
+    /**
+     * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to 
trigger the execution of the
+     * appropriate {@link ConsumerRebalanceListener} {@link 
ConsumerRebalanceListenerMethodName method} on the
+     * application thread.
+     *
+     * <p/>
+     *
+     * This method is essentially "giving" the baton from the background 
thread to the application thread for
+     * processing of the reconciliation logic. It will "receive" the "baton" 
back via the
+     * {@link 
#consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerMethodName,
 Optional)} method.
+     *
+     * <p/>
+     *
+     * Because the reconciliation process (run in the background thread) will 
be blocked by the application thread
+     * until it completes this, we need to leave a {@link 
ConsumerRebalanceListenerCallbackBreadcrumb breadcrumb}
+     * by which to remember where we left off.
+     *
+     * @param methodName Callback method that needs to be executed on the 
application thread
+     * @param partitions Partitions to supply to the callback method
+     * @return Future that will be chained within the rest of the 
reconciliation logic
+     */
+    private CompletableFuture<Void> 
enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName 
methodName,
+                                                                             
Set<TopicPartition> partitions) {
+        if (breadcrumb != null) {
+            // In this case, there was already an existing breadcrumb, so we 
need to report the matter back to the user.
+            String s = "An internal error occurred; an attempt to schedule the 
" +
+                    methodName + " method for execution during rebalancing 
failed because " +
+                    breadcrumb.methodName + " was already scheduled";
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.completeExceptionally(new KafkaException(s));
+            return future;
+        }
+
+        // This is the happy path—there isn't an existing breadcrumb, so we 
can schedule our new event
+        // without hesitation.
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        breadcrumb = new 
ConsumerRebalanceListenerCallbackBreadcrumb(methodName, future);
+        SortedSet<TopicPartition> sortedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+        sortedPartitions.addAll(partitions);
+        BackgroundEvent event = new 
ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
+        backgroundEventHandler.add(event);
+        log.debug("The event to trigger the {} method execution was enqueued 
successfully", methodName);
+
+        return future;
+    }
+
+    /**
+     * Signals that a {@link ConsumerRebalanceListener} callback has 
completed. This is invoked when the
+     * application thread has completed the callback and has submitted a
+     * {@link ConsumerRebalanceListenerCallbackCompletedEvent} to the network 
I/O thread. At this point, we
+     * notify the state machine that it's complete so that it can move to the 
next appropriate step of the
+     * rebalance process.
+     *
+     * <p/>
+     *
+     * This method is "receiving" the baton back from the application thread 
after having "given" it to the
+     * application thread via the
+     * {@link 
#enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName, 
Set)} method.
+     *
+     * @param methodName Method name of the callback that was executed
+     * @param error Optional error that was thrown by the callback, captured, 
and forwarded here
+     */
+    @Override
+    public void 
consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerMethodName 
methodName,
+                                                           
Optional<KafkaException> error) {
+        if (breadcrumb == null) {
+            // In this case, we're somehow completing a callback for which we 
don't have a recorded breadcrumb.
+            // Because of that, we don't have a Future that can be completed, 
so we're left having to report it
+            // back to the user asynchronously.
+            String s = "An internal error occurred; the " + methodName + " 
method was executed " +
+                    "during rebalancing, but there was no record of it being 
scheduled";
+            backgroundEventHandler.add(new ErrorBackgroundEvent(new 
KafkaException(s)));
+            return;
+        }

Review Comment:
   I may be oversimplifying here or missing something, but I see this as 
similar to my [comment 
above](https://github.com/apache/kafka/pull/14640#discussion_r1412390835), I 
would keep it simple and just throw `RuntimeExeption` given that this would be 
a bug in our code misusing the internal object `breadcrum`, so ok to stop the 
app, and have us change the code. We do want to stop the world in this 
situation, wouldn't the throw achieve that here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to