kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1419795598


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -971,12 +989,59 @@ private CompletableFuture<Void> 
invokeOnPartitionsLostCallback(Set<TopicPartitio
         // behaviour.
         Optional<ConsumerRebalanceListener> listener = 
subscriptions.rebalanceListener();
         if (!partitionsLost.isEmpty() && listener.isPresent()) {
-            throw new UnsupportedOperationException("User-defined callbacks 
not supported yet");
+            return 
enqueueConsumerRebalanceListenerCallback(ON_PARTITIONS_LOST, partitionsLost);
         } else {
             return CompletableFuture.completedFuture(null);
         }
     }
 
+    /**
+     * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to 
trigger the execution of the
+     * appropriate {@link ConsumerRebalanceListener} {@link 
ConsumerRebalanceListenerMethodName method} on the
+     * application thread.
+     *
+     * <p/>
+     *
+     * Because the reconciliation process (run in the background thread) will 
be blocked by the application thread
+     * until it completes this, we need to provide a {@link CompletableFuture} 
by which to remember where we left off.
+     *
+     * @param methodName Callback method that needs to be executed on the 
application thread
+     * @param partitions Partitions to supply to the callback method
+     * @return Future that will be chained within the rest of the 
reconciliation logic
+     */
+    private CompletableFuture<Void> 
enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName 
methodName,
+                                                                             
Set<TopicPartition> partitions) {
+        SortedSet<TopicPartition> sortedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+        sortedPartitions.addAll(partitions);
+        CompletableBackgroundEvent<Void> event = new 
ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
+        backgroundEventHandler.add(event);
+        log.debug("The event to trigger the {} method execution was enqueued 
successfully", methodName.fullyQualifiedMethodName());
+        return event.future();
+    }
+
+    @Override
+    public void 
consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerCallbackCompletedEvent
 event) {
+        ConsumerRebalanceListenerMethodName methodName = event.methodName();
+        Optional<KafkaException> error = event.error();
+        CompletableFuture<Void> future = event.future();
+
+        if (error.isPresent()) {
+            String message = error.get().getMessage();
+            log.warn(
+                "The {} method completed with an error ({}); signaling to 
continue to the next phase of rebalance",
+                methodName.fullyQualifiedMethodName(),
+                message
+            );
+        } else {
+            log.debug(
+                "The {} method completed successfully; signaling to continue 
to the next phase of rebalance",
+                methodName.fullyQualifiedMethodName()
+            );
+        }
+
+        future.complete(null);

Review Comment:
   @dajac & @lianetm—
   
   My intention was that the callback invocation logic _only_ perform these 
steps:
   
   1. Invoke the correct callback method
   2. Capture any error from the invocation
   3. Package up and forward the result of the invocation via another event 
   
   Of course, the `MembershipManagerImpl` is the only entity interested in 
those results.
   
   I intentionally made no attempt to control the flow of the reconciliation 
logic in the callback invocation layer. I assume it is preferable for the 
`MembershipManagerImpl` to be the sole arbiter for reconciliation flow.
   
   It follows that the `MembershipManagerImpl` would be expected to take the 
result of the callback invocation and decide what to do next: fail, retry, or 
continue:
   
   * If the decision is to _fail_, the `MembershipManagerImpl` should enqueue a 
new 'error event' onto the background event queue where it would bubble up to 
the user
   * If the decision is to _retry_, the `MembershipManagerImpl` can choose to 
enqueue another `ConsumerRebalanceListenerCallbackNeededEvent` onto the 
background event queue
   * If the decision is to _continue_, the `MembershipManagerImpl` might simply 
log an error and keep going.
   
   The above description is simplified, but it hopefully captures the intention 
of the callback invocation.
   
   Let me know if we agree on those points, or if there's more discussion 
needed.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -971,12 +989,59 @@ private CompletableFuture<Void> 
invokeOnPartitionsLostCallback(Set<TopicPartitio
         // behaviour.
         Optional<ConsumerRebalanceListener> listener = 
subscriptions.rebalanceListener();
         if (!partitionsLost.isEmpty() && listener.isPresent()) {
-            throw new UnsupportedOperationException("User-defined callbacks 
not supported yet");
+            return 
enqueueConsumerRebalanceListenerCallback(ON_PARTITIONS_LOST, partitionsLost);
         } else {
             return CompletableFuture.completedFuture(null);
         }
     }
 
+    /**
+     * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to 
trigger the execution of the
+     * appropriate {@link ConsumerRebalanceListener} {@link 
ConsumerRebalanceListenerMethodName method} on the
+     * application thread.
+     *
+     * <p/>
+     *
+     * Because the reconciliation process (run in the background thread) will 
be blocked by the application thread
+     * until it completes this, we need to provide a {@link CompletableFuture} 
by which to remember where we left off.
+     *
+     * @param methodName Callback method that needs to be executed on the 
application thread
+     * @param partitions Partitions to supply to the callback method
+     * @return Future that will be chained within the rest of the 
reconciliation logic
+     */
+    private CompletableFuture<Void> 
enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName 
methodName,
+                                                                             
Set<TopicPartition> partitions) {
+        SortedSet<TopicPartition> sortedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+        sortedPartitions.addAll(partitions);
+        CompletableBackgroundEvent<Void> event = new 
ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
+        backgroundEventHandler.add(event);
+        log.debug("The event to trigger the {} method execution was enqueued 
successfully", methodName.fullyQualifiedMethodName());
+        return event.future();
+    }
+
+    @Override
+    public void 
consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerCallbackCompletedEvent
 event) {
+        ConsumerRebalanceListenerMethodName methodName = event.methodName();
+        Optional<KafkaException> error = event.error();
+        CompletableFuture<Void> future = event.future();
+
+        if (error.isPresent()) {
+            String message = error.get().getMessage();
+            log.warn(
+                "The {} method completed with an error ({}); signaling to 
continue to the next phase of rebalance",
+                methodName.fullyQualifiedMethodName(),
+                message
+            );
+        } else {
+            log.debug(
+                "The {} method completed successfully; signaling to continue 
to the next phase of rebalance",
+                methodName.fullyQualifiedMethodName()
+            );
+        }
+
+        future.complete(null);

Review Comment:
   @dajac & @lianetm—
   
   My intention was that the callback invocation logic _only_ perform these 
steps:
   
   1. Invoke the correct callback method
   2. Capture any error from the invocation
   3. Package up and forward the result of the invocation via another event 
   
   Of course, the `MembershipManagerImpl` is the only entity interested in 
those results.
   
   I intentionally made no attempt to control the flow of the reconciliation 
logic in the callback invocation layer. I assume it is preferable for the 
`MembershipManagerImpl` to be the sole arbiter for reconciliation flow.
   
   It follows that the `MembershipManagerImpl` would be expected to take the 
result of the callback invocation and decide what to do next: fail, retry, or 
continue:
   
   * If the decision is to _fail_, the `MembershipManagerImpl` should enqueue a 
new 'error event' onto the background event queue where it would bubble up to 
the user
   * If the decision is to _retry_, the `MembershipManagerImpl` can choose to 
enqueue another `ConsumerRebalanceListenerCallbackNeededEvent` onto the 
background event queue
   * If the decision is to _continue_, the `MembershipManagerImpl` might simply 
log an error and keep going.
   
   The above description is simplified, but it hopefully captures the intention 
of the callback invocation.
   
   Let me know if we agree on those points, or if there's more discussion 
needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to