Repository: samza
Updated Branches:
  refs/heads/master 5b953ac3e -> 12968cfb6


SAMZA-1666: Benchmark SystemProducer and SystemConsumer

* Tests to benchmark the performance of the system consumers and producers.
* Config to test the benchmark for the event hub system producer and consumer.

SystemConsumerBench and SystemProducerBench provides base generic 
implementation to test the benchmark for the system producers and consumers. 
Any new system that needs benchmark test needs a properties file.

The benchmark test itself is single threaded in the way it consumes and 
produces events. Scaling the benchmark tests right now involves running 
multiple processes of these tests in parallel.

Right now we just calculate the event rate, But in future we could create a 
logging metrics registry to hookup other metrics and log them in console along 
with event rate while the benchmark tests are being run.

Author: Srinivasulu Punuru <spun...@linkedin.com>

Reviewers: Jagadish <jagad...@apache.org>, Wei Song<ws...@linkedin.com>

Closes #473 from srinipunuru/benchmark.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/12968cfb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/12968cfb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/12968cfb

Branch: refs/heads/master
Commit: 12968cfb6024705d6412b9b9917897f0eb794c3e
Parents: 5b953ac
Author: Srinivasulu Punuru <spun...@linkedin.com>
Authored: Tue Apr 17 15:40:59 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Tue Apr 17 15:40:59 2018 -0700

----------------------------------------------------------------------
 samza-tools/config/bench-log4j.xml              |  35 +++++
 samza-tools/config/eh-bench.properties          |  26 ++++
 samza-tools/scripts/system-consumer-bench.sh    |  34 +++++
 .../scripts/system-consumer-with-samza-bench.sh |  34 +++++
 samza-tools/scripts/system-producer-bench.sh    |  34 +++++
 .../tools/benchmark/AbstractSamzaBench.java     | 153 +++++++++++++++++++
 .../benchmark/ConfigBasedSspGrouperFactory.java |  87 +++++++++++
 .../tools/benchmark/SystemConsumerBench.java    |  91 +++++++++++
 .../benchmark/SystemConsumerWithSamzaBench.java | 117 ++++++++++++++
 .../tools/benchmark/SystemProducerBench.java    | 124 +++++++++++++++
 10 files changed, 735 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/config/bench-log4j.xml
