This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 442d362 MINOR: add useConfiguredPartitioner and skipFlush options for ProduceBench 442d362 is described below commit 442d36241bea6e83c75ed7513e48099b8d73ef39 Author: jolshan <jols...@confluent.io> AuthorDate: Wed Jul 3 17:23:37 2019 -0700 MINOR: add useConfiguredPartitioner and skipFlush options for ProduceBench Add a "useConfiguredPartitioner" boolean to specify testing with the configured partitioner, rather than overriding the partitioner in the test. Add a "skipFlush" boolean to specify skipping the flush operation when producing. This is helpful when testing some scenarios where linger.ms is greater than 0. Reviewers: Colin P. McCabe <cmcc...@apache.org> --- .../kafka/trogdor/workload/ProduceBenchSpec.java | 18 +++++++++++++++++- .../kafka/trogdor/workload/ProduceBenchWorker.java | 16 +++++++++++++--- .../kafka/trogdor/common/JsonSerializationTest.java | 2 +- 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java index 34b5393..9f6a907 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java @@ -71,6 +71,8 @@ public class ProduceBenchSpec extends TaskSpec { private final Map<String, String> commonClientConf; private final TopicsSpec activeTopics; private final TopicsSpec inactiveTopics; + private final boolean useConfiguredPartitioner; + private final boolean skipFlush; @JsonCreator public ProduceBenchSpec(@JsonProperty("startMs") long startMs, @@ -86,7 +88,9 @@ public class ProduceBenchSpec extends TaskSpec { @JsonProperty("commonClientConf") Map<String, String> commonClientConf, @JsonProperty("adminClientConf") Map<String, String> adminClientConf, @JsonProperty("activeTopics") TopicsSpec activeTopics, - @JsonProperty("inactiveTopics") TopicsSpec inactiveTopics) { + @JsonProperty("inactiveTopics") TopicsSpec inactiveTopics, + @JsonProperty("useConfiguredPartitioner") boolean useConfiguredPartitioner, + @JsonProperty("skipFlush") boolean skipFlush) { super(startMs, durationMs); this.producerNode = (producerNode == null) ? "" : producerNode; this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers; @@ -104,6 +108,8 @@ public class ProduceBenchSpec extends TaskSpec { TopicsSpec.EMPTY : activeTopics.immutableCopy(); this.inactiveTopics = (inactiveTopics == null) ? TopicsSpec.EMPTY : inactiveTopics.immutableCopy(); + this.useConfiguredPartitioner = useConfiguredPartitioner; + this.skipFlush = skipFlush; } @JsonProperty @@ -166,6 +172,16 @@ public class ProduceBenchSpec extends TaskSpec { return inactiveTopics; } + @JsonProperty + public boolean useConfiguredPartitioner() { + return useConfiguredPartitioner; + } + + @JsonProperty + public boolean skipFlush() { + return skipFlush; + } + @Override public TaskController newController(String id) { return topology -> Collections.singleton(producerNode); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java index 84b94d5..9b5159a 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java @@ -214,7 +214,11 @@ public class ProduceBenchWorker implements TaskWorker { this.producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer()); this.keys = new PayloadIterator(spec.keyGenerator()); this.values = new PayloadIterator(spec.valueGenerator()); - this.throttle = new SendRecordsThrottle(perPeriod, producer); + if (spec.skipFlush()) { + this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS); + } else { + this.throttle = new SendRecordsThrottle(perPeriod, producer); + } } @Override @@ -289,8 +293,14 @@ public class ProduceBenchWorker implements TaskWorker { partitionsIterator = activePartitions.iterator(); TopicPartition partition = partitionsIterator.next(); - ProducerRecord<byte[], byte[]> record = new ProducerRecord<>( - partition.topic(), partition.partition(), keys.next(), values.next()); + ProducerRecord<byte[], byte[]> record; + if (spec.useConfiguredPartitioner()) { + record = new ProducerRecord<>( + partition.topic(), keys.next(), values.next()); + } else { + record = new ProducerRecord<>( + partition.topic(), partition.partition(), keys.next(), values.next()); + } sendFuture = producer.send(record, new SendRecordsCallback(this, Time.SYSTEM.milliseconds())); throttle.increment(); diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java index c324ec4..90002ca 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java @@ -55,7 +55,7 @@ public class JsonSerializationTest { verify(new WorkerRunning(null, null, 0, null)); verify(new WorkerStopping(null, null, 0, null)); verify(new ProduceBenchSpec(0, 0, null, null, - 0, 0, null, null, Optional.empty(), null, null, null, null, null)); + 0, 0, null, null, Optional.empty(), null, null, null, null, null, false, false)); verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null, 0, null, null, 0)); verify(new TopicsSpec());