This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2c2b30d  MINOR: Add RandomComponentPayloadGenerator and update Trogdor 
documentation (#7103)
2c2b30d is described below

commit 2c2b30d96b7d30037d3ee69ebbf985cd88557af9
Author: jolshan <jols...@confluent.io>
AuthorDate: Wed Jul 31 14:00:49 2019 -0700

    MINOR: Add RandomComponentPayloadGenerator and update Trogdor documentation 
(#7103)
    
    Add a new RandomComponentPayloadGenerator that gives a payload based on 
random selection of another PayloadGenerator.  Additionally, add an example 
that uses a non-default PayloadGenerator configuration to TROGDOR.md.
    
    Reviewers: Colin P. McCabe <cmcc...@apache.org>
---
 TROGDOR.md                                         |  30 ++++++
 .../kafka/trogdor/workload/PayloadGenerator.java   |   3 +-
 .../kafka/trogdor/workload/RandomComponent.java    |  49 +++++++++
 .../workload/RandomComponentPayloadGenerator.java  | 114 +++++++++++++++++++++
 .../trogdor/workload/PayloadGeneratorTest.java     |  97 +++++++++++++++++-
 5 files changed, 290 insertions(+), 3 deletions(-)

diff --git a/TROGDOR.md b/TROGDOR.md
index ad8d8af..3891857 100644
--- a/TROGDOR.md
+++ b/TROGDOR.md
@@ -87,6 +87,36 @@ The task specification is usually written as JSON.  For 
example, this task speci
         "durationMs": 30000,
         "partitions": [["node1", "node2"], ["node3"]]
     }
+    
+This task runs a simple ProduceBench test on a cluster with one producer node, 
5 topics, and 10,000 messages per second. 
+The keys are generated sequentially and the configured partitioner 
(DefaultPartitioner) is used. 
+
+    {
+        "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
+        "durationMs": 10000000,
+        "producerNode": "node0",
+        "bootstrapServers": "localhost:9092",
+        "targetMessagesPerSec": 10000,
+        "maxMessages": 50000,
+        "activeTopics": {
+            "foo[1-3]": {
+                "numPartitions": 10,
+                "replicationFactor": 1
+            }
+        },
+        "inactiveTopics": {
+             "foo[4-5]": {
+                 "numPartitions": 10,
+                 "replicationFactor": 1
+             }
+        },
+        "keyGenerator": {
+             "type": "sequential",
+             "size": 8,
+             "offset": 1
+        },
+        "useConfiguredPartitioner": true
+     }
 
 Tasks are submitted to the coordinator.  Once the coordinator determines that 
it is time for the task to start, it creates workers on agent processes.  The 
workers run until the task is done.
 
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
index 3c574ba..b06ba01 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java
@@ -34,7 +34,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
     @JsonSubTypes.Type(value = ConstantPayloadGenerator.class, name = 
"constant"),
     @JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name = 
"sequential"),
     @JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name = 