----------------------------------------------------------------------
diff --git a/samza-tools/config/bench-log4j.xml 
b/samza-tools/config/bench-log4j.xml
new file mode 100644
index 0000000..02f5ec8
--- /dev/null
+++ b/samza-tools/config/bench-log4j.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN" 
"log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
+  <appender name="fileAppender" class="org.apache.log4j.FileAppender">
+    <param name="File"   value="./eh-bench.log" />
+    <param name="Append" value="false" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p 
[%c{1}:%L] - %m%n"/>
+    </layout>
+  </appender>
+  <root>
+    <priority value ="info" />
+    <appender-ref ref="fileAppender" />
+  </root>
+</log4j:configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/config/eh-bench.properties
----------------------------------------------------------------------
diff --git a/samza-tools/config/eh-bench.properties 
b/samza-tools/config/eh-bench.properties
new file mode 100644
index 0000000..13b96a8
--- /dev/null
+++ b/samza-tools/config/eh-bench.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+systems.eventhub.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory
+systems.eventhub.stream.list=eh
+streams.eh.eventhubs.namespace=
+streams.eh.eventhubs.entitypath=
+sensitive.streams.eh.eventhubs.sas.keyname=
+sensitive.streams.eh.eventhubs.sas.token=
+streams.eh.eventhubs.consumer.group=
+streams.eh.samza.physical.name=
+streams.eh.samza.system=eventhub
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/scripts/system-consumer-bench.sh
----------------------------------------------------------------------
diff --git a/samza-tools/scripts/system-consumer-bench.sh 
b/samza-tools/scripts/system-consumer-bench.sh
new file mode 100755
index 0000000..01240c3
--- /dev/null
+++ b/samza-tools/scripts/system-consumer-bench.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ `uname` == 'Linux' ];
+then
+  base_dir=$(readlink -f $(dirname $0))
+else
+  base_dir=$(dirname $0)
+fi
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+    export 
LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/bench-log4j.xml"
+fi
+
+if [ "x$HEAP_OPTS" = "x" ]; then
+    export HEAP_OPTS="-Xmx1G -Xms1G"
+fi
+
+exec $base_dir/run-class.sh 
org.apache.samza.tools.benchmark.SystemConsumerBench "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/scripts/system-consumer-with-samza-bench.sh
----------------------------------------------------------------------
diff --git a/samza-tools/scripts/system-consumer-with-samza-bench.sh 
b/samza-tools/scripts/system-consumer-with-samza-bench.sh
new file mode 100755
index 0000000..a544133
--- /dev/null
+++ b/samza-tools/scripts/system-consumer-with-samza-bench.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ `uname` == 'Linux' ];
+then
+  base_dir=$(readlink -f $(dirname $0))
+else
+  base_dir=$(dirname $0)
+fi
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+    export 
LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/bench-log4j.xml"
+fi
+
+if [ "x$HEAP_OPTS" = "x" ]; then
+    export HEAP_OPTS="-Xmx1G -Xms1G"
+fi
+
+exec $base_dir/run-class.sh 
org.apache.samza.tools.benchmark.SystemConsumerWithSamzaBench "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/scripts/system-producer-bench.sh
----------------------------------------------------------------------
diff --git a/samza-tools/scripts/system-producer-bench.sh 
b/samza-tools/scripts/system-producer-bench.sh
new file mode 100755
index 0000000..3665820
--- /dev/null
+++ b/samza-tools/scripts/system-producer-bench.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if [ `uname` == 'Linux' ];
+then
+  base_dir=$(readlink -f $(dirname $0))
+else
+  base_dir=$(dirname $0)
+fi
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+    export 
LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/bench-log4j.xml"
+fi
+
+if [ "x$HEAP_OPTS" = "x" ]; then
+    export HEAP_OPTS="-Xmx1G -Xms1G"
+fi
+
+exec $base_dir/run-class.sh 
org.apache.samza.tools.benchmark.SystemProducerBench "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
new file mode 100644
index 0000000..9392aab
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/AbstractSamzaBench.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.tools.benchmark;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.testutil.ReflectionUtils;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.tools.CommandLineHelper;
+
+
+/**
+ * Base class for the samza benchmark tests
+ */
+
+public abstract class AbstractSamzaBench {
+  protected static final String OPT_SHORT_PROPERTIES_FILE = "p";
+  protected static final String OPT_LONG_PROPERTIES_FILE = "props";
+  protected static final String OPT_ARG_PROPERTIES_FILE = "PROPERTIES_FILE";
+  protected static final String OPT_DESC_PROPERTIES_FILE = "Path to the 
properties file.";
+
+  protected static final String OPT_SHORT_NUM_EVENTS = "n";
+  protected static final String OPT_LONG_NUM_EVENTS = "numEvents";
+  protected static final String OPT_ARG_NUM_EVENTS = "NUMBER_EVENTS";
+  protected static final String OPT_DESC_NUM_EVENTS = "Total number of events 
to consume.";
+
+  protected static final String OPT_SHORT_START_PARTITION = "sp";
+  protected static final String OPT_LONG_START_PARTITION = "startPartition";
+  protected static final String OPT_ARG_START_PARTITION = "START_PARTITION";
+  protected static final String OPT_DESC_START_PARTITION = "Start partition.";
+
+  protected static final String OPT_SHORT_END_PARTITION = "ep";
+  protected static final String OPT_LONG_END_PARTITION = "endPartition";
+  protected static final String OPT_ARG_END_PARTITION = "END_PARTITION";
+  protected static final String OPT_DESC_END_PARTITION = "End partition.";
+
+  protected static final String OPT_SHORT_STREAM = "s";
+  protected static final String OPT_LONG_STREAM = "streamId";
+  protected static final String OPT_ARG_STREAM = "STREAM_ID";
+  protected static final String OPT_DESC_STREAM = "STREAM ID.";
+  protected static final String CFG_STREAM_SYSTEM_NAME = 
"streams.%s.samza.system";
+  protected static final String CFG_SYSTEM_FACTORY = 
"systems.%s.samza.factory";
+  protected static final String CFG_PHYSICAL_STREAM_NAME = 
"streams.%s.samza.physical.name";
+  protected final Options options;
+  protected final CommandLine cmd;
+  protected SystemFactory factory;
+  protected Config config;
+  protected String systemName;
+  protected String physicalStreamName;
+  protected int startPartition;
+  protected int endPartition;
+  protected int totalEvents;
+  protected String streamId;
+
+  public AbstractSamzaBench(String scriptName, String args[]) throws 
ParseException {
+    options = new Options();
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_PROPERTIES_FILE, 
OPT_LONG_PROPERTIES_FILE, OPT_ARG_PROPERTIES_FILE,
+            true, OPT_DESC_PROPERTIES_FILE));
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_NUM_EVENTS, 
OPT_LONG_NUM_EVENTS, OPT_ARG_NUM_EVENTS, true,
+            OPT_DESC_NUM_EVENTS));
+
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_START_PARTITION, 
OPT_LONG_START_PARTITION, OPT_ARG_START_PARTITION,
+            true, OPT_DESC_START_PARTITION));
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_END_PARTITION, 
OPT_LONG_END_PARTITION, OPT_ARG_END_PARTITION, true,
+            OPT_DESC_END_PARTITION));
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_STREAM, OPT_LONG_STREAM, 
OPT_ARG_STREAM, true, OPT_DESC_STREAM));
+
+    addOptions(options);
+
+    CommandLineParser parser = new BasicParser();
+    try {
+      cmd = parser.parse(options, args);
+    } catch (Exception e) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp(String.format("Error: %s.sh", scriptName), options);
+      throw e;
+    }
+  }
+
+  public void start() throws IOException, InterruptedException {
+    startPartition = 
Integer.parseInt(cmd.getOptionValue(OPT_SHORT_START_PARTITION));
+    endPartition = 
Integer.parseInt(cmd.getOptionValue(OPT_SHORT_END_PARTITION));
+    totalEvents = Integer.parseInt(cmd.getOptionValue(OPT_SHORT_NUM_EVENTS));
+    String propsFile = cmd.getOptionValue(OPT_SHORT_PROPERTIES_FILE);
+    streamId = cmd.getOptionValue(OPT_SHORT_STREAM);
+    Properties props = new Properties();
+    props.load(new FileInputStream(propsFile));
+    addMoreSystemConfigs(props);
+    config = convertToSamzaConfig(props);
+    systemName = config.get(String.format(CFG_STREAM_SYSTEM_NAME, streamId));
+    String systemFactory = config.get(String.format(CFG_SYSTEM_FACTORY, 
systemName));
+    physicalStreamName = config.get(String.format(CFG_PHYSICAL_STREAM_NAME, 
streamId));
+
+    factory = ReflectionUtils.createInstance(systemFactory);
+    if (factory == null) {
+      throw new RuntimeException("Cannot instantiate systemfactory " + 
systemFactory);
+    }
+  }
+
+  /**
+   * Derived classes can override this method to add any additional properties 
needed to create the System
+   * @param props Properties to which system configs can be added.
+   */
+  protected void addMoreSystemConfigs(Properties props) {
+  }
+
+  /**
+   * Derived classes can override this method to add any additional options 
that benchmark test may need.
+   * @param options Options to which additional command line options can be 
added.
+   */
+  protected void addOptions(Options options) {
+  }
+
+  Config convertToSamzaConfig(Properties props) {
+      Map<String, String> propsValue =
+          
props.stringPropertyNames().stream().collect(Collectors.toMap(Function.identity(),
 props::getProperty));
+      return new MapConfig(propsValue);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/src/main/java/org/apache/samza/tools/benchmark/ConfigBasedSspGrouperFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/ConfigBasedSspGrouperFactory.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/ConfigBasedSspGrouperFactory.java
new file mode 100644
index 0000000..073fbb0
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/ConfigBasedSspGrouperFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.tools.benchmark;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
+import 
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Grouper that assigns only the subset of partitions configured to the task. 
This can be used only
+ * with {@link org.apache.samza.standalone.PassthroughJobCoordinator}.
+ */
+class ConfigBasedSspGrouperFactory implements 
SystemStreamPartitionGrouperFactory {
+
+  /**
+   * Comma separated list of partitions that needs to be assigned to this task.
+   */
+  public static final String CONFIG_STREAM_PARTITIONS = 
"streams.%s.partitions";
+  private static final String CFG_PARTITIONS_DELIMITER = ",";
+
+  @Override
+  public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config 
config) {
+    return new ConfigBasedSspGrouper(config);
+  }
+
+  private class ConfigBasedSspGrouper implements SystemStreamPartitionGrouper {
+    private final Config config;
+    private HashMap<String, Set<Integer>> _streamPartitionsMap = new 
HashMap<>();
+
+    public ConfigBasedSspGrouper(Config config) {
+      this.config = config;
+    }
+
+    @Override
+    public Map<TaskName, Set<SystemStreamPartition>> 
group(Set<SystemStreamPartition> ssps) {
+      Set<SystemStreamPartition> filteredSsps = new HashSet<>();
+      for (SystemStreamPartition ssp : ssps) {
+        Set<Integer> partitions = getPartitions(ssp.getSystemStream());
+        if (partitions.contains(ssp.getPartition().getPartitionId())) {
+          filteredSsps.add(ssp);
+        }
+      }
+      HashMap<TaskName, Set<SystemStreamPartition>> group = new HashMap<>();
+      group.put(new TaskName("TestTask"), filteredSsps);
+      return group;
+    }
+
+    private Set<Integer> getPartitions(SystemStream systemStream) {
+      String streamName = systemStream.getStream();
+
+      if (!_streamPartitionsMap.containsKey(streamName)) {
+        String partitions = config.get(String.format(CONFIG_STREAM_PARTITIONS, 
streamName));
+        _streamPartitionsMap.put(streamName, 
Arrays.stream(partitions.split(CFG_PARTITIONS_DELIMITER))
+            .map(Integer::parseInt)
+            .collect(Collectors.toSet()));
+      }
+      return _streamPartitionsMap.get(streamName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerBench.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerBench.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerBench.java
new file mode 100644
index 0000000..cbfc865
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerBench.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.tools.benchmark;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.cli.ParseException;
+import org.apache.samza.Partition;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.NoOpMetricsRegistry;
+
+
+/**
+ * Generic benchmark test for {@link SystemConsumer}.
+ */
+public class SystemConsumerBench extends AbstractSamzaBench {
+
+  public static void main(String args[]) throws Exception {
+    SystemConsumerBench bench = new SystemConsumerBench(args);
+    bench.start();
+  }
+
+  public SystemConsumerBench(String args[]) throws ParseException {
+    super("system-consumer-bench", args);
+  }
+
+  public void start() throws IOException, InterruptedException {
+    super.start();
+    SystemAdmin systemAdmin = factory.getAdmin(systemName, config);
+    SystemStreamMetadata ssm =
+        
systemAdmin.getSystemStreamMetadata(Collections.singleton(physicalStreamName)).get(physicalStreamName);
+
+    NoOpMetricsRegistry metricsRegistry = new NoOpMetricsRegistry();
+    Set<SystemStreamPartition> ssps = createSSPs(systemName, 
physicalStreamName, startPartition, endPartition);
+    SystemConsumer consumer = factory.getConsumer(systemName, config, 
metricsRegistry);
+    for (SystemStreamPartition ssp : ssps) {
+      consumer.register(ssp, 
ssm.getSystemStreamPartitionMetadata().get(ssp.getPartition()).getOldestOffset());
+    }
+
+    consumer.start();
+
+    System.out.println("starting consumption at " + Instant.now());
+    Instant startTime = Instant.now();
+    int numEvents = 0;
+    while (numEvents < totalEvents) {
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollResult = 
consumer.poll(ssps, 2000);
+      numEvents += pollResult.values().stream().mapToInt(List::size).sum();
+    }
+
+    System.out.println("Ending consumption at " + Instant.now());
+    System.out.println(String.format("Event Rate is %s Messages/Sec ",
+        (numEvents * 1000 / Duration.between(startTime, 
Instant.now()).toMillis())));
+    consumer.stop();
+    System.exit(0);
+  }
+
+  Set<SystemStreamPartition> createSSPs(String systemName, String 
physicalStreamName, int startPartition,
+      int endPartition) {
+    return IntStream.range(startPartition, endPartition)
+        .mapToObj(x -> new SystemStreamPartition(systemName, 
physicalStreamName, new Partition(x)))
+        .collect(Collectors.toSet());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
new file mode 100644
index 0000000..5456db6
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.tools.benchmark;
+
+import com.google.common.base.Joiner;
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.cli.ParseException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+
+
+/**
+ * Generic benchmark test for a test consumer but with samza framework
+ */
+public class SystemConsumerWithSamzaBench extends AbstractSamzaBench {
+  public SystemConsumerWithSamzaBench(String[] args) throws ParseException {
+    super("system-consumer-with-samza-bench", args);
+  }
+
+  public static void main(String args[]) throws Exception {
+    SystemConsumerBench bench = new SystemConsumerBench(args);
+    bench.start();
+  }
+
+  @Override
+  public void addMoreSystemConfigs(Properties props) {
+    props.put("app.runner.class", LocalApplicationRunner.class.getName());
+    List<Integer> partitions = IntStream.range(startPartition, 
endPartition).boxed().collect(Collectors.toList());
+    props.put(JobConfig.JOB_NAME(), "SamzaBench");
+    props.put(JobConfig.PROCESSOR_ID(), "1");
+    props.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
+    
props.put(String.format(ConfigBasedSspGrouperFactory.CONFIG_STREAM_PARTITIONS, 
streamId),
+        Joiner.on(",").join(partitions));
+    props.put(TaskConfig.GROUPER_FACTORY(), 
ConfigBasedSspGrouperFactory.class.getName());
+  }
+
+  public void start() throws IOException, InterruptedException {
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+    super.start();
+    MessageConsumer consumeFn = new MessageConsumer();
+    StreamApplication app = (graph, config) -> {
+      MessageStream<Object> stream = graph.getInputStream(streamId);
+      stream.map(consumeFn);
+    };
+
+    runner.run(app);
+
+    while (consumeFn.getEventsConsumed() < totalEvents) {
+      Thread.sleep(10);
+    }
+
+    Instant endTime = Instant.now();
+
+    runner.kill(app);
+
+    System.out.println("\n*******************");
+    System.out.println(String.format("Started at %s Ending at %s ", 
consumeFn.startTime, endTime));
+    System.out.println(String.format("Event Rate is %s Messages/Sec ",
+        (consumeFn.getEventsConsumed() * 1000 / 
Duration.between(consumeFn.startTime, Instant.now()).toMillis())));
+
+    System.out.println(
+        "Event Rate is " + consumeFn.getEventsConsumed() * 1000 / 
Duration.between(consumeFn.startTime, endTime).toMillis());
+    System.out.println("*******************\n");
+
+    System.exit(0);
+  }
+
+  private class MessageConsumer implements MapFunction<Object, Object> {
+    AtomicInteger eventsConsumed = new AtomicInteger(0);
+    volatile Instant startTime;
+
+    @Override
+    public Object apply(Object message) {
+
+      eventsConsumed.incrementAndGet();
+      if (eventsConsumed.get() == 1) {
+        startTime = Instant.now();
+      }
+      return message;
+    }
+
+    public int getEventsConsumed() {
+      return eventsConsumed.get();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/12968cfb/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemProducerBench.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemProducerBench.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemProducerBench.java
new file mode 100644
index 0000000..6c2a5f2
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemProducerBench.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.tools.benchmark;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.tools.CommandLineHelper;
+import org.apache.samza.tools.RandomValueGenerator;
+import org.apache.samza.util.NoOpMetricsRegistry;
+
+
+/**
+ * Generic benchmark test for a {@link SystemProducer}.
+ */
+public class SystemProducerBench extends AbstractSamzaBench {
+
+  private static final String OPT_SHORT_MESSAGE_SIZE = "sz";
+  private static final String OPT_LONG_MESSAGE_SIZE = "size";
+  private static final String OPT_ARG_MESSAGE_SIZE = "MESSAGE_SIZE";
+  private static final String OPT_DESC_MESSAGE_SIZE = "Size of the message in 
bytes.";
+
+  private byte[] value;
+
+  public static void main(String args[]) throws Exception {
+    SystemProducerBench bench = new SystemProducerBench(args);
+    bench.start();
+  }
+
+  public SystemProducerBench(String args[]) throws ParseException {
+    super("system-producer", args);
+  }
+
+  public void addOptions(Options options) {
+    options.addOption(
+        CommandLineHelper.createOption(OPT_SHORT_MESSAGE_SIZE, 
OPT_LONG_MESSAGE_SIZE, OPT_ARG_MESSAGE_SIZE, true,
+            OPT_DESC_MESSAGE_SIZE));
+  }
+
+  public void start() throws IOException, InterruptedException {
+
+    super.start();
+    String source = "SystemProducerBench";
+
+    int size = Integer.parseInt(cmd.getOptionValue(OPT_SHORT_MESSAGE_SIZE));
+    RandomValueGenerator randGenerator = new 
RandomValueGenerator(System.currentTimeMillis());
+    value = randGenerator.getNextString(size, size).getBytes();
+
+    NoOpMetricsRegistry metricsRegistry = new NoOpMetricsRegistry();
+    List<SystemStreamPartition> ssps = createSSPs(systemName, 
physicalStreamName, startPartition, endPartition);
+    SystemProducer producer = factory.getProducer(systemName, config, 
metricsRegistry);
+    producer.register(source);
+    producer.start();
+
+    System.out.println("starting production at " + Instant.now());
+    Instant startTime = Instant.now();
+    for (int index = 0; index < totalEvents; index++) {
+      SystemStreamPartition ssp = ssps.get(index % ssps.size());
+      OutgoingMessageEnvelope messageEnvelope = createMessageEnvelope(ssp, 
index);
+      producer.send(source, messageEnvelope);
+    }
+
+    System.out.println("Ending production at " + Instant.now());
+    System.out.println(String.format("Event Rate is %s Messages/Sec",
+        (totalEvents * 1000 / Duration.between(startTime, 
Instant.now()).toMillis())));
+
+    producer.flush(source);
+
+    System.out.println("Ending flush at " + Instant.now());
+    System.out.println(String.format("Event Rate with flush is %s 
Messages/Sec",
+        (totalEvents * 1000 / Duration.between(startTime, 
Instant.now()).toMillis())));
+    producer.stop();
+    System.exit(0);
+  }
+
+  /**
+   * Naive create message implementation that uses the same random string for 
each of the message.
+   * If a system producer wants to test with a specific type of messages, It 
needs to override this method.
+   */
+  OutgoingMessageEnvelope createMessageEnvelope(SystemStreamPartition ssp, int 
index) {
+    return new OutgoingMessageEnvelope(ssp.getSystemStream(), 
String.valueOf(index), value);
+  }
+
+  /**
+   * Simple implementation to create SSPs that assumes that the partitions are 
ordered list of integers.
+   */
+  List<SystemStreamPartition> createSSPs(String systemName, String 
physicalStreamName, int startPartition,
+      int endPartition) {
+    return IntStream.range(startPartition, endPartition)
+        .mapToObj(x -> new SystemStreamPartition(systemName, 
physicalStreamName, new Partition(x)))
+        .collect(Collectors.toList());
+  }
+}

Reply via email to