This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new f5f1060a81d KAFKA-14352: Rack-aware consumer partition assignment
protocol changes (KIP-881) (#12954)
f5f1060a81d is described below
commit f5f1060a81dd200e573ef69574ea75b64b73474c
Author: Rajini Sivaram <[email protected]>
AuthorDate: Wed Dec 7 11:41:21 2022 +0000
KAFKA-14352: Rack-aware consumer partition assignment protocol changes
(KIP-881) (#12954)
Reviewers: David Jacot <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../consumer/ConsumerPartitionAssignor.java | 15 +++--
.../kafka/clients/consumer/KafkaConsumer.java | 3 +-
.../consumer/internals/ConsumerCoordinator.java | 8 ++-
.../consumer/internals/ConsumerProtocol.java | 5 +-
.../common/message/ConsumerProtocolAssignment.json | 3 +-
.../message/ConsumerProtocolSubscription.json | 6 +-
.../consumer/CooperativeStickyAssignorTest.java | 8 +--
.../kafka/clients/consumer/KafkaConsumerTest.java | 3 +-
.../kafka/clients/consumer/StickyAssignorTest.java | 8 +--
.../internals/ConsumerCoordinatorTest.java | 67 +++++++++++++++++++---
.../consumer/internals/ConsumerProtocolTest.java | 23 ++++++--
.../kafka/api/PlaintextConsumerTest.scala | 17 ++++++
13 files changed, 136 insertions(+), 32 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ac0accc17fa..cea6a193790 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -66,7 +66,7 @@
<suppress checks="ParameterNumber"
files="(NetworkClient|FieldSpec|KafkaRaftClient).java"/>
<suppress checks="ParameterNumber"
- files="KafkaConsumer.java"/>
+ files="(KafkaConsumer|ConsumerCoordinator).java"/>
<suppress checks="ParameterNumber"
files="Fetcher.java"/>
<suppress checks="ParameterNumber"
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index f544ce77e6b..0488c2b8c8a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -103,27 +103,29 @@ public interface ConsumerPartitionAssignor {
private final List<String> topics;
private final ByteBuffer userData;
private final List<TopicPartition> ownedPartitions;
+ private final Optional<String> rackId;
private Optional<String> groupInstanceId;
private final Optional<Integer> generationId;
- public Subscription(List<String> topics, ByteBuffer userData,
List<TopicPartition> ownedPartitions, int generationId) {
+ public Subscription(List<String> topics, ByteBuffer userData,
List<TopicPartition> ownedPartitions, int generationId, Optional<String>
rackId) {
this.topics = topics;
this.userData = userData;
this.ownedPartitions = ownedPartitions;
this.groupInstanceId = Optional.empty();
this.generationId = generationId < 0 ? Optional.empty() :
Optional.of(generationId);
+ this.rackId = rackId;
}
public Subscription(List<String> topics, ByteBuffer userData,
List<TopicPartition> ownedPartitions) {
- this(topics, userData, ownedPartitions, DEFAULT_GENERATION);
+ this(topics, userData, ownedPartitions, DEFAULT_GENERATION,
Optional.empty());
}
public Subscription(List<String> topics, ByteBuffer userData) {
- this(topics, userData, Collections.emptyList(),
DEFAULT_GENERATION);
+ this(topics, userData, Collections.emptyList(),
DEFAULT_GENERATION, Optional.empty());
}
public Subscription(List<String> topics) {
- this(topics, null, Collections.emptyList(), DEFAULT_GENERATION);
+ this(topics, null, Collections.emptyList(), DEFAULT_GENERATION,
Optional.empty());
}
public List<String> topics() {
@@ -138,6 +140,10 @@ public interface ConsumerPartitionAssignor {
return ownedPartitions;
}
+ public Optional<String> rackId() {
+ return rackId;
+ }
+
public void setGroupInstanceId(Optional<String> groupInstanceId) {
this.groupInstanceId = groupInstanceId;
}
@@ -158,6 +164,7 @@ public interface ConsumerPartitionAssignor {
", ownedPartitions=" + ownedPartitions +
", groupInstanceId=" +
groupInstanceId.map(String::toString).orElse("null") +
", generationId=" + generationId.orElse(-1) +
+ ", rackId=" + (rackId.orElse("null")) +
")";
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f07846945a7..cf85798f82b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -790,7 +790,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
enableAutoCommit,
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
-
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
+ config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
}
this.fetcher = new Fetcher<>(
logContext,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 051962d4435..fec31fe80f8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -117,6 +117,7 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
private AtomicBoolean asyncCommitFenced;
private ConsumerGroupMetadata groupMetadata;
private final boolean throwOnFetchStableOffsetsUnsupported;
+ private final Optional<String> rackId;
// hold onto request&future for committed offset requests to enable async
calls.
private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
@@ -162,7 +163,8 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
boolean autoCommitEnabled,
int autoCommitIntervalMs,
ConsumerInterceptors<?, ?> interceptors,
- boolean throwOnFetchStableOffsetsUnsupported) {
+ boolean throwOnFetchStableOffsetsUnsupported,
+ String rackId) {
super(rebalanceConfig,
logContext,
client,
@@ -186,6 +188,7 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
this.groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId,
JoinGroupRequest.UNKNOWN_GENERATION_ID,
JoinGroupRequest.UNKNOWN_MEMBER_ID, rebalanceConfig.groupInstanceId);
this.throwOnFetchStableOffsetsUnsupported =
throwOnFetchStableOffsetsUnsupported;
+ this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() :
Optional.of(rackId);
if (autoCommitEnabled)
this.nextAutoCommitTimer = time.timer(autoCommitIntervalMs);
@@ -245,7 +248,8 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
Subscription subscription = new Subscription(topics,
assignor.subscriptionUserData(joinedSubscription),
subscriptions.assignedPartitionsList(),
-
generation().generationId);
+
generation().generationId,
+ rackId);
ByteBuffer metadata =
ConsumerProtocol.serializeSubscription(subscription);
protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index b237a3b0c21..79feb4eb57b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Optional;
/**
* ConsumerProtocol contains the schemas for consumer subscriptions and
assignments for use with
@@ -89,6 +90,7 @@ public class ConsumerProtocol {
}
partition.partitions().add(tp.partition());
}
+ subscription.rackId().ifPresent(data::setRackId);
data.setGenerationId(subscription.generationId().orElse(-1));
return MessageUtil.toVersionPrefixedByteBuffer(version, data);
@@ -112,7 +114,8 @@ public class ConsumerProtocol {
data.topics(),
data.userData() != null ? data.userData().duplicate() : null,
ownedPartitions,
- data.generationId());
+ data.generationId(),
+ data.rackId() == null || data.rackId().isEmpty() ?
Optional.empty() : Optional.of(data.rackId()));
} catch (BufferUnderflowException e) {
throw new SchemaException("Buffer underflow while parsing consumer
protocol's subscription", e);
}
diff --git
a/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
b/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
index 8b0c138d698..fe07aaeadff 100644
--- a/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
+++ b/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
@@ -23,7 +23,8 @@
// that new versions cannot remove or reorder any of the existing fields.
//
// Version 2 is to support a new field "GenerationId" in
ConsumerProtocolSubscription.
- "validVersions": "0-2",
+ // Version 3 adds rack id to ConsumerProtocolSubscription.
+ "validVersions": "0-3",
"flexibleVersions": "none",
"fields": [
{ "name": "AssignedPartitions", "type": "[]TopicPartition", "versions":
"0+",
diff --git
a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
index 6997d56a04b..49801c65f77 100644
---
a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
+++
b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
@@ -24,7 +24,8 @@
// Version 1 added the "OwnedPartitions" field to allow assigner know what
partitions each member owned
// Version 2 added a new field "GenerationId" to indicate if the member has
out-of-date ownedPartitions.
- "validVersions": "0-2",
+ // Version 3 adds rack id to enable rack-aware assignment.
+ "validVersions": "0-3",
"flexibleVersions": "none",
"fields": [
{ "name": "Topics", "type": "[]string", "versions": "0+" },
@@ -36,6 +37,7 @@
{ "name": "Partitions", "type": "[]int32", "versions": "1+"}
]
},
- { "name": "GenerationId", "type": "int32", "versions": "2+", "default":
"-1", "ignorable": true }
+ { "name": "GenerationId", "type": "int32", "versions": "2+", "default":
"-1", "ignorable": true },
+ { "name": "RackId", "type": "string", "versions": "3+",
"nullableVersions": "3+", "default": "null", "ignorable": true }
]
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
index 9284de50ab7..28a988b8d82 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
@@ -54,12 +54,12 @@ public class CooperativeStickyAssignorTest extends
AbstractStickyAssignorTest {
@Override
public Subscription buildSubscriptionV1(List<String> topics,
List<TopicPartition> partitions, int generationId) {
assignor.onAssignment(new
ConsumerPartitionAssignor.Assignment(partitions), new
ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty()));
- return new Subscription(topics, assignor.subscriptionUserData(new
HashSet<>(topics)), partitions, DEFAULT_GENERATION);
+ return new Subscription(topics, assignor.subscriptionUserData(new
HashSet<>(topics)), partitions, DEFAULT_GENERATION, Optional.empty());
}
@Override
public Subscription buildSubscriptionV2Above(List<String> topics,
List<TopicPartition> partitions, int generationId) {
- return new Subscription(topics, assignor.subscriptionUserData(new
HashSet<>(topics)), partitions, generationId);
+ return new Subscription(topics, assignor.subscriptionUserData(new
HashSet<>(topics)), partitions, generationId, Optional.empty());
}
@Override
@@ -156,7 +156,7 @@ public class CooperativeStickyAssignorTest extends
AbstractStickyAssignorTest {
// subscription containing empty owned partitions and the same
generation id, and non-empty owned partition in user data,
// member data should honor the one in subscription since
cooperativeStickyAssignor only supports ConsumerProtocolSubscription v1 and
above
- Subscription subscription = new Subscription(topics,
generateUserData(topics, ownedPartitions, generationId),
Collections.emptyList(), generationId);
+ Subscription subscription = new Subscription(topics,
generateUserData(topics, ownedPartitions, generationId),
Collections.emptyList(), generationId, Optional.empty());
AbstractStickyAssignor.MemberData memberData =
memberData(subscription);
assertEquals(Collections.emptyList(), memberData.partitions,
"subscription: " + subscription + " doesn't have expected owned partition");
@@ -170,7 +170,7 @@ public class CooperativeStickyAssignorTest extends
AbstractStickyAssignorTest {
// subscription containing empty owned partitions and a higher
generation id, and non-empty owned partition in user data,
// member data should honor the one in subscription since generation
id is higher
- Subscription subscription = new Subscription(topics,
generateUserData(topics, ownedPartitions, generationId - 1),
Collections.emptyList(), generationId);
+ Subscription subscription = new Subscription(topics,
generateUserData(topics, ownedPartitions, generationId - 1),
Collections.emptyList(), generationId, Optional.empty());
AbstractStickyAssignor.MemberData memberData =
memberData(subscription);
assertEquals(Collections.emptyList(), memberData.partitions,
"subscription: " + subscription + " doesn't have expected owned partition");
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2c667dcb5d6..f08ac45ddaf 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2614,7 +2614,8 @@ public class KafkaConsumerTest {
autoCommitEnabled,
autoCommitIntervalMs,
interceptors,
- throwOnStableOffsetNotSupported);
+ throwOnStableOffsetNotSupported,
+ null);
}
Fetcher<String, String> fetcher = new Fetcher<>(
loggerFactory,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index b27aa0e4a74..9dbc085f76c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -53,19 +53,19 @@ public class StickyAssignorTest extends
AbstractStickyAssignorTest {
@Override
public Subscription buildSubscriptionV0(List<String> topics,
List<TopicPartition> partitions, int generationId) {
return new Subscription(topics, serializeTopicPartitionAssignment(new
MemberData(partitions, Optional.of(generationId))),
- Collections.emptyList(), DEFAULT_GENERATION);
+ Collections.emptyList(), DEFAULT_GENERATION, Optional.empty());
}
@Override
public Subscription buildSubscriptionV1(List<String> topics,
List<TopicPartition> partitions, int generationId) {
return new Subscription(topics, serializeTopicPartitionAssignment(new
MemberData(partitions, Optional.of(generationId))),
- partitions, DEFAULT_GENERATION);
+ partitions, DEFAULT_GENERATION, Optional.empty());
}
@Override
public Subscription buildSubscriptionV2Above(List<String> topics,
List<TopicPartition> partitions, int generationId) {
return new Subscription(topics, serializeTopicPartitionAssignment(new
MemberData(partitions, Optional.of(generationId))),
- partitions, generationId);
+ partitions, generationId, Optional.empty());
}
@Override
@@ -308,7 +308,7 @@ public class StickyAssignorTest extends
AbstractStickyAssignorTest {
List<TopicPartition> ownedPartitions = partitions(tp(topic1, 0),
tp(topic2, 1));
int generationIdInUserData = generationId - 1;
- Subscription subscription = new Subscription(topics,
generateUserData(topics, ownedPartitions, generationIdInUserData),
Collections.emptyList(), generationId);
+ Subscription subscription = new Subscription(topics,
generateUserData(topics, ownedPartitions, generationIdInUserData),
Collections.emptyList(), generationId, Optional.empty());
AbstractStickyAssignor.MemberData memberData =
memberData(subscription);
// in StickyAssignor with eager rebalance protocol, we'll always honor
data in user data
assertEquals(ownedPartitions, memberData.partitions, "subscription: "
+ subscription + " doesn't have expected owned partition");
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 9dd44740a55..36bf0ea6825 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -117,6 +117,7 @@ import static java.util.Collections.singletonMap;
import static
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE;
import static
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.EAGER;
import static
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME;
+import static
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
@@ -1847,7 +1848,7 @@ public abstract class ConsumerCoordinatorTest {
// note that `MockPartitionAssignor.prepare` is not called therefore
calling `MockPartitionAssignor.assign`
// will throw a IllegalStateException. this indirectly verifies that
`assign` is correctly skipped.
Map<String, List<String>> memberSubscriptions =
singletonMap(consumerId, singletonList(topic1));
- client.prepareResponse(joinGroupLeaderResponse(1, consumerId,
memberSubscriptions, true, Errors.NONE));
+ client.prepareResponse(joinGroupLeaderResponse(1, consumerId,
memberSubscriptions, true, Errors.NONE, Optional.empty()));
client.prepareResponse(syncGroupResponse(singletonList(t1p),
Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -1879,7 +1880,7 @@ public abstract class ConsumerCoordinatorTest {
mkEntry(consumerId, singletonList(topic1)),
mkEntry(consumerId2, singletonList(topic2))
);
- client.prepareResponse(joinGroupLeaderResponse(1, consumerId,
memberSubscriptions, true, Errors.NONE));
+ client.prepareResponse(joinGroupLeaderResponse(1, consumerId,
memberSubscriptions, true, Errors.NONE, Optional.empty()));
client.prepareResponse(syncGroupResponse(singletonList(t1p),
Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -3490,6 +3491,36 @@ public abstract class ConsumerCoordinatorTest {
assertEquals(lost.isEmpty() ? null : lost, rebalanceListener.lost);
}
+ @Test
+ public void testSubscriptionRackId() {
+ metrics.close();
+ coordinator.close(time.timer(0));
+
+ String rackId = "rack-a";
+ metrics = new Metrics(time);
+ RackAwareAssignor assignor = new RackAwareAssignor();
+
+ coordinator = new ConsumerCoordinator(rebalanceConfig, new
LogContext(), consumerClient,
+ Collections.singletonList(assignor), metadata, subscriptions,
+ metrics, consumerId + groupId, time, false,
autoCommitIntervalMs, null, false, rackId);
+
+ subscriptions.subscribe(singleton(topic1), rebalanceListener);
+ client.updateMetadata(metadataResponse);
+
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+ Map<String, List<String>> memberSubscriptions =
singletonMap(consumerId, singletonList(topic1));
+ assignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+
+ client.prepareResponse(joinGroupLeaderResponse(1, consumerId,
memberSubscriptions, false, Errors.NONE, Optional.of(rackId)));
+ client.prepareResponse(syncGroupResponse(singletonList(t1p),
Errors.NONE));
+
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertEquals(singleton(t1p),
coordinator.subscriptionState().assignedPartitions());
+ assertEquals(singleton(rackId), assignor.rackIds);
+ }
+
@Test
public void testThrowOnUnsupportedStableFlag() {
supportStableFlag((short) 6, true);
@@ -3514,7 +3545,8 @@ public abstract class ConsumerCoordinatorTest {
false,
autoCommitIntervalMs,
null,
- true);
+ true,
+ null);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
client.setNodeApiVersions(NodeApiVersions.create(ApiKeys.OFFSET_FETCH.id,
(short) 0, upperVersion));
@@ -3675,7 +3707,8 @@ public abstract class ConsumerCoordinatorTest {
autoCommitEnabled,
autoCommitIntervalMs,
null,
- false);
+ false,
+ null);
}
private Collection<TopicPartition> getRevoked(final List<TopicPartition>
owned,
@@ -3731,7 +3764,7 @@ public abstract class ConsumerCoordinatorTest {
Map<String, List<String>> subscriptions,
Errors error
) {
- return joinGroupLeaderResponse(generationId, memberId, subscriptions,
false, error);
+ return joinGroupLeaderResponse(generationId, memberId, subscriptions,
false, error, Optional.empty());
}
private JoinGroupResponse joinGroupLeaderResponse(
@@ -3739,11 +3772,13 @@ public abstract class ConsumerCoordinatorTest {
String memberId,
Map<String, List<String>> subscriptions,
boolean skipAssignment,
- Errors error
+ Errors error,
+ Optional<String> rackId
) {
List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new
ArrayList<>();
for (Map.Entry<String, List<String>> subscriptionEntry :
subscriptions.entrySet()) {
- ConsumerPartitionAssignor.Subscription subscription = new
ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue());
+ ConsumerPartitionAssignor.Subscription subscription = new
ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue(),
+ null, Collections.emptyList(), DEFAULT_GENERATION, rackId);
ByteBuffer buf =
ConsumerProtocol.serializeSubscription(subscription);
metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
.setMemberId(subscriptionEntry.getKey())
@@ -3901,4 +3936,22 @@ public abstract class ConsumerCoordinatorTest {
this.exception = exception;
}
}
+
+ private static class RackAwareAssignor extends MockPartitionAssignor {
+ private final Set<String> rackIds = new HashSet<>();
+
+ RackAwareAssignor() {
+ super(Arrays.asList(RebalanceProtocol.EAGER));
+ }
+
+ @Override
+ public Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic, Map<String, Subscription> subscriptions) {
+ subscriptions.forEach((consumer, subscription) -> {
+ if (!subscription.rackId().isPresent())
+ throw new IllegalStateException("Rack id not provided in
subscription for " + consumer);
+ rackIds.add(subscription.rackId().get());
+ });
+ return super.assign(partitionsPerTopic, subscriptions);
+ }
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 5106432ee53..0e97bfbb318 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -49,6 +49,7 @@ public class ConsumerProtocolTest {
private final TopicPartition tp2 = new TopicPartition("bar", 2);
private final Optional<String> groupInstanceId =
Optional.of("instance.id");
private final int generationId = 1;
+ private final Optional<String> rackId = Optional.of("rack-a");
@Test
public void serializeDeserializeSubscriptionAllVersions() {
@@ -56,7 +57,7 @@ public class ConsumerProtocolTest {
new TopicPartition("foo", 0),
new TopicPartition("bar", 0));
Subscription subscription = new Subscription(Arrays.asList("foo",
"bar"),
- ByteBuffer.wrap("hello".getBytes()), ownedPartitions,
generationId);
+ ByteBuffer.wrap("hello".getBytes()), ownedPartitions,
generationId, rackId);
for (short version =
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <=
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
ByteBuffer buffer =
ConsumerProtocol.serializeSubscription(subscription, version);
@@ -77,6 +78,12 @@ public class ConsumerProtocolTest {
} else {
assertFalse(parsedSubscription.generationId().isPresent());
}
+
+ if (version >= 3) {
+ assertEquals(rackId, parsedSubscription.rackId());
+ } else {
+ assertEquals(Optional.empty(), parsedSubscription.rackId());
+ }
}
}
@@ -89,6 +96,7 @@ public class ConsumerProtocolTest {
assertEquals(0, parsedSubscription.userData().limit());
assertFalse(parsedSubscription.groupInstanceId().isPresent());
assertFalse(parsedSubscription.generationId().isPresent());
+ assertFalse(parsedSubscription.rackId().isPresent());
}
@Test
@@ -102,6 +110,7 @@ public class ConsumerProtocolTest {
assertEquals(0, parsedSubscription.userData().limit());
assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
assertFalse(parsedSubscription.generationId().isPresent());
+ assertFalse(parsedSubscription.rackId().isPresent());
}
@Test
@@ -111,6 +120,7 @@ public class ConsumerProtocolTest {
Subscription parsedSubscription =
ConsumerProtocol.deserializeSubscription(buffer);
assertEquals(toSet(subscription.topics()),
toSet(parsedSubscription.topics()));
assertNull(parsedSubscription.userData());
+ assertFalse(parsedSubscription.rackId().isPresent());
}
@Test
@@ -146,14 +156,15 @@ public class ConsumerProtocolTest {
assertNull(parsedSubscription.userData());
assertTrue(parsedSubscription.ownedPartitions().isEmpty());
assertFalse(parsedSubscription.generationId().isPresent());
+ assertFalse(parsedSubscription.rackId().isPresent());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void deserializeNewSubscriptionWithOldVersion(boolean
hasGenerationId) {
+ public void deserializeNewSubscriptionWithOldVersion(boolean
hasGenerationIdAndRack) {
Subscription subscription;
- if (hasGenerationId) {
- subscription = new Subscription(Arrays.asList("foo", "bar"), null,
Collections.singletonList(tp2), generationId);
+ if (hasGenerationIdAndRack) {
+ subscription = new Subscription(Arrays.asList("foo", "bar"), null,
Collections.singletonList(tp2), generationId, rackId);
} else {
subscription = new Subscription(Arrays.asList("foo", "bar"), null,
Collections.singletonList(tp2));
}
@@ -166,6 +177,7 @@ public class ConsumerProtocolTest {
assertTrue(parsedSubscription.ownedPartitions().isEmpty());
assertFalse(parsedSubscription.groupInstanceId().isPresent());
assertFalse(parsedSubscription.generationId().isPresent());
+ assertFalse(parsedSubscription.rackId().isPresent());
}
@Test
@@ -178,6 +190,7 @@ public class ConsumerProtocolTest {
assertEquals(Collections.singleton(tp2),
toSet(subscription.ownedPartitions()));
assertEquals(groupInstanceId, subscription.groupInstanceId());
assertEquals(generationId,
subscription.generationId().orElse(DEFAULT_GENERATION));
+ assertEquals(rackId, subscription.rackId());
}
@Test
@@ -253,6 +266,7 @@ public class ConsumerProtocolTest {
new Field("owned_partitions", new ArrayOf(
ConsumerProtocolSubscription.TopicPartition.SCHEMA_1)),
new Field("generation_id", Type.INT32),
+ new Field("rack_id", Type.STRING),
new Field("bar", Type.STRING));
Struct subscriptionV100 = new Struct(subscriptionSchemaV100);
@@ -263,6 +277,7 @@ public class ConsumerProtocolTest {
.set("topic", tp2.topic())
.set("partitions", new Object[]{tp2.partition()})});
subscriptionV100.set("generation_id", generationId);
+ subscriptionV100.set("rack_id", rackId.orElse(null));
subscriptionV100.set("bar", "bar");
Struct headerV100 = new Struct(new Schema(new Field("version",
Type.INT16)));
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 92670dba3b3..bfc280a1678 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1902,5 +1902,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer2.close()
}
+
+ @Test
+ def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
+ consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
+
consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
classOf[RackAwareAssignor].getName)
+ val consumer = createConsumer()
+ consumer.subscribe(Set(topic).asJava)
+ awaitAssignment(consumer, Set(tp, tp2))
+ }
+}
+
+class RackAwareAssignor extends RoundRobinAssignor {
+ override def assign(partitionsPerTopic: util.Map[String, Integer],
subscriptions: util.Map[String, ConsumerPartitionAssignor.Subscription]):
util.Map[String, util.List[TopicPartition]] = {
+ assertEquals(1, subscriptions.size())
+ assertEquals(Optional.of("rack-a"),
subscriptions.values.asScala.head.rackId)
+ super.assign(partitionsPerTopic, subscriptions)
+ }
}