lianetm commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1419257433
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -979,12 +1026,124 @@ private CompletableFuture<Void> invokeOnPartitionsLostCallback(Set<TopicPartitio // behaviour. Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener(); if (!partitionsLost.isEmpty() && listener.isPresent()) { - throw new UnsupportedOperationException("User-defined callbacks not supported yet"); + return enqueueConsumerRebalanceListenerCallback(onPartitionsLost, partitionsLost); } else { return CompletableFuture.completedFuture(null); } } + /** + * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to trigger the execution of the + * appropriate {@link ConsumerRebalanceListener} {@link ConsumerRebalanceListenerMethodName method} on the + * application thread. + * + * <p/> + * + * This method is essentially "giving" the baton from the background thread to the application thread for + * processing of the reconciliation logic. It will "receive" the "baton" back via the + * {@link #consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerMethodName, Optional)} method. + * + * <p/> + * + * Because the reconciliation process (run in the background thread) will be blocked by the application thread + * until it completes this, we need to leave a {@link ConsumerRebalanceListenerCallbackBreadcrumb breadcrumb} + * by which to remember where we left off. + * + * @param methodName Callback method that needs to be executed on the application thread + * @param partitions Partitions to supply to the callback method + * @return Future that will be chained within the rest of the reconciliation logic + */ + private CompletableFuture<Void> enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName methodName, + Set<TopicPartition> partitions) { + if (breadcrumb != null) { + // In this case, there was already an existing breadcrumb, so we need to report the matter back to the user. + String s = "An internal error occurred; an attempt to schedule the " + + methodName + " method for execution during rebalancing failed because " + + breadcrumb.methodName + " was already scheduled"; + CompletableFuture<Void> future = new CompletableFuture<>(); + future.completeExceptionally(new KafkaException(s)); + return future; + } + + // This is the happy path—there isn't an existing breadcrumb, so we can schedule our new event + // without hesitation. + CompletableFuture<Void> future = new CompletableFuture<>(); + breadcrumb = new ConsumerRebalanceListenerCallbackBreadcrumb(methodName, future); + SortedSet<TopicPartition> sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + sortedPartitions.addAll(partitions); + BackgroundEvent event = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions); + backgroundEventHandler.add(event); + log.debug("The event to trigger the {} method execution was enqueued successfully", methodName); + + return future; + } + + /** + * Signals that a {@link ConsumerRebalanceListener} callback has completed. This is invoked when the + * application thread has completed the callback and has submitted a + * {@link ConsumerRebalanceListenerCallbackCompletedEvent} to the network I/O thread. At this point, we + * notify the state machine that it's complete so that it can move to the next appropriate step of the + * rebalance process. + * + * <p/> + * + * This method is "receiving" the baton back from the application thread after having "given" it to the + * application thread via the + * {@link #enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName, Set)} method. + * + * @param methodName Method name of the callback that was executed + * @param error Optional error that was thrown by the callback, captured, and forwarded here + */ + @Override + public void consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerMethodName methodName, + Optional<KafkaException> error) { + if (breadcrumb == null) { + // In this case, we're somehow completing a callback for which we don't have a recorded breadcrumb. + // Because of that, we don't have a Future that can be completed, so we're left having to report it + // back to the user asynchronously. + String s = "An internal error occurred; the " + methodName + " method was executed " + + "during rebalancing, but there was no record of it being scheduled"; + backgroundEventHandler.add(new ErrorBackgroundEvent(new KafkaException(s))); + return; + } Review Comment: I may be oversimplifying here or missing something, but I see this as similar to my [comment above](https://github.com/apache/kafka/pull/14640#discussion_r1412390835), I would keep it simple and just throw `RuntimeExeption` given that this would be a bug in our code misusing the internal object `breadcrum`, so ok to stop the app, and have us change the code. We do want to stop the world in this situation, wouldn't the throw achieve that here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org