[ 
https://issues.apache.org/jira/browse/KAFKA-7515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16667531#comment-16667531
 ] 

ASF GitHub Bot commented on KAFKA-7515:
---------------------------------------

cmccabe closed pull request #5810: KAFKA-7515: Trogdor - Add Consumer Group 
Benchmark Specification
URL: https://github.com/apache/kafka/pull/5810
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/TROGDOR.md b/TROGDOR.md
index 3783d7e8193..d71455a6e49 100644
--- a/TROGDOR.md
+++ b/TROGDOR.md
@@ -141,7 +141,8 @@ ProduceBench starts a Kafka producer on a single agent 
node, producing to severa
 RoundTripWorkload tests both production and consumption.  The workload starts 
a Kafka producer and consumer on a single node.  The consumer will read back 
the messages that were produced by the producer.
 
 ### ConsumeBench
-ConsumeBench starts a Kafka consumer on a single agent node.  The workload 
measures the average produce latency, as well as the median, 95th percentile, 
and 99th percentile latency.
+ConsumeBench starts a Kafka consumer on a single agent node. Depending on the 
passed in configuration (see ConsumeBenchSpec), the consumer either subscribes 
to a set of topics (leveraging consumer group functionality) or manually 
assigns partitions to itself.
+The workload measures the average produce latency, as well as the median, 95th 
percentile, and 99th percentile latency.
 
 Faults
 ========================================
diff --git a/tests/bin/trogdor-run-consume-bench.sh 
b/tests/bin/trogdor-run-consume-bench.sh
index 2e0239e4b02..be9a2f1a941 100755
--- a/tests/bin/trogdor-run-consume-bench.sh
+++ b/tests/bin/trogdor-run-consume-bench.sh
@@ -26,12 +26,7 @@ cat <<EOF
         "consumerNode": "node0",
         "bootstrapServers": "localhost:9092",
         "maxMessages": 100,
-        "activeTopics": {
-            "foo[1-3]": {
-                "numPartitions": 3,
-                "replicationFactor": 1
-            }
-        }
+        "activeTopics": ["foo[1-3]"]
     }
 }
 EOF
diff --git a/tests/kafkatest/services/trogdor/consume_bench_workload.py 
b/tests/kafkatest/services/trogdor/consume_bench_workload.py
new file mode 100644
index 00000000000..9e61b11928d
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/consume_bench_workload.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.services.service import Service
+from kafkatest.services.trogdor.task_spec import TaskSpec
+
+
+class ConsumeBenchWorkloadSpec(TaskSpec):
+    def __init__(self, start_ms, duration_ms, consumer_node, bootstrap_servers,
+                 target_messages_per_sec, max_messages, active_topics,
+                 consumer_conf, common_client_conf, admin_client_conf, 
consumer_group=None):
+        super(ConsumeBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
+        self.message["class"] = 
"org.apache.kafka.trogdor.workload.ConsumeBenchSpec"
+        self.message["consumerNode"] = consumer_node
+        self.message["bootstrapServers"] = bootstrap_servers
+        self.message["targetMessagesPerSec"] = target_messages_per_sec
+        self.message["maxMessages"] = max_messages
+        self.message["consumerConf"] = consumer_conf
+        self.message["adminClientConf"] = admin_client_conf
+        self.message["commonClientConf"] = common_client_conf
+        self.message["activeTopics"] = active_topics
+        if consumer_group is not None:
+            self.message["consumerGroup"] = consumer_group
+
+
+class ConsumeBenchWorkloadService(Service):
+    def __init__(self, context, kafka):
+        Service.__init__(self, context, num_nodes=1)
+        self.bootstrap_servers = kafka.bootstrap_servers(validate=False)
+        self.consumer_node = self.nodes[0].account.hostname
+
+    def free(self):
+        Service.free(self)
+
+    def wait_node(self, node, timeout_sec=None):
+        pass
+
+    def stop_node(self, node):
+        pass
+
+    def clean_node(self, node):
+        pass
\ No newline at end of file
diff --git a/tests/kafkatest/services/trogdor/produce_bench_workload.py 
b/tests/kafkatest/services/trogdor/produce_bench_workload.py
index 7eac4ee841f..cf6a962b055 100644
--- a/tests/kafkatest/services/trogdor/produce_bench_workload.py
+++ b/tests/kafkatest/services/trogdor/produce_bench_workload.py
@@ -20,8 +20,8 @@
 
 class ProduceBenchWorkloadSpec(TaskSpec):
     def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers,
-                 target_messages_per_sec, max_messages, producer_conf,
-                 inactive_topics, active_topics):
+                 target_messages_per_sec, max_messages, producer_conf, 
admin_client_conf,
+                 common_client_conf, inactive_topics, active_topics):
         super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
         self.message["class"] = 
