kirktrue commented on code in PR #18202:
URL: https://github.com/apache/kafka/pull/18202#discussion_r2176203498


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java:
##########
@@ -54,77 +64,102 @@ public class ConsumerRebalanceListenerInvoker {
         this.metricsManager = metricsManager;
     }
 
+    /**
+     * Invokes the onPartitionsAssigned callback method from the rebalance 
listener and logs the result.
+     *
+     * @param assignedPartitions the partitions assigned to the consumer
+     * @return an exception if an error occurred, or null if no error
+     */
     public Exception invokePartitionsAssigned(final SortedSet<TopicPartition> 
assignedPartitions) {
-        log.info("Adding newly assigned partitions: {}", 
assignedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));
-
-        Optional<ConsumerRebalanceListener> listener = 
subscriptions.rebalanceListener();
+        return invokeRebalanceCallback("Adding newly assigned partitions", 
assignedPartitions,
+                listener -> listener.onPartitionsAssigned(assignedPartitions),
+                metricsManager::recordPartitionsAssignedLatency);
+    }
 
-        if (listener.isPresent()) {
-            try {
-                final long startMs = time.milliseconds();
-                listener.get().onPartitionsAssigned(assignedPartitions);
-                
metricsManager.recordPartitionsAssignedLatency(time.milliseconds() - startMs);
-            } catch (WakeupException | InterruptException e) {
-                throw e;
-            } catch (Exception e) {
-                log.error("User provided listener {} failed on invocation of 
onPartitionsAssigned for partitions {}",
-                        listener.get().getClass().getName(), 
assignedPartitions, e);
-                return e;
-            }
-        }
+    /**
+     * Invokes the onPartitionsRevoked callback method from the rebalance 
listener and logs the result.
+     *
+     * @param revokedPartitions the partitions revoked from the consumer
+     * @return an exception if an error occurred, or null if no error
+     */
+    public Exception invokePartitionsRevoked(final SortedSet<TopicPartition> 
revokedPartitions) {
+        return invokeRebalanceCallback("Revoke previously assigned 
partitions", revokedPartitions,
+                listener -> {
+                    Set<TopicPartition> revokePausedPartitions = 
subscriptions.pausedPartitions();
+                    revokePausedPartitions.retainAll(revokedPartitions);
+                    if (!revokePausedPartitions.isEmpty()) {
+                        log.info("The pause flag in partitions [{}] will be 
removed due to revocation.",
+                                
revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));
+                    }

Review Comment:
   What do you think about extracting this logic from 
`invokePartitionsRevoked()` and `invokePartitionsLost()` and making it a 
separate utility method? Something like:
   
   ```java
   private void maybeLogPausedPartitions(SortedSet<TopicPartition> partitions, 
String reason) {
       if (!log.isInfoEnabled())
           return;
   
       Set<TopicPartition> pausedPartitions = subscriptions.pausedPartitions();
       pausedPartitions.retainAll(partitions);
   
       if (!pausedPartitions.isEmpty()) {
           log.info(
               "The pause flag in partitions [{}] will be removed due to {}.",
               reason
               
pausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 "))
           );
       }
   }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java:
##########
@@ -54,77 +64,102 @@ public class ConsumerRebalanceListenerInvoker {
         this.metricsManager = metricsManager;
     }
 
+    /**
+     * Invokes the onPartitionsAssigned callback method from the rebalance 
listener and logs the result.
+     *
+     * @param assignedPartitions the partitions assigned to the consumer
+     * @return an exception if an error occurred, or null if no error
+     */
     public Exception invokePartitionsAssigned(final SortedSet<TopicPartition> 
assignedPartitions) {
-        log.info("Adding newly assigned partitions: {}", 
assignedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));
-
-        Optional<ConsumerRebalanceListener> listener = 
subscriptions.rebalanceListener();
+        return invokeRebalanceCallback("Adding newly assigned partitions", 
assignedPartitions,
+                listener -> listener.onPartitionsAssigned(assignedPartitions),
+                metricsManager::recordPartitionsAssignedLatency);
+    }
 
-        if (listener.isPresent()) {
-            try {
-                final long startMs = time.milliseconds();
-                listener.get().onPartitionsAssigned(assignedPartitions);
-                
metricsManager.recordPartitionsAssignedLatency(time.milliseconds() - startMs);
-            } catch (WakeupException | InterruptException e) {
-                throw e;
-            } catch (Exception e) {
-                log.error("User provided listener {} failed on invocation of 
onPartitionsAssigned for partitions {}",
-                        listener.get().getClass().getName(), 
assignedPartitions, e);
-                return e;
-            }
-        }
+    /**
+     * Invokes the onPartitionsRevoked callback method from the rebalance 
listener and logs the result.
+     *
+     * @param revokedPartitions the partitions revoked from the consumer
+     * @return an exception if an error occurred, or null if no error
+     */
+    public Exception invokePartitionsRevoked(final SortedSet<TopicPartition> 
revokedPartitions) {
+        return invokeRebalanceCallback("Revoke previously assigned 
partitions", revokedPartitions,
+                listener -> {
+                    Set<TopicPartition> revokePausedPartitions = 
subscriptions.pausedPartitions();
+                    revokePausedPartitions.retainAll(revokedPartitions);
+                    if (!revokePausedPartitions.isEmpty()) {
+                        log.info("The pause flag in partitions [{}] will be 
removed due to revocation.",
+                                
revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));
+                    }
+                    listener.onPartitionsRevoked(revokedPartitions);
+                },
+                metricsManager::recordPartitionsRevokedLatency);
+    }
 
-        return null;
+    /**
+     * Invokes the onPartitionsLost callback method from the rebalance 
listener and logs the result.
+     *
+     * @param lostPartitions the partitions lost from the consumer
+     * @return an exception if an error occurred, or null if no error
+     */
+    public Exception invokePartitionsLost(final SortedSet<TopicPartition> 
lostPartitions) {
+        return invokeRebalanceCallback("Lost previously assigned partitions", 
lostPartitions,
+                listener -> {
+                    Set<TopicPartition> lostPausedPartitions = 
subscriptions.pausedPartitions();
+                    lostPausedPartitions.retainAll(lostPartitions);
+                    if (!lostPausedPartitions.isEmpty()) {
+                        log.info("The pause flag in partitions [{}] will be 
removed due to partition lost.",
+                                
lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));

Review Comment:
   There's a bug in the existing code: `invokePartitionsLost()` should pass in 
`lostPausedPartitions` to the log message, not `lostPartitions`. That supports 
the argument to add the utility method to log the message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to