[ https://issues.apache.org/jira/browse/KAFKA-7514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stanislav Kozlovski resolved KAFKA-7514. ---------------------------------------- Resolution: Fixed > Trogdor - Support Multiple Threads in ConsumeBenchWorker > -------------------------------------------------------- > > Key: KAFKA-7514 > URL: https://issues.apache.org/jira/browse/KAFKA-7514 > Project: Kafka > Issue Type: Improvement > Reporter: Stanislav Kozlovski > Assignee: Stanislav Kozlovski > Priority: Minor > > Trogdor's ConsumeBenchWorker currently uses only two threads - one for the > StatusUpdater: > {code:java} > this.statusUpdaterFuture = executor.scheduleAtFixedRate( > new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, > TimeUnit.MINUTES); > {code} > and one for the consumer task itself > {code:java} > executor.submit(new ConsumeMessages(partitions)); > {code} > A sample ConsumeBenchSpec specification in JSON looks like this: > {code:java} > { > "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", > "durationMs": 10000000, > "consumerNode": "node0", > "bootstrapServers": "localhost:9092", > "maxMessages": 100, > "activeTopics": { > "foo[1-3]": { > "numPartitions": 3, > "replicationFactor": 1 > } > } > } > {code} > > > h2. Motivation > This does not make the best use of machines with multiple cores. It would be > useful if there was a way to configure the ConsumeBenchSpec to use multiple > threads and spawn multiple consumers. This would also allow the > ConsumeBenchWorker to work with a higher amount of throughput due to the > parallelism taking place. > > h2. > h2. Proposal > Add a new `consumerCount` property to the ConsumeBenchSpec allowing you to > run multiple consumers in parallel > h2. Changes > By default, it will have a value of 1. > `activeTopics` will still be defined in the same way. They will be evenly > assigned to the consumers in a round-robin fashion. > For example, if we have this configuration > {code:java} > { > "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", > "durationMs": 10000000, > "consumerNode": "node0", > "bootstrapServers": "localhost:9092", > "maxMessages": 100, > "consumerCount": 2, > "activeTopics": { > "foo[1-4]": { > "numPartitions": 4, > "replicationFactor": 1 > } > } > }{code} > consumer 1 will be assigned partitions [foo1, foo3] > consumer 2 will be assigned partitions [foo2, foo4] > and the ConsumeBenchWorker will spawn 4 threads in total in the executor (2 > for every consumer). > > The `maxMessages` and `targetMessagesPerSec` will be counted independently > for every consumer > h3. Status > The way the worker's status will be updated as well. > A ConsumeBenchWorker shows the following status when queried with > `./bin/trogdor.sh client --show-tasks localhost:8889` > > {code:java} > "tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : > "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", > ... > "status" : { "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, > "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : > 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } },{code} > We will change it to show the status of every separate consumer and the topic > partitions it was assigned to > {code:java} > "tasks" : { > "consume_bench_19938" : > { > "state" : "DONE", > "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", > ... } > ... > "status":{ > "consumer-1":{ > "assignedPartitions":[ > "foo1", > "foo3" > ], > "totalMessagesReceived":190, > "totalBytesReceived":98040, > "averageMessageSizeBytes":516, > "averageLatencyMs":449.0, > "p50LatencyMs":449, > "p95LatencyMs":449, > "p99LatencyMs":449 > }, > "consumer-2":{ > "assignedPartitions":[ > "foo2", > "foo4" > ], > "totalMessagesReceived":190, > "totalBytesReceived":98040, > "averageMessageSizeBytes":516, > "averageLatencyMs":449.0, > "p50LatencyMs":449, > "p95LatencyMs":449, > "p99LatencyMs":449 > } > } > },{code} > > > h2. > Backwards Compatibility: > This change should be mostly backwards-compatible. If the `consumerThreads` > is not passed - only one consumer will be created and the round-robin > assignor will assign every partition to it. > The only change will be in the format of the reported status. Even with one > consumer, we will still show a status similar to > {code:java} > "status":{ > "consumer-1":{ > "assignedPartitions":[ > "foo1", > "foo3" > ], > "totalMessagesReceived":190, > "totalBytesReceived":98040, > "averageMessageSizeBytes":516, > "averageLatencyMs":449.0, > "p50LatencyMs":449, > "p95LatencyMs":449, > "p99LatencyMs":449 > } > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)