Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-25 Thread via GitHub


dajac merged PR #15717:
URL: https://github.com/apache/kafka/pull/15717


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-24 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1578083329


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private static final int MAX_BUCKET_COUNT = 5;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTo

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-24 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1578083329


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private static final int MAX_BUCKET_COUNT = 5;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTo

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-24 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1578074445


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private static final int MAX_BUCKET_COUNT = 5;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTo

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-24 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1578074445


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private static final int MAX_BUCKET_COUNT = 5;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTo

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-24 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1577437212


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private static final int MAX_BUCKET_COUNT = 5;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>();
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(to

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-23 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1576516058


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private static final int MAX_BUCKET_COUNT = 5;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTo

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-23 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1576507146


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private static final int MAX_BUCKET_COUNT = 5;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTo

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-23 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1576085044


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private static final int MAX_BUCKET_COUNT = 5;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicM

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-22 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1574417900


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssig

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1573016246


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new Ha

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1573016609


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new Ha

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1573016246


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new Ha

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1572735673


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new Ha

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1572729047


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.VersionedMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TargetAssignmentBuilderBenchmark {
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+private static final String GROUP_ID = "benchmark-group";
+
+private static final int GROUP_EPOCH = 0;
+
+private PartitionAssignor partitionAssignor;
+
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+private TargetAssignmentBuilder targetAssignmentBuilder;
+
+private AssignmentSpec assignmentSpec;
+
+private final List allTopicNames = new ArrayList<>(topicCount);
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+// For this benchmark we will use the Uniform Assignor
+// and a group that has a homogeneous subscription model.
+partitionAssignor = new UniformAssignor();
+
+subscriptionMetadata = generateMockSubscriptionMetadata();
+Map members = generateMockMembers();
+Map existingTargetAssignment = 
generateMockInitialTargetAssignment();
+
+// Add a new member to trigger a rebalance.
+Set subscribedTopics = new 
HashSet<>(subscriptionMetadata.keySet());
+
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder("new-member")
+.setSubscribedTopicNames(new ArrayList<>(subscribedTopics))
+.build();
+
+targetAssignmentBuilder = new TargetAssignmentBuilder(GROUP_ID, 
GROUP_EPOCH, partitionAssignor)
+.withMembers(members)
+.withSubscriptionMetadata(subscriptionMetadata)
+.withTargetAssignment(existingTargetAssignment)
+.addOrUpdateMember(newMember.memberId(), newMember);
+}
+
+private Map generateMockMembers() {
+Map members = new HashMap<>();
+
+for (int i = 0; i < memberCount - 1; i++) 

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1572726906


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionA

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1572314707


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssig

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-19 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1572307205


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new HashMa

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1571435096


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;
+
+private PartitionAssignor partitionAssignor;
+
+private final int numberOfRacks = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = new HashMap<>();
+Map> partitionRacks = isRackAware ?
+mkMapOfPartitionRacks(partitionsPerTopicCount) :
+Collections.emptyMap();
+
+for (int i = 1; i <= topicCount; i++) {
+Uuid topicUuid = Uuid.randomUuid();
+String topicName = "topic" + i;
+topicMetadata.put(topicUuid, new TopicMetadata(
+topicUuid, topicName, partitionsPerTopicCount, 
partitionRacks));
+}
+
+addTopicSubscriptions(topicMetadata);
+this.subscribedTopicDescriber = new 
SubscribedTopicMetadata(topicMetadata);
+
+if (isRangeAssignor) {
+this.partitionAssignor = new RangeAssignor();
+} else {
+this.partitionAssignor = new UniformAssignor();
+}
+
+if (isReassignment) {
+GroupAssignment initialAssignment = 
partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+Map members;
+
+members = initialAssignment.members();
+
+// Update the AssignmentSpec with the results from the initial 
assignment.
+Map updatedMembers = new HashMap<>();
+
+members.forEach((memberId, memberAssignment) -> {
+AssignmentMemberSpec memberSpec = 
assignmentSpec.members().get(memberId);
+updatedMembers.put(memberId, new AssignmentMemberSpec(
+memberSpec.instanceId(),
+memberSpec.rackId(),
+memberSpec.subscribedTopicIds(),
+memberAssignment.targetPartitions()
+));
+});
+
+// Add new member to trigger a reassignment.
+Optional rackId = isRackAware ? Optional.of("rack" + 
(memberCount + 1) % numberOfRacks) : Optional.empty();
+
+updatedMembers.put("newMember", new AssignmentMemberSpec(
+Optional.empty(),
+rackId,
+topicMetadata.keySet(),
+Collections.emptyMap()
+ 

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1571348485


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ClientSideAssignorBenchmark.java:
##
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ClientSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+COOPERATIVE_STICKY(new CooperativeStickyAssignor());
+
+private final ConsumerPartitionAssignor assignor;
+
+AssignorType(ConsumerPartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public ConsumerPartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "COOPERATIVE_STICKY"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private Map subscriptions 
= new HashMap<>();
+
+private ConsumerPartitionAssignor.GroupSubscription groupSubscription;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private ConsumerPartitionAssignor assignor;
+
+private Cluster metadata;
+
+private final List allTopicNames = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+// Ensure there are enough racks and brokers for the replication 
factor.
+if (NUMBER_OF_RACKS < 2) {
+throw new IllegalArgumentException("Number of broker racks must be 
at least equal to 2.");
+}
+
+populateTopicMetadata();
+
+addMemberSubscriptions();
+
+assignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) simulateRebalance();
+}
+
+private void populateTopicMetadata() {
+List partitions = new ArrayList<>();
+int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) 
/ topicCount;
+
+// Create nodes 

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1571339288


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new Ha

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1571327160


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;

Review Comment:
   Okay, I'll add that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1571012712


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)

Review Comment:
   sry I missed that, I changed it in the next commit
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1570226001


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)

Review Comment:
   We should use better defaults than this. I think that we usually use 5 and 
10/15.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1570135556


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.VersionedMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TargetAssignmentBuilderBenchmark {
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+String groupId = "benchmark-group";
+
+private static final int groupEpoch = 0;
+
+private PartitionAssignor partitionAssignor;
+
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+private TargetAssignmentBuilder targetAssignmentBuilder;
+
+private AssignmentSpec assignmentSpec;
+
+private static final int numberOfRacks = 3;
+
+private final List allTopicNames = new ArrayList<>(topicCount);
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+// For this benchmark we will use the Uniform Assignor
+// and a group that has a homogeneous subscription model.
+partitionAssignor = new UniformAssignor();
+subscriptionMetadata = generateMockSubscriptionMetadata();
+Map members = generateMockMembers();
+Map existingTargetAssignment = 
generateMockInitialTargetAssignment();
+
+// Add a new member to trigger a rebalance.
+Set subscribedTopics = new 
HashSet<>(subscriptionMetadata.keySet());
+String rackId = isRackAware ? "rack" + (memberCount - 1) % 
numberOfRacks : "";

Review Comment:
   removed rack aware now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1570135213


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.VersionedMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TargetAssignmentBuilderBenchmark {
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+String groupId = "benchmark-group";
+
+private static final int groupEpoch = 0;
+
+private PartitionAssignor partitionAssignor;
+
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+private TargetAssignmentBuilder targetAssignmentBuilder;
+
+private AssignmentSpec assignmentSpec;
+
+private static final int numberOfRacks = 3;
+
+private final List allTopicNames = new ArrayList<>(topicCount);
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+// For this benchmark we will use the Uniform Assignor
+// and a group that has a homogeneous subscription model.
+partitionAssignor = new UniformAssignor();
+subscriptionMetadata = generateMockSubscriptionMetadata();
+Map members = generateMockMembers();
+Map existingTargetAssignment = 
generateMockInitialTargetAssignment();
+
+// Add a new member to trigger a rebalance.
+Set subscribedTopics = new 
HashSet<>(subscriptionMetadata.keySet());
+String rackId = isRackAware ? "rack" + (memberCount - 1) % 
numberOfRacks : "";
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder("new-member")
+.setSubscribedTopicNames(new ArrayList<>(subscribedTopics))
+.setRackId(rackId)
+.build();
+
+targetAssignmentBuilder = new TargetAssignmentBuilder(groupId, 
groupEpoch, partitionAssignor)
+.withMembers(members)
+.withSubscriptionMetadata(subscriptionMetadata)
+.withTargetAssignment

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1570128341


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.VersionedMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TargetAssignmentBuilderBenchmark {
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;

Review Comment:
   okay



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1570122759


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.VersionedMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TargetAssignmentBuilderBenchmark {
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+String groupId = "benchmark-group";
+
+private static final int groupEpoch = 0;
+
+private PartitionAssignor partitionAssignor;
+
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+private TargetAssignmentBuilder targetAssignmentBuilder;
+
+private AssignmentSpec assignmentSpec;
+
+private static final int numberOfRacks = 3;
+
+private final List allTopicNames = new ArrayList<>(topicCount);
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+// For this benchmark we will use the Uniform Assignor
+// and a group that has a homogeneous subscription model.
+partitionAssignor = new UniformAssignor();
+subscriptionMetadata = generateMockSubscriptionMetadata();
+Map members = generateMockMembers();
+Map existingTargetAssignment = 
generateMockInitialTargetAssignment();
+
+// Add a new member to trigger a rebalance.
+Set subscribedTopics = new 
HashSet<>(subscriptionMetadata.keySet());
+String rackId = isRackAware ? "rack" + (memberCount - 1) % 
numberOfRacks : "";

Review Comment:
   It is defined with the Optional return value
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-17 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1568482951


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new HashMa

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-17 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1568383960


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;
+
+private PartitionAssignor partitionAssignor;
+
+private final int numberOfRacks = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = new HashMap<>();
+Map> partitionRacks = isRackAware ?
+mkMapOfPartitionRacks(partitionsPerTopicCount) :
+Collections.emptyMap();
+
+for (int i = 1; i <= topicCount; i++) {
+Uuid topicUuid = Uuid.randomUuid();
+String topicName = "topic" + i;
+topicMetadata.put(topicUuid, new TopicMetadata(
+topicUuid, topicName, partitionsPerTopicCount, 
partitionRacks));
+}
+
+addTopicSubscriptions(topicMetadata);
+this.subscribedTopicDescriber = new 
SubscribedTopicMetadata(topicMetadata);
+
+if (isRangeAssignor) {
+this.partitionAssignor = new RangeAssignor();
+} else {
+this.partitionAssignor = new UniformAssignor();
+}
+
+if (isReassignment) {
+GroupAssignment initialAssignment = 
partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+Map members;
+
+members = initialAssignment.members();
+
+// Update the AssignmentSpec with the results from the initial 
assignment.
+Map updatedMembers = new HashMap<>();
+
+members.forEach((memberId, memberAssignment) -> {
+AssignmentMemberSpec memberSpec = 
assignmentSpec.members().get(memberId);
+updatedMembers.put(memberId, new AssignmentMemberSpec(
+memberSpec.instanceId(),
+memberSpec.rackId(),
+memberSpec.subscribedTopicIds(),
+memberAssignment.targetPartitions()
+));
+});
+
+// Add new member to trigger a reassignment.
+Optional rackId = isRackAware ? Optional.of("rack" + 
(memberCount + 1) % numberOfRacks) : Optional.empty();
+
+updatedMembers.put("newMember", new AssignmentMemberSpec(
+Optional.empty(),
+rackId,
+topicMetadata.keySet(),
+Collections.emptyMap()
+)

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-16 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1568011957


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ClientSideAssignorBenchmark.java:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ClientSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+COOPERATIVE_STICKY(new CooperativeStickyAssignor());
+
+private final ConsumerPartitionAssignor assignor;
+
+AssignorType(ConsumerPartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public ConsumerPartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"1000", "1"})
+private int memberCount;
+
+@Param({"10", "50"})
+private int partitionsPerTopicCount;
+
+@Param({"100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "COOPERATIVE_STICKY"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private Map subscriptions 
= new HashMap<>();
+
+private ConsumerPartitionAssignor.GroupSubscription groupSubscription;
+
+private static final int numberOfRacks = 3;
+
+private static final int replicationFactor = 2;

Review Comment:
   For the server side tests I always used replication factor as 2, so in order 
to get the same distribution of racks for consistency I added the replication 
factor as 2. That being said I think I'll just hardcode the replication factor 
here as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-16 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1568009821


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-16 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1568009691


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org