"org.apache.kafka.trogdor.workload.ProduceBenchSpec"
         self.message["producerNode"] = producer_node
@@ -29,6 +29,8 @@ def __init__(self, start_ms, duration_ms, producer_node, 
bootstrap_servers,
         self.message["targetMessagesPerSec"] = target_messages_per_sec
         self.message["maxMessages"] = max_messages
         self.message["producerConf"] = producer_conf
+        self.message["adminClientConf"] = admin_client_conf
+        self.message["commonClientConf"] = common_client_conf
         self.message["inactiveTopics"] = inactive_topics
         self.message["activeTopics"] = active_topics
 
diff --git a/tests/kafkatest/tests/core/consume_bench_test.py 
b/tests/kafkatest/tests/core/consume_bench_test.py
new file mode 100644
index 00000000000..bec7416a40d
--- /dev/null
+++ b/tests/kafkatest/tests/core/consume_bench_test.py
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from ducktape.mark import parametrize
+from ducktape.tests.test import Test
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.trogdor.produce_bench_workload import 
ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
+from kafkatest.services.trogdor.consume_bench_workload import 
ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
+from kafkatest.services.trogdor.task_spec import TaskSpec
+from kafkatest.services.trogdor.trogdor import TrogdorService
+from kafkatest.services.zookeeper import ZookeeperService
+
+
+class ConsumeBenchTest(Test):
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(ConsumeBenchTest, self).__init__(test_context)
+        self.zk = ZookeeperService(test_context, num_nodes=3)
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk)
+        self.producer_workload_service = 
ProduceBenchWorkloadService(test_context, self.kafka)
+        self.consumer_workload_service = 
ConsumeBenchWorkloadService(test_context, self.kafka)
+        self.consumer_workload_service_2 = 
ConsumeBenchWorkloadService(test_context, self.kafka)
+        self.active_topics = {"consume_bench_topic[0-5]": {"numPartitions": 5, 
"replicationFactor": 3}}
+        self.trogdor = TrogdorService(context=self.test_context,
+                                      client_services=[self.kafka, 
self.producer_workload_service,
+                                                       
self.consumer_workload_service,
+                                                       
self.consumer_workload_service_2])
+
+    def setUp(self):
+        self.trogdor.start()
+        self.zk.start()
+        self.kafka.start()
+
+    def teardown(self):
+        self.trogdor.stop()
+        self.kafka.stop()
+        self.zk.stop()
+
+    def produce_messages(self, topics, max_messages=10000):
+        produce_spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                                
self.producer_workload_service.producer_node,
+                                                
self.producer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=max_messages,
+                                                producer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                inactive_topics={},
+                                                active_topics=topics)
+        produce_workload = self.trogdor.create_task("produce_workload", 
produce_spec)
+        produce_workload.wait_for_done(timeout_sec=180)
+        self.logger.debug("Produce workload finished")
+
+    @parametrize(topics=["consume_bench_topic[0-5]"]) # topic subscription
+    @parametrize(topics=["consume_bench_topic[0-5]:[0-4]"])  # manual topic 
assignment
+    def test_consume_bench(self, topics):
+        """
+        Runs a ConsumeBench workload to consume messages
+        """
+        self.produce_messages(self.active_topics)
+        consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                                
self.consumer_workload_service.consumer_node,
+                                                
self.consumer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=10000,
+                                                consumer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                active_topics=topics)
+        consume_workload = self.trogdor.create_task("consume_workload", 
consume_spec)
+        consume_workload.wait_for_done(timeout_sec=360)
+        self.logger.debug("Consume workload finished")
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
+
+    def test_consume_bench_single_partition(self):
+        """
+        Run a ConsumeBench against a single partition
+        """
+        active_topics = {"consume_bench_topic": {"numPartitions": 2, 
"replicationFactor": 3}}
+        self.produce_messages(active_topics, 5000)
+        consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                                
self.consumer_workload_service.consumer_node,
+                                                
self.consumer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=2500,
+                                                consumer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                
active_topics=["consume_bench_topic:1"])
+        consume_workload = self.trogdor.create_task("consume_workload", 
consume_spec)
+        consume_workload.wait_for_done(timeout_sec=180)
+        self.logger.debug("Consume workload finished")
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
+
+    def test_consume_group_bench(self):
+        """
+        Runs two ConsumeBench workloads in the same consumer group to read 
messages from topics
+        """
+        self.produce_messages(self.active_topics)
+        consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                                
self.consumer_workload_service.consumer_node,
+                                                
self.consumer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=2000, # both 
should read at least 2k messages
+                                                consumer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                consumer_group="testGroup",
+                                                
active_topics=["consume_bench_topic[0-5]"])
+        consume_workload_1 = self.trogdor.create_task("consume_workload_1", 
consume_spec)
+        consume_workload_2 = self.trogdor.create_task("consume_workload_2", 
consume_spec)
+        consume_workload_1.wait_for_done(timeout_sec=360)
+        self.logger.debug("Consume workload 1 finished")
+        consume_workload_2.wait_for_done(timeout_sec=360)
+        self.logger.debug("Consume workload 2 finished")
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
diff --git a/tests/kafkatest/tests/core/produce_bench_test.py 
b/tests/kafkatest/tests/core/produce_bench_test.py
index 6a1724dc3d9..125ee941eb5 100644
--- a/tests/kafkatest/tests/core/produce_bench_test.py
+++ b/tests/kafkatest/tests/core/produce_bench_test.py
@@ -51,6 +51,8 @@ def test_produce_bench(self):
                                         target_messages_per_sec=1000,
                                         max_messages=100000,
                                         producer_conf={},
