dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1567241790


##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ClientSideAssignorBenchmark.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ClientSideAssignorBenchmark {
+
+    public enum AssignorType {
+        RANGE(new RangeAssignor()),
+        COOPERATIVE_STICKY(new CooperativeStickyAssignor());
+
+        private final ConsumerPartitionAssignor assignor;
+
+        AssignorType(ConsumerPartitionAssignor assignor) {
+            this.assignor = assignor;
+        }
+
+        public ConsumerPartitionAssignor assignor() {
+            return assignor;
+        }
+    }
+
+    /**
+     * The subscription pattern followed by the members of the group.
+     *
+     * A subscription model is considered homogenous if all the members of the 
group
+     * are subscribed to the same set of topics, it is heterogeneous otherwise.
+     */
+    public enum SubscriptionModel {
+        HOMOGENEOUS, HETEROGENEOUS
+    }
+
+    @Param({"1000", "10000"})
+    private int memberCount;
+
+    @Param({"10", "50"})
+    private int partitionsPerTopicCount;
+
+    @Param({"100", "1000"})
+    private int topicCount;
+
+    @Param({"true", "false"})
+    private boolean isRackAware;
+
+    @Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+    private SubscriptionModel subscriptionModel;
+
+    @Param({"RANGE", "COOPERATIVE_STICKY"})
+    private AssignorType assignorType;
+
+    @Param({"true", "false"})
+    private boolean simulateRebalanceTrigger;
+
+    private Map<String, ConsumerPartitionAssignor.Subscription> subscriptions 
= new HashMap<>();
+
+    private ConsumerPartitionAssignor.GroupSubscription groupSubscription;
+
+    private static final int numberOfRacks = 3;
+
+    private static final int replicationFactor = 2;

Review Comment:
   As they are constants now, let's rename them to `NUMBER_OF_RACKS` and 
