This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9f657abf3a2 MINOR: Improve consumer rebalance callbacks docs (#20528)
9f657abf3a2 is described below

commit 9f657abf3a2b9a97f93548c41f1ee290110af2cb
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Sep 16 11:12:19 2025 -0400

    MINOR: Improve consumer rebalance callbacks docs (#20528)
    
    Clarify rebalance callbacks behaviour (got some questions for
    onPartitionsAssigned, docs where indeed confusing about the partitions
    received in params).   Reviewed all rebalance callbacks with it.
    
    Reviewers: Bill Bejeck<[email protected]>
---
 .../consumer/PlaintextConsumerCallbackTest.java    | 84 ++++++++++++++++++++++
 .../consumer/ConsumerRebalanceListener.java        | 57 ++++++++++-----
 .../kafka/clients/consumer/KafkaConsumer.java      |  9 ++-
 3 files changed, 129 insertions(+), 21 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java
index c81a3cd1667..0e09d62033b 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java
@@ -27,7 +27,9 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 
 import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
@@ -166,6 +168,80 @@ public class PlaintextConsumerCallbackTest {
         
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(CLASSIC);
     }
 
+    @ClusterTest
+    public void 
testOnPartitionsAssignedCalledWithNewPartitionsOnlyForClassicCooperative() 
throws InterruptedException {
+        try (var consumer = createClassicConsumerCooperativeProtocol()) {
+            testOnPartitionsAssignedCalledWithExpectedPartitions(consumer, 
true);
+        }
+    }
+
+    @ClusterTest
+    public void 
testOnPartitionsAssignedCalledWithNewPartitionsOnlyForAsyncConsumer() throws 
InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            testOnPartitionsAssignedCalledWithExpectedPartitions(consumer, 
true);
+        }
+    }
+
+    @ClusterTest
+    public void 
testOnPartitionsAssignedCalledWithNewPartitionsOnlyForClassicEager() throws 
InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            testOnPartitionsAssignedCalledWithExpectedPartitions(consumer, 
false);
+        }
+    }
+
+    private void testOnPartitionsAssignedCalledWithExpectedPartitions(
+            Consumer<byte[], byte[]> consumer,
+            boolean expectNewPartitionsOnlyInCallback) throws 
InterruptedException {
+        subscribeAndExpectOnPartitionsAssigned(consumer, List.of(topic), 
List.of(tp));
+        assertEquals(Set.of(tp), consumer.assignment());
+
+        // Add a new partition assignment while keeping the previous one
+        String newTopic = "newTopic";
+        TopicPartition addedPartition = new TopicPartition(newTopic, 0);
+        List<TopicPartition> expectedPartitionsInCallback;
+        if (expectNewPartitionsOnlyInCallback) {
+            expectedPartitionsInCallback = List.of(addedPartition);
+        } else {
+            expectedPartitionsInCallback = List.of(tp, addedPartition);
+        }
+
+        // Change subscription to keep the previous one and add a new topic. 
Assignment should be updated
+        // to contain partitions from both topics, but the 
onPartitionsAssigned parameters may containing
+        // the full new assignment or just the newly added partitions 
depending on the case.
+        subscribeAndExpectOnPartitionsAssigned(
+                consumer,
+                List.of(topic, newTopic),
+                expectedPartitionsInCallback);
+        assertEquals(Set.of(tp, addedPartition), consumer.assignment());
+    }
+
+    private void subscribeAndExpectOnPartitionsAssigned(Consumer<byte[], 
byte[]> consumer, List<String> topics, Collection<TopicPartition> 
expectedPartitionsInCallback) throws InterruptedException {
+        var partitionsAssigned = new AtomicBoolean(false);
+        AtomicReference<Collection<TopicPartition>> partitionsFromCallback = 
new AtomicReference<>();
+        consumer.subscribe(topics, new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                if (partitions.containsAll(expectedPartitionsInCallback)) {
+                    partitionsFromCallback.set(partitions);
+                    partitionsAssigned.set(true);
+                }
+            }
+
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                // noop
+            }
+        });
+        ClientsTestUtils.pollUntilTrue(
+                consumer,
+                partitionsAssigned::get,
+                "Timed out before expected rebalance completed"
+        );
+        // These are different types, so comparing values instead
+        
assertTrue(expectedPartitionsInCallback.containsAll(partitionsFromCallback.get())
 && partitionsFromCallback.get().containsAll(expectedPartitionsInCallback),
+                "Expected partitions " + expectedPartitionsInCallback + " as 
parameter for onPartitionsAssigned, but got " + partitionsFromCallback.get());
+    }
+
     @ClusterTest
     public void 
testAsyncConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback()
 throws InterruptedException {
         
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(CONSUMER);
@@ -284,4 +360,12 @@ public class PlaintextConsumerCallbackTest {
             ENABLE_AUTO_COMMIT_CONFIG, "false"
         ));
     }
