lianetm commented on code in PR #14640: URL: https://github.com/apache/kafka/pull/14640#discussion_r1419219318
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -971,12 +989,59 @@ private CompletableFuture<Void> invokeOnPartitionsLostCallback(Set<TopicPartitio // behaviour. Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener(); if (!partitionsLost.isEmpty() && listener.isPresent()) { - throw new UnsupportedOperationException("User-defined callbacks not supported yet"); + return enqueueConsumerRebalanceListenerCallback(ON_PARTITIONS_LOST, partitionsLost); } else { return CompletableFuture.completedFuture(null); } } + /** + * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to trigger the execution of the + * appropriate {@link ConsumerRebalanceListener} {@link ConsumerRebalanceListenerMethodName method} on the + * application thread. + * + * <p/> + * + * Because the reconciliation process (run in the background thread) will be blocked by the application thread + * until it completes this, we need to provide a {@link CompletableFuture} by which to remember where we left off. + * + * @param methodName Callback method that needs to be executed on the application thread + * @param partitions Partitions to supply to the callback method + * @return Future that will be chained within the rest of the reconciliation logic + */ + private CompletableFuture<Void> enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName methodName, + Set<TopicPartition> partitions) { + SortedSet<TopicPartition> sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + sortedPartitions.addAll(partitions); + CompletableBackgroundEvent<Void> event = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions); + backgroundEventHandler.add(event); + log.debug("The event to trigger the {} method execution was enqueued successfully", methodName.fullyQualifiedMethodName()); + return event.future(); + } + + @Override + public void consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerCallbackCompletedEvent event) { + ConsumerRebalanceListenerMethodName methodName = event.methodName(); + Optional<KafkaException> error = event.error(); + CompletableFuture<Void> future = event.future(); + + if (error.isPresent()) { + String message = error.get().getMessage(); + log.warn( + "The {} method completed with an error ({}); signaling to continue to the next phase of rebalance", + methodName.fullyQualifiedMethodName(), + message + ); + } else { + log.debug( + "The {} method completed successfully; signaling to continue to the next phase of rebalance", + methodName.fullyQualifiedMethodName() + ); + } + + future.complete(null); Review Comment: Regarding the retry path that @dajac mentioned, I expect the same. When callbacks fail (and we propagate the error), the state machine will stay `RECONCILING` with `assignmentsReadyToReconcile`, that will be retried on the next reconciliation loop, which is the behaviour in the legacy coordinator too described [onJoinComplete](https://github.com/apache/kafka/blob/964e73178b5b8363cd2685ce6872905ef0c04dee/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L236). That being said, this needs more though regarding the callback failures. When callbacks fail, the subscription state is updated anyways (old and new code), so in the old one, will the partitions be identified as revoked and the callback retried? (If so we do need to make sure the reconciliation does the same. In its current shape I expect it will view it as [sameAssignment](https://github.com/apache/kafka/blob/9d2297ad2d4e6478c472b3811000db36ca9b49a5/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L619), so it won't retry the reconciliation) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org