"uniformRandom"),
-    @JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null")
+    @JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null"),
+    @JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name = 
"randomComponent")
     })
 public interface PayloadGenerator {
     /**
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/RandomComponent.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RandomComponent.java
new file mode 100644
index 0000000..b5973a8
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RandomComponent.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Contains a percent value represented as an integer between 1 and 100 and a 
PayloadGenerator to specify
+ * how often that PayloadGenerator should be used. 
+ */
+public class RandomComponent {
+    private final int percent;
+    private final PayloadGenerator component;
+    
+
+    @JsonCreator
+    public RandomComponent(@JsonProperty("percent") int percent,
+                           @JsonProperty("component") PayloadGenerator 
component) {
+        this.percent = percent;
+        this.component = component;
+    }
+
+    @JsonProperty
+    public int percent() {
+        return percent;
+    }
+
+    @JsonProperty
+    public PayloadGenerator component() {
+        return component;
+    }
+}
+
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/RandomComponentPayloadGenerator.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RandomComponentPayloadGenerator.java
new file mode 100644
index 0000000..be50a44
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RandomComponentPayloadGenerator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+/**
+ * A PayloadGenerator which generates pseudo-random payloads based on other 
PayloadGenerators.
+ *
+ * Given a seed and non-null list of RandomComponents, 
RandomComponentPayloadGenerator
+ * will use any given generator in its list of components a percentage of the 
time based on the 
+ * percent field in the RandomComponent. These percent fields must be integers 
greater than 0 
+ * and together add up to 100. The payloads generated can be reproduced from 
run to run.
+ * 
+ * An example of how to include this generator in a Trogdor taskSpec is shown 
below.
+ * #{@code
+ *    "keyGenerator": {
+ *        "type": "randomComponent",
+ *        "seed": 456,
+ *        "components": [
+ *          {
+ *            "percent": 50,
+ *            "component": {
+ *              "type": "null"
+ *            }
+ *          },
+ *          {
+ *            "percent": 50,
+ *            "component": {
+ *              "type": "uniformRandom",
+ *              "size": 4,
+ *              "seed": 123,
+ *              "padding": 0
+ *            }
+ *          }
+ *        ]
+ *    }
+ * }
+ */
+public class RandomComponentPayloadGenerator implements PayloadGenerator {
+    private final long seed;
+    private final List<RandomComponent> components;
+    private final Random random = new Random();
+
+    @JsonCreator
+    public RandomComponentPayloadGenerator(@JsonProperty("seed") long seed,
+                                           @JsonProperty("components") 
List<RandomComponent> components) {
+        this.seed = seed;
+        if (components == null || components.isEmpty()) {
+            throw new IllegalArgumentException("Components must be a 
specified, non-empty list of RandomComponents.");
+        }
+        int sum = 0;
+        for (RandomComponent component : components) {
+            if (component.percent() < 1) {
+                throw new IllegalArgumentException("Percent value must be 
greater than zero.");
+            }
+            sum += component.percent();
+        }
+        if (sum != 100) {
+            throw new IllegalArgumentException("Components must be a list of 
RandomComponents such that the percent fields sum to 100");
+        }
+        this.components = new ArrayList<>(components);
+    }
+
+    @JsonProperty
+    public long seed() {
+        return seed;
+    }
+
+    @JsonProperty
+    public List<RandomComponent> components() {
+        return components;
+    }
+
+    @Override
+    public byte[] generate(long position) {
+        int randPercent;
+        synchronized (random) {
+            random.setSeed(seed + position);
+            randPercent = random.nextInt(100);
+        }
+        int curPercent = 0;
+        RandomComponent com = components.get(0);
+        for (RandomComponent component : components) {
+            curPercent += component.percent();
+            if (curPercent > randPercent) {
+                com = component;
+                break;
+            }
+        }
+        return com.component().generate(position);
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java
 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java
index 0909dc0..9ee654b 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java
@@ -24,10 +24,13 @@ import org.junit.rules.Timeout;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 
 public class PayloadGeneratorTest {
     @Rule
@@ -104,7 +107,11 @@ public class PayloadGeneratorTest {
         byte[] val = generator.generate(123);
         generator.generate(456);
         byte[] val2 = generator.generate(123);
-        assertArrayEquals(val, val2);
+        if (val == null) {
+            assertNull(val2);
+        } else {
+            assertArrayEquals(val, val2);
+        }
     }
 
     @Test
@@ -123,6 +130,92 @@ public class PayloadGeneratorTest {
         assertArrayEquals(val1End, val2End);
         assertArrayEquals(val1End, val3End);
     }
+    
+    @Test
+    public void testRandomComponentPayloadGenerator() {
+        NullPayloadGenerator nullGenerator = new NullPayloadGenerator();
+        RandomComponent nullConfig = new RandomComponent(50, nullGenerator);
+        
+        UniformRandomPayloadGenerator uniformGenerator =
+            new UniformRandomPayloadGenerator(5, 123, 0);
+        RandomComponent uniformConfig = new RandomComponent(50, 
uniformGenerator);
+        
+        SequentialPayloadGenerator sequentialGenerator =
+            new SequentialPayloadGenerator(4, 10);
+        RandomComponent sequentialConfig = new RandomComponent(75, 
sequentialGenerator);
+        
+        ConstantPayloadGenerator constantGenerator =
+            new ConstantPayloadGenerator(4, new byte[0]);
+        RandomComponent constantConfig = new RandomComponent(25, 
constantGenerator);
+        
+        List<RandomComponent> components1 = new 
ArrayList<>(Arrays.asList(nullConfig, uniformConfig));
+        List<RandomComponent> components2 = new 
ArrayList<>(Arrays.asList(sequentialConfig, constantConfig));
+        byte[] expected = new byte[4];
+
+        PayloadIterator iter = new PayloadIterator(
+            new RandomComponentPayloadGenerator(4, components1));
+        int notNull = 0;
+        int isNull = 0;
+        while (notNull < 1000 || isNull < 1000) {
+            byte[] cur = iter.next();
+            if (cur == null) {
+                isNull++;
+            } else {
+                notNull++;
+            }
+        }
+        
+        iter = new PayloadIterator(
+            new RandomComponentPayloadGenerator(123, components2));
+        int isZeroBytes = 0;
+        int isNotZeroBytes = 0;
+        while (isZeroBytes < 500 || isNotZeroBytes < 1500) {
+            byte[] cur = iter.next();
+            if (Arrays.equals(expected, cur)) {
+                isZeroBytes++;
+            } else {
+                isNotZeroBytes++;
+            }
+        }
+        
+        RandomComponent uniformConfig2 = new RandomComponent(25, 
uniformGenerator);
+        RandomComponent sequentialConfig2 = new RandomComponent(25, 
sequentialGenerator);
+        RandomComponent nullConfig2 = new RandomComponent(25, nullGenerator);
+        
+        List<RandomComponent> components3 = new 
ArrayList<>(Arrays.asList(sequentialConfig2, uniformConfig2, nullConfig));
+        List<RandomComponent> components4 = new 
ArrayList<>(Arrays.asList(uniformConfig2, sequentialConfig2, constantConfig, 
nullConfig2));
+        
+        testReproducible(new RandomComponentPayloadGenerator(4, components1));
+        testReproducible(new RandomComponentPayloadGenerator(123, 
components2));
+        testReproducible(new RandomComponentPayloadGenerator(50, components3));
+        testReproducible(new RandomComponentPayloadGenerator(0, components4));
+    } 
+    
+    @Test
+    public void testRandomComponentPayloadGeneratorErrors() {
+        NullPayloadGenerator nullGenerator = new NullPayloadGenerator();
+        RandomComponent nullConfig = new RandomComponent(25, nullGenerator);
+        UniformRandomPayloadGenerator uniformGenerator =
+            new UniformRandomPayloadGenerator(5, 123, 0);
+        RandomComponent uniformConfig = new RandomComponent(25, 
uniformGenerator);
+        ConstantPayloadGenerator constantGenerator =
+            new ConstantPayloadGenerator(4, new byte[0]);
+        RandomComponent constantConfig = new RandomComponent(-25, 
constantGenerator);
+        
+        List<RandomComponent> components1 = new 
ArrayList<>(Arrays.asList(nullConfig, uniformConfig));
+        List<RandomComponent> components2 = new ArrayList<>(Arrays.asList(
+             nullConfig, constantConfig, uniformConfig, nullConfig, 
uniformConfig, uniformConfig));
+     
+        assertThrows(IllegalArgumentException.class, () -> {
+            new PayloadIterator(new RandomComponentPayloadGenerator(1, new 
ArrayList<>()));
+        });
+        assertThrows(IllegalArgumentException.class, () -> {
+            new PayloadIterator(new RandomComponentPayloadGenerator(13, 
components2));
+        });
+        assertThrows(IllegalArgumentException.class, () -> {
+            new PayloadIterator(new RandomComponentPayloadGenerator(123, 
components1));
+        });
+    }  
 
     @Test
     public void testPayloadIterator() {

Reply via email to