kirktrue commented on code in PR #18202: URL: https://github.com/apache/kafka/pull/18202#discussion_r2176203498
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java: ########## @@ -54,77 +64,102 @@ public class ConsumerRebalanceListenerInvoker { this.metricsManager = metricsManager; } + /** + * Invokes the onPartitionsAssigned callback method from the rebalance listener and logs the result. + * + * @param assignedPartitions the partitions assigned to the consumer + * @return an exception if an error occurred, or null if no error + */ public Exception invokePartitionsAssigned(final SortedSet<TopicPartition> assignedPartitions) { - log.info("Adding newly assigned partitions: {}", assignedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); - - Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener(); + return invokeRebalanceCallback("Adding newly assigned partitions", assignedPartitions, + listener -> listener.onPartitionsAssigned(assignedPartitions), + metricsManager::recordPartitionsAssignedLatency); + } - if (listener.isPresent()) { - try { - final long startMs = time.milliseconds(); - listener.get().onPartitionsAssigned(assignedPartitions); - metricsManager.recordPartitionsAssignedLatency(time.milliseconds() - startMs); - } catch (WakeupException | InterruptException e) { - throw e; - } catch (Exception e) { - log.error("User provided listener {} failed on invocation of onPartitionsAssigned for partitions {}", - listener.get().getClass().getName(), assignedPartitions, e); - return e; - } - } + /** + * Invokes the onPartitionsRevoked callback method from the rebalance listener and logs the result. + * + * @param revokedPartitions the partitions revoked from the consumer + * @return an exception if an error occurred, or null if no error + */ + public Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revokedPartitions) { + return invokeRebalanceCallback("Revoke previously assigned partitions", revokedPartitions, + listener -> { + Set<TopicPartition> revokePausedPartitions = subscriptions.pausedPartitions(); + revokePausedPartitions.retainAll(revokedPartitions); + if (!revokePausedPartitions.isEmpty()) { + log.info("The pause flag in partitions [{}] will be removed due to revocation.", + revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + } Review Comment: What do you think about extracting this logic from `invokePartitionsRevoked()` and `invokePartitionsLost()` and making it a separate utility method? Something like: ```java private void maybeLogPausedPartitions(SortedSet<TopicPartition> partitions, String reason) { if (!log.isInfoEnabled()) return; Set<TopicPartition> pausedPartitions = subscriptions.pausedPartitions(); pausedPartitions.retainAll(partitions); if (!pausedPartitions.isEmpty()) { log.info( "The pause flag in partitions [{}] will be removed due to {}.", reason pausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")) ); } } ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java: ########## @@ -54,77 +64,102 @@ public class ConsumerRebalanceListenerInvoker { this.metricsManager = metricsManager; } + /** + * Invokes the onPartitionsAssigned callback method from the rebalance listener and logs the result. + * + * @param assignedPartitions the partitions assigned to the consumer + * @return an exception if an error occurred, or null if no error + */ public Exception invokePartitionsAssigned(final SortedSet<TopicPartition> assignedPartitions) { - log.info("Adding newly assigned partitions: {}", assignedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); - - Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener(); + return invokeRebalanceCallback("Adding newly assigned partitions", assignedPartitions, + listener -> listener.onPartitionsAssigned(assignedPartitions), + metricsManager::recordPartitionsAssignedLatency); + } - if (listener.isPresent()) { - try { - final long startMs = time.milliseconds(); - listener.get().onPartitionsAssigned(assignedPartitions); - metricsManager.recordPartitionsAssignedLatency(time.milliseconds() - startMs); - } catch (WakeupException | InterruptException e) { - throw e; - } catch (Exception e) { - log.error("User provided listener {} failed on invocation of onPartitionsAssigned for partitions {}", - listener.get().getClass().getName(), assignedPartitions, e); - return e; - } - } + /** + * Invokes the onPartitionsRevoked callback method from the rebalance listener and logs the result. + * + * @param revokedPartitions the partitions revoked from the consumer + * @return an exception if an error occurred, or null if no error + */ + public Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revokedPartitions) { + return invokeRebalanceCallback("Revoke previously assigned partitions", revokedPartitions, + listener -> { + Set<TopicPartition> revokePausedPartitions = subscriptions.pausedPartitions(); + revokePausedPartitions.retainAll(revokedPartitions); + if (!revokePausedPartitions.isEmpty()) { + log.info("The pause flag in partitions [{}] will be removed due to revocation.", + revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + } + listener.onPartitionsRevoked(revokedPartitions); + }, + metricsManager::recordPartitionsRevokedLatency); + } - return null; + /** + * Invokes the onPartitionsLost callback method from the rebalance listener and logs the result. + * + * @param lostPartitions the partitions lost from the consumer + * @return an exception if an error occurred, or null if no error + */ + public Exception invokePartitionsLost(final SortedSet<TopicPartition> lostPartitions) { + return invokeRebalanceCallback("Lost previously assigned partitions", lostPartitions, + listener -> { + Set<TopicPartition> lostPausedPartitions = subscriptions.pausedPartitions(); + lostPausedPartitions.retainAll(lostPartitions); + if (!lostPausedPartitions.isEmpty()) { + log.info("The pause flag in partitions [{}] will be removed due to partition lost.", + lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); Review Comment: There's a bug in the existing code: `invokePartitionsLost()` should pass in `lostPausedPartitions` to the log message, not `lostPartitions`. That supports the argument to add the utility method to log the message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org