+
+    private Consumer<byte[], byte[]> 
createClassicConsumerCooperativeProtocol() {
+        return cluster.consumer(Map.of(
+                GROUP_PROTOCOL_CONFIG, CLASSIC.name.toLowerCase(Locale.ROOT),
+                ENABLE_AUTO_COMMIT_CONFIG, "false",
+                ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
+        ));
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
index c49b2c8045a..23e045b7600 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -50,7 +50,7 @@ import java.util.Collection;
  * Under normal conditions, if a partition is reassigned from one consumer to 
another, then the old consumer will
  * always invoke {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} 
for that partition prior to the new consumer
  * invoking {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} for 
the same partition. So if offsets or other state is saved in the
- * {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call by one 
consumer member, it will be always accessible by the time the
+ * {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call by one 
consumer member, it will always be accessible by the time the
  * other consumer member taking over that partition and triggering its {@link 
#onPartitionsAssigned(Collection) onPartitionsAssigned} callback to load the 
state.
  * <p>
  * You can think of revocation as a graceful way to give up ownership of a 
partition. In some cases, the consumer may not have an opportunity to do so.
@@ -120,13 +120,31 @@ public interface ConsumerRebalanceListener {
     /**
      * A callback method the user can implement to provide handling of offset 
commits to a customized store.
      * This method will be called during a rebalance operation when the 
consumer has to give up some partitions.
-     * It can also be called when consumer is being closed ({@link 
KafkaConsumer#close(CloseOptions option)})
-     * or is unsubscribing ({@link KafkaConsumer#unsubscribe()}).
+     * The consumer may need to give up some partitions (thus this callback 
executed) under the following scenarios:
+     * <ul>
+     *     <li>If the consumer assignment changes</li>
+     *     <li>If the consumer is being closed ({@link 
KafkaConsumer#close(CloseOptions option)})</li>
+     *     <li>If the consumer is unsubscribing ({@link 
KafkaConsumer#unsubscribe()})</li>
+     * </ul>
      * It is recommended that offsets should be committed in this callback to 
either Kafka or a
      * custom offset store to prevent duplicate data.
      * <p>
-     * In eager rebalancing, it will always be called at the start of a 
rebalance and after the consumer stops fetching data.
-     * In cooperative rebalancing, it will be called at the end of a rebalance 
on the set of partitions being revoked iff the set is non-empty.
+     * This callback is always called before re-assigning the partitions.
+     * If the consumer is using the {@link GroupProtocol#CLASSIC} rebalance 
protocol:
+     * <ul>
+     *     <li>
+     *         In eager rebalancing, onPartitionsRevoked will be called with 
the full set of assigned partitions as a parameter (all partitions are revoked).
+     *         It will be called even if there are no partitions to revoke.
+     *     </li>
+     *     <li>
+     *         In cooperative rebalancing, onPartitionsRevoked will be called 
with the set of partitions to revoke,
+     *         iff the set is non-empty.
+     *     </li>
+     * </ul>
+     * If the consumer is using the {@link GroupProtocol#CONSUMER} rebalance 
protocol, this callback will be called
+     * with the set of partitions to revoke iff the set is non-empty
+     * (same behavior as the {@link GroupProtocol#CLASSIC} rebalance protocol 
with Cooperative mode).
+     * <p>
      * For examples on usage of this API, see Usage Examples section of {@link 
KafkaConsumer KafkaConsumer}.
      * <p>
      * It is common for the revocation callback to use the consumer instance 
in order to commit offsets. It is possible
@@ -135,8 +153,9 @@ public interface ConsumerRebalanceListener {
      * invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which 
this callback is being executed. This means it is not
      * necessary to catch these exceptions and re-attempt to wakeup or 
interrupt the consumer thread.
      *
-     * @param partitions The list of partitions that were assigned to the 
consumer and now need to be revoked (may not
-     *                   include all currently assigned partitions, i.e. there 
may still be some partitions left)
+     * @param partitions The list of partitions that were assigned to the 
consumer and now need to be revoked. This will
+     *                  include the full assignment under the Classic/Eager 
protocol, given that it revokes all partitions.
+     *                   It will only include the subset to revoke under the 
Classic/Cooperative and Consumer protocols.
      * @throws org.apache.kafka.common.errors.WakeupException If raised from a 
nested call to {@link KafkaConsumer}
      * @throws org.apache.kafka.common.errors.InterruptException If raised 
from a nested call to {@link KafkaConsumer}
      */
@@ -144,12 +163,13 @@ public interface ConsumerRebalanceListener {
 
     /**
      * A callback method the user can implement to provide handling of 
customized offsets on completion of a successful
-     * partition re-assignment. This method will be called after the partition 
re-assignment completes and before the
-     * consumer starts fetching data, and only as the result of a {@link 
Consumer#poll(java.time.Duration) poll(long)} call.
+     * partition re-assignment. This method will be called after the partition 
re-assignment completes (even if no new
+     * partitions were assigned to the consumer), and before the consumer 
starts fetching data,
+     * and only as the result of a {@link Consumer#poll(java.time.Duration) 
poll(long)} call.
      * <p>
      * It is guaranteed that under normal conditions all the processes in a 
consumer group will execute their
-     * {@link #onPartitionsRevoked(Collection)} callback before any instance 
executes its
-     * {@link #onPartitionsAssigned(Collection)} callback. During exceptional 
scenarios, partitions may be migrated
+     * {@link #onPartitionsRevoked(Collection)} callback before any instance 
executes this onPartitionsAssigned callback.
+     * During exceptional scenarios, partitions may be migrated
      * without the old owner being notified (i.e. their {@link 
#onPartitionsRevoked(Collection)} callback not triggered),
      * and later when the old owner consumer realized this event, the {@link 
#onPartitionsLost(Collection)} callback
      * will be triggered by the consumer then.
@@ -160,9 +180,11 @@ public interface ConsumerRebalanceListener {
      * invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which 
this callback is being executed. This means it is not
      * necessary to catch these exceptions and re-attempt to wakeup or 
interrupt the consumer thread.
      *
-     * @param partitions The list of partitions that are now assigned to the 
consumer (previously owned partitions will
-     *                   NOT be included, i.e. this list will only include 
newly added partitions)
-     * @throws org.apache.kafka.common.errors.WakeupException If raised from a 
nested call to {@link KafkaConsumer}
+     * @param partitions Partitions that have been added to the assignment as 
a result of the rebalance.
+     *                   Note that partitions that were already owned by this 
consumer and remain assigned are not
+     *                   included in this list under the Classic/Cooperative 
or Consumer protocols. THe full assignment
+     *                   will be received under the Classic/Eager protocol.
+     * @throws org.apache.kafka.common.errors.WakeupException    If raised 
from a nested call to {@link KafkaConsumer}
      * @throws org.apache.kafka.common.errors.InterruptException If raised 
from a nested call to {@link KafkaConsumer}
      */
     void onPartitionsAssigned(Collection<TopicPartition> partitions);
@@ -187,10 +209,9 @@ public interface ConsumerRebalanceListener {
      * necessary to catch these exceptions and re-attempt to wakeup or 
interrupt the consumer thread.
      *
      * @param partitions The list of partitions that were assigned to the 
consumer and now have been reassigned
-     *                   to other consumers. With the current protocol this 
will always include all of the consumer's
-     *                   previously assigned partitions, but this may change 
in future protocols (ie there would still
-     *                   be some partitions left)
-     * @throws org.apache.kafka.common.errors.WakeupException If raised from a 
nested call to {@link KafkaConsumer}
+     *                   to other consumers. With both, the Classic and 
Consumer protocols, this will always include
+     *                   all partitions that were previously assigned to the 
consumer.
+     * @throws org.apache.kafka.common.errors.WakeupException    If raised 
from a nested call to {@link KafkaConsumer}
      * @throws org.apache.kafka.common.errors.InterruptException If raised 
from a nested call to {@link KafkaConsumer}
      */
     default void onPartitionsLost(Collection<TopicPartition> partitions) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index e74cf0414a8..9f1992d6568 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -661,7 +661,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * If the given list of topics is empty, it is treated the same as {@link 
#unsubscribe()}.
      *
      * <p>
-     * As part of group management, the consumer will keep track of the list 
of consumers that belong to a particular
+     * As part of group management, the group coordinator will keep track of 
the list of consumers that belong to a particular
      * group and will trigger a rebalance operation if any one of the 
following events are triggered:
      * <ul>
      * <li>Number of partitions change for any of the subscribed topics
@@ -670,8 +670,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> 
{
      * <li>A new member is added to the consumer group
      * </ul>
      * <p>
-     * When any of these events are triggered, the provided listener will be 
invoked first to indicate that
-     * the consumer's assignment has been revoked, and then again when the new 
assignment has been received.
+     * When any of these events are triggered, the provided listener will be 
invoked in this way:
+     * <ul>
+     *     <li>{@link 
ConsumerRebalanceListener#onPartitionsRevoked(Collection)} will be invoked with 
the partitions to revoke, before re-assigning those partitions to another 
consumer.</li>
+     *     <li>{@link 
ConsumerRebalanceListener#onPartitionsAssigned(Collection)} will be invoked 
when the rebalance completes (even if no new partitions are assigned to the 
consumer)</li>
+     * </ul>
      * Note that rebalances will only occur during an active call to {@link 
#poll(Duration)}, so callbacks will
      * also only be invoked during that time.
      *

Reply via email to