This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 844b0e651b3 KAFKA-19369: Add group.share.assignors config and integration test (#19900) 844b0e651b3 is described below commit 844b0e651b3fef2c29cf6264d1487a23fdbc3f19 Author: PoAn Yang <pay...@apache.org> AuthorDate: Fri Jun 6 21:20:56 2025 +0800 KAFKA-19369: Add group.share.assignors config and integration test (#19900) * Add `group.share.assignors` config to `GroupCoordinatorConfig`. * Send `rackId` in share group heartbeat request if it's not null. * Add integration test `testShareConsumerWithRackAwareAssignor`. Reviewers: Lan Ding <53332773+dl1...@users.noreply.github.com>, Andrew Schofield <aschofi...@confluent.io> --------- Signed-off-by: PoAn Yang <pay...@apache.org> --- .../kafka/clients/consumer/RackAwareAssignor.java | 3 +- .../consumer/ShareConsumerRackAwareTest.java | 188 +++++++++++++++++++++ .../consumer/internals/RequestManagers.java | 2 +- .../coordinator/group/GroupCoordinatorConfig.java | 57 +++++++ .../coordinator/group/GroupCoordinatorShard.java | 1 + .../group/GroupCoordinatorConfigTest.java | 47 +++++- 6 files changed, 295 insertions(+), 3 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java index e71e1f8a1a3..4b9ee6fd274 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java @@ -22,6 +22,7 @@ import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor; import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl; @@ -36,7 +37,7 @@ import java.util.Set; * information of the members when assigning partitions to them. * It needs all brokers and members to have rack information available. */ -public class RackAwareAssignor implements ConsumerGroupPartitionAssignor { +public class RackAwareAssignor implements ConsumerGroupPartitionAssignor, ShareGroupPartitionAssignor { @Override public String name() { return "rack-aware-assignor"; diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java new file mode 100644 index 00000000000..7628220bff2 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewPartitionReassignment; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.ShareGroupDescription; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +public class ShareConsumerRackAwareTest { + @ClusterTest( + types = {Type.KRAFT}, + brokers = 3, + serverProperties = { + @ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"), + @ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack1"), + @ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack2"), + @ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic, share"), + @ClusterConfigProperty(key = GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, value = "org.apache.kafka.clients.consumer.RackAwareAssignor") + } + ) + void testShareConsumerWithRackAwareAssignor(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + String groupId = "group0"; + String topic = "test-topic"; + try (Admin admin = clusterInstance.admin(); + Producer<byte[], byte[]> producer = clusterInstance.producer(); + ShareConsumer<byte[], byte[]> consumer0 = clusterInstance.shareConsumer(Map.of( + CommonClientConfigs.GROUP_ID_CONFIG, groupId, + CommonClientConfigs.CLIENT_ID_CONFIG, "client0", + CommonClientConfigs.CLIENT_RACK_CONFIG, "rack0" + )); + ShareConsumer<byte[], byte[]> consumer1 = clusterInstance.shareConsumer(Map.of( + CommonClientConfigs.GROUP_ID_CONFIG, groupId, + CommonClientConfigs.CLIENT_ID_CONFIG, "client1", + CommonClientConfigs.CLIENT_RACK_CONFIG, "rack1" + )); + ShareConsumer<byte[], byte[]> consumer2 = clusterInstance.shareConsumer(Map.of( + CommonClientConfigs.GROUP_ID_CONFIG, groupId, + CommonClientConfigs.CLIENT_ID_CONFIG, "client2", + CommonClientConfigs.CLIENT_RACK_CONFIG, "rack2" + )) + ) { + // Create a new topic with 1 partition on broker 0. + admin.createTopics(List.of(new NewTopic(topic, Map.of(0, List.of(0))))); + clusterInstance.waitForTopic(topic, 1); + + producer.send(new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes())); + producer.flush(); + + consumer0.subscribe(List.of(topic)); + consumer1.subscribe(List.of(topic)); + consumer2.subscribe(List.of(topic)); + + TestUtils.waitForCondition(() -> { + consumer0.poll(Duration.ofMillis(1000)); + consumer1.poll(Duration.ofMillis(1000)); + consumer2.poll(Duration.ofMillis(1000)); + Map<String, ShareGroupDescription> groups = assertDoesNotThrow(() -> admin.describeShareGroups(Set.of("group0")).all().get()); + ShareGroupDescription groupDescription = groups.get(groupId); + return isExpectedAssignment(groupDescription, 3, Map.of( + "client0", Set.of(new TopicPartition(topic, 0)), + "client1", Set.of(), + "client2", Set.of() + )); + }, "Consumer 0 should be assigned to topic partition 0"); + + // Add a new partition 1 and 2 to broker 1. + admin.createPartitions( + Map.of( + topic, + NewPartitions.increaseTo(3, List.of(List.of(1), List.of(1))) + ) + ); + clusterInstance.waitForTopic(topic, 3); + + TestUtils.waitForCondition(() -> { + consumer0.poll(Duration.ofMillis(1000)); + consumer1.poll(Duration.ofMillis(1000)); + consumer2.poll(Duration.ofMillis(1000)); + Map<String, ShareGroupDescription> groups = assertDoesNotThrow(() -> admin.describeShareGroups(Set.of("group0")).all().get()); + ShareGroupDescription groupDescription = groups.get(groupId); + return isExpectedAssignment(groupDescription, 3, Map.of( + "client0", Set.of(new TopicPartition(topic, 0)), + "client1", Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2)), + "client2", Set.of() + )); + }, "Consumer 1 should be assigned to topic partition 1 and 2"); + + // Add a new partition 3, 4, and 5 to broker 2. + admin.createPartitions( + Map.of( + topic, + NewPartitions.increaseTo(6, List.of(List.of(2), List.of(2), List.of(2))) + ) + ); + TestUtils.waitForCondition(() -> { + consumer0.poll(Duration.ofMillis(1000)); + consumer1.poll(Duration.ofMillis(1000)); + consumer2.poll(Duration.ofMillis(1000)); + Map<String, ShareGroupDescription> groups = assertDoesNotThrow(() -> admin.describeShareGroups(Set.of("group0")).all().get()); + ShareGroupDescription groupDescription = groups.get(groupId); + return isExpectedAssignment(groupDescription, 3, Map.of( + "client0", Set.of(new TopicPartition(topic, 0)), + "client1", Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2)), + "client2", Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4), new TopicPartition(topic, 5)) + )); + }, "Consumer 2 should be assigned to topic partition 3, 4, and 5"); + + // Change partitions to different brokers. + // partition 0 -> broker 2 + // partition 1 -> broker 2 + // partition 2 -> broker 2 + // partition 3 -> broker 1 + // partition 4 -> broker 1 + // partition 5 -> broker 0 + admin.alterPartitionReassignments(Map.of( + new TopicPartition(topic, 0), Optional.of(new NewPartitionReassignment(List.of(2))), + new TopicPartition(topic, 1), Optional.of(new NewPartitionReassignment(List.of(2))), + new TopicPartition(topic, 2), Optional.of(new NewPartitionReassignment(List.of(2))), + new TopicPartition(topic, 3), Optional.of(new NewPartitionReassignment(List.of(1))), + new TopicPartition(topic, 4), Optional.of(new NewPartitionReassignment(List.of(1))), + new TopicPartition(topic, 5), Optional.of(new NewPartitionReassignment(List.of(0))) + )).all().get(); + TestUtils.waitForCondition(() -> { + consumer0.poll(Duration.ofMillis(1000)); + consumer1.poll(Duration.ofMillis(1000)); + consumer2.poll(Duration.ofMillis(1000)); + Map<String, ShareGroupDescription> groups = assertDoesNotThrow(() -> admin.describeShareGroups(Set.of("group0")).all().get()); + ShareGroupDescription groupDescription = groups.get(groupId); + return isExpectedAssignment(groupDescription, 3, Map.of( + "client0", Set.of(new TopicPartition(topic, 5)), + "client1", Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4)), + "client2", Set.of(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2)) + )); + }, "Consumer with topic partition mapping should be 0 -> 5 | 1 -> 3, 4 | 2 -> 0, 1, 2"); + } + } + + boolean isExpectedAssignment( + ShareGroupDescription groupDescription, + int memberCount, + Map<String, Set<TopicPartition>> expectedAssignments + ) { + return groupDescription != null && + groupDescription.members().size() == memberCount && + groupDescription.members().stream().allMatch( + member -> { + String clientId = member.clientId(); + Set<TopicPartition> expectedPartitions = expectedAssignments.get(clientId); + return member.assignment().topicPartitions().equals(expectedPartitions); + } + ); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 32c76f8c732..ae39753f3d8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -342,7 +342,7 @@ public class RequestManagers implements Closeable { ShareMembershipManager shareMembershipManager = new ShareMembershipManager( logContext, groupRebalanceConfig.groupId, - null, + groupRebalanceConfig.rackId.orElse(null), subscriptions, metadata, time, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 629bc895d3c..5c72eba5071 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -23,7 +23,9 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SimpleAssignor; import org.apache.kafka.coordinator.group.assignor.UniformAssignor; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig; @@ -32,6 +34,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -235,6 +238,13 @@ public class GroupCoordinatorConfig { public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000; public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members."; + private static final ShareGroupPartitionAssignor SHARE_GROUP_BUILTIN_ASSIGNOR = new SimpleAssignor(); + public static final String SHARE_GROUP_ASSIGNORS_CONFIG = "group.share.assignors"; + public static final String SHARE_GROUP_ASSIGNORS_DOC = "The server-side assignors as a list of either names for built-in assignors or full class names for custom assignors. " + + "The list must contain only a single entry which is used by all groups. The supported built-in assignors are: " + + SHARE_GROUP_BUILTIN_ASSIGNOR.name() + "."; + public static final String SHARE_GROUP_ASSIGNORS_DEFAULT = SHARE_GROUP_BUILTIN_ASSIGNOR.name(); + /// /// Streams group configs /// @@ -317,6 +327,7 @@ public class GroupCoordinatorConfig { .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC) .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC) + .define(SHARE_GROUP_ASSIGNORS_CONFIG, LIST, SHARE_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, SHARE_GROUP_ASSIGNORS_DOC) // Streams group configs .define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC) @@ -367,6 +378,7 @@ public class GroupCoordinatorConfig { private final int shareGroupHeartbeatIntervalMs; private final int shareGroupMinHeartbeatIntervalMs; private final int shareGroupMaxHeartbeatIntervalMs; + private final List<ShareGroupPartitionAssignor> shareGroupAssignors; // Streams group configurations private final int streamsGroupSessionTimeoutMs; private final int streamsGroupMinSessionTimeoutMs; @@ -415,6 +427,7 @@ public class GroupCoordinatorConfig { this.shareGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); this.shareGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); this.shareGroupMaxSize = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG); + this.shareGroupAssignors = shareGroupAssignors(config); // Streams group configurations this.streamsGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG); this.streamsGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG); @@ -466,6 +479,8 @@ public class GroupCoordinatorConfig { require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs, String.format("%s must be less than %s", SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)); + require(shareGroupAssignors.size() == 1, + String.format("%s must contain exactly one assignor, but found %d", SHARE_GROUP_ASSIGNORS_CONFIG, shareGroupAssignors.size())); // Streams group configs validation. require(streamsGroupMaxHeartbeatIntervalMs >= streamsGroupMinHeartbeatIntervalMs, String.format("%s must be greater than or equal to %s", @@ -550,6 +565,41 @@ public class GroupCoordinatorConfig { return assignors; } + protected List<ShareGroupPartitionAssignor> shareGroupAssignors( + AbstractConfig config + ) { + List<ShareGroupPartitionAssignor> assignors = new ArrayList<>(); + + try { + for (String kclass : config.getList(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG)) { + ShareGroupPartitionAssignor assignor = SHARE_GROUP_BUILTIN_ASSIGNOR; + + if (!Objects.equals(kclass, SHARE_GROUP_ASSIGNORS_DEFAULT)) { + try { + assignor = Utils.newInstance(kclass, ShareGroupPartitionAssignor.class); + } catch (ClassNotFoundException e) { + throw new KafkaException("Class " + kclass + " cannot be found", e); + } catch (ClassCastException e) { + throw new KafkaException(kclass + " is not an instance of " + ShareGroupPartitionAssignor.class.getName()); + } + } + + assignors.add(assignor); + + if (assignor instanceof Configurable configurable) { + configurable.configure(config.originals()); + } + } + } catch (Exception e) { + for (ShareGroupPartitionAssignor assignor : assignors) { + maybeCloseQuietly(assignor, "AutoCloseable object constructed and configured during failed call to shareGroupAssignors"); + } + throw e; + } + + return assignors; + } + /** * Copy the subset of properties that are relevant to consumer group and share group. */ @@ -809,6 +859,13 @@ public class GroupCoordinatorConfig { return shareGroupMaxHeartbeatIntervalMs; } + /** + * The share group assignors. + */ + public List<ShareGroupPartitionAssignor> shareGroupAssignors() { + return shareGroupAssignors; + } + /** * The streams group session timeout in milliseconds. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index d262aed8a80..938809c5e7b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -268,6 +268,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord .withConfig(config) .withGroupConfigManager(groupConfigManager) .withGroupCoordinatorMetricsShard(metricsShard) + .withShareGroupAssignor(config.shareGroupAssignors().get(0)) .withAuthorizerPlugin(authorizerPlugin) .build(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 267f7ded413..7c5c7a5b8da 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -25,8 +25,10 @@ import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAss import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor; import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SimpleAssignor; import org.apache.kafka.coordinator.group.assignor.UniformAssignor; import org.junit.jupiter.api.Test; @@ -37,13 +39,14 @@ import java.util.List; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class GroupCoordinatorConfigTest { - public static class CustomAssignor implements ConsumerGroupPartitionAssignor, Configurable { + public static class CustomAssignor implements ConsumerGroupPartitionAssignor, Configurable, ShareGroupPartitionAssignor { public Map<String, ?> configs; @Override @@ -126,6 +129,48 @@ public class GroupCoordinatorConfigTest { assertTrue(assignors.get(1) instanceof CustomAssignor); } + @Test + public void testShareGroupAssignorFullClassNames() { + // The full class name of the assignors is part of our public api. Hence, + // we should ensure that they are not changed by mistake. + assertEquals( + "org.apache.kafka.coordinator.group.assignor.SimpleAssignor", + SimpleAssignor.class.getName() + ); + } + + @Test + public void testShareGroupAssignors() { + Map<String, Object> configs = new HashMap<>(); + GroupCoordinatorConfig config; + List<ShareGroupPartitionAssignor> assignors; + + // Test default config. + config = createConfig(configs); + assignors = config.shareGroupAssignors(); + assertEquals(1, assignors.size()); + + // Test short names. + configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, "simple"); + config = createConfig(configs); + assignors = config.shareGroupAssignors(); + assertEquals(1, assignors.size()); + assertInstanceOf(SimpleAssignor.class, assignors.get(0)); + + // Test custom assignor. + configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, CustomAssignor.class.getName()); + config = createConfig(configs); + assignors = config.shareGroupAssignors(); + assertEquals(1, assignors.size()); + assertInstanceOf(CustomAssignor.class, assignors.get(0)); + assertNotNull(((CustomAssignor) assignors.get(0)).configs); + + // Test must contain only one assignor. + configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, "simple, " + CustomAssignor.class.getName()); + assertEquals("group.share.assignors must contain exactly one assignor, but found 2", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + } + @Test public void testConfigs() { Map<String, Object> configs = new HashMap<>();