This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 844b0e651b3 KAFKA-19369: Add group.share.assignors config and 
integration test (#19900)
844b0e651b3 is described below

commit 844b0e651b3fef2c29cf6264d1487a23fdbc3f19
Author: PoAn Yang <pay...@apache.org>
AuthorDate: Fri Jun 6 21:20:56 2025 +0800

    KAFKA-19369: Add group.share.assignors config and integration test (#19900)
    
    * Add `group.share.assignors` config to `GroupCoordinatorConfig`.
    * Send `rackId` in share group heartbeat request if it's not null.
    * Add integration test `testShareConsumerWithRackAwareAssignor`.
    
    Reviewers: Lan Ding <53332773+dl1...@users.noreply.github.com>, Andrew
     Schofield <aschofi...@confluent.io>
    
    ---------
    
    Signed-off-by: PoAn Yang <pay...@apache.org>
---
 .../kafka/clients/consumer/RackAwareAssignor.java  |   3 +-
 .../consumer/ShareConsumerRackAwareTest.java       | 188 +++++++++++++++++++++
 .../consumer/internals/RequestManagers.java        |   2 +-
 .../coordinator/group/GroupCoordinatorConfig.java  |  57 +++++++
 .../coordinator/group/GroupCoordinatorShard.java   |   1 +
 .../group/GroupCoordinatorConfigTest.java          |  47 +++++-
 6 files changed, 295 insertions(+), 3 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java
index e71e1f8a1a3..4b9ee6fd274 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java
@@ -22,6 +22,7 @@ import 
org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
 import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
 import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
 import 
org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
+import 
org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
 import 
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
 import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
 
@@ -36,7 +37,7 @@ import java.util.Set;
  * information of the members when assigning partitions to them.
  * It needs all brokers and members to have rack information available.
  */
-public class RackAwareAssignor implements ConsumerGroupPartitionAssignor {
+public class RackAwareAssignor implements ConsumerGroupPartitionAssignor, 
ShareGroupPartitionAssignor {
     @Override
     public String name() {
         return "rack-aware-assignor";
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
new file mode 100644
index 00000000000..7628220bff2
--- /dev/null
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewPartitionReassignment;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.ShareGroupDescription;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+public class ShareConsumerRackAwareTest {
+    @ClusterTest(
+        types = {Type.KRAFT},
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(id = 0, key = "broker.rack", value = 
"rack0"),
+            @ClusterConfigProperty(id = 1, key = "broker.rack", value = 
"rack1"),
+            @ClusterConfigProperty(id = 2, key = "broker.rack", value = 
"rack2"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic, share"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, value = 
"org.apache.kafka.clients.consumer.RackAwareAssignor")
+        }
+    )
+    void testShareConsumerWithRackAwareAssignor(ClusterInstance 
clusterInstance) throws ExecutionException, InterruptedException {
+        String groupId = "group0";
+        String topic = "test-topic";
+        try (Admin admin = clusterInstance.admin();
+             Producer<byte[], byte[]> producer = clusterInstance.producer();
+             ShareConsumer<byte[], byte[]> consumer0 = 
clusterInstance.shareConsumer(Map.of(
+                 CommonClientConfigs.GROUP_ID_CONFIG, groupId,
+                 CommonClientConfigs.CLIENT_ID_CONFIG, "client0",
+                 CommonClientConfigs.CLIENT_RACK_CONFIG, "rack0"
+             ));
+             ShareConsumer<byte[], byte[]> consumer1 = 
clusterInstance.shareConsumer(Map.of(
+                 CommonClientConfigs.GROUP_ID_CONFIG, groupId,
+                 CommonClientConfigs.CLIENT_ID_CONFIG, "client1",
+                 CommonClientConfigs.CLIENT_RACK_CONFIG, "rack1"
+             ));
+             ShareConsumer<byte[], byte[]> consumer2 = 
clusterInstance.shareConsumer(Map.of(
+                 CommonClientConfigs.GROUP_ID_CONFIG, groupId,
+                 CommonClientConfigs.CLIENT_ID_CONFIG, "client2",
+                 CommonClientConfigs.CLIENT_RACK_CONFIG, "rack2"
+             ))
+        ) {
+            // Create a new topic with 1 partition on broker 0.
+            admin.createTopics(List.of(new NewTopic(topic, Map.of(0, 
List.of(0)))));
+            clusterInstance.waitForTopic(topic, 1);
+
+            producer.send(new ProducerRecord<>(topic, "key".getBytes(), 
"value".getBytes()));
+            producer.flush();
+
+            consumer0.subscribe(List.of(topic));
+            consumer1.subscribe(List.of(topic));
+            consumer2.subscribe(List.of(topic));
+
+            TestUtils.waitForCondition(() -> {
+                consumer0.poll(Duration.ofMillis(1000));
+                consumer1.poll(Duration.ofMillis(1000));
+                consumer2.poll(Duration.ofMillis(1000));
+                Map<String, ShareGroupDescription> groups = 
assertDoesNotThrow(() -> 
admin.describeShareGroups(Set.of("group0")).all().get());
+                ShareGroupDescription groupDescription = groups.get(groupId);
+                return isExpectedAssignment(groupDescription, 3, Map.of(
+                    "client0", Set.of(new TopicPartition(topic, 0)),
+                    "client1", Set.of(),
+                    "client2", Set.of()
+                ));
+            }, "Consumer 0 should be assigned to topic partition 0");
+
+            // Add a new partition 1 and 2 to broker 1.
+            admin.createPartitions(
+                Map.of(
+                    topic,
+                    NewPartitions.increaseTo(3, List.of(List.of(1), 
List.of(1)))
+                )
+            );
+            clusterInstance.waitForTopic(topic, 3);
+
+            TestUtils.waitForCondition(() -> {
+                consumer0.poll(Duration.ofMillis(1000));
+                consumer1.poll(Duration.ofMillis(1000));
+                consumer2.poll(Duration.ofMillis(1000));
+                Map<String, ShareGroupDescription> groups = 
assertDoesNotThrow(() -> 
admin.describeShareGroups(Set.of("group0")).all().get());
+                ShareGroupDescription groupDescription = groups.get(groupId);
+                return isExpectedAssignment(groupDescription, 3, Map.of(
+                    "client0", Set.of(new TopicPartition(topic, 0)),
+                    "client1", Set.of(new TopicPartition(topic, 1), new 
TopicPartition(topic, 2)),
+                    "client2", Set.of()
+                ));
+            }, "Consumer 1 should be assigned to topic partition 1 and 2");
+
+            // Add a new partition 3, 4, and 5 to broker 2.
+            admin.createPartitions(
+                Map.of(
+                    topic,
+                    NewPartitions.increaseTo(6, List.of(List.of(2), 
List.of(2), List.of(2)))
+                )
+            );
+            TestUtils.waitForCondition(() -> {
+                consumer0.poll(Duration.ofMillis(1000));
+                consumer1.poll(Duration.ofMillis(1000));
+                consumer2.poll(Duration.ofMillis(1000));
+                Map<String, ShareGroupDescription> groups = 
assertDoesNotThrow(() -> 
admin.describeShareGroups(Set.of("group0")).all().get());
+                ShareGroupDescription groupDescription = groups.get(groupId);
+                return isExpectedAssignment(groupDescription, 3, Map.of(
+                    "client0", Set.of(new TopicPartition(topic, 0)),
+                    "client1", Set.of(new TopicPartition(topic, 1), new 
TopicPartition(topic, 2)),
+                    "client2", Set.of(new TopicPartition(topic, 3), new 
TopicPartition(topic, 4), new TopicPartition(topic, 5))
+                ));
+            }, "Consumer 2 should be assigned to topic partition 3, 4, and 5");
+
+            // Change partitions to different brokers.
+            // partition 0 -> broker 2
+            // partition 1 -> broker 2
+            // partition 2 -> broker 2
+            // partition 3 -> broker 1
+            // partition 4 -> broker 1
+            // partition 5 -> broker 0
+            admin.alterPartitionReassignments(Map.of(
+                new TopicPartition(topic, 0), Optional.of(new 
NewPartitionReassignment(List.of(2))),
+                new TopicPartition(topic, 1), Optional.of(new 
NewPartitionReassignment(List.of(2))),
+                new TopicPartition(topic, 2), Optional.of(new 
NewPartitionReassignment(List.of(2))),
+                new TopicPartition(topic, 3), Optional.of(new 
NewPartitionReassignment(List.of(1))),
+                new TopicPartition(topic, 4), Optional.of(new 
NewPartitionReassignment(List.of(1))),
+                new TopicPartition(topic, 5), Optional.of(new 
NewPartitionReassignment(List.of(0)))
+            )).all().get();
+            TestUtils.waitForCondition(() -> {
+                consumer0.poll(Duration.ofMillis(1000));
+                consumer1.poll(Duration.ofMillis(1000));
+                consumer2.poll(Duration.ofMillis(1000));
+                Map<String, ShareGroupDescription> groups = 
assertDoesNotThrow(() -> 
admin.describeShareGroups(Set.of("group0")).all().get());
+                ShareGroupDescription groupDescription = groups.get(groupId);
+                return isExpectedAssignment(groupDescription, 3, Map.of(
+                    "client0", Set.of(new TopicPartition(topic, 5)),
+                    "client1", Set.of(new TopicPartition(topic, 3), new 
TopicPartition(topic, 4)),
+                    "client2", Set.of(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1), new TopicPartition(topic, 2))
+                ));
+            }, "Consumer with topic partition mapping should be 0 -> 5 | 1 -> 
3, 4 | 2 -> 0, 1, 2");
+        }
+    }
+
+    boolean isExpectedAssignment(
+        ShareGroupDescription groupDescription,
+        int memberCount,
+        Map<String, Set<TopicPartition>> expectedAssignments
+    ) {
+        return groupDescription != null &&
+            groupDescription.members().size() == memberCount &&
+            groupDescription.members().stream().allMatch(
+                member -> {
+                    String clientId = member.clientId();
+                    Set<TopicPartition> expectedPartitions = 
expectedAssignments.get(clientId);
+                    return 
member.assignment().topicPartitions().equals(expectedPartitions);
+                }
+            );
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 32c76f8c732..ae39753f3d8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -342,7 +342,7 @@ public class RequestManagers implements Closeable {
                 ShareMembershipManager shareMembershipManager = new 
ShareMembershipManager(
                         logContext,
                         groupRebalanceConfig.groupId,
-                        null,
+                        groupRebalanceConfig.rackId.orElse(null),
                         subscriptions,
                         metadata,
                         time,
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 629bc895d3c..5c72eba5071 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -23,7 +23,9 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.utils.Utils;
 import 
org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
+import 
org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
 import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SimpleAssignor;
 import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 
@@ -32,6 +34,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -235,6 +238,13 @@ public class GroupCoordinatorConfig {
     public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 
15000;
     public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = 
"The maximum heartbeat interval for share group members.";
 
+    private static final ShareGroupPartitionAssignor 
SHARE_GROUP_BUILTIN_ASSIGNOR = new SimpleAssignor();
+    public static final String SHARE_GROUP_ASSIGNORS_CONFIG = 
"group.share.assignors";
+    public static final String SHARE_GROUP_ASSIGNORS_DOC = "The server-side 
assignors as a list of either names for built-in assignors or full class names 
for custom assignors. " +
+        "The list must contain only a single entry which is used by all 
groups. The supported built-in assignors are: " +
+        SHARE_GROUP_BUILTIN_ASSIGNOR.name() + ".";
+    public static final String SHARE_GROUP_ASSIGNORS_DEFAULT = 
SHARE_GROUP_BUILTIN_ASSIGNOR.name();
+
     ///
     /// Streams group configs
     ///
@@ -317,6 +327,7 @@ public class GroupCoordinatorConfig {
         .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
         .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
         .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, 
SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, 
SHARE_GROUP_MAX_SIZE_DOC)
+        .define(SHARE_GROUP_ASSIGNORS_CONFIG, LIST, 
SHARE_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, SHARE_GROUP_ASSIGNORS_DOC)
 
         // Streams group configs
         .define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, 
STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
@@ -367,6 +378,7 @@ public class GroupCoordinatorConfig {
     private final int shareGroupHeartbeatIntervalMs;
     private final int shareGroupMinHeartbeatIntervalMs;
     private final int shareGroupMaxHeartbeatIntervalMs;
+    private final List<ShareGroupPartitionAssignor> shareGroupAssignors;
     // Streams group configurations
     private final int streamsGroupSessionTimeoutMs;
     private final int streamsGroupMinSessionTimeoutMs;
@@ -415,6 +427,7 @@ public class GroupCoordinatorConfig {
         this.shareGroupMinHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.shareGroupMaxHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.shareGroupMaxSize = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG);
+        this.shareGroupAssignors = shareGroupAssignors(config);
         // Streams group configurations
         this.streamsGroupSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG);
         this.streamsGroupMinSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
@@ -466,6 +479,8 @@ public class GroupCoordinatorConfig {
         require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs,
             String.format("%s must be less than %s",
                 SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG));
+        require(shareGroupAssignors.size() == 1,
+            String.format("%s must contain exactly one assignor, but found 
%d", SHARE_GROUP_ASSIGNORS_CONFIG, shareGroupAssignors.size()));
         // Streams group configs validation.
         require(streamsGroupMaxHeartbeatIntervalMs >= 
streamsGroupMinHeartbeatIntervalMs,
             String.format("%s must be greater than or equal to %s",
@@ -550,6 +565,41 @@ public class GroupCoordinatorConfig {
         return assignors;
     }
 
+    protected List<ShareGroupPartitionAssignor> shareGroupAssignors(
+        AbstractConfig config
+    ) {
+        List<ShareGroupPartitionAssignor> assignors = new ArrayList<>();
+
+        try {
+            for (String kclass : 
config.getList(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG)) {
+                ShareGroupPartitionAssignor assignor = 
SHARE_GROUP_BUILTIN_ASSIGNOR;
+
+                if (!Objects.equals(kclass, SHARE_GROUP_ASSIGNORS_DEFAULT)) {
+                    try {
+                        assignor = Utils.newInstance(kclass, 
ShareGroupPartitionAssignor.class);
+                    } catch (ClassNotFoundException e) {
+                        throw new KafkaException("Class " + kclass + " cannot 
be found", e);
+                    } catch (ClassCastException e) {
+                        throw new KafkaException(kclass + " is not an instance 
of " + ShareGroupPartitionAssignor.class.getName());
+                    }
+                }
+
+                assignors.add(assignor);
+
+                if (assignor instanceof Configurable configurable) {
+                    configurable.configure(config.originals());
+                }
+            }
+        } catch (Exception e) {
+            for (ShareGroupPartitionAssignor assignor : assignors) {
+                maybeCloseQuietly(assignor, "AutoCloseable object constructed 
and configured during failed call to shareGroupAssignors");
+            }
+            throw e;
+        }
+
+        return assignors;
+    }
+
     /**
      * Copy the subset of properties that are relevant to consumer group and 
share group.
      */
@@ -809,6 +859,13 @@ public class GroupCoordinatorConfig {
         return shareGroupMaxHeartbeatIntervalMs;
     }
 
+    /**
+     * The share group assignors.
+     */
+    public List<ShareGroupPartitionAssignor> shareGroupAssignors() {
+        return shareGroupAssignors;
+    }
+
     /**
      * The streams group session timeout in milliseconds.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index d262aed8a80..938809c5e7b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -268,6 +268,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                 .withConfig(config)
                 .withGroupConfigManager(groupConfigManager)
                 .withGroupCoordinatorMetricsShard(metricsShard)
+                .withShareGroupAssignor(config.shareGroupAssignors().get(0))
                 .withAuthorizerPlugin(authorizerPlugin)
                 .build();
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 267f7ded413..7c5c7a5b8da 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -25,8 +25,10 @@ import 
org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAss
 import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
 import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
 import 
org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
+import 
org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
 import 
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
 import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SimpleAssignor;
 import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
 
 import org.junit.jupiter.api.Test;
@@ -37,13 +39,14 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class GroupCoordinatorConfigTest {
 
-    public static class CustomAssignor implements 
ConsumerGroupPartitionAssignor, Configurable {
+    public static class CustomAssignor implements 
ConsumerGroupPartitionAssignor, Configurable, ShareGroupPartitionAssignor {
         public Map<String, ?> configs;
 
         @Override
@@ -126,6 +129,48 @@ public class GroupCoordinatorConfigTest {
         assertTrue(assignors.get(1) instanceof CustomAssignor);
     }
 
+    @Test
+    public void testShareGroupAssignorFullClassNames() {
+        // The full class name of the assignors is part of our public api. 
Hence,
+        // we should ensure that they are not changed by mistake.
+        assertEquals(
+            "org.apache.kafka.coordinator.group.assignor.SimpleAssignor",
+            SimpleAssignor.class.getName()
+        );
+    }
+
+    @Test
+    public void testShareGroupAssignors() {
+        Map<String, Object> configs = new HashMap<>();
+        GroupCoordinatorConfig config;
+        List<ShareGroupPartitionAssignor> assignors;
+
+        // Test default config.
+        config = createConfig(configs);
+        assignors = config.shareGroupAssignors();
+        assertEquals(1, assignors.size());
+
+        // Test short names.
+        configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, 
"simple");
+        config = createConfig(configs);
+        assignors = config.shareGroupAssignors();
+        assertEquals(1, assignors.size());
+        assertInstanceOf(SimpleAssignor.class, assignors.get(0));
+
+        // Test custom assignor.
+        configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, 
CustomAssignor.class.getName());
+        config = createConfig(configs);
+        assignors = config.shareGroupAssignors();
+        assertEquals(1, assignors.size());
+        assertInstanceOf(CustomAssignor.class, assignors.get(0));
+        assertNotNull(((CustomAssignor) assignors.get(0)).configs);
+
+        // Test must contain only one assignor.
+        configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNORS_CONFIG, 
"simple, " + CustomAssignor.class.getName());
+        assertEquals("group.share.assignors must contain exactly one assignor, 
but found 2",
+            assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
+    }
+
     @Test
     public void testConfigs() {
         Map<String, Object> configs = new HashMap<>();

Reply via email to