This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new baef516 Add ConfigurableProducerSpec to Trogdor for improved E2E
latency tracking. (#9736)
baef516 is described below
commit baef516789f0d80ce1faba5d257a905ecb27e6fe
Author: Scott Hendricks <[email protected]>
AuthorDate: Fri Dec 18 16:03:59 2020 -0500
Add ConfigurableProducerSpec to Trogdor for improved E2E latency tracking.
(#9736)
Reviewer: Colin P. McCabe <[email protected]>
---
checkstyle/suppressions.xml | 2 +
.../trogdor/workload/ConfigurableProducerSpec.java | 214 ++++++++++++++
.../workload/ConfigurableProducerWorker.java | 322 +++++++++++++++++++++
.../trogdor/workload/ConstantFlushGenerator.java | 73 +++++
.../workload/ConstantThroughputGenerator.java | 112 +++++++
.../kafka/trogdor/workload/ConsumeBenchSpec.java | 12 +
.../kafka/trogdor/workload/ConsumeBenchWorker.java | 62 +++-
.../{PayloadGenerator.java => FlushGenerator.java} | 32 +-
.../trogdor/workload/GaussianFlushGenerator.java | 100 +++++++
.../workload/GaussianThroughputGenerator.java | 152 ++++++++++
.../GaussianTimestampRandomPayloadGenerator.java | 122 ++++++++
.../kafka/trogdor/workload/PayloadGenerator.java | 4 +-
...{PayloadGenerator.java => RecordProcessor.java} | 33 +--
...loadGenerator.java => ThroughputGenerator.java} | 32 +-
.../workload/TimestampRandomPayloadGenerator.java | 102 +++++++
.../trogdor/workload/TimestampRecordProcessor.java | 162 +++++++++++
.../trogdor/workload/ConsumeBenchSpecTest.java | 4 +-
17 files changed, 1465 insertions(+), 75 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 69df37d..b44e713 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -241,6 +241,8 @@
<suppress checks="ParameterNumber"
files="ProduceBenchSpec.java"/>
<suppress checks="ParameterNumber"
+ files="ConsumeBenchSpec.java"/>
+ <suppress checks="ParameterNumber"
files="SustainedConnectionSpec.java"/>
<suppress id="dontUseSystemExit"
files="VerifiableConsumer.java"/>
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
new file mode 100644
index 0000000..5235fc3
--- /dev/null
+++
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This is the spec to pass in to be able to run the
`ConfigurableProducerWorker` workload. This allows for customized
+ * and even variable configurations in terms of messages per second, message
size, batch size, key size, and even the
+ * ability to target a specific partition out of a topic.
+ *
+ * This has several notable differences from the ProduceBench classes, namely
the ability to dynamically control
+ * flushing and throughput through configurable classes, but also the ability
to run against specific partitions within
+ * a topic directly. This workload can only run against one topic at a time,
unlike the ProduceBench workload.
+ *
+ * The parameters that differ from ProduceBenchSpec:
+ *
+ * `flushGenerator` - Used to instruct the KafkaProducer when to
issue flushes. This allows us to simulate
+ * variable batching since batch flushing is not
currently exposed within the KafkaProducer
+ * class. See the `FlushGenerator` interface for
more information.
+ *
+ * `throughputGenerator` - Used to throttle the ConfigurableProducerWorker
based on a calculated number of messages
+ * within a window. See the `ThroughputGenerator`
interface for more information.
+ *
+ * `activeTopic` - This class only supports execution against a
single topic at a time. If more than one
+ * topic is specified, the
ConfigurableProducerWorker will throw an error.
+ *
+ * `activePartition` - Specify a specific partition number within the
activeTopic to run load against, or
+ * specify `-1` to allow use of all partitions.
+ *
+ * Here is an example spec:
+ *
+ * {
+ * "startMs": 1606949497662,
+ * "durationMs": 3600000,
+ * "producerNode": "trogdor-agent-0",
+ * "bootstrapServers": "some.example.kafka.server:9091",
+ * "flushGenerator": {
+ * "type": "gaussian",
+ * "messagesPerFlushAverage": 16,
+ * "messagesPerFlushDeviation": 4
+ * },
+ * "throughputGenerator": {
+ * "type": "gaussian",
+ * "messagesPerSecondAverage": 500,
+ * "messagesPerSecondDeviation": 50,
+ * "windowsUntilRateChange": 100,
+ * "windowSizeMs": 100
+ * },
+ * "keyGenerator": {
+ * "type": "constant",
+ * "size": 8
+ * },
+ * "valueGenerator": {
+ * "type": "gaussianTimestampRandom",
+ * "messageSizeAverage": 512,
+ * "messageSizeDeviation": 100,
+ * "messagesUntilSizeChange": 100
+ * },
+ * "producerConf": {
+ * "acks": "all"
+ * },
+ * "commonClientConf": {},
+ * "adminClientConf": {},
+ * "activeTopic": {
+ * "topic0": {
+ * "numPartitions": 100,
+ * "replicationFactor": 3,
+ * "configs": {
+ * "retention.ms": "1800000"
+ * }
+ * }
+ * },
+ * "activePartition": 5
+ * }
+ *
+ * This example spec performed the following:
+ *
+ * * Ran on `trogdor-agent-0` for 1 hour starting at 2020-12-02 22:51:37.662
GMT
+ * * Produced with acks=all to Partition 5 of `topic0` on kafka server
`some.example.kafka.server:9091`.
+ * * The average batch had 16 messages, with a standard deviation of 4
messages.
+ * * The messages had 8-byte constant keys with an average size of 512 bytes
and a standard deviation of 100 bytes.
+ * * The messages had millisecond timestamps embedded in the first several
bytes of the value.
+ * * The average throughput was 500 messages/second, with a window of 100ms
and a deviation of 50 messages/second.
+ */
+
+public class ConfigurableProducerSpec extends TaskSpec {
+ private final String producerNode;
+ private final String bootstrapServers;
+ private final Optional<FlushGenerator> flushGenerator;
+ private final ThroughputGenerator throughputGenerator;
+ private final PayloadGenerator keyGenerator;
+ private final PayloadGenerator valueGenerator;
+ private final Map<String, String> producerConf;
+ private final Map<String, String> adminClientConf;
+ private final Map<String, String> commonClientConf;
+ private final TopicsSpec activeTopic;
+ private final int activePartition;
+
+ @JsonCreator
+ public ConfigurableProducerSpec(@JsonProperty("startMs") long startMs,
+ @JsonProperty("durationMs") long
durationMs,
+ @JsonProperty("producerNode") String
producerNode,
+ @JsonProperty("bootstrapServers") String
bootstrapServers,
+ @JsonProperty("flushGenerator")
Optional<FlushGenerator> flushGenerator,
+ @JsonProperty("throughputGenerator")
ThroughputGenerator throughputGenerator,
+ @JsonProperty("keyGenerator")
PayloadGenerator keyGenerator,
+ @JsonProperty("valueGenerator")
PayloadGenerator valueGenerator,
+ @JsonProperty("producerConf") Map<String,
String> producerConf,
+ @JsonProperty("commonClientConf")
Map<String, String> commonClientConf,
+ @JsonProperty("adminClientConf")
Map<String, String> adminClientConf,
+ @JsonProperty("activeTopic") TopicsSpec
activeTopic,
+ @JsonProperty("activePartition") int
activePartition) {
+ super(startMs, durationMs);
+ this.producerNode = (producerNode == null) ? "" : producerNode;
+ this.bootstrapServers = (bootstrapServers == null) ? "" :
bootstrapServers;
+ this.flushGenerator = flushGenerator;
+ this.keyGenerator = keyGenerator;
+ this.valueGenerator = valueGenerator;
+ this.throughputGenerator = throughputGenerator;
+ this.producerConf = configOrEmptyMap(producerConf);
+ this.commonClientConf = configOrEmptyMap(commonClientConf);
+ this.adminClientConf = configOrEmptyMap(adminClientConf);
+ this.activeTopic = activeTopic.immutableCopy();
+ this.activePartition = activePartition;
+ }
+
+ @JsonProperty
+ public String producerNode() {
+ return producerNode;
+ }
+
+ @JsonProperty
+ public String bootstrapServers() {
+ return bootstrapServers;
+ }
+
+ @JsonProperty
+ public Optional<FlushGenerator> flushGenerator() {
+ return flushGenerator;
+ }
+
+ @JsonProperty
+ public PayloadGenerator keyGenerator() {
+ return keyGenerator;
+ }
+
+ @JsonProperty
+ public PayloadGenerator valueGenerator() {
+ return valueGenerator;
+ }
+
+ @JsonProperty
+ public ThroughputGenerator throughputGenerator() {
+ return throughputGenerator;
+ }
+
+ @JsonProperty
+ public Map<String, String> producerConf() {
+ return producerConf;
+ }
+
+ @JsonProperty
+ public Map<String, String> commonClientConf() {
+ return commonClientConf;
+ }
+
+ @JsonProperty
+ public Map<String, String> adminClientConf() {
+ return adminClientConf;
+ }
+
+ @JsonProperty
+ public TopicsSpec activeTopic() {
+ return activeTopic;
+ }
+
+ @JsonProperty
+ public int activePartition() {
+ return activePartition;
+ }
+
+ @Override
+ public TaskController newController(String id) {
+ return topology -> Collections.singleton(producerNode);
+ }
+
+ @Override
+ public TaskWorker newTaskWorker(String id) {
+ return new ConfigurableProducerWorker(id, this);
+ }
+}
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerWorker.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerWorker.java
new file mode 100644
index 0000000..b08ef44
--- /dev/null
+++
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerWorker.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.WorkerUtils;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This workload allows for customized and even variable configurations in
terms of messages per second, message size,
+ * batch size, key size, and even the ability to target a specific partition
out of a topic.
+ *
+ * See `ConfigurableProducerSpec` for a more detailed description.
+ */
+
+public class ConfigurableProducerWorker implements TaskWorker {
+ private static final Logger log =
LoggerFactory.getLogger(ConfigurableProducerWorker.class);
+
+ private final String id;
+
+ private final ConfigurableProducerSpec spec;
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+
+ private ScheduledExecutorService executor;
+
+ private WorkerStatusTracker status;
+
+ private KafkaFutureImpl<String> doneFuture;
+
+ public ConfigurableProducerWorker(String id, ConfigurableProducerSpec
spec) {
+ this.id = id;
+ this.spec = spec;
+ }
+
+ @Override
+ public void start(Platform platform, WorkerStatusTracker status,
+ KafkaFutureImpl<String> doneFuture) {
+ if (!running.compareAndSet(false, true)) {
+ throw new IllegalStateException("ConfigurableProducerWorker is
already running.");
+ }
+ log.info("{}: Activating ConfigurableProducerWorker with {}", id,
spec);
+ // Create an executor with 2 threads. We need the second thread so
+ // that the StatusUpdater can run in parallel with SendRecords.
+ this.executor = Executors.newScheduledThreadPool(2,
+
ThreadUtils.createThreadFactory("ConfigurableProducerWorkerThread%d", false));
+ this.status = status;
+ this.doneFuture = doneFuture;
+ executor.submit(new Prepare());
+ }
+
+ public class Prepare implements Runnable {
+ @Override
+ public void run() {
+ try {
+ Map<String, NewTopic> newTopics = new HashMap<>();
+ if (spec.activeTopic().materialize().size() != 1) {
+ throw new RuntimeException("Can only run against 1
topic.");
+ }
+ List<TopicPartition> active = new ArrayList<>();
+ for (Map.Entry<String, PartitionsSpec> entry :
+ spec.activeTopic().materialize().entrySet()) {
+ String topicName = entry.getKey();
+ PartitionsSpec partSpec = entry.getValue();
+ newTopics.put(topicName, partSpec.newTopic(topicName));
+ for (Integer partitionNumber :
partSpec.partitionNumbers()) {
+ active.add(new TopicPartition(topicName,
partitionNumber));
+ }
+ }
+ status.update(new TextNode("Creating " +
newTopics.keySet().size() + " topic(s)"));
+ WorkerUtils.createTopics(log, spec.bootstrapServers(),
spec.commonClientConf(),
+ spec.adminClientConf(), newTopics,
false);
+ status.update(new TextNode("Created " +
newTopics.keySet().size() + " topic(s)"));
+ executor.submit(new SendRecords(active.get(0).topic(),
spec.activePartition()));
+ } catch (Throwable e) {
+ WorkerUtils.abort(log, "Prepare", e, doneFuture);
+ }
+ }
+ }
+
+ private static class SendRecordsCallback implements Callback {
+ private final SendRecords sendRecords;
+ private final long startMs;
+
+ SendRecordsCallback(SendRecords sendRecords, long startMs) {
+ this.sendRecords = sendRecords;
+ this.startMs = startMs;
+ }
+
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception)
{
+ long now = Time.SYSTEM.milliseconds();
+ long durationMs = now - startMs;
+ sendRecords.recordDuration(durationMs);
+ if (exception != null) {
+ log.error("SendRecordsCallback: error", exception);
+ }
+ }
+ }
+
+ public class SendRecords implements Callable<Void> {
+ private final String activeTopic;
+ private final int activePartition;
+
+ private final Histogram histogram;
+
+ private final Future<?> statusUpdaterFuture;
+
+ private final KafkaProducer<byte[], byte[]> producer;
+
+ private final PayloadIterator keys;
+
+ private final PayloadIterator values;
+
+ private Future<RecordMetadata> sendFuture;
+
+ SendRecords(String topic, int partition) {
+ this.activeTopic = topic;
+ this.activePartition = partition;
+ this.histogram = new Histogram(10000);
+
+ this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
+ new StatusUpdater(histogram), 30, 30, TimeUnit.SECONDS);
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
spec.bootstrapServers());
+ WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(),
spec.producerConf());
+ this.producer = new KafkaProducer<>(props, new
ByteArraySerializer(), new ByteArraySerializer());
+ this.keys = new PayloadIterator(spec.keyGenerator());
+ this.values = new PayloadIterator(spec.valueGenerator());
+ }
+
+ @Override
+ public Void call() throws Exception {
+ long startTimeMs = Time.SYSTEM.milliseconds();
+ try {
+ try {
+ long sentMessages = 0;
+ while (true) {
+ sendMessage();
+ sentMessages++;
+ }
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ if (sendFuture != null) {
+ try {
+ sendFuture.get();
+ } catch (Exception e) {
+ log.error("Exception on final future", e);
+ }
+ }
+ producer.close();
+ }
+ } catch (Exception e) {
+ WorkerUtils.abort(log, "SendRecords", e, doneFuture);
+ } finally {
+ statusUpdaterFuture.cancel(false);
+ StatusData statusData = new StatusUpdater(histogram).update();
+ long curTimeMs = Time.SYSTEM.milliseconds();
+ log.info("Sent {} total record(s) in {} ms. status: {}",
+ histogram.summarize().numSamples(), curTimeMs -
startTimeMs, statusData);
+ }
+ doneFuture.complete("");
+ return null;
+ }
+
+ private void sendMessage() throws InterruptedException {
+ ProducerRecord<byte[], byte[]> record;
+ if (activePartition != -1) {
+ record = new ProducerRecord<>(activeTopic, activePartition,
keys.next(), values.next());
+ } else {
+ record = new ProducerRecord<>(activeTopic, keys.next(),
values.next());
+ }
+ sendFuture = producer.send(record, new SendRecordsCallback(this,
Time.SYSTEM.milliseconds()));
+ spec.flushGenerator().ifPresent(flushGenerator ->
flushGenerator.increment(producer));
+ spec.throughputGenerator().throttle();
+ }
+
+ void recordDuration(long durationMs) {
+ histogram.add(durationMs);
+ }
+ }
+
+ public class StatusUpdater implements Runnable {
+ private final Histogram histogram;
+
+ StatusUpdater(Histogram histogram) {
+ this.histogram = histogram;
+ }
+
+ @Override
+ public void run() {
+ try {
+ update();
+ } catch (Exception e) {
+ WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
+ }
+ }
+
+ StatusData update() {
+ Histogram.Summary summary =
histogram.summarize(StatusData.PERCENTILES);
+ StatusData statusData = new StatusData(summary.numSamples(),
summary.average(),
+ summary.percentiles().get(0).value(),
+ summary.percentiles().get(1).value(),
+ summary.percentiles().get(2).value());
+ status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
+ return statusData;
+ }
+ }
+
+ public static class StatusData {
+ private final long totalSent;
+ private final float averageLatencyMs;
+ private final int p50LatencyMs;
+ private final int p95LatencyMs;
+ private final int p99LatencyMs;
+
+ /**
+ * The percentiles to use when calculating the histogram data.
+ * These should match up with the p50LatencyMs, p95LatencyMs, etc.
fields.
+ */
+ final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
+
+ @JsonCreator
+ StatusData(@JsonProperty("totalSent") long totalSent,
+ @JsonProperty("averageLatencyMs") float averageLatencyMs,
+ @JsonProperty("p50LatencyMs") int p50latencyMs,
+ @JsonProperty("p95LatencyMs") int p95latencyMs,
+ @JsonProperty("p99LatencyMs") int p99latencyMs) {
+ this.totalSent = totalSent;
+ this.averageLatencyMs = averageLatencyMs;
+ this.p50LatencyMs = p50latencyMs;
+ this.p95LatencyMs = p95latencyMs;
+ this.p99LatencyMs = p99latencyMs;
+ }
+
+ @JsonProperty
+ public long totalSent() {
+ return totalSent;
+ }
+
+ @JsonProperty
+ public float averageLatencyMs() {
+ return averageLatencyMs;
+ }
+
+ @JsonProperty
+ public int p50LatencyMs() {
+ return p50LatencyMs;
+ }
+
+ @JsonProperty
+ public int p95LatencyMs() {
+ return p95LatencyMs;
+ }
+
+ @JsonProperty
+ public int p99LatencyMs() {
+ return p99LatencyMs;
+ }
+ }
+
+ @Override
+ public void stop(Platform platform) throws Exception {
+ if (!running.compareAndSet(true, false)) {
+ throw new IllegalStateException("ConfigurableProducerWorker is not
running.");
+ }
+ log.info("{}: Deactivating ConfigurableProducerWorker.", id);
+ doneFuture.complete("");
+ executor.shutdownNow();
+ executor.awaitTermination(1, TimeUnit.DAYS);
+ this.executor = null;
+ this.status = null;
+ this.doneFuture = null;
+ }
+}
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantFlushGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantFlushGenerator.java
new file mode 100644
index 0000000..9d656b2
--- /dev/null
+++
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantFlushGenerator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.errors.InterruptException;
+
+/**
+ * This generator will flush the producer after a specific number of messages.
This is useful to simulate a specific
+ * number of messages in a batch regardless of the message size, since batch
flushing is not exposed in the
+ * KafkaProducer client code.
+ *
+ * WARNING: This does not directly control when KafkaProducer will batch, this
only makes best effort. This also
+ * cannot tell when a KafkaProducer batch is closed. If the KafkaProducer
sends a batch before this executes, this
+ * will continue to execute on its own cadence. To alleviate this, make sure
to set `linger.ms` to allow for at least
+ * `messagesPerFlush` messages to be generated, and make sure to set
`batch.size` to allow for all these messages.
+ *
+ * Here is an example spec:
+ *
+ * {
+ * "type": "constant",
+ * "messagesPerFlush": 16
+ * }
+ *
+ * This example will flush the producer every 16 messages.
+ */
+
+public class ConstantFlushGenerator implements FlushGenerator {
+ private final int messagesPerFlush;
+ private int messageTracker = 0;
+
+ @JsonCreator
+ public ConstantFlushGenerator(@JsonProperty("messagesPerFlush") int
messagesPerFlush) {
+ this.messagesPerFlush = messagesPerFlush;
+ }
+
+ @JsonProperty
+ public int messagesPerFlush() {
+ return messagesPerFlush;
+ }
+
+ @Override
+ public synchronized <K, V> void increment(KafkaProducer<K, V> producer) {
+ // Increment the message tracker.
+ messageTracker += 1;
+
+ // Flush when we reach the desired number of messages.
+ if (messageTracker >= messagesPerFlush) {
+ messageTracker = 0;
+ try {
+ producer.flush();
+ } catch (InterruptException e) {
+ // Ignore flush interruption exceptions.
+ }
+ }
+ }
+}
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
new file mode 100644
index 0000000..19e5af0
--- /dev/null
+++
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * This throughput generator configures constant throughput.
+ *
+ * The lower the window size, the smoother the traffic will be. Using a 100ms
window offers no noticeable spikes in
+ * traffic while still being long enough to avoid too much overhead.
+ *
+ * WARNING: Due to binary nature of throughput in terms of messages sent in a
window, each window will send at least 1
+ * message, and each window sends the same number of messages, rounded down.
For example, 99 messages per second with a
+ * 100ms window will only send 90 messages per second, or 9 messages per
window. Another example, in order to send only
+ * 5 messages per second, a window size of 200ms is required. In cases like
these, both the `messagesPerSecond` and
+ * `windowSizeMs` parameters should be adjusted together to achieve more
accurate throughput.
+ *
+ * Here is an example spec:
+ *
+ * {
+ * "type": "constant",
+ * "messagesPerSecond": 500,
+ * "windowSizeMs": 100
+ * }
+ *
+ * This will produce a workload that runs 500 messages per second, with a
maximum resolution of 50 messages per 100
+ * millisecond.
+ */
+
+public class ConstantThroughputGenerator implements ThroughputGenerator {
+ private final int messagesPerSecond;
+ private final int messagesPerWindow;
+ private final long windowSizeMs;
+
+ private long nextWindowStarts = 0;
+ private int messageTracker = 0;
+
+ @JsonCreator
+ public ConstantThroughputGenerator(@JsonProperty("messagesPerSecond") int
messagesPerSecond,
+ @JsonProperty("windowSizeMs") long
windowSizeMs) {
+ // Calcualte the default values.
+ if (windowSizeMs <= 0) {
+ windowSizeMs = 100;
+ }
+ this.windowSizeMs = windowSizeMs;
+ this.messagesPerSecond = messagesPerSecond;
+
+ // Use the rest of the parameters to calculate window properties.
+ this.messagesPerWindow = (int) ((long) messagesPerSecond /
windowSizeMs);
+ calculateNextWindow();
+ }
+
+ @JsonProperty
+ public int messagesPerSecond() {
+ return messagesPerSecond;
+ }
+
+ private void calculateNextWindow() {
+ // Reset the message count.
+ messageTracker = 0;
+
+ // Calculate the next window start time.
+ long now = Time.SYSTEM.milliseconds();
+ if (nextWindowStarts > 0) {
+ while (nextWindowStarts < now) {
+ nextWindowStarts += windowSizeMs;
+ }
+ } else {
+ nextWindowStarts = now + windowSizeMs;
+ }
+ }
+
+ @Override
+ public synchronized void throttle() throws InterruptedException {
+ // Run unthrottled if messagesPerSecond is negative.
+ if (messagesPerSecond < 0) {
+ return;
+ }
+
+ // Calculate the next window if we've moved beyond the current one.
+ if (Time.SYSTEM.milliseconds() >= nextWindowStarts) {
+ calculateNextWindow();
+ }
+
+ // Increment the message tracker.
+ messageTracker += 1;
+
+ // Compare the tracked message count with the throttle limits.
+ if (messageTracker >= messagesPerWindow) {
+
+ // Wait the difference in time between now and when the next
window starts.
+ wait(nextWindowStarts - Time.SYSTEM.milliseconds());
+ }
+ }
+}
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
index b7e0172..6909d2d 100644
---
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
+++
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
@@ -35,6 +35,7 @@ import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
+import java.util.Optional;
/**
* The specification for a benchmark that consumer messages from a set of
topic/partitions.
@@ -71,6 +72,9 @@ import java.util.HashSet;
* explicitly specifying partitions in "activeTopics" when there are multiple
"threadsPerWorker"
* and a particular "consumerGroup" will result in an #{@link
ConfigException}, aborting the task.
*
+ * The "recordProcessor" field allows the specification of tasks to run on
records that are consumed. This is run
+ * immediately after the messages are polled. See the `RecordProcessor`
interface for more information.
+ *
* An example JSON representation which will result in a consumer that is part
of the consumer group "cg" and
* subscribed to topics foo1, foo2, foo3 and bar.
* #{@code
@@ -98,6 +102,7 @@ public class ConsumeBenchSpec extends TaskSpec {
private final List<String> activeTopics;
private final String consumerGroup;
private final int threadsPerWorker;
+ private final Optional<RecordProcessor> recordProcessor;
@JsonCreator
public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@@ -111,6 +116,7 @@ public class ConsumeBenchSpec extends TaskSpec {
@JsonProperty("commonClientConf") Map<String,
String> commonClientConf,
@JsonProperty("adminClientConf") Map<String,
String> adminClientConf,
@JsonProperty("threadsPerWorker") Integer
threadsPerWorker,
+ @JsonProperty("recordProcessor")
Optional<RecordProcessor> recordProcessor,
@JsonProperty("activeTopics") List<String>
activeTopics) {
super(startMs, durationMs);
this.consumerNode = (consumerNode == null) ? "" : consumerNode;
@@ -123,6 +129,7 @@ public class ConsumeBenchSpec extends TaskSpec {
this.activeTopics = activeTopics == null ? new ArrayList<>() :
activeTopics;
this.consumerGroup = consumerGroup == null ? "" : consumerGroup;
this.threadsPerWorker = threadsPerWorker == null ? 1 :
threadsPerWorker;
+ this.recordProcessor = recordProcessor;
}
@JsonProperty
@@ -156,6 +163,11 @@ public class ConsumeBenchSpec extends TaskSpec {
}
@JsonProperty
+ public Optional<RecordProcessor> recordProcessor() {
+ return this.recordProcessor;
+ }
+
+ @JsonProperty
public Map<String, String> consumerConf() {
return consumerConf;
}
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
index 4e77dff..f6067b5 100644
---
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
+++
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
@@ -47,6 +47,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.Optional;
import java.util.Properties;
import java.util.HashMap;
import java.util.concurrent.Callable;
@@ -123,18 +124,18 @@ public class ConsumeBenchWorker implements TaskWorker {
consumer = consumer(consumerGroup, clientId(0));
if (toUseGroupPartitionAssignment) {
Set<String> topics = partitionsByTopic.keySet();
- tasks.add(new ConsumeMessages(consumer, topics));
+ tasks.add(new ConsumeMessages(consumer,
spec.recordProcessor(), topics));
for (int i = 0; i < consumerCount - 1; i++) {
- tasks.add(new ConsumeMessages(consumer(consumerGroup(),
clientId(i + 1)), topics));
+ tasks.add(new ConsumeMessages(consumer(consumerGroup(),
clientId(i + 1)), spec.recordProcessor(), topics));
}
} else {
List<TopicPartition> partitions =
populatePartitionsByTopic(consumer.consumer(), partitionsByTopic)
.values().stream().flatMap(List::stream).collect(Collectors.toList());
- tasks.add(new ConsumeMessages(consumer, partitions));
+ tasks.add(new ConsumeMessages(consumer,
spec.recordProcessor(), partitions));
for (int i = 0; i < consumerCount - 1; i++) {
- tasks.add(new ConsumeMessages(consumer(consumerGroup(),
clientId(i + 1)), partitions));
+ tasks.add(new ConsumeMessages(consumer(consumerGroup(),
clientId(i + 1)), spec.recordProcessor(), partitions));
}
}
@@ -198,13 +199,15 @@ public class ConsumeBenchWorker implements TaskWorker {
private final Throttle throttle;
private final String clientId;
private final ThreadSafeConsumer consumer;
+ private final Optional<RecordProcessor> recordProcessor;
- private ConsumeMessages(ThreadSafeConsumer consumer) {
+ private ConsumeMessages(ThreadSafeConsumer consumer,
+ Optional<RecordProcessor> recordProcessor) {
this.latencyHistogram = new Histogram(10000);
this.messageSizeHistogram = new Histogram(2 * 1024 * 1024);
this.clientId = consumer.clientId();
this.statusUpdaterFuture = executor.scheduleAtFixedRate(
- new ConsumeStatusUpdater(latencyHistogram,
messageSizeHistogram, consumer), 1, 1, TimeUnit.MINUTES);
+ new ConsumeStatusUpdater(latencyHistogram,
messageSizeHistogram, consumer, recordProcessor), 1, 1, TimeUnit.MINUTES);
int perPeriod;
if (spec.targetMessagesPerSec() <= 0)
perPeriod = Integer.MAX_VALUE;
@@ -213,16 +216,20 @@ public class ConsumeBenchWorker implements TaskWorker {
this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
this.consumer = consumer;
+ this.recordProcessor = recordProcessor;
}
- ConsumeMessages(ThreadSafeConsumer consumer, Set<String> topics) {
- this(consumer);
+ ConsumeMessages(ThreadSafeConsumer consumer,
+ Optional<RecordProcessor> recordProcessor,
+ Set<String> topics) {
+ this(consumer, recordProcessor);
log.info("Will consume from topics {} via dynamic group
assignment.", topics);
this.consumer.subscribe(topics);
}
-
- ConsumeMessages(ThreadSafeConsumer consumer, List<TopicPartition>
partitions) {
- this(consumer);
+ ConsumeMessages(ThreadSafeConsumer consumer,
+ Optional<RecordProcessor> recordProcessor,
+ List<TopicPartition> partitions) {
+ this(consumer, recordProcessor);
log.info("Will consume from topic partitions {} via manual
assignment.", partitions);
this.consumer.assign(partitions);
}
@@ -242,6 +249,10 @@ public class ConsumeBenchWorker implements TaskWorker {
}
long endBatchMs = Time.SYSTEM.milliseconds();
long elapsedBatchMs = endBatchMs - startBatchMs;
+
+ // Do the record batch processing immediately to avoid
latency skew.
+ recordProcessor.ifPresent(processor ->
processor.processRecords(records));
+
for (ConsumerRecord<byte[], byte[]> record : records) {
messagesConsumed++;
long messageBytes = 0;
@@ -266,7 +277,7 @@ public class ConsumeBenchWorker implements TaskWorker {
} finally {
statusUpdaterFuture.cancel(false);
StatusData statusData =
- new ConsumeStatusUpdater(latencyHistogram,
messageSizeHistogram, consumer).update();
+ new ConsumeStatusUpdater(latencyHistogram,
messageSizeHistogram, consumer, spec.recordProcessor()).update();
long curTimeMs = Time.SYSTEM.milliseconds();
log.info("{} Consumed total number of messages={}, bytes={} in
{} ms. status: {}",
clientId, messagesConsumed, bytesConsumed, curTimeMs
- startTimeMs, statusData);
@@ -331,11 +342,16 @@ public class ConsumeBenchWorker implements TaskWorker {
private final Histogram latencyHistogram;
private final Histogram messageSizeHistogram;
private final ThreadSafeConsumer consumer;
+ private final Optional<RecordProcessor> recordProcessor;
- ConsumeStatusUpdater(Histogram latencyHistogram, Histogram
messageSizeHistogram, ThreadSafeConsumer consumer) {
+ ConsumeStatusUpdater(Histogram latencyHistogram,
+ Histogram messageSizeHistogram,
+ ThreadSafeConsumer consumer,
+ Optional<RecordProcessor> recordProcessor) {
this.latencyHistogram = latencyHistogram;
this.messageSizeHistogram = messageSizeHistogram;
this.consumer = consumer;
+ this.recordProcessor = recordProcessor;
}
@Override
@@ -350,6 +366,13 @@ public class ConsumeBenchWorker implements TaskWorker {
StatusData update() {
Histogram.Summary latSummary =
latencyHistogram.summarize(StatusData.PERCENTILES);
Histogram.Summary msgSummary =
messageSizeHistogram.summarize(StatusData.PERCENTILES);
+
+ // Parse out the RecordProcessor's status, id specified.
+ Optional<JsonNode> recordProcessorStatus = Optional.empty();
+ if (recordProcessor.isPresent()) {
+ recordProcessorStatus =
Optional.of(recordProcessor.get().processorStatus());
+ }
+
StatusData statusData = new StatusData(
consumer.assignedPartitions(),
latSummary.numSamples(),
@@ -358,7 +381,8 @@ public class ConsumeBenchWorker implements TaskWorker {
latSummary.average(),
latSummary.percentiles().get(0).value(),
latSummary.percentiles().get(1).value(),
- latSummary.percentiles().get(2).value());
+ latSummary.percentiles().get(2).value(),
+ recordProcessorStatus);
statusUpdater.updateConsumeStatus(consumer.clientId(), statusData);
log.info("Status={}", JsonUtil.toJsonString(statusData));
return statusData;
@@ -374,6 +398,7 @@ public class ConsumeBenchWorker implements TaskWorker {
private final int p50LatencyMs;
private final int p95LatencyMs;
private final int p99LatencyMs;
+ private final Optional<JsonNode> recordProcessorStatus;
/**
* The percentiles to use when calculating the histogram data.
@@ -388,7 +413,8 @@ public class ConsumeBenchWorker implements TaskWorker {
@JsonProperty("averageLatencyMs") float averageLatencyMs,
@JsonProperty("p50LatencyMs") int p50latencyMs,
@JsonProperty("p95LatencyMs") int p95latencyMs,
- @JsonProperty("p99LatencyMs") int p99latencyMs) {
+ @JsonProperty("p99LatencyMs") int p99latencyMs,
+ @JsonProperty("recordProcessorStatus") Optional<JsonNode>
recordProcessorStatus) {
this.assignedPartitions = assignedPartitions;
this.totalMessagesReceived = totalMessagesReceived;
this.totalBytesReceived = totalBytesReceived;
@@ -397,6 +423,7 @@ public class ConsumeBenchWorker implements TaskWorker {
this.p50LatencyMs = p50latencyMs;
this.p95LatencyMs = p95latencyMs;
this.p99LatencyMs = p99latencyMs;
+ this.recordProcessorStatus = recordProcessorStatus;
}
@JsonProperty
@@ -438,6 +465,11 @@ public class ConsumeBenchWorker implements TaskWorker {
public int p99LatencyMs() {
return p99LatencyMs;
}
+
+ @JsonProperty
+ public JsonNode recordProcessorStatus() {
+ return recordProcessorStatus.orElse(null);
+ }
}
@Override
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/FlushGenerator.java
similarity index 54%
copy from
tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
copy to
tools/src/main/java/org/apache/kafka/trogdor/workload/FlushGenerator.java
index b06ba01..8c0ae62 100644
---
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/FlushGenerator.java
@@ -19,31 +19,27 @@ package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.kafka.clients.producer.KafkaProducer;
/**
- * Generates byte arrays based on a position argument.
+ * This interface is used to facilitate flushing the KafkaProducers on a
cadence specified by the user.
*
- * The array generated at a given position should be the same no matter how
many
- * times generate() is invoked. PayloadGenerator instances should be immutable
- * and thread-safe.
+ * Currently there are 3 flushing methods:
+ *
+ * * Disabled, by not specifying this parameter.
+ * * `constant` will use `ConstantFlushGenerator` to keep the number of
messages per batch constant.
+ * * `gaussian` will use `GaussianFlushGenerator` to vary the number of
messages per batch on a normal distribution.
+ *
+ * Please see the implementation classes for more details.
*/
+
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes(value = {
- @JsonSubTypes.Type(value = ConstantPayloadGenerator.class, name =
"constant"),
- @JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name =
"sequential"),
- @JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name =
"uniformRandom"),
- @JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null"),
- @JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name =
"randomComponent")
+ @JsonSubTypes.Type(value = ConstantFlushGenerator.class, name =
"constant"),
+ @JsonSubTypes.Type(value = GaussianFlushGenerator.class, name = "gaussian")
})
-public interface PayloadGenerator {
- /**
- * Generate a payload.
- *
- * @param position The position to use to generate the payload
- *
- * @return A new array object containing the payload.
- */
- byte[] generate(long position);
+public interface FlushGenerator {
+ <K, V> void increment(KafkaProducer<K, V> producer);
}
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java
new file mode 100644
index 0000000..a3157db
--- /dev/null
+++
b/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianFlushGenerator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.errors.InterruptException;
+import java.util.Random;
+
+/**
+ * This generator will flush the producer after a specific number of messages,
determined by a gaussian distribution.
+ * This is useful to simulate a specific number of messages in a batch
regardless of the message size, since batch
+ * flushing is not exposed in the KafkaProducer.
+ *
+ * WARNING: This does not directly control when KafkaProducer will batch, this
only makes best effort. This also
+ * cannot tell when a KafkaProducer batch is closed. If the KafkaProducer
sends a batch before this executes, this
+ * will continue to execute on its own cadence. To alleviate this, make sure
to set `linger.ms` to allow for messages
+ * to be generated up to your upper limit threshold, and make sure to set
`batch.size` to allow for all these messages.
+ *
+ * Here is an example spec:
+ *
+ * {
+ * "type": "gaussian",
+ * "messagesPerFlushAverage": 16,
+ * "messagesPerFlushDeviation": 4
+ * }
+ *
+ * This example will flush the producer on average every 16 messages, assuming
`linger.ms` and `batch.size` allow for
+ * it. That average changes based on a normal distribution after each flush:
+ *
+ * An average of the flushes will be at 16 messages.
+ * ~68% of the flushes are at between 12 and 20 messages.
+ * ~95% of the flushes are at between 8 and 24 messages.
+ * ~99% of the flushes are at between 4 and 28 messages.
+ */
+
+public class GaussianFlushGenerator implements FlushGenerator {
+ private final int messagesPerFlushAverage;
+ private final int messagesPerFlushDeviation;
+
+ private final Random random = new Random();
+
+ private int messageTracker = 0;
+ private int flushSize = 0;
+
+ @JsonCreator
+ public GaussianFlushGenerator(@JsonProperty("messagesPerFlushAverage") int
messagesPerFlushAverage,
+ @JsonProperty("messagesPerFlushDeviation")
int messagesPerFlushDeviation) {
+ this.messagesPerFlushAverage = messagesPerFlushAverage;
+ this.messagesPerFlushDeviation = messagesPerFlushDeviation;
+ calculateFlushSize();
+ }
+
+ @JsonProperty
+ public int messagesPerFlushAverage() {
+ return messagesPerFlushAverage;
+ }
+
+ @JsonProperty
+ public long messagesPerFlushDeviation() {
+ return messagesPerFlushDeviation;
+ }
+
+ private synchronized void calculateFlushSize() {
+ flushSize = Math.max((int) (random.nextGaussian() *
messagesPerFlushDeviation) + messagesPerFlushAverage, 1);
+ messageTracker = 0;
+ }
+
+ @Override
+ public synchronized <K, V> void increment(KafkaProducer<K, V> producer) {
+ // Increment the message tracker.
+ messageTracker += 1;
+
+ // Compare the tracked message count with the throttle limits.
+ if (messageTracker >= flushSize) {
+ try {
+ producer.flush();
+ } catch (InterruptException e) {
+ // Ignore flush interruption exceptions.
+ }
+ calculateFlushSize();
+ }
+ }
+}
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
new file mode 100644
index 0000000..6d71ff5
--- /dev/null
+++
b/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianThroughputGenerator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+import java.util.Random;
+
+/*
+ * This throughput generator configures throughput with a gaussian normal
distribution on a per-window basis. You can
+ * specify how many windows to keep the throughput at the rate before
changing. All traffic will follow a gaussian
+ * distribution centered around `messagesPerSecondAverage` with a deviation of
`messagesPerSecondDeviation`.
+ *
+ * The lower the window size, the smoother the traffic will be. Using a 100ms
window offers no noticeable spikes in
+ * traffic while still being long enough to avoid too much overhead.
+ *
+ * WARNING: Due to binary nature of throughput in terms of messages sent in a
window, this does not work well for an
+ * average throughput of less than 5 messages per window. In cases where you
want lower throughput, please adjust the
+ * `windowSizeMs` accordingly.
+ *
+ * Here is an example spec:
+ *
+ * {
+ * "type": "gaussian",
+ * "messagesPerSecondAverage": 500,
+ * "messagesPerSecondDeviation": 50,
+ * "windowsUntilRateChange": 100,
+ * "windowSizeMs": 100
+ * }
+ *
+ * This will produce a workload that runs on average 500 messages per second,
however that speed will change every 10
+ * seconds due to the `windowSizeMs * windowsUntilRateChange` parameters. The
throughput will have the following
+ * normal distribution:
+ *
+ * An average of the throughput windows of 500 messages per second.
+ * ~68% of the throughput windows are between 450 and 550 messages per
second.
+ * ~95% of the throughput windows are between 400 and 600 messages per
second.
+ * ~99% of the throughput windows are between 350 and 650 messages per
second.
+ *
+ */
+
+public class GaussianThroughputGenerator implements ThroughputGenerator {
+ private final int messagesPerSecondAverage;
+ private final int messagesPerSecondDeviation;
+ private final int messagesPerWindowAverage;
+ private final int messagesPerWindowDeviation;
+ private final int windowsUntilRateChange;
+ private final long windowSizeMs;
+
+ private final Random random = new Random();
+
+ private long nextWindowStarts = 0;
+ private int messageTracker = 0;
+ private int windowTracker = 0;
+ private int throttleMessages = 0;
+
+ @JsonCreator
+ public
GaussianThroughputGenerator(@JsonProperty("messagesPerSecondAverage") int
messagesPerSecondAverage,
+
@JsonProperty("messagesPerSecondDeviation") int messagesPerSecondDeviation,
+ @JsonProperty("windowsUntilRateChange")
int windowsUntilRateChange,
+ @JsonProperty("windowSizeMs") long
windowSizeMs) {
+ // Calcualte the default values.
+ if (windowSizeMs <= 0) {
+ windowSizeMs = 100;
+ }
+ this.windowSizeMs = windowSizeMs;
+ this.messagesPerSecondAverage = messagesPerSecondAverage;
+ this.messagesPerSecondDeviation = messagesPerSecondDeviation;
+ this.windowsUntilRateChange = windowsUntilRateChange;
+
+ // Take per-second calculations and convert them to per-window
calculations.
+ messagesPerWindowAverage = (int) (messagesPerSecondAverage *
windowSizeMs / 1000);
+ messagesPerWindowDeviation = (int) (messagesPerSecondDeviation *
windowSizeMs / 1000);
+
+ // Calcualte the first window.
+ calculateNextWindow(true);
+ }
+
+ @JsonProperty
+ public int messagesPerSecondAverage() {
+ return messagesPerSecondAverage;
+ }
+
+ @JsonProperty
+ public long messagesPerSecondDeviation() {
+ return messagesPerSecondDeviation;
+ }
+
+ @JsonProperty
+ public long windowsUntilRateChange() {
+ return windowsUntilRateChange;
+ }
+
+ private synchronized void calculateNextWindow(boolean force) {
+ // Reset the message count.
+ messageTracker = 0;
+
+ // Calculate the next window start time.
+ long now = Time.SYSTEM.milliseconds();
+ if (nextWindowStarts > 0) {
+ while (nextWindowStarts < now) {
+ nextWindowStarts += windowSizeMs;
+ }
+ } else {
+ nextWindowStarts = now + windowSizeMs;
+ }
+
+ // Check the windows between rate changes.
+ if ((windowTracker > windowsUntilRateChange) || force) {
+ windowTracker = 0;
+
+ // Calculate the number of messages allowed in this window using a
normal distribution.
+ // The formula is: Messages = Gaussian * Deviation + Average
+ throttleMessages = Math.max((int) (random.nextGaussian() *
(double) messagesPerWindowDeviation) + messagesPerWindowAverage, 1);
+ }
+ windowTracker += 1;
+ }
+
+ @Override
+ public synchronized void throttle() throws InterruptedException {
+ // Calculate the next window if we've moved beyond the current one.
+ if (Time.SYSTEM.milliseconds() >= nextWindowStarts) {
+ calculateNextWindow(false);
+ }
+
+ // Increment the message tracker.
+ messageTracker += 1;
+
+ // Compare the tracked message count with the throttle limits.
+ if (messageTracker >= throttleMessages) {
+
+ // Wait the difference in time between now and when the next
window starts.
+ wait(nextWindowStarts - Time.SYSTEM.milliseconds());
+ }
+ }
+}
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
new file mode 100644
index 0000000..90fe279
--- /dev/null
+++
b/tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Random;
+
+/**
+ * This class behaves identically to TimestampRandomPayloadGenerator, except
the message size follows a gaussian
+ * distribution.
+ *
+ * This should be used in conjunction with TimestampRecordProcessor in the
Consumer to measure true end-to-end latency
+ * of a system.
+ *
+ * `messageSizeAverage` - The average size in bytes of each message.
+ * `messageSizeDeviation` - The standard deviation to use when calculating
message size.
+ * `messagesUntilSizeChange` - The number of messages to keep at the same size.
+ * `seed` - Used to initialize Random() to remove some non-determinism.
+ *
+ * Here is an example spec:
+ *
+ * {
+ * "type": "gaussianTimestampRandom",
+ * "messageSizeAverage": 512,
+ * "messageSizeDeviation": 100,
+ * "messagesUntilSizeChange": 100
+ * }
+ *
+ * This will generate messages on a gaussian distribution with an average size
each 512-bytes. The message sizes will
+ * have a standard deviation of 100 bytes, and the size will only change every
100 messages. The distribution of
+ * messages will be as follows:
+ *
+ * The average size of the messages are 512 bytes.
+ * ~68% of the messages are between 412 and 612 bytes
+ * ~95% of the messages are between 312 and 712 bytes
+ * ~99% of the messages are between 212 and 812 bytes
+ */
+
+public class GaussianTimestampRandomPayloadGenerator implements
PayloadGenerator {
+ private final int messageSizeAverage;
+ private final int messageSizeDeviation;
+ private final int messagesUntilSizeChange;
+ private final long seed;
+
+ private final Random random = new Random();
+ private final ByteBuffer buffer;
+
+ private int messageTracker = 0;
+ private int messageSize = 0;
+
+ @JsonCreator
+ public
GaussianTimestampRandomPayloadGenerator(@JsonProperty("messageSizeAverage") int
messageSizeAverage,
+
@JsonProperty("messageSizeDeviation") int messageSizeDeviation,
+
@JsonProperty("messagesUntilSizeChange") int messagesUntilSizeChange,
+ @JsonProperty("seed") long
seed) {
+ this.messageSizeAverage = messageSizeAverage;
+ this.messageSizeDeviation = messageSizeDeviation;
+ this.seed = seed;
+ this.messagesUntilSizeChange = messagesUntilSizeChange;
+ buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+ }
+
+ @JsonProperty
+ public int messageSizeAverage() {
+ return messageSizeAverage;
+ }
+
+ @JsonProperty
+ public long messageSizeDeviation() {
+ return messageSizeDeviation;
+ }
+
+ @JsonProperty
+ public long seed() {
+ return seed;
+ }
+
+ @Override
+ public synchronized byte[] generate(long position) {
+ // Make the random number generator deterministic for unit tests.
+ random.setSeed(seed + position);
+
+ // Calculate the next message size based on a gaussian distribution.
+ if ((messageSize == 0) || (messageTracker >= messagesUntilSizeChange))
{
+ messageTracker = 0;
+ messageSize = Math.max((int) (random.nextGaussian() *
messageSizeDeviation) + messageSizeAverage, Long.BYTES);
+ }
+ messageTracker += 1;
+
+ // Generate out of order to prevent inclusion of random number
generation in latency numbers.
+ byte[] result = new byte[messageSize];
+ random.nextBytes(result);
+
+ // Do the timestamp generation as the very last task.
+ buffer.clear();
+ buffer.putLong(Time.SYSTEM.milliseconds());
+ buffer.rewind();
+ System.arraycopy(buffer.array(), 0, result, 0, Long.BYTES);
+ return result;
+ }
+}
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
index b06ba01..225a663 100644
---
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
+++
b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
@@ -35,7 +35,9 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name =
"sequential"),
@JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name =
"uniformRandom"),
@JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null"),
- @JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name =
"randomComponent")
+ @JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name =
"randomComponent"),
+ @JsonSubTypes.Type(value = TimestampRandomPayloadGenerator.class, name =
"timestampRandom"),
+ @JsonSubTypes.Type(value = GaussianTimestampRandomPayloadGenerator.class,
name = "gaussianTimestampRandom")
})
public interface PayloadGenerator {
/**
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java
similarity index 50%
copy from
tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
copy to
tools/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java
index b06ba01..500736d 100644
---
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RecordProcessor.java
@@ -19,31 +19,20 @@ package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
/**
- * Generates byte arrays based on a position argument.
- *
- * The array generated at a given position should be the same no matter how
many
- * times generate() is invoked. PayloadGenerator instances should be immutable
- * and thread-safe.
+ * RecordProcessor allows for acting on data polled from ConsumeBench
workloads.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
- include = JsonTypeInfo.As.PROPERTY,
- property = "type")
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "type")
@JsonSubTypes(value = {
- @JsonSubTypes.Type(value = ConstantPayloadGenerator.class, name =
"constant"),
- @JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name =
"sequential"),
- @JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name =
"uniformRandom"),
- @JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null"),
- @JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name =
"randomComponent")
- })
-public interface PayloadGenerator {
- /**
- * Generate a payload.
- *
- * @param position The position to use to generate the payload
- *
- * @return A new array object containing the payload.
- */
- byte[] generate(long position);
+ @JsonSubTypes.Type(value = TimestampRecordProcessor.class, name =
"timestamp"),
+})
+public interface RecordProcessor {
+ void processRecords(ConsumerRecords<byte[], byte[]> consumerRecords);
+ JsonNode processorStatus();
}
+
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ThroughputGenerator.java
similarity index 54%
copy from
tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
copy to
tools/src/main/java/org/apache/kafka/trogdor/workload/ThroughputGenerator.java
index b06ba01..b6be8a9 100644
---
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
+++
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ThroughputGenerator.java
@@ -21,29 +21,25 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
- * Generates byte arrays based on a position argument.
+ * This interface is used to facilitate running a configurable number of
messages per second by throttling if the
+ * throughput goes above a certain amount.
*
- * The array generated at a given position should be the same no matter how
many
- * times generate() is invoked. PayloadGenerator instances should be immutable
- * and thread-safe.
+ * Currently there are 2 throughput methods:
+ *
+ * * `constant` will use `ConstantThroughputGenerator` to keep the number of
messages per second constant.
+ * * `gaussian` will use `GaussianThroughputGenerator` to vary the number of
messages per second on a normal
+ * distribution.
+ *
+ * Please see the implementation classes for more details.
*/
+
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes(value = {
- @JsonSubTypes.Type(value = ConstantPayloadGenerator.class, name =
"constant"),
- @JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name =
"sequential"),
- @JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name =
"uniformRandom"),
- @JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null"),
- @JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name =
"randomComponent")
+ @JsonSubTypes.Type(value = ConstantThroughputGenerator.class, name =
"constant"),
+ @JsonSubTypes.Type(value = GaussianThroughputGenerator.class, name =
"gaussian")
})
-public interface PayloadGenerator {
- /**
- * Generate a payload.
- *
- * @param position The position to use to generate the payload
- *
- * @return A new array object containing the payload.
- */
- byte[] generate(long position);
+public interface ThroughputGenerator {
+ void throttle() throws InterruptedException;
}
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/TimestampRandomPayloadGenerator.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TimestampRandomPayloadGenerator.java
new file mode 100644
index 0000000..d2cfdc7
--- /dev/null
+++
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TimestampRandomPayloadGenerator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.ByteOrder;
+import java.util.Random;
+import java.nio.ByteBuffer;
+
+/**
+ * A PayloadGenerator which generates a timestamped uniform random payload.
+ *
+ * This generator generates pseudo-random payloads that can be reproduced from
run to run.
+ * The guarantees are the same as those of java.util.Random.
+ *
+ * The timestamp used for this class is in milliseconds since epoch, encoded
directly to the first several bytes of the
+ * payload.
+ *
+ * This should be used in conjunction with TimestampRecordProcessor in the
Consumer to measure true end-to-end latency
+ * of a system.
+ *
+ * `size` - The size in bytes of each message.
+ * `seed` - Used to initialize Random() to remove some non-determinism.
+ *
+ * Here is an example spec:
+ *
+ * {
+ * "type": "timestampRandom",
+ * "size": 512
+ * }
+ *
+ * This will generate a 512-byte random message with the first several bytes
encoded with the timestamp.
+ */
+public class TimestampRandomPayloadGenerator implements PayloadGenerator {
+ private final int size;
+ private final long seed;
+
+ private final byte[] randomBytes;
+ private final ByteBuffer buffer;
+
+ private final Random random = new Random();
+
+ @JsonCreator
+ public TimestampRandomPayloadGenerator(@JsonProperty("size") int size,
+ @JsonProperty("seed") long seed) {
+ this.size = size;
+ this.seed = seed;
+ if (size < Long.BYTES) {
+ throw new RuntimeException("The size of the payload must be
greater than or equal to " + Long.BYTES + ".");
+ }
+ random.setSeed(seed);
+ this.randomBytes = new byte[size - Long.BYTES];
+ buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+ }
+
+ @JsonProperty
+ public int size() {
+ return size;
+ }
+
+ @JsonProperty
+ public long seed() {
+ return seed;
+ }
+
+ @Override
+ public synchronized byte[] generate(long position) {
+ // Generate out of order to prevent inclusion of random number
generation in latency numbers.
+ byte[] result = new byte[size];
+ if (randomBytes.length > 0) {
+ random.setSeed(seed + position);
+ random.nextBytes(randomBytes);
+ System.arraycopy(randomBytes, 0, result, Long.BYTES,
randomBytes.length);
+ }
+
+ // Do the timestamp generation as the very last task.
+ buffer.clear();
+ buffer.putLong(Time.SYSTEM.milliseconds());
+ buffer.rewind();
+ System.arraycopy(buffer.array(), 0, result, 0, Long.BYTES);
+ return result;
+ }
+}
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
new file mode 100644
index 0000000..658016a
--- /dev/null
+++
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.common.utils.Time;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * This class will process records containing timestamps and generate a
histogram based on the data. It will then be
+ * present in the status from the `ConsumeBenchWorker` class. This must be
used with a timestamped PayloadGenerator
+ * implementation.
+ *
+ * Example spec:
+ * {
+ * "type": "timestampRandom",
+ * "histogramMaxMs": 10000,
+ * "histogramMinMs": 0,
+ * "histogramStepMs": 1
+ * }
+ *
+ * This will track total E2E latency up to 10 seconds, using 1ms resolution
and a timestamp size of 8 bytes.
+ */
+
+public class TimestampRecordProcessor implements RecordProcessor {
+ private final int histogramMaxMs;
+ private final int histogramMinMs;
+ private final int histogramStepMs;
+ private final ByteBuffer buffer;
+ private final Histogram histogram;
+
+ private final Logger log =
LoggerFactory.getLogger(TimestampRecordProcessor.class);
+
+ final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
+
+ @JsonCreator
+ public TimestampRecordProcessor(@JsonProperty("histogramMaxMs") int
histogramMaxMs,
+ @JsonProperty("histogramMinMs") int
histogramMinMs,
+ @JsonProperty("histogramStepMs") int
histogramStepMs) {
+ this.histogramMaxMs = histogramMaxMs;
+ this.histogramMinMs = histogramMinMs;
+ this.histogramStepMs = histogramStepMs;
+ this.histogram = new Histogram((histogramMaxMs - histogramMinMs) /
histogramStepMs);
+ buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+ }
+
+ @JsonProperty
+ public int histogramMaxMs() {
+ return histogramMaxMs;
+ }
+
+ @JsonProperty
+ public int histogramMinMs() {
+ return histogramMinMs;
+ }
+
+ @JsonProperty
+ public int histogramStepMs() {
+ return histogramStepMs;
+ }
+
+ private void putHistogram(long latency) {
+ histogram.add(Long.max(0L, (latency - histogramMinMs) /
histogramStepMs));
+ }
+
+ @Override
+ public synchronized void processRecords(ConsumerRecords<byte[], byte[]>
consumerRecords) {
+ // Save the current time to prevent skew by processing time.
+ long curTime = Time.SYSTEM.milliseconds();
+ for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+ try {
+ buffer.clear();
+ buffer.put(record.value(), 0, Long.BYTES);
+ buffer.rewind();
+ putHistogram(curTime - buffer.getLong());
+ } catch (RuntimeException e) {
+ log.error("Error in processRecords:", e);
+ }
+ }
+ }
+
+ @Override
+ public JsonNode processorStatus() {
+ Histogram.Summary summary = histogram.summarize(PERCENTILES);
+ StatusData statusData = new StatusData(
+ summary.average() * histogramStepMs + histogramMinMs,
+ summary.percentiles().get(0).value() * histogramStepMs +
histogramMinMs,
+ summary.percentiles().get(1).value() * histogramStepMs +
histogramMinMs,
+ summary.percentiles().get(2).value() * histogramStepMs +
histogramMinMs);
+ return JsonUtil.JSON_SERDE.valueToTree(statusData);
+ }
+
+ private static class StatusData {
+ private final float averageLatencyMs;
+ private final int p50LatencyMs;
+ private final int p95LatencyMs;
+ private final int p99LatencyMs;
+
+ /**
+ * The percentiles to use when calculating the histogram data.
+ * These should match up with the p50LatencyMs, p95LatencyMs, etc.
fields.
+ */
+ final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
+
+ @JsonCreator
+ StatusData(@JsonProperty("averageLatencyMs") float averageLatencyMs,
+ @JsonProperty("p50LatencyMs") int p50latencyMs,
+ @JsonProperty("p95LatencyMs") int p95latencyMs,
+ @JsonProperty("p99LatencyMs") int p99latencyMs) {
+ this.averageLatencyMs = averageLatencyMs;
+ this.p50LatencyMs = p50latencyMs;
+ this.p95LatencyMs = p95latencyMs;
+ this.p99LatencyMs = p99latencyMs;
+ }
+
+ @JsonProperty
+ public float averageLatencyMs() {
+ return averageLatencyMs;
+ }
+
+ @JsonProperty
+ public int p50LatencyMs() {
+ return p50LatencyMs;
+ }
+
+ @JsonProperty
+ public int p95LatencyMs() {
+ return p95LatencyMs;
+ }
+
+ @JsonProperty
+ public int p99LatencyMs() {
+ return p99LatencyMs;
+ }
+ }
+}
diff --git
a/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
b/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
index 59f76d2..a0e3eb0 100644
---
a/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
+++
b/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
+import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -69,6 +70,7 @@ public class ConsumeBenchSpecTest {
private ConsumeBenchSpec consumeBenchSpec(List<String> activeTopics) {
return new ConsumeBenchSpec(0, 0, "node", "localhost",
123, 1234, "cg-1",
- Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap(), 1, activeTopics);
+ Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap(), 1,
+ Optional.empty(), activeTopics);
}
}