showuon commented on a change in pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#discussion_r742575556



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -2007,7 +2007,7 @@ public OffsetAndMetadata committed(TopicPartition 
partition, final Duration time
      * Suspend fetching from the requested partitions. Future calls to {@link 
#poll(Duration)} will not return
      * any records from these partitions until they have been resumed using 
{@link #resume(Collection)}.
      * Note that this method does not affect partition subscription. In 
particular, it does not cause a group
-     * rebalance when automatic assignment is used.
+     * rebalance when automatic assignment is used. And groupRebalance does 
not preserve pause state.

Review comment:
       How about this:
   ```
   * ...
   * rebalance when automatic assignment is used.
   * 
   * Note: Rebalance will not preserve the pause/resume state.
   ```
   
   

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -307,6 +307,10 @@ private Exception invokePartitionsAssigned(final 
Set<TopicPartition> assignedPar
 
     private Exception invokePartitionsRevoked(final Set<TopicPartition> 
revokedPartitions) {
         log.info("Revoke previously assigned partitions {}", 
Utils.join(revokedPartitions, ", "));
+        Set<TopicPartition> revokePausedPartitions = 
subscriptions.pausedPartitions();
+        revokePausedPartitions.retainAll(revokedPartitions);
+        if (!revokePausedPartitions.isEmpty())
+            log.info("Revoke previously paused partitions {}", 
Utils.join(revokePausedPartitions, ", "));

Review comment:
       I'm afraid that users might get confused, because it looks like we 
revoke twice:
   ```
   Revoke previously assigned partitions tp1, tp2.
   Revoke previously paused partitions tp1
   ```
   
   How about it:
   `log.info("The pause flag in partitions [{}] will be removed due to 
revocation.", Utils.join(revokePausedPartitions, ", "));`
   WDYT?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to