This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new baef516  Add ConfigurableProducerSpec to Trogdor for improved E2E 
latency tracking. (#9736)
baef516 is described below

commit baef516789f0d80ce1faba5d257a905ecb27e6fe
Author: Scott Hendricks <[email protected]>
AuthorDate: Fri Dec 18 16:03:59 2020 -0500

    Add ConfigurableProducerSpec to Trogdor for improved E2E latency tracking. 
(#9736)
    
    Reviewer: Colin P. McCabe <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +
 .../trogdor/workload/ConfigurableProducerSpec.java | 214 ++++++++++++++
 .../workload/ConfigurableProducerWorker.java       | 322 +++++++++++++++++++++
 .../trogdor/workload/ConstantFlushGenerator.java   |  73 +++++
 .../workload/ConstantThroughputGenerator.java      | 112 +++++++
 .../kafka/trogdor/workload/ConsumeBenchSpec.java   |  12 +
 .../kafka/trogdor/workload/ConsumeBenchWorker.java |  62 +++-
 .../{PayloadGenerator.java => FlushGenerator.java} |  32 +-
 .../trogdor/workload/GaussianFlushGenerator.java   | 100 +++++++
 .../workload/GaussianThroughputGenerator.java      | 152 ++++++++++
 .../GaussianTimestampRandomPayloadGenerator.java   | 122 ++++++++
 .../kafka/trogdor/workload/PayloadGenerator.java   |   4 +-
 ...{PayloadGenerator.java => RecordProcessor.java} |  33 +--
 ...loadGenerator.java => ThroughputGenerator.java} |  32 +-
 .../workload/TimestampRandomPayloadGenerator.java  | 102 +++++++
 .../trogdor/workload/TimestampRecordProcessor.java | 162 +++++++++++
 .../trogdor/workload/ConsumeBenchSpecTest.java     |   4 +-
 17 files changed, 1465 insertions(+), 75 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 69df37d..b44e713 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -241,6 +241,8 @@
     <suppress checks="ParameterNumber"
               files="ProduceBenchSpec.java"/>
     <suppress checks="ParameterNumber"
+              files="ConsumeBenchSpec.java"/>
+    <suppress checks="ParameterNumber"
               files="SustainedConnectionSpec.java"/>
     <suppress id="dontUseSystemExit"
               files="VerifiableConsumer.java"/>
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
new file mode 100644
index 0000000..5235fc3
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This is the spec to pass in to be able to run the 
`ConfigurableProducerWorker` workload.  This allows for customized
+ * and even variable configurations in terms of messages per second, message 
size, batch size, key size, and even the
+ * ability to target a specific partition out of a topic.
+ *
+ * This has several notable differences from the ProduceBench classes, namely 
the ability to dynamically control
+ * flushing and throughput through configurable classes, but also the ability 
to run against specific partitions within
+ * a topic directly.  This workload can only run against one topic at a time, 
unlike the ProduceBench workload.
+ *
+ * The parameters that differ from ProduceBenchSpec:
+ *
+ *     `flushGenerator` -      Used to instruct the KafkaProducer when to 
issue flushes.  This allows us to simulate
+ *                             variable batching since batch flushing is not 
currently exposed within the KafkaProducer
+ *                             class.  See the `FlushGenerator` interface for 
more information.
+ *
+ *     `throughputGenerator` - Used to throttle the ConfigurableProducerWorker 
based on a calculated number of messages
+ *                             within a window.  See the `ThroughputGenerator` 
interface for more information.
+ *
+ *     `activeTopic`         - This class only supports execution against a 
single topic at a time.  If more than one
+ *                             topic is specified, the 
ConfigurableProducerWorker will throw an error.
+ *
+ *     `activePartition`     - Specify a specific partition number within the 
activeTopic to run load against, or
+ *                             specify `-1` to allow use of all partitions.
+ *
+ * Here is an example spec:
+ *
+ * {
+ *     "startMs": 1606949497662,
+ *     "durationMs": 3600000,
+ *     "producerNode": "trogdor-agent-0",
+ *     "bootstrapServers": "some.example.kafka.server:9091",
+ *     "flushGenerator": {
+ *         "type": "gaussian",
+ *         "messagesPerFlushAverage": 16,
+ *         "messagesPerFlushDeviation": 4
+ *     },
+ *     "throughputGenerator": {
+ *         "type": "gaussian",
+ *         "messagesPerSecondAverage": 500,
+ *         "messagesPerSecondDeviation": 50,
+ *         "windowsUntilRateChange": 100,
+ *         "windowSizeMs": 100
+ *     },
+ *     "keyGenerator": {
+ *         "type": "constant",
+ *         "size": 8
+ *     },
+ *     "valueGenerator": {
+ *         "type": "gaussianTimestampRandom",
+ *         "messageSizeAverage": 512,
+ *         "messageSizeDeviation": 100,
+ *         "messagesUntilSizeChange": 100
+ *     },
+ *     "producerConf": {
+ *         "acks": "all"
+ *     },
+ *     "commonClientConf": {},
+ *     "adminClientConf": {},
+ *     "activeTopic": {
+ *         "topic0": {
+ *             "numPartitions": 100,
+ *             "replicationFactor": 3,
+ *             "configs": {
+ *                 "retention.ms": "1800000"
+ *             }
+ *         }
+ *     },
+ *     "activePartition": 5
+ * }
+ *
+ * This example spec performed the following:
+ *
+ *   * Ran on `trogdor-agent-0` for 1 hour starting at 2020-12-02 22:51:37.662 
GMT
+ *   * Produced with acks=all to Partition 5 of `topic0` on kafka server 
`some.example.kafka.server:9091`.
+ *   * The average batch had 16 messages, with a standard deviation of 4 
messages.
+ *   * The messages had 8-byte constant keys with an average size of 512 bytes 
and a standard deviation of 100 bytes.
+ *   * The messages had millisecond timestamps embedded in the first several 
bytes of the value.
+ *   * The average throughput was 500 messages/second, with a window of 100ms 
and a deviation of 50 messages/second.
+ */
+
+public class ConfigurableProducerSpec extends TaskSpec {
+    private final String producerNode;
+    private final String bootstrapServers;
+    private final Optional<FlushGenerator> flushGenerator;
+    private final ThroughputGenerator throughputGenerator;
+    private final PayloadGenerator keyGenerator;
+    private final PayloadGenerator valueGenerator;
+    private final Map<String, String> producerConf;
+    private final Map<String, String> adminClientConf;
+    private final Map<String, String> commonClientConf;
+    private final TopicsSpec activeTopic;
+    private final int activePartition;
+
+    @JsonCreator
+    public ConfigurableProducerSpec(@JsonProperty("startMs") long startMs,
+                                    @JsonProperty("durationMs") long 
durationMs,
+                                    @JsonProperty("producerNode") String 
producerNode,
+                                    @JsonProperty("bootstrapServers") String 
bootstrapServers,
+                                    @JsonProperty("flushGenerator") 
Optional<FlushGenerator> flushGenerator,
+                                    @JsonProperty("throughputGenerator") 
ThroughputGenerator throughputGenerator,
+                                    @JsonProperty("keyGenerator") 
PayloadGenerator keyGenerator,
+                                    @JsonProperty("valueGenerator") 
PayloadGenerator valueGenerator,
+                                    @JsonProperty("producerConf") Map<String, 
String> producerConf,
+                                    @JsonProperty("commonClientConf") 
Map<String, String> commonClientConf,
+                                    @JsonProperty("adminClientConf") 
Map<String, String> adminClientConf,
+                                    @JsonProperty("activeTopic") TopicsSpec 
activeTopic,
+                                    @JsonProperty("activePartition") int 
activePartition) {
+        super(startMs, durationMs);
+        this.producerNode = (producerNode == null) ? "" : producerNode;
+        this.bootstrapServers = (bootstrapServers == null) ? "" : 
bootstrapServers;
+        this.flushGenerator = flushGenerator;
+        this.keyGenerator = keyGenerator;
+        this.valueGenerator = valueGenerator;
+        this.throughputGenerator = throughputGenerator;
+        this.producerConf = configOrEmptyMap(producerConf);
+        this.commonClientConf = configOrEmptyMap(commonClientConf);
+        this.adminClientConf = configOrEmptyMap(adminClientConf);
+        this.activeTopic = activeTopic.immutableCopy();
+        this.activePartition = activePartition;
+    }
+
+    @JsonProperty
+    public String producerNode() {
+        return producerNode;
+    }
+
+    @JsonProperty
+    public String bootstrapServers() {
+        return bootstrapServers;
+    }
+
+    @JsonProperty
+    public Optional<FlushGenerator> flushGenerator() {
+        return flushGenerator;
+    }
+
+    @JsonProperty
+    public PayloadGenerator keyGenerator() {
+        return keyGenerator;
+    }
+
+    @JsonProperty
+    public PayloadGenerator valueGenerator() {
+        return valueGenerator;
+    }
+
+    @JsonProperty
+    public ThroughputGenerator throughputGenerator() {
+        return throughputGenerator;
+    }
+
+    @JsonProperty
+    public Map<String, String> producerConf() {
+        return producerConf;
+    }
+
+    @JsonProperty
+    public Map<String, String> commonClientConf() {
+        return commonClientConf;
+    }
+
+    @JsonProperty
+    public Map<String, String> adminClientConf() {
+        return adminClientConf;
+    }
+
+    @JsonProperty
+    public TopicsSpec activeTopic() {
+        return activeTopic;
+    }
+
+    @JsonProperty
+    public int activePartition() {
+        return activePartition;
+    }
+
+    @Override
+    public TaskController newController(String id) {
+        return topology -> Collections.singleton(producerNode);
+    }
+
+    @Override
+    public TaskWorker newTaskWorker(String id) {
+        return new ConfigurableProducerWorker(id, this);
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerWorker.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerWorker.java
new file mode 100644
index 0000000..b08ef44
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerWorker.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.WorkerUtils;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This workload allows for customized and even variable configurations in 
terms of messages per second, message size,
+ * batch size, key size, and even the ability to target a specific partition 
out of a topic.
+ *
+ * See `ConfigurableProducerSpec` for a more detailed description.
+ */
+
+public class ConfigurableProducerWorker implements TaskWorker {
+    private static final Logger log = 
LoggerFactory.getLogger(ConfigurableProducerWorker.class);
+
+    private final String id;
+
+    private final ConfigurableProducerSpec spec;
+
+    private final AtomicBoolean running = new AtomicBoolean(false);
+
+    private ScheduledExecutorService executor;
+
+    private WorkerStatusTracker status;
+
+    private KafkaFutureImpl<String> doneFuture;
+
+    public ConfigurableProducerWorker(String id, ConfigurableProducerSpec 
spec) {
+        this.id = id;
+        this.spec = spec;
+    }
+
+    @Override
+    public void start(Platform platform, WorkerStatusTracker status,
+                      KafkaFutureImpl<String> doneFuture) {
+        if (!running.compareAndSet(false, true)) {
+            throw new IllegalStateException("ConfigurableProducerWorker is 
already running.");
+        }
+        log.info("{}: Activating ConfigurableProducerWorker with {}", id, 
spec);
+        // Create an executor with 2 threads.  We need the second thread so
+        // that the StatusUpdater can run in parallel with SendRecords.
+        this.executor = Executors.newScheduledThreadPool(2,
+            
ThreadUtils.createThreadFactory("ConfigurableProducerWorkerThread%d", false));
+        this.status = status;
+        this.doneFuture = doneFuture;
+        executor.submit(new Prepare());
+    }
+
+    public class Prepare implements Runnable {
+        @Override
+        public void run() {
+            try {
+                Map<String, NewTopic> newTopics = new HashMap<>();
+                if (spec.activeTopic().materialize().size() != 1) {
+                    throw new RuntimeException("Can only run against 1 
topic.");
+                }
+                List<TopicPartition> active = new ArrayList<>();
+                for (Map.Entry<String, PartitionsSpec> entry :
+                        spec.activeTopic().materialize().entrySet()) {
+                    String topicName = entry.getKey();
+                    PartitionsSpec partSpec = entry.getValue();
+                    newTopics.put(topicName, partSpec.newTopic(topicName));
+                    for (Integer partitionNumber : 
partSpec.partitionNumbers()) {
+                        active.add(new TopicPartition(topicName, 
partitionNumber));
+                    }
+                }
+                status.update(new TextNode("Creating " + 
newTopics.keySet().size() + " topic(s)"));
+                WorkerUtils.createTopics(log, spec.bootstrapServers(), 
spec.commonClientConf(),
+                                         spec.adminClientConf(), newTopics, 
false);
+                status.update(new TextNode("Created " + 
newTopics.keySet().size() + " topic(s)"));
+                executor.submit(new SendRecords(active.get(0).topic(), 
spec.activePartition()));
+            } catch (Throwable e) {
+                WorkerUtils.abort(log, "Prepare", e, doneFuture);
+            }
+        }
+    }
+
+    private static class SendRecordsCallback implements Callback {
+        private final SendRecords sendRecords;
+        private final long startMs;
+
+        SendRecordsCallback(SendRecords sendRecords, long startMs) {
+            this.sendRecords = sendRecords;
+            this.startMs = startMs;
+        }
+
+        @Override
+        public void onCompletion(RecordMetadata metadata, Exception exception) 
{
+            long now = Time.SYSTEM.milliseconds();
+            long durationMs = now - startMs;
+            sendRecords.recordDuration(durationMs);
+            if (exception != null) {
+                log.error("SendRecordsCallback: error", exception);
+            }
+        }
+    }
+
+    public class SendRecords implements Callable<Void> {
+        private final String activeTopic;
+        private final int activePartition;
+
+        private final Histogram histogram;
+
+        private final Future<?> statusUpdaterFuture;
+
+        private final KafkaProducer<byte[], byte[]> producer;
+
+        private final PayloadIterator keys;
+
+        private final PayloadIterator values;
+
+        private Future<RecordMetadata> sendFuture;
+
+        SendRecords(String topic, int partition) {
+            this.activeTopic = topic;
+            this.activePartition = partition;
+            this.histogram = new Histogram(10000);
+
+            this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
+                new StatusUpdater(histogram), 30, 30, TimeUnit.SECONDS);
+
+            Properties props = new Properties();
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
+            WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.producerConf());
+            this.producer = new KafkaProducer<>(props, new 
ByteArraySerializer(), new ByteArraySerializer());
+            this.keys = new PayloadIterator(spec.keyGenerator());
+            this.values = new PayloadIterator(spec.valueGenerator());
+        }
+
+        @Override
+        public Void call() throws Exception {
+            long startTimeMs = Time.SYSTEM.milliseconds();
+            try {
+                try {
+                    long sentMessages = 0;
+                    while (true) {
+                        sendMessage();
+                        sentMessages++;
+                    }
+                } catch (Exception e) {
+                    throw e;
+                } finally {
+                    if (sendFuture != null) {
+                        try {
+                            sendFuture.get();
+                        } catch (Exception e) {
+                            log.error("Exception on final future", e);
+                        }
+                    }
+                    producer.close();
+                }
+            } catch (Exception e) {
+                WorkerUtils.abort(log, "SendRecords", e, doneFuture);
+            } finally {
+                statusUpdaterFuture.cancel(false);
+                StatusData statusData = new StatusUpdater(histogram).update();
+                long curTimeMs = Time.SYSTEM.milliseconds();
+                log.info("Sent {} total record(s) in {} ms.  status: {}",
+                    histogram.summarize().numSamples(), curTimeMs - 
startTimeMs, statusData);
+            }
+            doneFuture.complete("");
+            return null;
+        }
+
+        private void sendMessage() throws InterruptedException {
+            ProducerRecord<byte[], byte[]> record;
+            if (activePartition != -1) {
+                record = new ProducerRecord<>(activeTopic, activePartition, 
keys.next(), values.next());
+            } else {
+                record = new ProducerRecord<>(activeTopic, keys.next(), 
values.next());
+            }
+            sendFuture = producer.send(record, new SendRecordsCallback(this, 
Time.SYSTEM.milliseconds()));
+            spec.flushGenerator().ifPresent(flushGenerator -> 
flushGenerator.increment(producer));
+            spec.throughputGenerator().throttle();
+        }
+
+        void recordDuration(long durationMs) {
+            histogram.add(durationMs);
+        }
+    }
+
+    public class StatusUpdater implements Runnable {
+        private final Histogram histogram;
+
+        StatusUpdater(Histogram histogram) {
+            this.histogram = histogram;
+        }
+
+        @Override
+        public void run() {
+            try {
+                update();
+            } catch (Exception e) {
+                WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
+            }
+        }
+
+        StatusData update() {
+            Histogram.Summary summary = 
histogram.summarize(StatusData.PERCENTILES);
+            StatusData statusData = new StatusData(summary.numSamples(), 
summary.average(),
+                summary.percentiles().get(0).value(),
+                summary.percentiles().get(1).value(),
+                summary.percentiles().get(2).value());
+            status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
+            return statusData;
+        }
+    }
+
+    public static class StatusData {
+        private final long totalSent;
+        private final float averageLatencyMs;
+        private final int p50LatencyMs;
+        private final int p95LatencyMs;
+        private final int p99LatencyMs;
+
+        /**
+         * The percentiles to use when calculating the histogram data.
+         * These should match up with the p50LatencyMs, p95LatencyMs, etc. 
fields.
+         */
+        final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
+
+        @JsonCreator
+        StatusData(@JsonProperty("totalSent") long totalSent,
+                   @JsonProperty("averageLatencyMs") float averageLatencyMs,
+                   @JsonProperty("p50LatencyMs") int p50latencyMs,
+                   @JsonProperty("p95LatencyMs") int p95latencyMs,
+                   @JsonProperty("p99LatencyMs") int p99latencyMs) {
+            this.totalSent = totalSent;
+            this.averageLatencyMs = averageLatencyMs;
+            this.p50LatencyMs = p50latencyMs;
+            this.p95LatencyMs = p95latencyMs;
+            this.p99LatencyMs = p99latencyMs;
+        }
+
+        @JsonProperty
+        public long totalSent() {
+            return totalSent;
+        }
+
+        @JsonProperty
+        public float averageLatencyMs() {
+            return averageLatencyMs;
+        }
+
+        @JsonProperty
+        public int p50LatencyMs() {
+            return p50LatencyMs;
+        }
+
+        @JsonProperty
+        public int p95LatencyMs() {
+            return p95LatencyMs;
+        }
+
+        @JsonProperty
+        public int p99LatencyMs() {
+            return p99LatencyMs;
+        }
+    }
+
+    @Override
+    public void stop(Platform platform) throws Exception {
+        if (!running.compareAndSet(true, false)) {
+            throw new IllegalStateException("ConfigurableProducerWorker is not 
running.");
+        }
+        log.info("{}: Deactivating ConfigurableProducerWorker.", id);
+        doneFuture.complete("");
+        executor.shutdownNow();
+        executor.awaitTermination(1, TimeUnit.DAYS);
+        this.executor = null;
+        this.status = null;
+        this.doneFuture = null;
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantFlushGenerator.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantFlushGenerator.java
new file mode 100644
index 0000000..9d656b2
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantFlushGenerator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.errors.InterruptException;
+
+/**
+ * This generator will flush the producer after a specific number of messages. 
 This is useful to simulate a specific
+ * number of messages in a batch regardless of the message size, since batch 
flushing is not exposed in the
+ * KafkaProducer client code.
+ *
+ * WARNING: This does not directly control when KafkaProducer will batch, this 
only makes best effort.  This also
+ * cannot tell when a KafkaProducer batch is closed.  If the KafkaProducer 
sends a batch before this executes, this
+ * will continue to execute on its own cadence.  To alleviate this, make sure 
to set `linger.ms` to allow for at least
+ * `messagesPerFlush` messages to be generated, and make sure to set 
`batch.size` to allow for all these messages.
+ *
+ * Here is an example spec:
+ *
+ * {
+ *    "type": "constant",
+ *    "messagesPerFlush": 16
+ * }
+ *
+ * This example will flush the producer every 16 messages.
+ */
+
+public class ConstantFlushGenerator implements FlushGenerator {
+    private final int messagesPerFlush;
+    private int messageTracker = 0;
+
+    @JsonCreator
+    public ConstantFlushGenerator(@JsonProperty("messagesPerFlush") int 
messagesPerFlush) {
+        this.messagesPerFlush = messagesPerFlush;
+    }
+
+    @JsonProperty
+    public int messagesPerFlush() {
+        return messagesPerFlush;
+    }
+
+    @Override
+    public synchronized <K, V> void increment(KafkaProducer<K, V> producer) {
+        // Increment the message tracker.
+        messageTracker += 1;
+
+        // Flush when we reach the desired number of messages.
+        if (messageTracker >= messagesPerFlush) {
+            messageTracker = 0;
+            try {
+                producer.flush();
+            } catch (InterruptException e) {
+                // Ignore flush interruption exceptions.
+            }
+        }
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
new file mode 100644
index 0000000..19e5af0
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * This throughput generator configures constant throughput.
+ *
+ * The lower the window size, the smoother the traffic will be. Using a 100ms 
window offers no noticeable spikes in
+ * traffic while still being long enough to avoid too much overhead.
+ *
+ * WARNING: Due to binary nature of throughput in terms of messages sent in a 
window, each window will send at least 1
+ * message, and each window sends the same number of messages, rounded down. 
For example, 99 messages per second with a
+ * 100ms window will only send 90 messages per second, or 9 messages per 
window. Another example, in order to send only
+ * 5 messages per second, a window size of 200ms is required. In cases like 
these, both the `messagesPerSecond` and
+ * `windowSizeMs` parameters should be adjusted together to achieve more 
accurate throughput.
+ *
+ * Here is an example spec:
+ *
+ * {
+ *    "type": "constant",
+ *    "messagesPerSecond": 500,
+ *    "windowSizeMs": 100
+ * }
+ *
+ * This will produce a workload that runs 500 messages per second, with a 
maximum resolution of 50 messages per 100
+ * millisecond.
+ */
+
+public class ConstantThroughputGenerator implements ThroughputGenerator {
+    private final int messagesPerSecond;
+    private final int messagesPerWindow;
+    private final long windowSizeMs;
+
+    private long nextWindowStarts = 0;
+    private int messageTracker = 0;
+
+    @JsonCreator
+    public ConstantThroughputGenerator(@JsonProperty("messagesPerSecond") int 
messagesPerSecond,
+                                       @JsonProperty("windowSizeMs") long 
windowSizeMs) {
+        // Calcualte the default values.
+        if (windowSizeMs <= 0) {
+            windowSizeMs = 100;
+        }
+        this.windowSizeMs = windowSizeMs;
+        this.messagesPerSecond = messagesPerSecond;
+
+        // Use the rest of the parameters to calculate window properties.
+        this.messagesPerWindow = (int) ((long) messagesPerSecond / 
windowSizeMs);
+        calculateNextWindow();
+    }
+
+    @JsonProperty
+    public int messagesPerSecond() {
+        return messagesPerSecond;
+    }
+
+    private void calculateNextWindow() {
+        // Reset the message count.
+        messageTracker = 0;
+
+        // Calculate the next window start time.
+        long now = Time.SYSTEM.milliseconds();
+        if (nextWindowStarts > 0) {
+            while (nextWindowStarts < now) {
+                nextWindowStarts += windowSizeMs;
+            }
+        } else {
+            nextWindowStarts = now + windowSizeMs;
+        }
+    }
+
+    @Override
+    public synchronized void throttle() throws InterruptedException {
+        // Run unthrottled if messagesPerSecond is negative.
+        if (messagesPerSecond < 0) {
+            return;
+        }
+
+        // Calculate the next window if we've moved beyond the current one.
+        if (Time.SYSTEM.milliseconds() >= nextWindowStarts) {
+            calculateNextWindow();
+        }
+
+        // Increment the message tracker.
+        messageTracker += 1;
+
+        // Compare the tracked message count with the throttle limits.
+        if (messageTracker >= messagesPerWindow) {
+
+            // Wait the difference in time between now and when the next 
window starts.
+            wait(nextWindowStarts - Time.SYSTEM.milliseconds());
+        }
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
index b7e0172..6909d2d 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
@@ -35,6 +35,7 @@ import java.util.Map;
 import java.util.HashMap;
 import java.util.Set;
 import java.util.HashSet;
+import java.util.Optional;
 
 /**
  * The specification for a benchmark that consumer messages from a set of 
topic/partitions.
@@ -71,6 +72,9 @@ import java.util.HashSet;
  * explicitly specifying partitions in "activeTopics" when there are multiple 
"threadsPerWorker"
  * and a particular "consumerGroup" will result in an #{@link 
ConfigException}, aborting the task.
  *
+ * The "recordProcessor" field allows the specification of tasks to run on 
records that are consumed.  This is run
+ * immediately after the messages are polled.  See the `RecordProcessor` 
interface for more information.
+ *
  * An example JSON representation which will result in a consumer that is part 
of the consumer group "cg" and
  * subscribed to topics foo1, foo2, foo3 and bar.
  * #{@code
@@ -98,6 +102,7 @@ public class ConsumeBenchSpec extends TaskSpec {
     private final List<String> activeTopics;
     private final String consumerGroup;
     private final int threadsPerWorker;
+    private final Optional<RecordProcessor> recordProcessor;
 
     @JsonCreator
     public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@@ -111,6 +116,7 @@ public class ConsumeBenchSpec extends TaskSpec {
                             @JsonProperty("commonClientConf") Map<String, 
String> commonClientConf,
                             @JsonProperty("adminClientConf") Map<String, 
String> adminClientConf,
                             @JsonProperty("threadsPerWorker") Integer 
threadsPerWorker,
+                            @JsonProperty("recordProcessor") 
Optional<RecordProcessor> recordProcessor,
                             @JsonProperty("activeTopics") List<String> 
activeTopics) {
         super(startMs, durationMs);
         this.consumerNode = (consumerNode == null) ? "" : consumerNode;
@@ -123,6 +129,7 @@ public class ConsumeBenchSpec extends TaskSpec {
         this.activeTopics = activeTopics == null ? new ArrayList<>() : 
activeTopics;
         this.consumerGroup = consumerGroup == null ? "" : consumerGroup;
         this.threadsPerWorker = threadsPerWorker == null ? 1 : 
threadsPerWorker;
+        this.recordProcessor = recordProcessor;
     }
 
     @JsonProperty
@@ -156,6 +163,11 @@ public class ConsumeBenchSpec extends TaskSpec {
     }
 
     @JsonProperty
+    public Optional<RecordProcessor> recordProcessor() {
+        return this.recordProcessor;
+    }
+
+    @JsonProperty
     public Map<String, String> consumerConf() {
         return consumerConf;
     }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
index 4e77dff..f6067b5 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
@@ -47,6 +47,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.HashMap;
 import java.util.concurrent.Callable;
@@ -123,18 +124,18 @@ public class ConsumeBenchWorker implements TaskWorker {
             consumer = consumer(consumerGroup, clientId(0));
             if (toUseGroupPartitionAssignment) {
                 Set<String> topics = partitionsByTopic.keySet();
-                tasks.add(new ConsumeMessages(consumer, topics));
+                tasks.add(new ConsumeMessages(consumer,  
spec.recordProcessor(), topics));
 
                 for (int i = 0; i < consumerCount - 1; i++) {
-                    tasks.add(new ConsumeMessages(consumer(consumerGroup(), 
clientId(i + 1)), topics));
+                    tasks.add(new ConsumeMessages(consumer(consumerGroup(), 
clientId(i + 1)), spec.recordProcessor(), topics));
                 }
             } else {
                 List<TopicPartition> partitions = 
populatePartitionsByTopic(consumer.consumer(), partitionsByTopic)
                     
.values().stream().flatMap(List::stream).collect(Collectors.toList());
-                tasks.add(new ConsumeMessages(consumer, partitions));
+                tasks.add(new ConsumeMessages(consumer, 
spec.recordProcessor(), partitions));
 
                 for (int i = 0; i < consumerCount - 1; i++) {
-                    tasks.add(new ConsumeMessages(consumer(consumerGroup(), 
clientId(i + 1)), partitions));
+                    tasks.add(new ConsumeMessages(consumer(consumerGroup(), 
clientId(i + 1)), spec.recordProcessor(), partitions));
                 }
             }
 
@@ -198,13 +199,15 @@ public class ConsumeBenchWorker implements TaskWorker {
         private final Throttle throttle;
         private final String clientId;
         private final ThreadSafeConsumer consumer;
+        private final Optional<RecordProcessor> recordProcessor;
 
-        private ConsumeMessages(ThreadSafeConsumer consumer) {
+        private ConsumeMessages(ThreadSafeConsumer consumer,
+                                Optional<RecordProcessor> recordProcessor) {
             this.latencyHistogram = new Histogram(10000);
             this.messageSizeHistogram = new Histogram(2 * 1024 * 1024);
             this.clientId = consumer.clientId();
             this.statusUpdaterFuture = executor.scheduleAtFixedRate(
-                new ConsumeStatusUpdater(latencyHistogram, 
messageSizeHistogram, consumer), 1, 1, TimeUnit.MINUTES);
+                new ConsumeStatusUpdater(latencyHistogram, 
messageSizeHistogram, consumer, recordProcessor), 1, 1, TimeUnit.MINUTES);
             int perPeriod;
             if (spec.targetMessagesPerSec() <= 0)
                 perPeriod = Integer.MAX_VALUE;
@@ -213,16 +216,20 @@ public class ConsumeBenchWorker implements TaskWorker {
 
             this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
             this.consumer = consumer;
+            this.recordProcessor = recordProcessor;
         }
 
-        ConsumeMessages(ThreadSafeConsumer consumer, Set<String> topics) {
-            this(consumer);
+        ConsumeMessages(ThreadSafeConsumer consumer,
+                        Optional<RecordProcessor> recordProcessor,
+                        Set<String> topics) {
+            this(consumer, recordProcessor);
             log.info("Will consume from topics {} via dynamic group 
assignment.", topics);
             this.consumer.subscribe(topics);
         }
-
-        ConsumeMessages(ThreadSafeConsumer consumer, List<TopicPartition> 
partitions) {
-            this(consumer);
+        ConsumeMessages(ThreadSafeConsumer consumer,
+                        Optional<RecordProcessor> recordProcessor,
+                        List<TopicPartition> partitions) {
+            this(consumer, recordProcessor);
             log.info("Will consume from topic partitions {} via manual 
assignment.", partitions);
             this.consumer.assign(partitions);
         }
@@ -242,6 +249,10 @@ public class ConsumeBenchWorker implements TaskWorker {
                     }
                     long endBatchMs = Time.SYSTEM.milliseconds();
                     long elapsedBatchMs = endBatchMs - startBatchMs;
+
+                    // Do the record batch processing immediately to avoid 
latency skew.
+                    recordProcessor.ifPresent(processor -> 
processor.processRecords(records));
+
                     for (ConsumerRecord<byte[], byte[]> record : records) {
                         messagesConsumed++;
                         long messageBytes = 0;
@@ -266,7 +277,7 @@ public class ConsumeBenchWorker implements TaskWorker {
             } finally {
                 statusUpdaterFuture.cancel(false);
                 StatusData statusData =
-                    new ConsumeStatusUpdater(latencyHistogram, 
messageSizeHistogram, consumer).update();
+                    new ConsumeStatusUpdater(latencyHistogram, 
messageSizeHistogram, consumer, spec.recordProcessor()).update();
                 long curTimeMs = Time.SYSTEM.milliseconds();
                 log.info("{} Consumed total number of messages={}, bytes={} in 
{} ms.  status: {}",
                          clientId, messagesConsumed, bytesConsumed, curTimeMs 
- startTimeMs, statusData);
@@ -331,11 +342,16 @@ public class ConsumeBenchWorker implements TaskWorker {
         private final Histogram latencyHistogram;
         private final Histogram messageSizeHistogram;
         private final ThreadSafeConsumer consumer;
+        private final Optional<RecordProcessor> recordProcessor;
 
-        ConsumeStatusUpdater(Histogram latencyHistogram, Histogram 
messageSizeHistogram, ThreadSafeConsumer consumer) {
+        ConsumeStatusUpdater(Histogram latencyHistogram,
+                             Histogram messageSizeHistogram,
+                             ThreadSafeConsumer consumer,
+                             Optional<RecordProcessor> recordProcessor) {
             this.latencyHistogram = latencyHistogram;
             this.messageSizeHistogram = messageSizeHistogram;
             this.consumer = consumer;
+            this.recordProcessor = recordProcessor;
         }
 
         @Override
@@ -350,6 +366,13 @@ public class ConsumeBenchWorker implements TaskWorker {
         StatusData update() {
             Histogram.Summary latSummary = 
latencyHistogram.summarize(StatusData.PERCENTILES);
             Histogram.Summary msgSummary = 
messageSizeHistogram.summarize(StatusData.PERCENTILES);
+
+            // Parse out the RecordProcessor's status, id specified.
+            Optional<JsonNode> recordProcessorStatus = Optional.empty();
+            if (recordProcessor.isPresent()) {
+                recordProcessorStatus = 
Optional.of(recordProcessor.get().processorStatus());
+            }
+
             StatusData statusData = new StatusData(
                 consumer.assignedPartitions(),
                 latSummary.numSamples(),
@@ -358,7 +381,8 @@ public class ConsumeBenchWorker implements TaskWorker {
                 latSummary.average(),
                 latSummary.percentiles().get(0).value(),
                 latSummary.percentiles().get(1).value(),
-                latSummary.percentiles().get(2).value());
+                latSummary.percentiles().get(2).value(),
+                recordProcessorStatus);
             statusUpdater.updateConsumeStatus(consumer.clientId(), statusData);
             log.info("Status={}", JsonUtil.toJsonString(statusData));
             return statusData;
@@ -374,6 +398,7 @@ public class ConsumeBenchWorker implements TaskWorker {
         private final int p50LatencyMs;
         private final int p95LatencyMs;
         private final int p99LatencyMs;
+        private final Optional<JsonNode> recordProcessorStatus;
 
         /**
          * The percentiles to use when calculating the histogram data.
@@ -388,7 +413,8 @@ public class ConsumeBenchWorker implements TaskWorker {
                    @JsonProperty("averageLatencyMs") float averageLatencyMs,
                    @JsonProperty("p50LatencyMs") int p50latencyMs,
                    @JsonProperty("p95LatencyMs") int p95latencyMs,
-                   @JsonProperty("p99LatencyMs") int p99latencyMs) {
+                   @JsonProperty("p99LatencyMs") int p99latencyMs,
+                   @JsonProperty("recordProcessorStatus") Optional<JsonNode> 
recordProcessorStatus) {
             this.assignedPartitions = assignedPartitions;
             this.totalMessagesReceived = totalMessagesReceived;
             this.totalBytesReceived = totalBytesReceived;
@@ -397,6 +423,7 @@ public class ConsumeBenchWorker implements TaskWorker {
             this.p50LatencyMs = p50latencyMs;
             this.p95LatencyMs = p95latencyMs;
             this.p99LatencyMs = p99latencyMs;
+            this.recordProcessorStatus = recordProcessorStatus;
         }
 
         @JsonProperty
@@ -438,6 +465,11 @@ public class ConsumeBenchWorker implements TaskWorker {
         public int p99LatencyMs() {
             return p99LatencyMs;
         }
+
+        @JsonProperty
+        public JsonNode recordProcessorStatus() {
+            return recordProcessorStatus.orElse(null);
+        }
     }
 
     @Override
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/FlushGenerator.java
similarity index 54%
copy from 
tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
copy to 
tools/src/main/java/org/apache/kafka/trogdor/workload/FlushGenerator.java
index b06ba01..8c0ae62 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/FlushGenerator.java
@@ -19,31 +19,27 @@ package org.apache.kafka.trogdor.workload;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.kafka.clients.producer.KafkaProducer;
 
 /**
- * Generates byte arrays based on a position argument.
+ * This interface is used to facilitate flushing the KafkaProducers on a 
cadence specified by the user.
  *
- * The array generated at a given position should be the same no matter how 
many
- * times generate() is invoked.  PayloadGenerator instances should be immutable
- * and thread-safe.
+ * Currently there are 3 flushing methods:
+ *
+ *   * Disabled, by not specifying this parameter.
+ *   * `constant` will use `ConstantFlushGenerator` to keep the number of 
messages per batch constant.
+ *   * `gaussian` will use `GaussianFlushGenerator` to vary the number of 
messages per batch on a normal distribution.
+ *
+ * Please see the implementation classes for more details.
  */
+
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
     include = JsonTypeInfo.As.PROPERTY,
     property = "type")
 @JsonSubTypes(value = {
-    @JsonSubTypes.Type(value = ConstantPayloadGenerator.class, name = 
"constant"),
-    @JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name = 
"sequential"),
-    @JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name = 
"uniformRandom"),
-    @JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null"),
-    @JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name = 
"randomComponent")
+    @JsonSubTypes.Type(value = ConstantFlushGenerator.class, name = 
"constant"),
+    @JsonSubTypes.Type(value = GaussianFlushGenerator.class, name = "gaussian")
     })
-public interface PayloadGenerator {
-    /**
-     * Generate a payload.
-     *
-     * @param position  The position to use to generate the payload
-     *
-     * @return          A new array object containing the payload.
-     */
-    byte[] generate(long position);
+public interface FlushGenerator {
+    <K, V> void increment(KafkaProducer<K, V> producer);
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java
new file mode 100644
index 0000000..a3157db
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.errors.InterruptException;
+import java.util.Random;
+
+/**
+ * This generator will flush the producer after a specific number of messages, 
determined by a gaussian distribution.
+ * This is useful to simulate a specific number of messages in a batch 
regardless of the message size, since batch
+ * flushing is not exposed in the KafkaProducer.
+ *
+ * WARNING: This does not directly control when KafkaProducer will batch, this 
only makes best effort.  This also
+ * cannot tell when a KafkaProducer batch is closed.  If the KafkaProducer 
sends a batch before this executes, this
+ * will continue to execute on its own cadence.  To alleviate this, make sure 
to set `linger.ms` to allow for messages
+ * to be generated up to your upper limit threshold, and make sure to set 
`batch.size` to allow for all these messages.
+ *
+ * Here is an example spec:
+ *
+ * {
+ *    "type": "gaussian",
+ *    "messagesPerFlushAverage": 16,
+ *    "messagesPerFlushDeviation": 4
+ * }
+ *
+ * This example will flush the producer on average every 16 messages, assuming 
`linger.ms` and `batch.size` allow for
+ * it.  That average changes based on a normal distribution after each flush:
+ *
+ *    An average of the flushes will be at 16 messages.
+ *    ~68% of the flushes are at between 12 and 20 messages.
+ *    ~95% of the flushes are at between 8 and 24 messages.
+ *    ~99% of the flushes are at between 4 and 28 messages.
+ */
+
+public class GaussianFlushGenerator implements FlushGenerator {
+    private final int messagesPerFlushAverage;
+    private final int messagesPerFlushDeviation;
+
+    private final Random random = new Random();
+
+    private int messageTracker = 0;
+    private int flushSize = 0;
+
+    @JsonCreator
+    public GaussianFlushGenerator(@JsonProperty("messagesPerFlushAverage") int 
messagesPerFlushAverage,
+                                  @JsonProperty("messagesPerFlushDeviation") 
int messagesPerFlushDeviation) {
+        this.messagesPerFlushAverage = messagesPerFlushAverage;
+        this.messagesPerFlushDeviation = messagesPerFlushDeviation;
+        calculateFlushSize();
+    }
+
+    @JsonProperty
+    public int messagesPerFlushAverage() {
+        return messagesPerFlushAverage;
+    }
+
+    @JsonProperty
+    public long messagesPerFlushDeviation() {
+        return messagesPerFlushDeviation;
+    }
+
+    private synchronized void calculateFlushSize() {
+        flushSize = Math.max((int) (random.nextGaussian() * 
messagesPerFlushDeviation) + messagesPerFlushAverage, 1);
+        messageTracker = 0;
+    }
+
+    @Override
+    public synchronized <K, V> void increment(KafkaProducer<K, V> producer) {
+        // Increment the message tracker.
+        messageTracker += 1;
+
+        // Compare the tracked message count with the throttle limits.
+        if (messageTracker >= flushSize) {
+            try {
+                producer.flush();
+            } catch (InterruptException e) {
+                // Ignore flush interruption exceptions.
+            }
+            calculateFlushSize();
+        }
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
new file mode 100644
index 0000000..6d71ff5
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+import java.util.Random;
+
+/*
+ * This throughput generator configures throughput with a gaussian normal 
distribution on a per-window basis. You can
+ * specify how many windows to keep the throughput at the rate before 
changing. All traffic will follow a gaussian
+ * distribution centered around `messagesPerSecondAverage` with a deviation of 
`messagesPerSecondDeviation`.
+ *
+ * The lower the window size, the smoother the traffic will be. Using a 100ms 
window offers no noticeable spikes in
+ * traffic while still being long enough to avoid too much overhead.
+ *
+ * WARNING: Due to binary nature of throughput in terms of messages sent in a 
window, this does not work well for an
+ * average throughput of less than 5 messages per window.  In cases where you 
want lower throughput, please adjust the
+ * `windowSizeMs` accordingly.
+ *
+ * Here is an example spec:
+ *
+ * {
+ *    "type": "gaussian",
+ *    "messagesPerSecondAverage": 500,
+ *    "messagesPerSecondDeviation": 50,
+ *    "windowsUntilRateChange": 100,
+ *    "windowSizeMs": 100
+ * }
+ *
+ * This will produce a workload that runs on average 500 messages per second, 
however that speed will change every 10
+ * seconds due to the `windowSizeMs * windowsUntilRateChange` parameters. The 
throughput will have the following
+ * normal distribution:
+ *
+ *    An average of the throughput windows of 500 messages per second.
+ *    ~68% of the throughput windows are between 450 and 550 messages per 
second.
+ *    ~95% of the throughput windows are between 400 and 600 messages per 
second.
+ *    ~99% of the throughput windows are between 350 and 650 messages per 
second.
+ *
+ */
+
+public class GaussianThroughputGenerator implements ThroughputGenerator {
+    private final int messagesPerSecondAverage;
+    private final int messagesPerSecondDeviation;
+    private final int messagesPerWindowAverage;
+    private final int messagesPerWindowDeviation;
+    private final int windowsUntilRateChange;
+    private final long windowSizeMs;
+
+    private final Random random = new Random();
+
+    private long nextWindowStarts = 0;
+    private int messageTracker = 0;
+    private int windowTracker = 0;
+    private int throttleMessages = 0;
+
+    @JsonCreator
+    public 
GaussianThroughputGenerator(@JsonProperty("messagesPerSecondAverage") int 
messagesPerSecondAverage,
+                                       
@JsonProperty("messagesPerSecondDeviation") int messagesPerSecondDeviation,
+                                       @JsonProperty("windowsUntilRateChange") 
int windowsUntilRateChange,
+                                       @JsonProperty("windowSizeMs") long 
windowSizeMs) {
+        // Calcualte the default values.
+        if (windowSizeMs <= 0) {
+            windowSizeMs = 100;
+        }
+        this.windowSizeMs = windowSizeMs;
+        this.messagesPerSecondAverage = messagesPerSecondAverage;
+        this.messagesPerSecondDeviation = messagesPerSecondDeviation;
+        this.windowsUntilRateChange = windowsUntilRateChange;
+
+        // Take per-second calculations and convert them to per-window 
calculations.
+        messagesPerWindowAverage = (int) (messagesPerSecondAverage * 
windowSizeMs / 1000);
+        messagesPerWindowDeviation = (int) (messagesPerSecondDeviation * 
windowSizeMs / 1000);
+
+        // Calcualte the first window.
+        calculateNextWindow(true);
+    }
+
+    @JsonProperty
+    public int messagesPerSecondAverage() {
+        return messagesPerSecondAverage;
+    }
+
+    @JsonProperty
+    public long messagesPerSecondDeviation() {
+        return messagesPerSecondDeviation;
+    }
+
+    @JsonProperty
+    public long windowsUntilRateChange() {
+        return windowsUntilRateChange;
+    }
+
+    private synchronized void calculateNextWindow(boolean force) {
+        // Reset the message count.
+        messageTracker = 0;
+
+        // Calculate the next window start time.
+        long now = Time.SYSTEM.milliseconds();
+        if (nextWindowStarts > 0) {
+            while (nextWindowStarts < now) {
+                nextWindowStarts += windowSizeMs;
+            }
+        } else {
+            nextWindowStarts = now + windowSizeMs;
+        }
+
+        // Check the windows between rate changes.
+        if ((windowTracker > windowsUntilRateChange) || force) {
+            windowTracker = 0;
+
+            // Calculate the number of messages allowed in this window using a 
normal distribution.
+            // The formula is: Messages = Gaussian * Deviation + Average
+            throttleMessages = Math.max((int) (random.nextGaussian() * 
(double) messagesPerWindowDeviation) + messagesPerWindowAverage, 1);
+        }
+        windowTracker += 1;
+    }
+
+    @Override
+    public synchronized void throttle() throws InterruptedException {
+        // Calculate the next window if we've moved beyond the current one.
+        if (Time.SYSTEM.milliseconds() >= nextWindowStarts) {
+            calculateNextWindow(false);
+        }
+
+        // Increment the message tracker.
+        messageTracker += 1;
+
+        // Compare the tracked message count with the throttle limits.
+        if (messageTracker >= throttleMessages) {
+
+            // Wait the difference in time between now and when the next 
window starts.
+            wait(nextWindowStarts - Time.SYSTEM.milliseconds());
+        }
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
new file mode 100644
index 0000000..90fe279
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Random;
+
+/**
+ * This class behaves identically to TimestampRandomPayloadGenerator, except 
the message size follows a gaussian
+ * distribution.
+ *
+ * This should be used in conjunction with TimestampRecordProcessor in the 
Consumer to measure true end-to-end latency
+ * of a system.
+ *
+ * `messageSizeAverage` - The average size in bytes of each message.
+ * `messageSizeDeviation` - The standard deviation to use when calculating 
message size.
+ * `messagesUntilSizeChange` - The number of messages to keep at the same size.
+ * `seed` - Used to initialize Random() to remove some non-determinism.
+ *
+ * Here is an example spec:
+ *
+ * {
+ *    "type": "gaussianTimestampRandom",
+ *    "messageSizeAverage": 512,
+ *    "messageSizeDeviation": 100,
+ *    "messagesUntilSizeChange": 100
+ * }
+ *
+ * This will generate messages on a gaussian distribution with an average size 
each 512-bytes. The message sizes will
+ * have a standard deviation of 100 bytes, and the size will only change every 
100 messages.  The distribution of
+ * messages will be as follows:
+ *
+ *    The average size of the messages are 512 bytes.
+ *    ~68% of the messages are between 412 and 612 bytes
+ *    ~95% of the messages are between 312 and 712 bytes
+ *    ~99% of the messages are between 212 and 812 bytes
+ */
+
+public class GaussianTimestampRandomPayloadGenerator implements 
PayloadGenerator {
+    private final int messageSizeAverage;
+    private final int messageSizeDeviation;
+    private final int messagesUntilSizeChange;
+    private final long seed;
+
+    private final Random random = new Random();
+    private final ByteBuffer buffer;
+
+    private int messageTracker = 0;
+    private int messageSize = 0;
+
+    @JsonCreator
+    public 
GaussianTimestampRandomPayloadGenerator(@JsonProperty("messageSizeAverage") int 
messageSizeAverage,
+                                                   
@JsonProperty("messageSizeDeviation") int messageSizeDeviation,
+                                                   
@JsonProperty("messagesUntilSizeChange") int messagesUntilSizeChange,
+                                                   @JsonProperty("seed") long 
seed) {
+        this.messageSizeAverage = messageSizeAverage;
+        this.messageSizeDeviation = messageSizeDeviation;
+        this.seed = seed;
+        this.messagesUntilSizeChange = messagesUntilSizeChange;
+        buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.order(ByteOrder.LITTLE_ENDIAN);
+    }
+
+    @JsonProperty
+    public int messageSizeAverage() {
+        return messageSizeAverage;
+    }
+
+    @JsonProperty
+    public long messageSizeDeviation() {
+        return messageSizeDeviation;
+    }
+
+    @JsonProperty
+    public long seed() {
+        return seed;
+    }
+
+    @Override
+    public synchronized byte[] generate(long position) {
+        // Make the random number generator deterministic for unit tests.
+        random.setSeed(seed + position);
+
+        // Calculate the next message size based on a gaussian distribution.
+        if ((messageSize == 0) || (messageTracker >= messagesUntilSizeChange)) 
{
+            messageTracker = 0;
+            messageSize = Math.max((int) (random.nextGaussian() * 
messageSizeDeviation) + messageSizeAverage, Long.BYTES);
+        }
+        messageTracker += 1;
+
+        // Generate out of order to prevent inclusion of random number 
generation in latency numbers.
+        byte[] result = new byte[messageSize];
+        random.nextBytes(result);
+
+        // Do the timestamp generation as the very last task.
+        buffer.clear();
+        buffer.putLong(Time.SYSTEM.milliseconds());
+        buffer.rewind();
+        System.arraycopy(buffer.array(), 0, result, 0, Long.BYTES);
+        return result;
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
index b06ba01..225a663 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
@@ -35,7 +35,9 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
     @JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name = 
"sequential"),
     @JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name = 
"uniformRandom"),
     @JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null"),
-    @JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name = 
"randomComponent")
+    @JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name = 
"randomComponent"),
+    @JsonSubTypes.Type(value = TimestampRandomPayloadGenerator.class, name = 
"timestampRandom"),
+    @JsonSubTypes.Type(value = GaussianTimestampRandomPayloadGenerator.class, 
name = "gaussianTimestampRandom")
     })
 public interface PayloadGenerator {
     /**
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java
similarity index 50%
copy from 
tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
copy to 
tools/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java
index b06ba01..500736d 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java
@@ -19,31 +19,20 @@ package org.apache.kafka.trogdor.workload;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 
 /**
- * Generates byte arrays based on a position argument.
- *
- * The array generated at a given position should be the same no matter how 
many
- * times generate() is invoked.  PayloadGenerator instances should be immutable
- * and thread-safe.
+ * RecordProcessor allows for acting on data polled from ConsumeBench 
workloads.
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
-    include = JsonTypeInfo.As.PROPERTY,
-    property = "type")
+        include = JsonTypeInfo.As.PROPERTY,
+        property = "type")
 @JsonSubTypes(value = {
-    @JsonSubTypes.Type(value = ConstantPayloadGenerator.class, name = 
"constant"),
-    @JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name = 
"sequential"),
-    @JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name = 
"uniformRandom"),
-    @JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null"),
-    @JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name = 
"randomComponent")
-    })
-public interface PayloadGenerator {
-    /**
-     * Generate a payload.
-     *
-     * @param position  The position to use to generate the payload
-     *
-     * @return          A new array object containing the payload.
-     */
-    byte[] generate(long position);
+        @JsonSubTypes.Type(value = TimestampRecordProcessor.class, name = 
"timestamp"),
+})
+public interface RecordProcessor {
+    void processRecords(ConsumerRecords<byte[], byte[]> consumerRecords);
+    JsonNode processorStatus();
 }
+
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ThroughputGenerator.java
similarity index 54%
copy from 
tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
copy to 
tools/src/main/java/org/apache/kafka/trogdor/workload/ThroughputGenerator.java
index b06ba01..b6be8a9 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ThroughputGenerator.java
@@ -21,29 +21,25 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 /**
- * Generates byte arrays based on a position argument.
+ * This interface is used to facilitate running a configurable number of 
messages per second by throttling if the
+ * throughput goes above a certain amount.
  *
- * The array generated at a given position should be the same no matter how 
many
- * times generate() is invoked.  PayloadGenerator instances should be immutable
- * and thread-safe.
+ * Currently there are 2 throughput methods:
+ *
+ *   * `constant` will use `ConstantThroughputGenerator` to keep the number of 
messages per second constant.
+ *   * `gaussian` will use `GaussianThroughputGenerator` to vary the number of 
messages per second on a normal
+ *     distribution.
+ *
+ * Please see the implementation classes for more details.
  */
+
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
     include = JsonTypeInfo.As.PROPERTY,
     property = "type")
 @JsonSubTypes(value = {
-    @JsonSubTypes.Type(value = ConstantPayloadGenerator.class, name = 
"constant"),
-    @JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name = 
"sequential"),
-    @JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name = 
"uniformRandom"),
-    @JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null"),
-    @JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name = 
"randomComponent")
+    @JsonSubTypes.Type(value = ConstantThroughputGenerator.class, name = 
"constant"),
+    @JsonSubTypes.Type(value = GaussianThroughputGenerator.class, name = 
"gaussian")
     })
-public interface PayloadGenerator {
-    /**
-     * Generate a payload.
-     *
-     * @param position  The position to use to generate the payload
-     *
-     * @return          A new array object containing the payload.
-     */
-    byte[] generate(long position);
+public interface ThroughputGenerator {
+    void throttle() throws InterruptedException;
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/TimestampRandomPayloadGenerator.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TimestampRandomPayloadGenerator.java
new file mode 100644
index 0000000..d2cfdc7
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TimestampRandomPayloadGenerator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.ByteOrder;
+import java.util.Random;
+import java.nio.ByteBuffer;
+
+/**
+ * A PayloadGenerator which generates a timestamped uniform random payload.
+ *
+ * This generator generates pseudo-random payloads that can be reproduced from 
run to run.
+ * The guarantees are the same as those of java.util.Random.
+ *
+ * The timestamp used for this class is in milliseconds since epoch, encoded 
directly to the first several bytes of the
+ * payload.
+ *
+ * This should be used in conjunction with TimestampRecordProcessor in the 
Consumer to measure true end-to-end latency
+ * of a system.
+ *
+ * `size` - The size in bytes of each message.
+ * `seed` - Used to initialize Random() to remove some non-determinism.
+ *
+ * Here is an example spec:
+ *
+ * {
+ *    "type": "timestampRandom",
+ *    "size": 512
+ * }
+ *
+ * This will generate a 512-byte random message with the first several bytes 
encoded with the timestamp.
+ */
+public class TimestampRandomPayloadGenerator implements PayloadGenerator {
+    private final int size;
+    private final long seed;
+
+    private final byte[] randomBytes;
+    private final ByteBuffer buffer;
+
+    private final Random random = new Random();
+
+    @JsonCreator
+    public TimestampRandomPayloadGenerator(@JsonProperty("size") int size,
+                                           @JsonProperty("seed") long seed) {
+        this.size = size;
+        this.seed = seed;
+        if (size < Long.BYTES) {
+            throw new RuntimeException("The size of the payload must be 
greater than or equal to " + Long.BYTES + ".");
+        }
+        random.setSeed(seed);
+        this.randomBytes = new byte[size - Long.BYTES];
+        buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.order(ByteOrder.LITTLE_ENDIAN);
+    }
+
+    @JsonProperty
+    public int size() {
+        return size;
+    }
+
+    @JsonProperty
+    public long seed() {
+        return seed;
+    }
+
+    @Override
+    public synchronized byte[] generate(long position) {
+        // Generate out of order to prevent inclusion of random number 
generation in latency numbers.
+        byte[] result = new byte[size];
+        if (randomBytes.length > 0) {
+            random.setSeed(seed + position);
+            random.nextBytes(randomBytes);
+            System.arraycopy(randomBytes, 0, result, Long.BYTES, 
randomBytes.length);
+        }
+
+        // Do the timestamp generation as the very last task.
+        buffer.clear();
+        buffer.putLong(Time.SYSTEM.milliseconds());
+        buffer.rewind();
+        System.arraycopy(buffer.array(), 0, result, 0, Long.BYTES);
+        return result;
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
new file mode 100644
index 0000000..658016a
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.common.utils.Time;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * This class will process records containing timestamps and generate a 
histogram based on the data.  It will then be
+ * present in the status from the `ConsumeBenchWorker` class.  This must be 
used with a timestamped PayloadGenerator
+ * implementation.
+ *
+ * Example spec:
+ * {
+ *    "type": "timestampRandom",
+ *    "histogramMaxMs": 10000,
+ *    "histogramMinMs": 0,
+ *    "histogramStepMs": 1
+ * }
+ *
+ * This will track total E2E latency up to 10 seconds, using 1ms resolution 
and a timestamp size of 8 bytes.
+ */
+
+public class TimestampRecordProcessor implements RecordProcessor {
+    private final int histogramMaxMs;
+    private final int histogramMinMs;
+    private final int histogramStepMs;
+    private final ByteBuffer buffer;
+    private final Histogram histogram;
+
+    private final Logger log = 
LoggerFactory.getLogger(TimestampRecordProcessor.class);
+
+    final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
+
+    @JsonCreator
+    public TimestampRecordProcessor(@JsonProperty("histogramMaxMs") int 
histogramMaxMs,
+                                    @JsonProperty("histogramMinMs") int 
histogramMinMs,
+                                    @JsonProperty("histogramStepMs") int 
histogramStepMs) {
+        this.histogramMaxMs = histogramMaxMs;
+        this.histogramMinMs = histogramMinMs;
+        this.histogramStepMs = histogramStepMs;
+        this.histogram = new Histogram((histogramMaxMs - histogramMinMs) / 
histogramStepMs);
+        buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.order(ByteOrder.LITTLE_ENDIAN);
+    }
+
+    @JsonProperty
+    public int histogramMaxMs() {
+        return histogramMaxMs;
+    }
+
+    @JsonProperty
+    public int histogramMinMs() {
+        return histogramMinMs;
+    }
+
+    @JsonProperty
+    public int histogramStepMs() {
+        return histogramStepMs;
+    }
+
+    private void putHistogram(long latency) {
+        histogram.add(Long.max(0L, (latency - histogramMinMs) / 
histogramStepMs));
+    }
+
+    @Override
+    public synchronized void processRecords(ConsumerRecords<byte[], byte[]> 
consumerRecords) {
+        // Save the current time to prevent skew by processing time.
+        long curTime = Time.SYSTEM.milliseconds();
+        for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+            try {
+                buffer.clear();
+                buffer.put(record.value(), 0, Long.BYTES);
+                buffer.rewind();
+                putHistogram(curTime - buffer.getLong());
+            } catch (RuntimeException e) {
+                log.error("Error in processRecords:", e);
+            }
+        }
+    }
+
+    @Override
+    public JsonNode processorStatus() {
+        Histogram.Summary summary = histogram.summarize(PERCENTILES);
+        StatusData statusData = new StatusData(
+                summary.average() * histogramStepMs + histogramMinMs,
+                summary.percentiles().get(0).value() * histogramStepMs + 
histogramMinMs,
+                summary.percentiles().get(1).value() * histogramStepMs + 
histogramMinMs,
+                summary.percentiles().get(2).value() * histogramStepMs + 
histogramMinMs);
+        return JsonUtil.JSON_SERDE.valueToTree(statusData);
+    }
+
+    private static class StatusData {
+        private final float averageLatencyMs;
+        private final int p50LatencyMs;
+        private final int p95LatencyMs;
+        private final int p99LatencyMs;
+
+        /**
+         * The percentiles to use when calculating the histogram data.
+         * These should match up with the p50LatencyMs, p95LatencyMs, etc. 
fields.
+         */
+        final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
+
+        @JsonCreator
+        StatusData(@JsonProperty("averageLatencyMs") float averageLatencyMs,
+                   @JsonProperty("p50LatencyMs") int p50latencyMs,
+                   @JsonProperty("p95LatencyMs") int p95latencyMs,
+                   @JsonProperty("p99LatencyMs") int p99latencyMs) {
+            this.averageLatencyMs = averageLatencyMs;
+            this.p50LatencyMs = p50latencyMs;
+            this.p95LatencyMs = p95latencyMs;
+            this.p99LatencyMs = p99latencyMs;
+        }
+
+        @JsonProperty
+        public float averageLatencyMs() {
+            return averageLatencyMs;
+        }
+
+        @JsonProperty
+        public int p50LatencyMs() {
+            return p50LatencyMs;
+        }
+
+        @JsonProperty
+        public int p95LatencyMs() {
+            return p95LatencyMs;
+        }
+
+        @JsonProperty
+        public int p99LatencyMs() {
+            return p99LatencyMs;
+        }
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
index 59f76d2..a0e3eb0 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -69,6 +70,7 @@ public class ConsumeBenchSpecTest {
     private ConsumeBenchSpec consumeBenchSpec(List<String> activeTopics) {
         return new ConsumeBenchSpec(0, 0, "node", "localhost",
             123, 1234, "cg-1",
-            Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap(), 1, activeTopics);
+            Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap(), 1,
+            Optional.empty(), activeTopics);
     }
 }

Reply via email to