[ https://issues.apache.org/jira/browse/KAFKA-7597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701009#comment-16701009 ]
ASF GitHub Bot commented on KAFKA-7597: --------------------------------------- cmccabe closed pull request #5885: KAFKA-7597: Make Trogdor ProduceBenchWorker support transactions URL: https://github.com/apache/kafka/pull/5885 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index 4d514dfcf97..5ce648adf77 100644 --- a/build.gradle +++ b/build.gradle @@ -565,6 +565,7 @@ project(':core') { dependencies { compile project(':clients') compile libs.jacksonDatabind + compile libs.jacksonJDK8Datatypes compile libs.joptSimple compile libs.metrics compile libs.scalaLibrary @@ -830,6 +831,7 @@ project(':clients') { compile libs.snappy compile libs.slf4jApi compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing + compileOnly libs.jacksonJDK8Datatypes jacksonDatabindConfig libs.jacksonDatabind // to publish as provided scope dependency. @@ -839,6 +841,7 @@ project(':clients') { testRuntime libs.slf4jlog4j testRuntime libs.jacksonDatabind + testRuntime libs.jacksonJDK8Datatypes } task determineCommitId { @@ -918,6 +921,7 @@ project(':tools') { compile project(':log4j-appender') compile libs.argparse4j compile libs.jacksonDatabind + compile libs.jacksonJDK8Datatypes compile libs.slf4jApi compile libs.jacksonJaxrsJsonProvider @@ -1347,6 +1351,7 @@ project(':connect:json') { dependencies { compile project(':connect:api') compile libs.jacksonDatabind + compile libs.jacksonJDK8Datatypes compile libs.slf4jApi testCompile libs.easymock diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 7dd3604db08..59f56fcd4ab 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -103,6 +103,7 @@ libs += [ bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix", easymock: "org.easymock:easymock:$versions.easymock", jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson", + jacksonJDK8Datatypes: "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson", jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson", jaxbApi: "javax.xml.bind:jaxb-api:$versions.jaxb", jaxrsApi: "javax.ws.rs:javax.ws.rs-api:$versions.jaxrs", diff --git a/tests/bin/trogdor-run-transactional-produce-bench.sh b/tests/bin/trogdor-run-transactional-produce-bench.sh new file mode 100755 index 00000000000..fd5ff0a01f2 --- /dev/null +++ b/tests/bin/trogdor-run-transactional-produce-bench.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +COORDINATOR_ENDPOINT="localhost:8889" +TASK_ID="produce_bench_$RANDOM" +TASK_SPEC=$( +cat <<EOF +{ + "id": "$TASK_ID", + "spec": { + "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec", + "durationMs": 10000000, + "producerNode": "node0", + "bootstrapServers": "localhost:9092", + "targetMessagesPerSec": 100, + "maxMessages": 500, + "transactionGenerator" : { + "type" : "uniform", + "messagesPerTransaction" : 50 + }, + "activeTopics": { + "foo[1-3]": { + "numPartitions": 3, + "replicationFactor": 1 + } + }, + "inactiveTopics": { + "foo[4-5]": { + "numPartitions": 3, + "replicationFactor": 1 + } + } + } +} +EOF +) +./bin/trogdor.sh client --create-task "${TASK_SPEC}" "${COORDINATOR_ENDPOINT}" +echo "\$TASK_ID = $TASK_ID" diff --git a/tests/kafkatest/services/trogdor/produce_bench_workload.py b/tests/kafkatest/services/trogdor/produce_bench_workload.py index cf6a962b055..9afc81462ae 100644 --- a/tests/kafkatest/services/trogdor/produce_bench_workload.py +++ b/tests/kafkatest/services/trogdor/produce_bench_workload.py @@ -21,7 +21,8 @@ class ProduceBenchWorkloadSpec(TaskSpec): def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers, target_messages_per_sec, max_messages, producer_conf, admin_client_conf, - common_client_conf, inactive_topics, active_topics): + common_client_conf, inactive_topics, active_topics, + transaction_generator=None): super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms) self.message["class"] = "org.apache.kafka.trogdor.workload.ProduceBenchSpec" self.message["producerNode"] = producer_node @@ -29,6 +30,7 @@ def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers, self.message["targetMessagesPerSec"] = target_messages_per_sec self.message["maxMessages"] = max_messages self.message["producerConf"] = producer_conf + self.message["transactionGenerator"] = transaction_generator self.message["adminClientConf"] = admin_client_conf self.message["commonClientConf"] = common_client_conf self.message["inactiveTopics"] = inactive_topics diff --git a/tests/kafkatest/tests/core/produce_bench_test.py b/tests/kafkatest/tests/core/produce_bench_test.py index 125ee941eb5..a316520335b 100644 --- a/tests/kafkatest/tests/core/produce_bench_test.py +++ b/tests/kafkatest/tests/core/produce_bench_test.py @@ -31,6 +31,8 @@ def __init__(self, test_context): self.workload_service = ProduceBenchWorkloadService(test_context, self.kafka) self.trogdor = TrogdorService(context=self.test_context, client_services=[self.kafka, self.workload_service]) + self.active_topics = {"produce_bench_topic[0-1]": {"numPartitions": 1, "replicationFactor": 3}} + self.inactive_topics = {"produce_bench_topic[2-9]": {"numPartitions": 1, "replicationFactor": 3}} def setUp(self): self.trogdor.start() @@ -43,8 +45,6 @@ def teardown(self): self.zk.stop() def test_produce_bench(self): - active_topics={"produce_bench_topic[0-1]":{"numPartitions":1, "replicationFactor":3}} - inactive_topics={"produce_bench_topic[2-9]":{"numPartitions":1, "replicationFactor":3}} spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS, self.workload_service.producer_node, self.workload_service.bootstrap_servers, @@ -53,8 +53,29 @@ def test_produce_bench(self): producer_conf={}, admin_client_conf={}, common_client_conf={}, - inactive_topics=inactive_topics, - active_topics=active_topics) + inactive_topics=self.inactive_topics, + active_topics=self.active_topics) + workload1 = self.trogdor.create_task("workload1", spec) + workload1.wait_for_done(timeout_sec=360) + tasks = self.trogdor.tasks() + self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) + + def test_produce_bench_transactions(self): + spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS, + self.workload_service.producer_node, + self.workload_service.bootstrap_servers, + target_messages_per_sec=1000, + max_messages=100000, + producer_conf={}, + admin_client_conf={}, + common_client_conf={}, + inactive_topics=self.inactive_topics, + active_topics=self.active_topics, + transaction_generator={ + # 10 transactions with 10k messages + "type": "uniform", + "messagesPerTransaction": "10000" + }) workload1 = self.trogdor.create_task("workload1", spec) workload1.wait_for_done(timeout_sec=360) tasks = self.trogdor.tasks() diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java b/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java index 70193c3beef..ad90ffc6e84 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; /** * Utilities for working with JSON. @@ -33,6 +34,7 @@ JSON_SERDE = new ObjectMapper(); JSON_SERDE.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); JSON_SERDE.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true); + JSON_SERDE.registerModule(new Jdk8Module()); JSON_SERDE.setSerializationInclusion(JsonInclude.Include.NON_EMPTY); } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java index 30878bf303f..c0bbd7eb012 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java @@ -26,10 +26,39 @@ import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Set; /** * The specification for a benchmark that produces messages to a set of topics. + * + * To configure a transactional producer, a #{@link TransactionGenerator} must be passed in. + * Said generator works in lockstep with the producer by instructing it what action to take next in regards to a transaction. + * + * An example JSON representation which will result in a producer that creates three topics (foo1, foo2, foo3) + * with three partitions each and produces to them: + * #{@code + * { + * "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec", + * "durationMs": 10000000, + * "producerNode": "node0", + * "bootstrapServers": "localhost:9092", + * "targetMessagesPerSec": 10, + * "maxMessages": 100, + * "activeTopics": { + * "foo[1-3]": { + * "numPartitions": 3, + * "replicationFactor": 1 + * } + * }, + * "inactiveTopics": { + * "foo[4-5]": { + * "numPartitions": 3, + * "replicationFactor": 1 + * } + * } + * } + * } */ public class ProduceBenchSpec extends TaskSpec { private final String producerNode; @@ -38,6 +67,7 @@ private final int maxMessages; private final PayloadGenerator keyGenerator; private final PayloadGenerator valueGenerator; + private final Optional<TransactionGenerator> transactionGenerator; private final Map<String, String> producerConf; private final Map<String, String> adminClientConf; private final Map<String, String> commonClientConf; @@ -53,6 +83,7 @@ public ProduceBenchSpec(@JsonProperty("startMs") long startMs, @JsonProperty("maxMessages") int maxMessages, @JsonProperty("keyGenerator") PayloadGenerator keyGenerator, @JsonProperty("valueGenerator") PayloadGenerator valueGenerator, + @JsonProperty("transactionGenerator") Optional<TransactionGenerator> txGenerator, @JsonProperty("producerConf") Map<String, String> producerConf, @JsonProperty("commonClientConf") Map<String, String> commonClientConf, @JsonProperty("adminClientConf") Map<String, String> adminClientConf, @@ -67,6 +98,7 @@ public ProduceBenchSpec(@JsonProperty("startMs") long startMs, new SequentialPayloadGenerator(4, 0) : keyGenerator; this.valueGenerator = valueGenerator == null ? new ConstantPayloadGenerator(512, new byte[0]) : valueGenerator; + this.transactionGenerator = txGenerator == null ? Optional.empty() : txGenerator; this.producerConf = configOrEmptyMap(producerConf); this.commonClientConf = configOrEmptyMap(commonClientConf); this.adminClientConf = configOrEmptyMap(adminClientConf); @@ -106,6 +138,11 @@ public PayloadGenerator valueGenerator() { return valueGenerator; } + @JsonProperty + public Optional<TransactionGenerator> transactionGenerator() { + return transactionGenerator; + } + @JsonProperty public Map<String, String> producerConf() { return producerConf; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java index dc749eb65a4..abf59768921 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java @@ -36,6 +36,7 @@ import org.apache.kafka.trogdor.common.WorkerUtils; import org.apache.kafka.trogdor.task.TaskWorker; import org.apache.kafka.trogdor.task.WorkerStatusTracker; +import org.apache.kafka.trogdor.workload.TransactionGenerator.TransactionAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,13 +44,16 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; public class ProduceBenchWorker implements TaskWorker { private static final Logger log = LoggerFactory.getLogger(ProduceBenchWorker.class); @@ -179,18 +183,33 @@ protected synchronized void delay(long amount) throws InterruptedException { private final PayloadIterator values; + private final Optional<TransactionGenerator> transactionGenerator; + private final Throttle throttle; + private Iterator<TopicPartition> partitionsIterator; + private Future<RecordMetadata> sendFuture; + private AtomicLong transactionsCommitted; + private boolean enableTransactions; + SendRecords(HashSet<TopicPartition> activePartitions) { this.activePartitions = activePartitions; + this.partitionsIterator = activePartitions.iterator(); this.histogram = new Histogram(5000); + + this.transactionGenerator = spec.transactionGenerator(); + this.enableTransactions = this.transactionGenerator.isPresent(); + this.transactionsCommitted = new AtomicLong(); + int perPeriod = WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS); this.statusUpdaterFuture = executor.scheduleWithFixedDelay( - new StatusUpdater(histogram), 30, 30, TimeUnit.SECONDS); + new StatusUpdater(histogram, transactionsCommitted), 30, 30, TimeUnit.SECONDS); + Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers()); - // add common client configs to producer properties, and then user-specified producer - // configs + if (enableTransactions) + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "produce-bench-transaction-id-" + UUID.randomUUID()); + // add common client configs to producer properties, and then user-specified producer configs WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.producerConf()); this.producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer()); this.keys = new PayloadIterator(spec.keyGenerator()); @@ -202,23 +221,29 @@ protected synchronized void delay(long amount) throws InterruptedException { public Void call() throws Exception { long startTimeMs = Time.SYSTEM.milliseconds(); try { - Future<RecordMetadata> future = null; try { - Iterator<TopicPartition> iter = activePartitions.iterator(); - for (int m = 0; m < spec.maxMessages(); m++) { - if (!iter.hasNext()) { - iter = activePartitions.iterator(); + if (enableTransactions) + producer.initTransactions(); + + int sentMessages = 0; + while (sentMessages < spec.maxMessages()) { + if (enableTransactions) { + boolean tookAction = takeTransactionAction(); + if (tookAction) + continue; } - TopicPartition partition = iter.next(); - ProducerRecord<byte[], byte[]> record = new ProducerRecord<>( - partition.topic(), partition.partition(), keys.next(), values.next()); - future = producer.send(record, - new SendRecordsCallback(this, Time.SYSTEM.milliseconds())); - throttle.increment(); + sendMessage(); + sentMessages++; } + if (enableTransactions) + takeTransactionAction(); // give the transactionGenerator a chance to commit if configured evenly + } catch (Exception e) { + if (enableTransactions) + producer.abortTransaction(); + throw e; } finally { - if (future != null) { - future.get(); + if (sendFuture != null) { + sendFuture.get(); } producer.close(); } @@ -226,7 +251,7 @@ public Void call() throws Exception { WorkerUtils.abort(log, "SendRecords", e, doneFuture); } finally { statusUpdaterFuture.cancel(false); - StatusData statusData = new StatusUpdater(histogram).update(); + StatusData statusData = new StatusUpdater(histogram, transactionsCommitted).update(); long curTimeMs = Time.SYSTEM.milliseconds(); log.info("Sent {} total record(s) in {} ms. status: {}", histogram.summarize().numSamples(), curTimeMs - startTimeMs, statusData); @@ -235,6 +260,42 @@ public Void call() throws Exception { return null; } + private boolean takeTransactionAction() { + boolean tookAction = true; + TransactionAction nextAction = transactionGenerator.get().nextAction(); + switch (nextAction) { + case BEGIN_TRANSACTION: + log.debug("Beginning transaction."); + producer.beginTransaction(); + break; + case COMMIT_TRANSACTION: + log.debug("Committing transaction."); + producer.commitTransaction(); + transactionsCommitted.getAndIncrement(); + break; + case ABORT_TRANSACTION: + log.debug("Aborting transaction."); + producer.abortTransaction(); + break; + case NO_OP: + tookAction = false; + break; + } + return tookAction; + } + + private void sendMessage() throws InterruptedException { + if (!partitionsIterator.hasNext()) + partitionsIterator = activePartitions.iterator(); + + TopicPartition partition = partitionsIterator.next(); + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>( + partition.topic(), partition.partition(), keys.next(), values.next()); + sendFuture = producer.send(record, + new SendRecordsCallback(this, Time.SYSTEM.milliseconds())); + throttle.increment(); + } + void recordDuration(long durationMs) { histogram.add(durationMs); } @@ -242,9 +303,11 @@ void recordDuration(long durationMs) { public class StatusUpdater implements Runnable { private final Histogram histogram; + private final AtomicLong transactionsCommitted; - StatusUpdater(Histogram histogram) { + StatusUpdater(Histogram histogram, AtomicLong transactionsCommitted) { this.histogram = histogram; + this.transactionsCommitted = transactionsCommitted; } @Override @@ -261,7 +324,8 @@ StatusData update() { StatusData statusData = new StatusData(summary.numSamples(), summary.average(), summary.percentiles().get(0).value(), summary.percentiles().get(1).value(), - summary.percentiles().get(2).value()); + summary.percentiles().get(2).value(), + transactionsCommitted.get()); status.update(JsonUtil.JSON_SERDE.valueToTree(statusData)); return statusData; } @@ -273,6 +337,7 @@ StatusData update() { private final int p50LatencyMs; private final int p95LatencyMs; private final int p99LatencyMs; + private final long transactionsCommitted; /** * The percentiles to use when calculating the histogram data. @@ -285,12 +350,14 @@ StatusData update() { @JsonProperty("averageLatencyMs") float averageLatencyMs, @JsonProperty("p50LatencyMs") int p50latencyMs, @JsonProperty("p95LatencyMs") int p95latencyMs, - @JsonProperty("p99LatencyMs") int p99latencyMs) { + @JsonProperty("p99LatencyMs") int p99latencyMs, + @JsonProperty("transactionsCommitted") long transactionsCommitted) { this.totalSent = totalSent; this.averageLatencyMs = averageLatencyMs; this.p50LatencyMs = p50latencyMs; this.p95LatencyMs = p95latencyMs; this.p99LatencyMs = p99latencyMs; + this.transactionsCommitted = transactionsCommitted; } @JsonProperty @@ -298,6 +365,11 @@ public long totalSent() { return totalSent; } + @JsonProperty + public long transactionsCommitted() { + return transactionsCommitted; + } + @JsonProperty public float averageLatencyMs() { return averageLatencyMs; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java new file mode 100644 index 00000000000..5ec47ec91c1 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +/** + * Generates actions that should be taken by a producer that uses transactions. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(value = UniformTransactionsGenerator.class, name = "uniform"), +}) +public interface TransactionGenerator { + enum TransactionAction { + BEGIN_TRANSACTION, COMMIT_TRANSACTION, ABORT_TRANSACTION, NO_OP + } + + /** + * Returns the next action that the producer should take in regards to transactions. + * This method should be called every time before a producer sends a message. + * This means that most of the time it should return #{@link TransactionAction#NO_OP} + * to signal the producer that its next step should be to send a message. + */ + TransactionAction nextAction(); +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformTransactionsGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformTransactionsGenerator.java new file mode 100644 index 00000000000..1fbfbc23381 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformTransactionsGenerator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A uniform transactions generator where every N records are grouped in a separate transaction + */ +public class UniformTransactionsGenerator implements TransactionGenerator { + + private final int messagesPerTransaction; + private int messagesInTransaction = -1; + + @JsonCreator + public UniformTransactionsGenerator(@JsonProperty("messagesPerTransaction") int messagesPerTransaction) { + if (messagesPerTransaction < 1) + throw new IllegalArgumentException("Cannot have less than one message per transaction."); + + this.messagesPerTransaction = messagesPerTransaction; + } + + @JsonProperty + public int messagesPerTransaction() { + return messagesPerTransaction; + } + + @Override + public synchronized TransactionAction nextAction() { + if (messagesInTransaction == -1) { + messagesInTransaction = 0; + return TransactionAction.BEGIN_TRANSACTION; + } + if (messagesInTransaction == messagesPerTransaction) { + messagesInTransaction = -1; + return TransactionAction.COMMIT_TRANSACTION; + } + + messagesInTransaction += 1; + return TransactionAction.NO_OP; + } +} diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java index 5e6ff81e28b..c324ec4c708 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.junit.Assert.assertNotNull; @@ -54,7 +55,7 @@ public void testDeserializationDoesNotProduceNulls() throws Exception { verify(new WorkerRunning(null, null, 0, null)); verify(new WorkerStopping(null, null, 0, null)); verify(new ProduceBenchSpec(0, 0, null, null, - 0, 0, null, null, null, null, null, null, null)); + 0, 0, null, null, Optional.empty(), null, null, null, null, null)); verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null, 0, null, null, 0)); verify(new TopicsSpec()); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Trogdor - Support transactions in ProduceBenchWorker > ---------------------------------------------------- > > Key: KAFKA-7597 > URL: https://issues.apache.org/jira/browse/KAFKA-7597 > Project: Kafka > Issue Type: Improvement > Reporter: Stanislav Kozlovski > Assignee: Stanislav Kozlovski > Priority: Minor > > The `ProduceBenchWorker` allows you to schedule a benchmark of a Kafka > Producer. > It would prove useful if we supported transactions in this producer, as to > allow benchmarks with transactions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)