+                                        admin_client_conf={},
+                                        common_client_conf={},
                                         inactive_topics=inactive_topics,
                                         active_topics=active_topics)
         workload1 = self.trogdor.create_task("workload1", spec)
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
index 1b429ead3c8..6d4c67cde40 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
@@ -20,20 +20,65 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.trogdor.common.StringExpander;
 import org.apache.kafka.trogdor.task.TaskController;
 import org.apache.kafka.trogdor.task.TaskSpec;
 import org.apache.kafka.trogdor.task.TaskWorker;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.Set;
+import java.util.HashSet;
 
 /**
- * The specification for a benchmark that produces messages to a set of topics.
+ * The specification for a benchmark that consumer messages from a set of 
topic/partitions.
+ *
+ * If a consumer group is not given to the specification, a random one will be 
generated and
+ *  used to track offsets/subscribe to topics.
+ *
+ * This specification uses a specific way to represent a topic partition via 
its "activeTopics" field.
+ * The notation for that is topic_name:partition_number (e.g "foo:1" 
represents partition-1 of topic "foo")
+ * Note that a topic name cannot have more than one colon.
+ *
+ * The "activeTopics" field also supports ranges that get expanded. See 
#{@link StringExpander}.
+ *
+ * There now exists a clever and succinct way to represent multiple partitions 
of multiple topics.
+ * Example:
+ * Given "activeTopics": ["foo[1-3]:[1-3]"], "foo[1-3]:[1-3]" will get
+ * expanded to [foo1:1, foo1:2, foo1:3, foo2:1, ..., foo3:3].
+ * This represents all partitions 1-3 for the three topics foo1, foo2 and foo3.
+ *
+ * If there is at least one topic:partition pair, the consumer will be 
manually assigned partitions via
+ * #{@link org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection)}.
+ * Note that in this case the consumer will fetch and assign all partitions 
for a topic if no partition is given for it (e.g ["foo:1", "bar"])
+ *
+ * If there are no topic:partition pairs given, the consumer will subscribe to 
the topics via
+ * #{@link 
org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)}.
+ * It will be assigned partitions dynamically from the consumer group.
+ *
+ * An example JSON representation which will result in a consumer that is part 
of the consumer group "cg" and
+ * subscribed to topics foo1, foo2, foo3 and bar.
+ * #{@code
+ *    {
+ *        "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
+ *        "durationMs": 10000000,
+ *        "consumerNode": "node0",
+ *        "bootstrapServers": "localhost:9092",
+ *        "maxMessages": 100,
+ *        "consumerGroup": "cg",
+ *        "activeTopics": ["foo[1-3]", "bar"]
+ *    }
+ * }
  */
 public class ConsumeBenchSpec extends TaskSpec {
 
+    static final String EMPTY_CONSUMER_GROUP = "";
+    private static final String VALID_EXPANDED_TOPIC_NAME_PATTERN = 
"^[^:]+(:[\\d]+|[^:]*)$";
     private final String consumerNode;
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
@@ -41,7 +86,8 @@
     private final Map<String, String> consumerConf;
     private final Map<String, String> adminClientConf;
     private final Map<String, String> commonClientConf;
-    private final TopicsSpec activeTopics;
+    private final List<String> activeTopics;
+    private final String consumerGroup;
 
     @JsonCreator
     public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@@ -50,10 +96,11 @@ public ConsumeBenchSpec(@JsonProperty("startMs") long 
startMs,
                             @JsonProperty("bootstrapServers") String 
bootstrapServers,
                             @JsonProperty("targetMessagesPerSec") int 
targetMessagesPerSec,
                             @JsonProperty("maxMessages") int maxMessages,
+                            @JsonProperty("consumerGroup") String 
consumerGroup,
                             @JsonProperty("consumerConf") Map<String, String> 
consumerConf,
                             @JsonProperty("commonClientConf") Map<String, 
String> commonClientConf,
                             @JsonProperty("adminClientConf") Map<String, 
String> adminClientConf,
-                            @JsonProperty("activeTopics") TopicsSpec 
activeTopics) {
+                            @JsonProperty("activeTopics") List<String> 
activeTopics) {
         super(startMs, durationMs);
         this.consumerNode = (consumerNode == null) ? "" : consumerNode;
         this.bootstrapServers = (bootstrapServers == null) ? "" : 
bootstrapServers;
@@ -62,7 +109,8 @@ public ConsumeBenchSpec(@JsonProperty("startMs") long 
startMs,
         this.consumerConf = configOrEmptyMap(consumerConf);
         this.commonClientConf = configOrEmptyMap(commonClientConf);
         this.adminClientConf = configOrEmptyMap(adminClientConf);
-        this.activeTopics = activeTopics == null ? TopicsSpec.EMPTY : 
activeTopics.immutableCopy();
+        this.activeTopics = activeTopics == null ? new ArrayList<>() : 
activeTopics;
+        this.consumerGroup = consumerGroup == null ? EMPTY_CONSUMER_GROUP : 
consumerGroup;
     }
 
     @JsonProperty
@@ -70,6 +118,11 @@ public String consumerNode() {
         return consumerNode;
     }
 
+    @JsonProperty
+    public String consumerGroup() {
+        return consumerGroup;
+    }
+
     @JsonProperty
     public String bootstrapServers() {
         return bootstrapServers;
@@ -101,22 +154,67 @@ public int maxMessages() {
     }
 
     @JsonProperty
-    public TopicsSpec activeTopics() {
+    public List<String> activeTopics() {
         return activeTopics;
     }
 
     @Override
     public TaskController newController(String id) {
-        return new TaskController() {
-            @Override
-            public Set<String> targetNodes(Topology topology) {
-                return Collections.singleton(consumerNode);
-            }
-        };
+        return topology -> Collections.singleton(consumerNode);
     }
 
     @Override
     public TaskWorker newTaskWorker(String id) {
         return new ConsumeBenchWorker(id, this);
     }
+
+    /**
+     * Materializes a list of topic names (optionally with ranges) into a map 
of the topics and their partitions
+     *
+     * Example:
+     * ['foo[1-3]', 'foobar:2', 'bar[1-2]:[1-2]'] => {'foo1': [], 'foo2': [], 
'foo3': [], 'foobar': [2],
+     *                                                'bar1': [1, 2], 'bar2': 
[1, 2] }
+     */
+    Map<String, List<TopicPartition>> materializeTopics() {
+        Map<String, List<TopicPartition>> partitionsByTopics = new HashMap<>();
+
+        for (String rawTopicName : this.activeTopics) {
+            Set<String> expandedNames = expandTopicName(rawTopicName);
+            if 
(!expandedNames.iterator().next().matches(VALID_EXPANDED_TOPIC_NAME_PATTERN))
+                throw new IllegalArgumentException(String.format("Expanded 
topic name %s is invalid", rawTopicName));
+
+            for (String topicName : expandedNames) {
+                TopicPartition partition = null;
+                if (topicName.contains(":")) {
+                    String[] topicAndPartition = topicName.split(":");
+                    topicName = topicAndPartition[0];
+                    partition = new TopicPartition(topicName, 
Integer.parseInt(topicAndPartition[1]));
+                }
+                if (!partitionsByTopics.containsKey(topicName)) {
+                    partitionsByTopics.put(topicName, new ArrayList<>());
+                }
+                if (partition != null) {
+                    partitionsByTopics.get(topicName).add(partition);
+                }
+            }
+        }
+
+        return partitionsByTopics;
+    }
+
+    /**
+     * Expands a topic name until there are no more ranges in it
+     */
+    private Set<String> expandTopicName(String topicName) {
+        Set<String> expandedNames = StringExpander.expand(topicName);
+        if (expandedNames.size() == 1) {
+            return expandedNames;
+        }
+
+        Set<String> newNames = new HashSet<>();
+        for (String name : expandedNames) {
+            newNames.addAll(expandTopicName(name));
+        }
+        return newNames;
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
index c3a90e4da6a..b0998f0bdcf 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
@@ -39,9 +39,10 @@
 import org.apache.kafka.trogdor.task.TaskWorker;
 
 import java.time.Duration;
-import java.util.Collection;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
@@ -49,6 +50,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 public class ConsumeBenchWorker implements TaskWorker {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumeBenchWorker.class);
@@ -86,18 +88,62 @@ public void start(Platform platform, WorkerStatusTracker 
status,
         @Override
         public void run() {
             try {
-                HashSet<TopicPartition> partitions = new HashSet<>();
-                for (Map.Entry<String, PartitionsSpec> entry : 
spec.activeTopics().materialize().entrySet()) {
-                    for (Integer partitionNumber : 
entry.getValue().partitionNumbers()) {
-                        partitions.add(new TopicPartition(entry.getKey(), 
partitionNumber));
-                    }
-                }
-                log.info("Will consume from {}", partitions);
-                executor.submit(new ConsumeMessages(partitions));
+                executor.submit(consumeTask());
             } catch (Throwable e) {
                 WorkerUtils.abort(log, "Prepare", e, doneFuture);
             }
         }
+
+        private ConsumeMessages consumeTask() {
+            String consumerGroup = spec.consumerGroup();
+            Map<String, List<TopicPartition>> partitionsByTopic = 
spec.materializeTopics();
+            boolean toUseGroupPartitionAssignment = 
partitionsByTopic.values().isEmpty();
+
+            if (consumerGroup.equals(ConsumeBenchSpec.EMPTY_CONSUMER_GROUP)) 
// consumer group is undefined, the consumer should use a random group
+                consumerGroup = generateConsumerGroup();
+
+            consumer = consumer(consumerGroup);
+            if (!toUseGroupPartitionAssignment)
+                partitionsByTopic = populatePartitionsByTopic(consumer, 
partitionsByTopic);
+
+            return new ConsumeMessages(consumer, partitionsByTopic, 
toUseGroupPartitionAssignment);
+        }
+
+        private KafkaConsumer<byte[], byte[]> consumer(String consumerGroup) {
+            Properties props = new Properties();
+            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
+            props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
+            props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
+            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
+            // these defaults maybe over-written by the user-specified 
commonClientConf or consumerConf
+            WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.consumerConf());
+            return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
+        }
+
+        private String generateConsumerGroup() {
+            return "consume-bench-" + UUID.randomUUID().toString();
+        }
+
+        private Map<String, List<TopicPartition>> 
populatePartitionsByTopic(KafkaConsumer<byte[], byte[]> consumer,
+                                                                         
Map<String, List<TopicPartition>> materializedTopics) {
+            // fetch partitions for topics who do not have any listed
+            for (Map.Entry<String, List<TopicPartition>> entry : 
materializedTopics.entrySet()) {
+                String topicName = entry.getKey();
+                List<TopicPartition> partitions = entry.getValue();
+
+                if (partitions.isEmpty()) {
+                    List<TopicPartition> fetchedPartitions = 
consumer.partitionsFor(topicName).stream()
+                        .map(partitionInfo -> new 
TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
+                        .collect(Collectors.toList());
+                    partitions.addAll(fetchedPartitions);
+                }
+
+                materializedTopics.put(topicName, partitions);
+            }
+
+            return materializedTopics;
+        }
     }
 
     public class ConsumeMessages implements Callable<Void> {
@@ -105,24 +151,26 @@ public void run() {
         private final Histogram messageSizeHistogram;
         private final Future<?> statusUpdaterFuture;
         private final Throttle throttle;
+        private final KafkaConsumer<byte[], byte[]> consumer;
 
-        ConsumeMessages(Collection<TopicPartition> topicPartitions) {
+        ConsumeMessages(KafkaConsumer<byte[], byte[]> consumer, Map<String, 
List<TopicPartition>> topicPartitionsByTopic,
+                        boolean toUseGroupAssignment) {
             this.latencyHistogram = new Histogram(5000);
             this.messageSizeHistogram = new Histogram(2 * 1024 * 1024);
             this.statusUpdaterFuture = executor.scheduleAtFixedRate(
                 new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 
1, TimeUnit.MINUTES);
-            Properties props = new Properties();
-            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
-            props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
-            props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1");
-            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
-            // these defaults maybe over-written by the user-specified 
commonClientConf or
-            // consumerConf
-            WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.consumerConf());
-            consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
-                                           new ByteArrayDeserializer());
-            consumer.assign(topicPartitions);
+            this.consumer = consumer;
+            if (toUseGroupAssignment) {
+                Set<String> topics = topicPartitionsByTopic.keySet();
+                log.info("Will consume from topics {} via dynamic group 
assignment.", topics);
+                this.consumer.subscribe(topics);
+            } else {
+                List<TopicPartition> partitions = 
topicPartitionsByTopic.values().stream()
+                    .flatMap(List::stream).collect(Collectors.toList());
+                log.info("Will consume from topic partitions {} via manual 
assignment.", partitions);
+                this.consumer.assign(partitions);
+            }
+
             int perPeriod = WorkerUtils.perSecToPerPeriod(
                 spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
             this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java
index a9b550d648c..dcb8d8ad50b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java
@@ -80,9 +80,10 @@ public TopicsSpec immutableCopy() {
     public Map<String, PartitionsSpec> materialize() {
         HashMap<String, PartitionsSpec> all = new HashMap<>();
         for (Map.Entry<String, PartitionsSpec> entry : map.entrySet()) {
-            for (String topicName : StringExpander.expand(entry.getKey())) {
-                all.put(topicName, entry.getValue());
-            }
+            String topicName = entry.getKey();
+            PartitionsSpec partitions = entry.getValue();
+            for (String expandedTopicName : StringExpander.expand(topicName))
+                all.put(expandedTopicName, partitions);
         }
         return all;
     }
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
new file mode 100644
index 00000000000..117954b7caa
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ConsumeBenchSpecTest {
+
+    @Test
+    public void testMaterializeTopicsWithNoPartitions() {
+        Map<String, List<TopicPartition>> materializedTopics = 
consumeBenchSpec(Arrays.asList("topic[1-3]", 
"secondTopic")).materializeTopics();
+        Map<String, List<TopicPartition>> expected = new HashMap<>();
+        expected.put("topic1", new ArrayList<>());
+        expected.put("topic2", new ArrayList<>());
+        expected.put("topic3", new ArrayList<>());
+        expected.put("secondTopic", new ArrayList<>());
+
+        assertEquals(expected, materializedTopics);
+    }
+
+    @Test
+    public void testMaterializeTopicsWithSomePartitions() {
+        Map<String, List<TopicPartition>> materializedTopics = 
consumeBenchSpec(Arrays.asList("topic[1-3]:[1-5]", "secondTopic", 
"thirdTopic:1")).materializeTopics();
+        Map<String, List<TopicPartition>> expected = new HashMap<>();
+        expected.put("topic1", IntStream.range(1, 6).asLongStream().mapToObj(i 
-> new TopicPartition("topic1", (int) i)).collect(Collectors.toList()));
+        expected.put("topic2", IntStream.range(1, 6).asLongStream().mapToObj(i 
-> new TopicPartition("topic2", (int) i)).collect(Collectors.toList()));
+        expected.put("topic3", IntStream.range(1, 6).asLongStream().mapToObj(i 
-> new TopicPartition("topic3", (int) i)).collect(Collectors.toList()));
+        expected.put("secondTopic", new ArrayList<>());
+        expected.put("thirdTopic", Collections.singletonList(new 
TopicPartition("thirdTopic", 1)));
+
+        assertEquals(expected, materializedTopics);
+    }
+
+    @Test
+    public void testInvalidTopicNameRaisesExceptionInMaterialize() {
+        for (String invalidName : Arrays.asList("In:valid", "invalid:", 
":invalid", "in:valid:1", "invalid:2:2", "invalid::1", "invalid[1-3]:")) {
+            try {
+                
consumeBenchSpec(Collections.singletonList(invalidName)).materializeTopics();
+                fail(String.format("Invalid topic name (%s) should have raised 
an exception.", invalidName));
+            } catch (IllegalArgumentException ignored) { }
+        }
+
+    }
+
+    private ConsumeBenchSpec consumeBenchSpec(List<String> activeTopics) {
+        return new ConsumeBenchSpec(0, 0, "node", "localhost",
+            123, 1234, "cg-1",
+            Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap(), activeTopics);
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Trogdor - Add Consumer Group Benchmark Specification
> ----------------------------------------------------
>
>                 Key: KAFKA-7515
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7515
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Stanislav Kozlovski
>            Assignee: Stanislav Kozlovski
>            Priority: Minor
>
> Trogdor's `ConsumeBenchWorker` and `ConsumeBenchSpec` currently takes 
> specific topic partitions and assigns a consumer to them 
> ([https://github.com/apache/kafka/blob/509dd95ebbf03681ea680a84b8436814ba3e7541/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L125)]
> It is useful to have functionality that supports consumer group usage since 
> most Kafka consumers in practice subscribe to topics, not specific 
> partitions. Using the `subscribe()` API will also make use of the consumer 
> group, allowing for more flexible benchmark tests (e.g consuming a topic 
> while creating a new partition)
> This will also allow for benchmarking more real-life scenarios like spinning 
> up multiple consumers in a consumer group via spawning multiple Trogdor 
> agents (or multiple consumers in one agent if 
> https://issues.apache.org/jira/browse/KAFKA-7514 gets accepted)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to