This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 442d362  MINOR: add useConfiguredPartitioner and skipFlush options for 
ProduceBench
442d362 is described below

commit 442d36241bea6e83c75ed7513e48099b8d73ef39
Author: jolshan <jols...@confluent.io>
AuthorDate: Wed Jul 3 17:23:37 2019 -0700

    MINOR: add useConfiguredPartitioner and skipFlush options for ProduceBench
    
    Add a "useConfiguredPartitioner" boolean to specify testing with the 
configured partitioner, rather than overriding the partitioner in the test.
    
    Add a "skipFlush" boolean to specify skipping the flush operation when 
producing.  This is helpful when testing some scenarios where linger.ms is 
greater than 0.
    
    Reviewers: Colin P. McCabe <cmcc...@apache.org>
---
 .../kafka/trogdor/workload/ProduceBenchSpec.java       | 18 +++++++++++++++++-
 .../kafka/trogdor/workload/ProduceBenchWorker.java     | 16 +++++++++++++---
 .../kafka/trogdor/common/JsonSerializationTest.java    |  2 +-
 3 files changed, 31 insertions(+), 5 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index 34b5393..9f6a907 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -71,6 +71,8 @@ public class ProduceBenchSpec extends TaskSpec {
     private final Map<String, String> commonClientConf;
     private final TopicsSpec activeTopics;
     private final TopicsSpec inactiveTopics;
+    private final boolean useConfiguredPartitioner;
+    private final boolean skipFlush;
 
     @JsonCreator
     public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
@@ -86,7 +88,9 @@ public class ProduceBenchSpec extends TaskSpec {
                          @JsonProperty("commonClientConf") Map<String, String> 
commonClientConf,
                          @JsonProperty("adminClientConf") Map<String, String> 
adminClientConf,
                          @JsonProperty("activeTopics") TopicsSpec activeTopics,
-                         @JsonProperty("inactiveTopics") TopicsSpec 
inactiveTopics) {
+                         @JsonProperty("inactiveTopics") TopicsSpec 
inactiveTopics,
+                         @JsonProperty("useConfiguredPartitioner") boolean 
useConfiguredPartitioner, 
+                         @JsonProperty("skipFlush") boolean skipFlush) {
         super(startMs, durationMs);
         this.producerNode = (producerNode == null) ? "" : producerNode;
         this.bootstrapServers = (bootstrapServers == null) ? "" : 
bootstrapServers;
@@ -104,6 +108,8 @@ public class ProduceBenchSpec extends TaskSpec {
             TopicsSpec.EMPTY : activeTopics.immutableCopy();
         this.inactiveTopics = (inactiveTopics == null) ?
             TopicsSpec.EMPTY : inactiveTopics.immutableCopy();
+        this.useConfiguredPartitioner = useConfiguredPartitioner;
+        this.skipFlush = skipFlush;
     }
 
     @JsonProperty
@@ -166,6 +172,16 @@ public class ProduceBenchSpec extends TaskSpec {
         return inactiveTopics;
     }
 
+    @JsonProperty
+    public boolean useConfiguredPartitioner() {
+        return useConfiguredPartitioner;
+    }
+
+    @JsonProperty
+    public boolean skipFlush() {
+        return skipFlush;
+    }
+
     @Override
     public TaskController newController(String id) {
         return topology -> Collections.singleton(producerNode);
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index 84b94d5..9b5159a 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -214,7 +214,11 @@ public class ProduceBenchWorker implements TaskWorker {
             this.producer = new KafkaProducer<>(props, new 
ByteArraySerializer(), new ByteArraySerializer());
             this.keys = new PayloadIterator(spec.keyGenerator());
             this.values = new PayloadIterator(spec.valueGenerator());
-            this.throttle = new SendRecordsThrottle(perPeriod, producer);
+            if (spec.skipFlush()) {
+                this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
+            } else {
+                this.throttle = new SendRecordsThrottle(perPeriod, producer);
+            }
         }
 
         @Override
@@ -289,8 +293,14 @@ public class ProduceBenchWorker implements TaskWorker {
                 partitionsIterator = activePartitions.iterator();
 
             TopicPartition partition = partitionsIterator.next();
-            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
-                partition.topic(), partition.partition(), keys.next(), 
values.next());
+            ProducerRecord<byte[], byte[]> record;
+            if (spec.useConfiguredPartitioner()) {
+                record = new ProducerRecord<>(
+                    partition.topic(), keys.next(), values.next());
+            } else {
+                record = new ProducerRecord<>(
+                    partition.topic(), partition.partition(), keys.next(), 
values.next());
+            }
             sendFuture = producer.send(record,
                 new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
             throttle.increment();
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
 
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index c324ec4..90002ca 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -55,7 +55,7 @@ public class JsonSerializationTest {
         verify(new WorkerRunning(null, null, 0, null));
         verify(new WorkerStopping(null, null, 0, null));
         verify(new ProduceBenchSpec(0, 0, null, null,
-            0, 0, null, null, Optional.empty(), null, null, null, null, null));
+            0, 0, null, null, Optional.empty(), null, null, null, null, null, 
false, false));
         verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, 
null,
             0, null, null, 0));
         verify(new TopicsSpec());

Reply via email to