This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9f657abf3a2 MINOR: Improve consumer rebalance callbacks docs (#20528)
9f657abf3a2 is described below
commit 9f657abf3a2b9a97f93548c41f1ee290110af2cb
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Sep 16 11:12:19 2025 -0400
MINOR: Improve consumer rebalance callbacks docs (#20528)
Clarify rebalance callbacks behaviour (got some questions for
onPartitionsAssigned, docs where indeed confusing about the partitions
received in params). Reviewed all rebalance callbacks with it.
Reviewers: Bill Bejeck<[email protected]>
---
.../consumer/PlaintextConsumerCallbackTest.java | 84 ++++++++++++++++++++++
.../consumer/ConsumerRebalanceListener.java | 57 ++++++++++-----
.../kafka/clients/consumer/KafkaConsumer.java | 9 ++-
3 files changed, 129 insertions(+), 21 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java
index c81a3cd1667..0e09d62033b 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java
@@ -27,7 +27,9 @@ import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import static
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
@@ -166,6 +168,80 @@ public class PlaintextConsumerCallbackTest {
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(CLASSIC);
}
+ @ClusterTest
+ public void
testOnPartitionsAssignedCalledWithNewPartitionsOnlyForClassicCooperative()
throws InterruptedException {
+ try (var consumer = createClassicConsumerCooperativeProtocol()) {
+ testOnPartitionsAssignedCalledWithExpectedPartitions(consumer,
true);
+ }
+ }
+
+ @ClusterTest
+ public void
testOnPartitionsAssignedCalledWithNewPartitionsOnlyForAsyncConsumer() throws
InterruptedException {
+ try (var consumer = createConsumer(CONSUMER)) {
+ testOnPartitionsAssignedCalledWithExpectedPartitions(consumer,
true);
+ }
+ }
+
+ @ClusterTest
+ public void
testOnPartitionsAssignedCalledWithNewPartitionsOnlyForClassicEager() throws
InterruptedException {
+ try (var consumer = createConsumer(CLASSIC)) {
+ testOnPartitionsAssignedCalledWithExpectedPartitions(consumer,
false);
+ }
+ }
+
+ private void testOnPartitionsAssignedCalledWithExpectedPartitions(
+ Consumer<byte[], byte[]> consumer,
+ boolean expectNewPartitionsOnlyInCallback) throws
InterruptedException {
+ subscribeAndExpectOnPartitionsAssigned(consumer, List.of(topic),
List.of(tp));
+ assertEquals(Set.of(tp), consumer.assignment());
+
+ // Add a new partition assignment while keeping the previous one
+ String newTopic = "newTopic";
+ TopicPartition addedPartition = new TopicPartition(newTopic, 0);
+ List<TopicPartition> expectedPartitionsInCallback;
+ if (expectNewPartitionsOnlyInCallback) {
+ expectedPartitionsInCallback = List.of(addedPartition);
+ } else {
+ expectedPartitionsInCallback = List.of(tp, addedPartition);
+ }
+
+ // Change subscription to keep the previous one and add a new topic.
Assignment should be updated
+ // to contain partitions from both topics, but the
onPartitionsAssigned parameters may containing
+ // the full new assignment or just the newly added partitions
depending on the case.
+ subscribeAndExpectOnPartitionsAssigned(
+ consumer,
+ List.of(topic, newTopic),
+ expectedPartitionsInCallback);
+ assertEquals(Set.of(tp, addedPartition), consumer.assignment());
+ }
+
+ private void subscribeAndExpectOnPartitionsAssigned(Consumer<byte[],
byte[]> consumer, List<String> topics, Collection<TopicPartition>
expectedPartitionsInCallback) throws InterruptedException {
+ var partitionsAssigned = new AtomicBoolean(false);
+ AtomicReference<Collection<TopicPartition>> partitionsFromCallback =
new AtomicReference<>();
+ consumer.subscribe(topics, new ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
+ if (partitions.containsAll(expectedPartitionsInCallback)) {
+ partitionsFromCallback.set(partitions);
+ partitionsAssigned.set(true);
+ }
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
+ // noop
+ }
+ });
+ ClientsTestUtils.pollUntilTrue(
+ consumer,
+ partitionsAssigned::get,
+ "Timed out before expected rebalance completed"
+ );
+ // These are different types, so comparing values instead
+
assertTrue(expectedPartitionsInCallback.containsAll(partitionsFromCallback.get())
&& partitionsFromCallback.get().containsAll(expectedPartitionsInCallback),
+ "Expected partitions " + expectedPartitionsInCallback + " as
parameter for onPartitionsAssigned, but got " + partitionsFromCallback.get());
+ }
+
@ClusterTest
public void
testAsyncConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback()
throws InterruptedException {
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(CONSUMER);
@@ -284,4 +360,12 @@ public class PlaintextConsumerCallbackTest {
ENABLE_AUTO_COMMIT_CONFIG, "false"
));
}
+
+ private Consumer<byte[], byte[]>
createClassicConsumerCooperativeProtocol() {
+ return cluster.consumer(Map.of(
+ GROUP_PROTOCOL_CONFIG, CLASSIC.name.toLowerCase(Locale.ROOT),
+ ENABLE_AUTO_COMMIT_CONFIG, "false",
+ ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
+ ));
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
index c49b2c8045a..23e045b7600 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -50,7 +50,7 @@ import java.util.Collection;
* Under normal conditions, if a partition is reassigned from one consumer to
another, then the old consumer will
* always invoke {@link #onPartitionsRevoked(Collection) onPartitionsRevoked}
for that partition prior to the new consumer
* invoking {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} for
the same partition. So if offsets or other state is saved in the
- * {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call by one
consumer member, it will be always accessible by the time the
+ * {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call by one
consumer member, it will always be accessible by the time the
* other consumer member taking over that partition and triggering its {@link
#onPartitionsAssigned(Collection) onPartitionsAssigned} callback to load the
state.
* <p>
* You can think of revocation as a graceful way to give up ownership of a
partition. In some cases, the consumer may not have an opportunity to do so.
@@ -120,13 +120,31 @@ public interface ConsumerRebalanceListener {
/**
* A callback method the user can implement to provide handling of offset
commits to a customized store.
* This method will be called during a rebalance operation when the
consumer has to give up some partitions.
- * It can also be called when consumer is being closed ({@link
KafkaConsumer#close(CloseOptions option)})
- * or is unsubscribing ({@link KafkaConsumer#unsubscribe()}).
+ * The consumer may need to give up some partitions (thus this callback
executed) under the following scenarios:
+ * <ul>
+ * <li>If the consumer assignment changes</li>
+ * <li>If the consumer is being closed ({@link
KafkaConsumer#close(CloseOptions option)})</li>
+ * <li>If the consumer is unsubscribing ({@link
KafkaConsumer#unsubscribe()})</li>
+ * </ul>
* It is recommended that offsets should be committed in this callback to
either Kafka or a
* custom offset store to prevent duplicate data.
* <p>
- * In eager rebalancing, it will always be called at the start of a
rebalance and after the consumer stops fetching data.
- * In cooperative rebalancing, it will be called at the end of a rebalance
on the set of partitions being revoked iff the set is non-empty.
+ * This callback is always called before re-assigning the partitions.
+ * If the consumer is using the {@link GroupProtocol#CLASSIC} rebalance
protocol:
+ * <ul>
+ * <li>
+ * In eager rebalancing, onPartitionsRevoked will be called with
the full set of assigned partitions as a parameter (all partitions are revoked).
+ * It will be called even if there are no partitions to revoke.
+ * </li>
+ * <li>
+ * In cooperative rebalancing, onPartitionsRevoked will be called
with the set of partitions to revoke,
+ * iff the set is non-empty.
+ * </li>
+ * </ul>
+ * If the consumer is using the {@link GroupProtocol#CONSUMER} rebalance
protocol, this callback will be called
+ * with the set of partitions to revoke iff the set is non-empty
+ * (same behavior as the {@link GroupProtocol#CLASSIC} rebalance protocol
with Cooperative mode).
+ * <p>
* For examples on usage of this API, see Usage Examples section of {@link
KafkaConsumer KafkaConsumer}.
* <p>
* It is common for the revocation callback to use the consumer instance
in order to commit offsets. It is possible
@@ -135,8 +153,9 @@ public interface ConsumerRebalanceListener {
* invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which
this callback is being executed. This means it is not
* necessary to catch these exceptions and re-attempt to wakeup or
interrupt the consumer thread.
*
- * @param partitions The list of partitions that were assigned to the
consumer and now need to be revoked (may not
- * include all currently assigned partitions, i.e. there
may still be some partitions left)
+ * @param partitions The list of partitions that were assigned to the
consumer and now need to be revoked. This will
+ * include the full assignment under the Classic/Eager
protocol, given that it revokes all partitions.
+ * It will only include the subset to revoke under the
Classic/Cooperative and Consumer protocols.
* @throws org.apache.kafka.common.errors.WakeupException If raised from a
nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised
from a nested call to {@link KafkaConsumer}
*/
@@ -144,12 +163,13 @@ public interface ConsumerRebalanceListener {
/**
* A callback method the user can implement to provide handling of
customized offsets on completion of a successful
- * partition re-assignment. This method will be called after the partition
re-assignment completes and before the
- * consumer starts fetching data, and only as the result of a {@link
Consumer#poll(java.time.Duration) poll(long)} call.
+ * partition re-assignment. This method will be called after the partition
re-assignment completes (even if no new
+ * partitions were assigned to the consumer), and before the consumer
starts fetching data,
+ * and only as the result of a {@link Consumer#poll(java.time.Duration)
poll(long)} call.
* <p>
* It is guaranteed that under normal conditions all the processes in a
consumer group will execute their
- * {@link #onPartitionsRevoked(Collection)} callback before any instance
executes its
- * {@link #onPartitionsAssigned(Collection)} callback. During exceptional
scenarios, partitions may be migrated
+ * {@link #onPartitionsRevoked(Collection)} callback before any instance
executes this onPartitionsAssigned callback.
+ * During exceptional scenarios, partitions may be migrated
* without the old owner being notified (i.e. their {@link
#onPartitionsRevoked(Collection)} callback not triggered),
* and later when the old owner consumer realized this event, the {@link
#onPartitionsLost(Collection)} callback
* will be triggered by the consumer then.
@@ -160,9 +180,11 @@ public interface ConsumerRebalanceListener {
* invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which
this callback is being executed. This means it is not
* necessary to catch these exceptions and re-attempt to wakeup or
interrupt the consumer thread.
*
- * @param partitions The list of partitions that are now assigned to the
consumer (previously owned partitions will
- * NOT be included, i.e. this list will only include
newly added partitions)
- * @throws org.apache.kafka.common.errors.WakeupException If raised from a
nested call to {@link KafkaConsumer}
+ * @param partitions Partitions that have been added to the assignment as
a result of the rebalance.
+ * Note that partitions that were already owned by this
consumer and remain assigned are not
+ * included in this list under the Classic/Cooperative
or Consumer protocols. THe full assignment
+ * will be received under the Classic/Eager protocol.
+ * @throws org.apache.kafka.common.errors.WakeupException If raised
from a nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised
from a nested call to {@link KafkaConsumer}
*/
void onPartitionsAssigned(Collection<TopicPartition> partitions);
@@ -187,10 +209,9 @@ public interface ConsumerRebalanceListener {
* necessary to catch these exceptions and re-attempt to wakeup or
interrupt the consumer thread.
*
* @param partitions The list of partitions that were assigned to the
consumer and now have been reassigned
- * to other consumers. With the current protocol this
will always include all of the consumer's
- * previously assigned partitions, but this may change
in future protocols (ie there would still
- * be some partitions left)
- * @throws org.apache.kafka.common.errors.WakeupException If raised from a
nested call to {@link KafkaConsumer}
+ * to other consumers. With both, the Classic and
Consumer protocols, this will always include
+ * all partitions that were previously assigned to the
consumer.
+ * @throws org.apache.kafka.common.errors.WakeupException If raised
from a nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised
from a nested call to {@link KafkaConsumer}
*/
default void onPartitionsLost(Collection<TopicPartition> partitions) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index e74cf0414a8..9f1992d6568 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -661,7 +661,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* If the given list of topics is empty, it is treated the same as {@link
#unsubscribe()}.
*
* <p>
- * As part of group management, the consumer will keep track of the list
of consumers that belong to a particular
+ * As part of group management, the group coordinator will keep track of
the list of consumers that belong to a particular
* group and will trigger a rebalance operation if any one of the
following events are triggered:
* <ul>
* <li>Number of partitions change for any of the subscribed topics
@@ -670,8 +670,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
* <li>A new member is added to the consumer group
* </ul>
* <p>
- * When any of these events are triggered, the provided listener will be
invoked first to indicate that
- * the consumer's assignment has been revoked, and then again when the new
assignment has been received.
+ * When any of these events are triggered, the provided listener will be
invoked in this way:
+ * <ul>
+ * <li>{@link
ConsumerRebalanceListener#onPartitionsRevoked(Collection)} will be invoked with
the partitions to revoke, before re-assigning those partitions to another
consumer.</li>
+ * <li>{@link
ConsumerRebalanceListener#onPartitionsAssigned(Collection)} will be invoked
when the rebalance completes (even if no new partitions are assigned to the
consumer)</li>
+ * </ul>
* Note that rebalances will only occur during an active call to {@link
#poll(Duration)}, so callbacks will
* also only be invoked during that time.
*