`REPLICATION_FACTOR`.



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+    public enum AssignorType {
+        RANGE(new RangeAssignor()),
+        UNIFORM(new UniformAssignor());
+
+        private final PartitionAssignor assignor;
+
+        AssignorType(PartitionAssignor assignor) {
+            this.assignor = assignor;
+        }
+
+        public PartitionAssignor assignor() {
+            return assignor;
+        }
+    }
+
+    /**
+     * The subscription pattern followed by the members of the group.
+     *
+     * A subscription model is considered homogenous if all the members of the 
group
+     * are subscribed to the same set of topics, it is heterogeneous otherwise.
+     */
+    public enum SubscriptionModel {
+        HOMOGENEOUS, HETEROGENEOUS
+    }
+
+    @Param({"1000", "10000"})
+    private int memberCount;
+
+    @Param({"10", "50"})
+    private int partitionsPerTopicCount;
+
+    @Param({"100", "1000"})
+    private int topicCount;
+
+    @Param({"true", "false"})
+    private boolean isRackAware;
+
+    @Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+    private SubscriptionModel subscriptionModel;
+
+    @Param({"RANGE", "UNIFORM"})
+    private AssignorType assignorType;
+
+    @Param({"true", "false"})
+    private boolean simulateRebalanceTrigger;
+
+    private PartitionAssignor partitionAssignor;
+
+    private static final int numberOfRacks = 3;
+
+    private AssignmentSpec assignmentSpec;
+
+    private SubscribedTopicDescriber subscribedTopicDescriber;
+
+    @Setup(Level.Trial)
+    public void setup() {
+        Map<Uuid, TopicMetadata> topicMetadata = createTopicMetadata();
+        subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+        createAssignmentSpec(topicMetadata);
+
+        partitionAssignor = assignorType.assignor();
+
+        if (simulateRebalanceTrigger) {
+            simulateIncrementalRebalance(topicMetadata);
+        }
+    }
+
+    private Map<Uuid, TopicMetadata> createTopicMetadata() {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        Map<Integer, Set<String>> partitionRacks = isRackAware ?
+            mkMapOfPartitionRacks(partitionsPerTopicCount) :
+            Collections.emptyMap();
+
+        for (int i = 1; i <= topicCount; i++) {
+            Uuid topicUuid = Uuid.randomUuid();
+            String topicName = "topic" + i;
+            topicMetadata.put(topicUuid, new TopicMetadata(
+                topicUuid,
+                topicName,
+                partitionsPerTopicCount,
+                partitionRacks
+            ));
+        }
+
+        return topicMetadata;
+    }
+
+    private void createAssignmentSpec(Map<Uuid, TopicMetadata> topicMetadata) {
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+        List<Uuid> allTopicIds = new ArrayList<>(topicMetadata.keySet());
+        int topicCounter = 0;
+
+        for (int i = 0; i < memberCount; i++) {

Review Comment:
   In the reassignment case, would it be better to use memberCount - 1 in order 
to have memberCount in total in the group?



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##########
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+    @Param({"10", "50", "100"})
+    private int partitionsPerTopicCount;
+
+    @Param({"100"})
+    private int topicCount;
+
+    @Param({"500", "1000"})
+    private int memberCount;
+
+    @Param({"true", "false"})
+    private boolean isRackAware;
+
+    @Param({"true", "false"})
+    private boolean isSubscriptionUniform;
+
+    @Param({"true", "false"})
+    private boolean isRangeAssignor;
+
+    @Param({"true", "false"})
+    private boolean isReassignment;
+
+    private PartitionAssignor partitionAssignor;
+
+    private final int numberOfRacks = 3;
+
+    private AssignmentSpec assignmentSpec;
+
+    private SubscribedTopicDescriber subscribedTopicDescriber;
+
+    @Setup(Level.Trial)
+    public void setup() {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        Map<Integer, Set<String>> partitionRacks = isRackAware ?
+            mkMapOfPartitionRacks(partitionsPerTopicCount) :
+            Collections.emptyMap();
+
+        for (int i = 1; i <= topicCount; i++) {
+            Uuid topicUuid = Uuid.randomUuid();
+            String topicName = "topic" + i;
+            topicMetadata.put(topicUuid, new TopicMetadata(
+                topicUuid, topicName, partitionsPerTopicCount, 
partitionRacks));
+        }
+
+        addTopicSubscriptions(topicMetadata);
+        this.subscribedTopicDescriber = new 
SubscribedTopicMetadata(topicMetadata);
+
+        if (isRangeAssignor) {
+            this.partitionAssignor = new RangeAssignor();
+        } else {
+            this.partitionAssignor = new UniformAssignor();
+        }
+
+        if (isReassignment) {
+            GroupAssignment initialAssignment = 
partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+            Map<String, MemberAssignment> members;
+
+            members = initialAssignment.members();
+
+            // Update the AssignmentSpec with the results from the initial 
assignment.
+            Map<String, AssignmentMemberSpec> updatedMembers = new HashMap<>();
+
+            members.forEach((memberId, memberAssignment) -> {
+                AssignmentMemberSpec memberSpec = 
assignmentSpec.members().get(memberId);
+                updatedMembers.put(memberId, new AssignmentMemberSpec(
+                    memberSpec.instanceId(),
+                    memberSpec.rackId(),
+                    memberSpec.subscribedTopicIds(),
+                    memberAssignment.targetPartitions()
+                ));
+            });
+
+            // Add new member to trigger a reassignment.
+            Optional<String> rackId = isRackAware ? Optional.of("rack" + 
(memberCount + 1) % numberOfRacks) : Optional.empty();
+
+            updatedMembers.put("newMember", new AssignmentMemberSpec(
+                Optional.empty(),
+                rackId,
+                topicMetadata.keySet(),
+                Collections.emptyMap()
+            ));
+
+            this.assignmentSpec = new AssignmentSpec(updatedMembers);
+        }
+    }
+
+    private Map<Integer, Set<String>> mkMapOfPartitionRacks(int numPartitions) 
{
+        Map<Integer, Set<String>> partitionRacks = new 
HashMap<>(numPartitions);
+        for (int i = 0; i < numPartitions; i++) {
+            partitionRacks.put(i, new HashSet<>(Arrays.asList("rack" + i % 
numberOfRacks, "rack" + (i + 1) % numberOfRacks)));
+        }
+        return partitionRacks;
+    }
+
+    private void addTopicSubscriptions(Map<Uuid, TopicMetadata> topicMetadata) 
{
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+        List<Uuid> allTopicIds = new ArrayList<>(topicMetadata.keySet());
+        int topicCounter = 0;
+
+        for (int i = 0; i < memberCount; i++) {
+            String memberName = "member" + i;
+            Optional<String> rackId = isRackAware ? Optional.of("rack" + i % 
numberOfRacks) : Optional.empty();
+            List<Uuid> subscribedTopicIds;
+
+            // When subscriptions are uniform, all members are assigned all 
topics.
+            if (isSubscriptionUniform) {
+                subscribedTopicIds = allTopicIds;
+            } else {
+                subscribedTopicIds = Arrays.asList(
+                    allTopicIds.get(i % topicCount),
+                    allTopicIds.get((i+1) % topicCount)
+                );
+                topicCounter = max (topicCounter, ((i+1) % topicCount));
+
+                if (i == memberCount - 1 && topicCounter < topicCount - 1) {
+                    subscribedTopicIds.addAll(allTopicIds.subList(topicCounter 
+ 1, topicCount - 1));
+                }

Review Comment:
   I see. So if you run with 1000 topics and 10 members, the first 9 members 
will have 2 topics and the 10th one with have 998 topics. Is it correct? I 
understand that it is hard to find a generic way to do this.
   
   An alternative may be to divide the members and the topics into 5 buckets 
and assign the topics in bucket N to the members in bucket N. In this case, we 
would need to restrict the number of topics to be at minimum 5. This is 
probably a more realistic distribution in practice.



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+    public enum AssignorType {
+        RANGE(new RangeAssignor()),
+        UNIFORM(new UniformAssignor());
+
+        private final PartitionAssignor assignor;
+
+        AssignorType(PartitionAssignor assignor) {
+            this.assignor = assignor;
+        }
+
+        public PartitionAssignor assignor() {
+            return assignor;
+        }
+    }
+
+    /**
+     * The subscription pattern followed by the members of the group.
+     *
+     * A subscription model is considered homogenous if all the members of the 
group
+     * are subscribed to the same set of topics, it is heterogeneous otherwise.
+     */
+    public enum SubscriptionModel {
+        HOMOGENEOUS, HETEROGENEOUS
+    }
+
+    @Param({"1000", "10000"})
+    private int memberCount;
+
+    @Param({"10", "50"})
+    private int partitionsPerTopicCount;
+
+    @Param({"100", "1000"})
+    private int topicCount;
+
+    @Param({"true", "false"})
+    private boolean isRackAware;
+
+    @Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+    private SubscriptionModel subscriptionModel;
+
+    @Param({"RANGE", "UNIFORM"})
+    private AssignorType assignorType;
+
+    @Param({"true", "false"})
+    private boolean simulateRebalanceTrigger;
+
+    private PartitionAssignor partitionAssignor;
+
+    private static final int numberOfRacks = 3;

Review Comment:
   nit: Use constant code style.



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/TargetAssignmentBuilderBenchmark.java:
##########
@@ -0,0 +1,259 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.VersionedMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TargetAssignmentBuilderBenchmark {
+
+    @Param({"1000", "10000"})
+    private int memberCount;
+
+    @Param({"10", "50"})
+    private int partitionsPerTopicCount;
+
+    @Param({"1000"})
+    private int topicCount;
+
+    @Param({"true", "false"})
+    private boolean isSubscriptionUniform;
+
+    @Param({"true", "false"})
+    private boolean isRangeAssignor;
+
+    @Param({"true", "false"})
+    private boolean isRackAware;

Review Comment:
   Right but we only care about the overhead. I think that we should either use 
only one assignor, say uniform one, or use a mocked implementation.



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ClientSideAssignorBenchmark.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ClientSideAssignorBenchmark {
+
+    public enum AssignorType {
+        RANGE(new RangeAssignor()),
+        COOPERATIVE_STICKY(new CooperativeStickyAssignor());
+
+        private final ConsumerPartitionAssignor assignor;
+
+        AssignorType(ConsumerPartitionAssignor assignor) {
+            this.assignor = assignor;
+        }
+
+        public ConsumerPartitionAssignor assignor() {
+            return assignor;
+        }
+    }
+
+    /**
+     * The subscription pattern followed by the members of the group.
+     *
+     * A subscription model is considered homogenous if all the members of the 
group
+     * are subscribed to the same set of topics, it is heterogeneous otherwise.
+     */
+    public enum SubscriptionModel {
+        HOMOGENEOUS, HETEROGENEOUS
+    }
+
+    @Param({"1000", "10000"})
+    private int memberCount;
+
+    @Param({"10", "50"})
+    private int partitionsPerTopicCount;
+
+    @Param({"100", "1000"})
+    private int topicCount;
+
+    @Param({"true", "false"})
+    private boolean isRackAware;
+
+    @Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+    private SubscriptionModel subscriptionModel;
+
+    @Param({"RANGE", "COOPERATIVE_STICKY"})
+    private AssignorType assignorType;
+
+    @Param({"true", "false"})
+    private boolean simulateRebalanceTrigger;
+
+    private Map<String, ConsumerPartitionAssignor.Subscription> subscriptions 
= new HashMap<>();
+
+    private ConsumerPartitionAssignor.GroupSubscription groupSubscription;
+
+    private static final int numberOfRacks = 3;
+
+    private static final int replicationFactor = 2;
+
+    private ConsumerPartitionAssignor assignor;
+
+    private Cluster metadata;
+
+    private final List<String> allTopicNames = new ArrayList<>(topicCount);
+
+    @Setup(Level.Trial)
+    public void setup() {
+        // Ensure there are enough racks and brokers for the replication 
factor.
+        if (numberOfRacks < replicationFactor) {
+            throw new IllegalArgumentException("Number of broker racks must be 
at least equal to the replication factor.");
+        }
+
+        populateTopicMetadata();
+
+        addMemberSubscriptions();
+
+        assignor = assignorType.assignor();
+
+        if (simulateRebalanceTrigger) simulateRebalance();
+    }
+
+    private void populateTopicMetadata() {
+        List<PartitionInfo> partitions = new ArrayList<>();
+
+        // Create nodes (brokers), one for each rack.
+        List<Node> nodes = new ArrayList<>(numberOfRacks);
+        for (int i = 0; i < numberOfRacks; i++) {
+            nodes.add(new Node(i, "", i, "rack" + i));
+        }
+
+        for (int i = 0; i < topicCount; i++) {
+            String topicName = "topic" + i;
+            allTopicNames.add(topicName);
+            partitions.addAll(partitionInfos(topicName, 
partitionsPerTopicCount, nodes));
+        }
+
+        metadata = new Cluster("test-cluster", nodes, partitions, 
Collections.emptySet(), Collections.emptySet());
+    }
+
+    private void addMemberSubscriptions() {
+        subscriptions.clear();
+        int topicCounter = 0;
+
+        for (int i = 0; i < memberCount; i++) {
+            String memberName = "member" + i;
+
+            // When subscriptions are homogeneous, all members are assigned 
all topics.
+            List<String> subscribedTopics;
+
+            if (subscriptionModel == SubscriptionModel.HOMOGENEOUS) {
+                subscribedTopics = allTopicNames;
+            } else {
+                subscribedTopics = Arrays.asList(
+                    allTopicNames.get(i % topicCount),
+                    allTopicNames.get((i+1) % topicCount)
+                );
+                topicCounter = max (topicCounter, ((i+1) % topicCount));
+
+                if (i == memberCount - 1 && topicCounter < topicCount - 1) {
+                    subscribedTopics.addAll(allTopicNames.subList(topicCounter 
+ 1, topicCount - 1));
+                }
+            }
+
+            subscriptions.put(memberName, subscription(subscribedTopics, i));
+        }
+
+        groupSubscription = new 
ConsumerPartitionAssignor.GroupSubscription(subscriptions);
+    }
+
+    private List<PartitionInfo> partitionInfos(String topic, int 
numberOfPartitions, List<Node> nodes) {
+        // Create PartitionInfo for each partition.
+        List<PartitionInfo> partitionInfos = new 
ArrayList<>(numberOfPartitions);
+        for (int i = 0; i < numberOfPartitions; i++) {
+            Node[] replicas = new Node[replicationFactor];
+            for (int j = 0; j < replicationFactor; j++) {
+                // Assign nodes based on partition number to mimic 
mkMapOfPartitionRacks logic.
+                int nodeIndex = (i + j) % numberOfRacks;
+                replicas[j] = nodes.get(nodeIndex);
+            }
+            partitionInfos.add(new PartitionInfo(topic, i, replicas[0], 
replicas, replicas));
+        }
+
+        return partitionInfos;
+    }
+
+    protected ConsumerPartitionAssignor.Subscription subscription(List<String> 
topics, int consumerIndex) {
+        Optional<String> rackId = rackId(consumerIndex);
+        return new ConsumerPartitionAssignor.Subscription(
+            topics,
+            null,
+            Collections.emptyList(),
+            DEFAULT_GENERATION,
+            rackId
+        );
+    }
+
+    private Optional<String> rackId(int index) {
+        return isRackAware ? Optional.of("rack" + index % numberOfRacks) : 
Optional.empty();
+    }
+
+    protected ConsumerPartitionAssignor.Subscription 
subscriptionWithOwnedPartitions(

Review Comment:
   nit: private?



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+    public enum AssignorType {
+        RANGE(new RangeAssignor()),
+        UNIFORM(new UniformAssignor());
+
+        private final PartitionAssignor assignor;
+
+        AssignorType(PartitionAssignor assignor) {
+            this.assignor = assignor;
+        }
+
+        public PartitionAssignor assignor() {
+            return assignor;
+        }
+    }
+
+    /**
+     * The subscription pattern followed by the members of the group.
+     *
+     * A subscription model is considered homogenous if all the members of the 
group
+     * are subscribed to the same set of topics, it is heterogeneous otherwise.
+     */
+    public enum SubscriptionModel {
+        HOMOGENEOUS, HETEROGENEOUS
+    }
+
+    @Param({"1000", "10000"})
+    private int memberCount;
+
+    @Param({"10", "50"})
+    private int partitionsPerTopicCount;
+
+    @Param({"100", "1000"})
+    private int topicCount;
+
+    @Param({"true", "false"})
+    private boolean isRackAware;
+
+    @Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+    private SubscriptionModel subscriptionModel;
+
+    @Param({"RANGE", "UNIFORM"})
+    private AssignorType assignorType;
+
+    @Param({"true", "false"})
+    private boolean simulateRebalanceTrigger;
+
+    private PartitionAssignor partitionAssignor;
+
+    private static final int numberOfRacks = 3;
+
+    private AssignmentSpec assignmentSpec;
+
+    private SubscribedTopicDescriber subscribedTopicDescriber;
+
+    @Setup(Level.Trial)
+    public void setup() {
+        Map<Uuid, TopicMetadata> topicMetadata = createTopicMetadata();
+        subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+        createAssignmentSpec(topicMetadata);
+
+        partitionAssignor = assignorType.assignor();
+
+        if (simulateRebalanceTrigger) {
+            simulateIncrementalRebalance(topicMetadata);
+        }
+    }
+
+    private Map<Uuid, TopicMetadata> createTopicMetadata() {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        Map<Integer, Set<String>> partitionRacks = isRackAware ?
+            mkMapOfPartitionRacks(partitionsPerTopicCount) :
+            Collections.emptyMap();
+
+        for (int i = 1; i <= topicCount; i++) {
+            Uuid topicUuid = Uuid.randomUuid();
+            String topicName = "topic" + i;
+            topicMetadata.put(topicUuid, new TopicMetadata(
+                topicUuid,
+                topicName,
+                partitionsPerTopicCount,
+                partitionRacks
+            ));
+        }
+
+        return topicMetadata;
+    }
+
+    private void createAssignmentSpec(Map<Uuid, TopicMetadata> topicMetadata) {
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+        List<Uuid> allTopicIds = new ArrayList<>(topicMetadata.keySet());
+        int topicCounter = 0;
+
+        for (int i = 0; i < memberCount; i++) {
+            String memberName = "member" + i;
+            Optional<String> rackId = rackId(i);
+
+            // When subscriptions are homogenous, all members are assigned all 
topics.
+            List<Uuid> subscribedTopicIds;
+
+            if (subscriptionModel == SubscriptionModel.HOMOGENEOUS) {
+                subscribedTopicIds = allTopicIds;
+            } else {
+                subscribedTopicIds = Arrays.asList(
+                    allTopicIds.get(i % topicCount),
+                    allTopicIds.get((i+1) % topicCount)
+                );
+                topicCounter = max (topicCounter, ((i+1) % topicCount));
+
+                if (i == memberCount - 1 && topicCounter < topicCount - 1) {
+                    subscribedTopicIds.addAll(allTopicIds.subList(topicCounter 
+ 1, topicCount - 1));
+                }
+            }
+
+            members.put(memberName, new AssignmentMemberSpec(
+                Optional.empty(),
+                rackId,
+                subscribedTopicIds,
+                Collections.emptyMap()
+            ));
+        }
+
+        this.assignmentSpec = new AssignmentSpec(members);
+    }
+
+    private Optional<String> rackId(int index) {
+        return isRackAware ? Optional.of("rack" + index % numberOfRacks) : 
Optional.empty();
+    }
+
+    private static Map<Integer, Set<String>> mkMapOfPartitionRacks(int 
numPartitions) {
+        Map<Integer, Set<String>> partitionRacks = new 
HashMap<>(numPartitions);
+        for (int i = 0; i < numPartitions; i++) {
+            partitionRacks.put(i, new HashSet<>(Arrays.asList("rack" + i % 
numberOfRacks, "rack" + (i + 1) % numberOfRacks)));
+        }
+        return partitionRacks;
+    }
+
+    private void simulateIncrementalRebalance(Map<Uuid, TopicMetadata> 
topicMetadata) {
+        GroupAssignment initialAssignment = 
partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+        Map<String, MemberAssignment> members = initialAssignment.members();
+
+        Map<String, AssignmentMemberSpec> updatedMembers = new HashMap<>();
+        members.forEach((memberId, memberAssignment) -> {
+            AssignmentMemberSpec memberSpec = 
assignmentSpec.members().get(memberId);
+            updatedMembers.put(memberId, new AssignmentMemberSpec(
+                memberSpec.instanceId(),
+                memberSpec.rackId(),
+                memberSpec.subscribedTopicIds(),
+                memberAssignment.targetPartitions()
+            ));
+        });
+
+        Optional<String> rackId = isRackAware ? Optional.of("rack" + 
(memberCount + 1) % numberOfRacks) : Optional.empty();

Review Comment:
   Could we reuse `rackId()`?



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ClientSideAssignorBenchmark.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ClientSideAssignorBenchmark {
+
+    public enum AssignorType {
+        RANGE(new RangeAssignor()),
+        COOPERATIVE_STICKY(new CooperativeStickyAssignor());
+
+        private final ConsumerPartitionAssignor assignor;
+
+        AssignorType(ConsumerPartitionAssignor assignor) {
+            this.assignor = assignor;
+        }
+
+        public ConsumerPartitionAssignor assignor() {
+            return assignor;
+        }
+    }
+
+    /**
+     * The subscription pattern followed by the members of the group.
+     *
+     * A subscription model is considered homogenous if all the members of the 
group
+     * are subscribed to the same set of topics, it is heterogeneous otherwise.
+     */
+    public enum SubscriptionModel {
+        HOMOGENEOUS, HETEROGENEOUS
+    }
+
+    @Param({"1000", "10000"})
+    private int memberCount;
+
+    @Param({"10", "50"})
+    private int partitionsPerTopicCount;
+
+    @Param({"100", "1000"})
+    private int topicCount;
+
+    @Param({"true", "false"})
+    private boolean isRackAware;
+
+    @Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+    private SubscriptionModel subscriptionModel;
+
+    @Param({"RANGE", "COOPERATIVE_STICKY"})
+    private AssignorType assignorType;
+
+    @Param({"true", "false"})
+    private boolean simulateRebalanceTrigger;
+
+    private Map<String, ConsumerPartitionAssignor.Subscription> subscriptions 
= new HashMap<>();
+
+    private ConsumerPartitionAssignor.GroupSubscription groupSubscription;
+
+    private static final int numberOfRacks = 3;
+
+    private static final int replicationFactor = 2;

Review Comment:
   I also wonder if we should just keep `NUMBER_OF_RACKS` and assume that the 
replication factor is the same in order to simplify the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to