This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2c2b30d MINOR: Add RandomComponentPayloadGenerator and update Trogdor documentation (#7103) 2c2b30d is described below commit 2c2b30d96b7d30037d3ee69ebbf985cd88557af9 Author: jolshan <jols...@confluent.io> AuthorDate: Wed Jul 31 14:00:49 2019 -0700 MINOR: Add RandomComponentPayloadGenerator and update Trogdor documentation (#7103) Add a new RandomComponentPayloadGenerator that gives a payload based on random selection of another PayloadGenerator. Additionally, add an example that uses a non-default PayloadGenerator configuration to TROGDOR.md. Reviewers: Colin P. McCabe <cmcc...@apache.org> --- TROGDOR.md | 30 ++++++ .../kafka/trogdor/workload/PayloadGenerator.java | 3 +- .../kafka/trogdor/workload/RandomComponent.java | 49 +++++++++ .../workload/RandomComponentPayloadGenerator.java | 114 +++++++++++++++++++++ .../trogdor/workload/PayloadGeneratorTest.java | 97 +++++++++++++++++- 5 files changed, 290 insertions(+), 3 deletions(-) diff --git a/TROGDOR.md b/TROGDOR.md index ad8d8af..3891857 100644 --- a/TROGDOR.md +++ b/TROGDOR.md @@ -87,6 +87,36 @@ The task specification is usually written as JSON. For example, this task speci "durationMs": 30000, "partitions": [["node1", "node2"], ["node3"]] } + +This task runs a simple ProduceBench test on a cluster with one producer node, 5 topics, and 10,000 messages per second. +The keys are generated sequentially and the configured partitioner (DefaultPartitioner) is used. + + { + "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec", + "durationMs": 10000000, + "producerNode": "node0", + "bootstrapServers": "localhost:9092", + "targetMessagesPerSec": 10000, + "maxMessages": 50000, + "activeTopics": { + "foo[1-3]": { + "numPartitions": 10, + "replicationFactor": 1 + } + }, + "inactiveTopics": { + "foo[4-5]": { + "numPartitions": 10, + "replicationFactor": 1 + } + }, + "keyGenerator": { + "type": "sequential", + "size": 8, + "offset": 1 + }, + "useConfiguredPartitioner": true + } Tasks are submitted to the coordinator. Once the coordinator determines that it is time for the task to start, it creates workers on agent processes. The workers run until the task is done. diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java index 3c574ba..b06ba01 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java @@ -34,7 +34,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; @JsonSubTypes.Type(value = ConstantPayloadGenerator.class, name = "constant"), @JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name = "sequential"), @JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name = "uniformRandom"), - @JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null") + @JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null"), + @JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name = "randomComponent") }) public interface PayloadGenerator { /** diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RandomComponent.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RandomComponent.java new file mode 100644 index 0000000..b5973a8 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RandomComponent.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Contains a percent value represented as an integer between 1 and 100 and a PayloadGenerator to specify + * how often that PayloadGenerator should be used. + */ +public class RandomComponent { + private final int percent; + private final PayloadGenerator component; + + + @JsonCreator + public RandomComponent(@JsonProperty("percent") int percent, + @JsonProperty("component") PayloadGenerator component) { + this.percent = percent; + this.component = component; + } + + @JsonProperty + public int percent() { + return percent; + } + + @JsonProperty + public PayloadGenerator component() { + return component; + } +} + diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RandomComponentPayloadGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RandomComponentPayloadGenerator.java new file mode 100644 index 0000000..be50a44 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RandomComponentPayloadGenerator.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + + +/** + * A PayloadGenerator which generates pseudo-random payloads based on other PayloadGenerators. + * + * Given a seed and non-null list of RandomComponents, RandomComponentPayloadGenerator + * will use any given generator in its list of components a percentage of the time based on the + * percent field in the RandomComponent. These percent fields must be integers greater than 0 + * and together add up to 100. The payloads generated can be reproduced from run to run. + * + * An example of how to include this generator in a Trogdor taskSpec is shown below. + * #{@code + * "keyGenerator": { + * "type": "randomComponent", + * "seed": 456, + * "components": [ + * { + * "percent": 50, + * "component": { + * "type": "null" + * } + * }, + * { + * "percent": 50, + * "component": { + * "type": "uniformRandom", + * "size": 4, + * "seed": 123, + * "padding": 0 + * } + * } + * ] + * } + * } + */ +public class RandomComponentPayloadGenerator implements PayloadGenerator { + private final long seed; + private final List<RandomComponent> components; + private final Random random = new Random(); + + @JsonCreator + public RandomComponentPayloadGenerator(@JsonProperty("seed") long seed, + @JsonProperty("components") List<RandomComponent> components) { + this.seed = seed; + if (components == null || components.isEmpty()) { + throw new IllegalArgumentException("Components must be a specified, non-empty list of RandomComponents."); + } + int sum = 0; + for (RandomComponent component : components) { + if (component.percent() < 1) { + throw new IllegalArgumentException("Percent value must be greater than zero."); + } + sum += component.percent(); + } + if (sum != 100) { + throw new IllegalArgumentException("Components must be a list of RandomComponents such that the percent fields sum to 100"); + } + this.components = new ArrayList<>(components); + } + + @JsonProperty + public long seed() { + return seed; + } + + @JsonProperty + public List<RandomComponent> components() { + return components; + } + + @Override + public byte[] generate(long position) { + int randPercent; + synchronized (random) { + random.setSeed(seed + position); + randPercent = random.nextInt(100); + } + int curPercent = 0; + RandomComponent com = components.get(0); + for (RandomComponent component : components) { + curPercent += component.percent(); + if (curPercent > randPercent) { + com = component; + break; + } + } + return com.component().generate(position); + } +} diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java index 0909dc0..9ee654b 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java @@ -24,10 +24,13 @@ import org.junit.rules.Timeout; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; +import java.util.ArrayList; +import java.util.List; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; - +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; public class PayloadGeneratorTest { @Rule @@ -104,7 +107,11 @@ public class PayloadGeneratorTest { byte[] val = generator.generate(123); generator.generate(456); byte[] val2 = generator.generate(123); - assertArrayEquals(val, val2); + if (val == null) { + assertNull(val2); + } else { + assertArrayEquals(val, val2); + } } @Test @@ -123,6 +130,92 @@ public class PayloadGeneratorTest { assertArrayEquals(val1End, val2End); assertArrayEquals(val1End, val3End); } + + @Test + public void testRandomComponentPayloadGenerator() { + NullPayloadGenerator nullGenerator = new NullPayloadGenerator(); + RandomComponent nullConfig = new RandomComponent(50, nullGenerator); + + UniformRandomPayloadGenerator uniformGenerator = + new UniformRandomPayloadGenerator(5, 123, 0); + RandomComponent uniformConfig = new RandomComponent(50, uniformGenerator); + + SequentialPayloadGenerator sequentialGenerator = + new SequentialPayloadGenerator(4, 10); + RandomComponent sequentialConfig = new RandomComponent(75, sequentialGenerator); + + ConstantPayloadGenerator constantGenerator = + new ConstantPayloadGenerator(4, new byte[0]); + RandomComponent constantConfig = new RandomComponent(25, constantGenerator); + + List<RandomComponent> components1 = new ArrayList<>(Arrays.asList(nullConfig, uniformConfig)); + List<RandomComponent> components2 = new ArrayList<>(Arrays.asList(sequentialConfig, constantConfig)); + byte[] expected = new byte[4]; + + PayloadIterator iter = new PayloadIterator( + new RandomComponentPayloadGenerator(4, components1)); + int notNull = 0; + int isNull = 0; + while (notNull < 1000 || isNull < 1000) { + byte[] cur = iter.next(); + if (cur == null) { + isNull++; + } else { + notNull++; + } + } + + iter = new PayloadIterator( + new RandomComponentPayloadGenerator(123, components2)); + int isZeroBytes = 0; + int isNotZeroBytes = 0; + while (isZeroBytes < 500 || isNotZeroBytes < 1500) { + byte[] cur = iter.next(); + if (Arrays.equals(expected, cur)) { + isZeroBytes++; + } else { + isNotZeroBytes++; + } + } + + RandomComponent uniformConfig2 = new RandomComponent(25, uniformGenerator); + RandomComponent sequentialConfig2 = new RandomComponent(25, sequentialGenerator); + RandomComponent nullConfig2 = new RandomComponent(25, nullGenerator); + + List<RandomComponent> components3 = new ArrayList<>(Arrays.asList(sequentialConfig2, uniformConfig2, nullConfig)); + List<RandomComponent> components4 = new ArrayList<>(Arrays.asList(uniformConfig2, sequentialConfig2, constantConfig, nullConfig2)); + + testReproducible(new RandomComponentPayloadGenerator(4, components1)); + testReproducible(new RandomComponentPayloadGenerator(123, components2)); + testReproducible(new RandomComponentPayloadGenerator(50, components3)); + testReproducible(new RandomComponentPayloadGenerator(0, components4)); + } + + @Test + public void testRandomComponentPayloadGeneratorErrors() { + NullPayloadGenerator nullGenerator = new NullPayloadGenerator(); + RandomComponent nullConfig = new RandomComponent(25, nullGenerator); + UniformRandomPayloadGenerator uniformGenerator = + new UniformRandomPayloadGenerator(5, 123, 0); + RandomComponent uniformConfig = new RandomComponent(25, uniformGenerator); + ConstantPayloadGenerator constantGenerator = + new ConstantPayloadGenerator(4, new byte[0]); + RandomComponent constantConfig = new RandomComponent(-25, constantGenerator); + + List<RandomComponent> components1 = new ArrayList<>(Arrays.asList(nullConfig, uniformConfig)); + List<RandomComponent> components2 = new ArrayList<>(Arrays.asList( + nullConfig, constantConfig, uniformConfig, nullConfig, uniformConfig, uniformConfig)); + + assertThrows(IllegalArgumentException.class, () -> { + new PayloadIterator(new RandomComponentPayloadGenerator(1, new ArrayList<>())); + }); + assertThrows(IllegalArgumentException.class, () -> { + new PayloadIterator(new RandomComponentPayloadGenerator(13, components2)); + }); + assertThrows(IllegalArgumentException.class, () -> { + new PayloadIterator(new RandomComponentPayloadGenerator(123, components1)); + }); + } @Test public void testPayloadIterator() {