Updated Branches:
  refs/heads/master bda8df6e2 -> 9b360405c

SAMZA-111; performance fix in SystemConsumers to speed up consumers that have a 
high number of partitions (e.g. 100s or more).


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/9b360405
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/9b360405
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/9b360405

Branch: refs/heads/master
Commit: 9b360405c598a95e08c41f75a29742b9f20bbcee
Parents: bda8df6
Author: Chris Riccomini <[email protected]>
Authored: Thu Jan 2 15:15:19 2014 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Thu Jan 2 15:15:19 2014 -0800

----------------------------------------------------------------------
 .reviewboardrc                                  |   2 +-
 build.gradle                                    |  13 +-
 .../apache/samza/system/SystemConsumers.scala   | 131 +++++++++++++-
 samza-test/java.hprof.txt                       |  65 -------
 .../samza/system/mock/MockSystemAdmin.java      |  56 ++++++
 .../samza/system/mock/MockSystemConsumer.java   | 172 +++++++++++++++++++
 .../samza/system/mock/MockSystemFactory.java    |  99 +++++++++++
 .../test/integration/TestStatefulTask.scala     |   1 -
 .../TestSamzaContainerPerformance.scala         | 156 +++++++++++++++++
 9 files changed, 620 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/.reviewboardrc
----------------------------------------------------------------------
diff --git a/.reviewboardrc b/.reviewboardrc
index 0ee6a71..0a0a83d 100644
--- a/.reviewboardrc
+++ b/.reviewboardrc
@@ -1 +1 @@
-REPOSITORY = 'git://git.apache.org/incubator-samza.git'
\ No newline at end of file
+REPOSITORY = 'git://git.apache.org/incubator-samza.git'

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 49a8459..04b0de7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -179,6 +179,7 @@ project(":samza-test_$scalaVersion") {
   dependencies {
     compile project(':samza-api')
     compile project(":samza-kv_$scalaVersion")
+    compile project(":samza-core_$scalaVersion")
     compile "org.scala-lang:scala-library:$scalaVersion"
     compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion"
     compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
@@ -187,12 +188,18 @@ project(":samza-test_$scalaVersion") {
     compile files("../samza-kafka/lib/kafka_$scalaVersion-" + kafkaVersion + 
".jar")
     testCompile files("../samza-kafka/lib/kafka_$scalaVersion-" + kafkaVersion 
+ "-test.jar")
     testCompile "com.101tec:zkclient:$zkClientVersion"
-    testCompile project(":samza-core_$scalaVersion")
     testCompile project(":samza-kafka_$scalaVersion")
+    testRuntime "org.slf4j:slf4j-simple:1.6.2"
   }
 
   test {
-    // Bump up the heap so we can start ZooKeeper and Kafka brokers.
-    maxHeapSize = "1024m"
+    // Bump up the heap so we can start ZooKeeper and Kafka brokers. Also 
+    // required for TestSamzaContainerPerformance when a high thread count 
+    // with a lot of inputs is used.
+    maxHeapSize = "4096m"
+
+    // Forward all samza.* system properties to test subprocesses. This is 
+    // useful for configuring TestSamzaContainerPerformance from the CLI.
+    systemProperties = System.properties.findAll { it.key.startsWith("samza") }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 5cbffe5..dd7d357 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -25,19 +25,116 @@ import org.apache.samza.serializers.SerdeManager
 import grizzled.slf4j.Logging
 import org.apache.samza.system.chooser.MessageChooser
 
+/**
+ * The SystemConsumers class coordinates between all SystemConsumers, the
+ * MessageChooser, and the SamzaContainer. Its job is to poll each
+ * SystemConsumer for messages, update the
+ * {@link org.apache.samza.system.chooser.MessageChooser} with new incoming
+ * messages, poll the MessageChooser for the next message to process, and
+ * return that message to the SamzaContainer.
+ */
 class SystemConsumers(
+
+  /**
+   * The class that determines the order to process incoming messages.
+   */
   chooser: MessageChooser,
+
+  /**
+   * A map of SystemConsumers that should be polled for new messages.
+   */
   consumers: Map[String, SystemConsumer],
+
+  /**
+   * The class that handles deserialization of incoming messages.
+   */
   serdeManager: SerdeManager = new SerdeManager,
+
+  /**
+   * A helper class to hold all of SystemConsumers' metrics.
+   */
   metrics: SystemConsumersMetrics = new SystemConsumersMetrics,
+
+  /**
+   * The maximum number of messages to poll from a single 
SystemStreamPartition.
+   */
   maxMsgsPerStreamPartition: Int = 1000,
+
+  /**
+   * A percentage threshold that determines when a SystemStreamPartition
+   * should be polled again. 0.0 means poll for more messages only when
+   * SystemConsumer's buffer is totally empty. 0.2 means poll for more messages
+   * when SystemConsumers' buffer is 80% empty. SystemConsumers' buffer size
+   * is determined by maxMsgsPerStreamPartition.
+   */
+  fetchThresholdPct: Float = 0f,
+
+  /**
+   * If MessageChooser returns null when it's polled, SystemConsumers will
+   * poll each SystemConsumer with a timeout next time it tries to poll for
+   * messages. Setting the timeout to 0 means that SamzaContainer's main
+   * thread will sit in a tight loop polling every SystemConsumer over and
+   * over again if no new messages are available.
+   */
   noNewMessagesTimeout: Long = 10) extends Logging {
 
+  /**
+   * A buffer of incoming messages grouped by SystemStreamPartition.
+   */
   var unprocessedMessages = Map[SystemStreamPartition, 
Queue[IncomingMessageEnvelope]]()
+
+  /**
+   * The MessageChooser only gets updated with one 
message-per-SystemStreamPartition
+   * at a time. The MessageChooser will not receive a second message from the
+   * same SystemStreamPartition until the first message that it received has
+   * been returned to SystemConsumers. This set keeps track of which
+   * SystemStreamPartitions are valid to give to the MessageChooser.
+   */
   var neededByChooser = Set[SystemStreamPartition]()
+
+  /**
+   * A map of every SystemStreamPartition that SystemConsumers is responsible
+   * for polling. The values are how many messages to poll for during the next
+   * SystemConsumers.poll call.
+   *
+   * If the value for a SystemStreamPartition is maxMsgsPerStreamPartition,
+   * then the implication is that SystemConsumers has no incoming messages in
+   * its buffer for the SystemStreamPartition. If the value is 0 then the
+   * SystemConsumers' buffer is full for the SystemStreamPartition.
+   */
   var fetchMap = Map[SystemStreamPartition, java.lang.Integer]()
+
+  /**
+   * A cache of fetchMap values, grouped according to the system. This is
+   * purely a trick to get better performance out of the SystemConsumsers
+   * class, since the map from systemName to its fetchMap is used for every
+   * poll call.
+   */
+  var systemFetchMapCache = Map[String, Map[SystemStreamPartition, 
java.lang.Integer]]()
+
+  /**
+   * Default timeout to noNewMessagesTimeout. Every time SystemConsumers
+   * receives incoming messages, it sets timout to 0. Every time
+   * SystemConsumers receives no new incoming messages from the MessageChooser,
+   * it sets timeout to noNewMessagesTimeout again.
+   */
   var timeout = noNewMessagesTimeout
 
+  /**
+   * Used to determine when the next poll should take place for a given
+   * SystemStreamPartition. SystemConsumers inspects the value of fetchMap for 
each
+   * SystemStreamPartition, and decides to poll for the SystemStreamPartition
+   * if the fetchMap value is greater than or equal to the
+   * depletedQueueSizeThreshold. For example, suppose the fetchThresholdPct is
+   * 0.2, and the maxMsgsPerStreamPartition is 1000. This would result in
+   * depletedQueueSizeThreshold being 800. This a SystemStreamPartition with a
+   * fetchMap value of 936 (164 messages in the buffer is less than 20% of
+   * 1000) would be polled for more messages, while a SystemStream partition
+   * with a fetchMap value of 548 would not be polled for more messages (452
+   * messages in the buffer is greater than 20% of 1000).
+   */
+  val depletedQueueSizeThreshold = (maxMsgsPerStreamPartition * (1 - 
fetchThresholdPct)).toInt
+
   debug("Got stream consumers: %s" format consumers)
   debug("Got max messages per stream: %s" format maxMsgsPerStreamPartition)
   debug("Got no new message timeout: %s" format noNewMessagesTimeout)
@@ -72,7 +169,7 @@ class SystemConsumers(
     debug("Registering stream: %s, %s" format (systemStreamPartition, 
lastReadOffset))
 
     neededByChooser += systemStreamPartition
-    fetchMap += systemStreamPartition -> maxMsgsPerStreamPartition
+    updateFetchMap(systemStreamPartition, maxMsgsPerStreamPartition)
     unprocessedMessages += systemStreamPartition -> 
Queue[IncomingMessageEnvelope]()
     consumers(systemStreamPartition.getSystem).register(systemStreamPartition, 
lastReadOffset)
     chooser.register(systemStreamPartition, lastReadOffset)
@@ -115,11 +212,15 @@ class SystemConsumers(
       // If we have messages for a stream that the chooser needs, then update.
       if (fetchMap(systemStreamPartition).intValue < 
maxMsgsPerStreamPartition) {
         chooser.update(unprocessedMessages(systemStreamPartition).dequeue)
-        fetchMap += systemStreamPartition -> 
(fetchMap(systemStreamPartition).intValue + 1)
+        updateFetchMap(systemStreamPartition)
         neededByChooser -= systemStreamPartition
       })
   }
 
+  /**
+   * Poll a system for new messages from SystemStreamPartitions that have
+   * dipped below the depletedQueueSizeThreshold threshold.
+   */
   private def poll(systemName: String) = {
     debug("Polling system consumer: %s" format systemName)
 
@@ -127,9 +228,9 @@ class SystemConsumers(
 
     val consumer = consumers(systemName)
 
-    debug("Filtering for system: %s, %s" format (systemName, fetchMap))
+    debug("Getting fetch map for system: %s" format systemName)
 
-    val systemFetchMap = fetchMap.filterKeys(_.getSystem.equals(systemName))
+    val systemFetchMap = systemFetchMapCache(systemName)
 
     debug("Fetching: %s" format systemFetchMap)
 
@@ -147,7 +248,7 @@ class SystemConsumers(
 
       debug("Got message for: %s, %s" format (systemStreamPartition, envelope))
 
-      fetchMap += systemStreamPartition -> 
(fetchMap(systemStreamPartition).intValue - 1)
+      updateFetchMap(systemStreamPartition, -1)
 
       debug("Updated fetch map for: %s, %s" format (systemStreamPartition, 
fetchMap))
 
@@ -156,4 +257,24 @@ class SystemConsumers(
       debug("Updated unprocessed messages for: %s, %s" format 
(systemStreamPartition, unprocessedMessages))
     })
   }
+
+  /**
+   * A helper method that updates both fetchMap and systemFetchMapCache
+   * simultaneously. This is a convenience method to make sure that the
+   * systemFetchMapCache stays in sync with fetchMap.
+   */
+  private def updateFetchMap(systemStreamPartition: SystemStreamPartition, 
amount: Int = 1) {
+    val fetchSize = fetchMap.getOrElse(systemStreamPartition, new 
Integer(0)).intValue + amount
+    val systemName = systemStreamPartition.getSystem
+    var systemFetchMap = systemFetchMapCache.getOrElse(systemName, Map())
+
+    if (fetchSize >= depletedQueueSizeThreshold) {
+      systemFetchMap += systemStreamPartition -> fetchSize
+    } else {
+      systemFetchMap -= systemStreamPartition
+    }
+
+    fetchMap += systemStreamPartition -> fetchSize
+    systemFetchMapCache += systemName -> systemFetchMap
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/samza-test/java.hprof.txt
----------------------------------------------------------------------
diff --git a/samza-test/java.hprof.txt b/samza-test/java.hprof.txt
deleted file mode 100644
index c5693d2..0000000
--- a/samza-test/java.hprof.txt
+++ /dev/null
@@ -1,65 +0,0 @@
-JAVA PROFILE 1.0.1, created Sat Aug  3 14:27:50 2013
-
-Header for -agentlib:hprof (or -Xrunhprof) ASCII Output (JDK 5.0 JVMTI based)
-
-%W% %E%
-
- Copyright (c) 2006 Sun Microsystems, Inc. All  Rights Reserved.
-
-WARNING!  This file format is under development, and is subject to
-change without notice.
-
-This file contains the following types of records:
-
-THREAD START
-THREAD END      mark the lifetime of Java threads
-
-TRACE           represents a Java stack trace.  Each trace consists
-                of a series of stack frames.  Other records refer to
-                TRACEs to identify (1) where object allocations have
-                taken place, (2) the frames in which GC roots were
-                found, and (3) frequently executed methods.
-
-HEAP DUMP       is a complete snapshot of all live objects in the Java
-                heap.  Following distinctions are made:
-
-                ROOT    root set as determined by GC
-                CLS     classes 
-                OBJ     instances
-                ARR     arrays
-
-SITES           is a sorted list of allocation sites.  This identifies
-                the most heavily allocated object types, and the TRACE
-                at which those allocations occurred.
-
-CPU SAMPLES     is a statistical profile of program execution.  The VM
-                periodically samples all running threads, and assigns
-                a quantum to active TRACEs in those threads.  Entries
-                in this record are TRACEs ranked by the percentage of
-                total quanta they consumed; top-ranked TRACEs are
-                typically hot spots in the program.
-
-CPU TIME        is a profile of program execution obtained by measuring
-                the time spent in individual methods (excluding the time
-                spent in callees), as well as by counting the number of
-                times each method is called. Entries in this record are
-                TRACEs ranked by the percentage of total CPU time. The
-                "count" field indicates the number of times each TRACE 
-                is invoked.
-
-MONITOR TIME    is a profile of monitor contention obtained by measuring
-                the time spent by a thread waiting to enter a monitor.
-                Entries in this record are TRACEs ranked by the percentage
-                of total monitor contention time and a brief description
-                of the monitor.  The "count" field indicates the number of 
-                times the monitor was contended at that TRACE.
-
-MONITOR DUMP    is a complete snapshot of all the monitors and threads in 
-                the System.
-
-HEAP DUMP, SITES, CPU SAMPLES|TIME and MONITOR DUMP|TIME records are generated 
-at program exit.  They can also be obtained during program execution by typing 
-Ctrl-\ (on Solaris) or by typing Ctrl-Break (on Win32).
-
---------
-

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java 
b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
new file mode 100644
index 0000000..13ac689
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.mock;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * A SystemAdmin that returns a constant set of partitions for all streams.
+ */
+public class MockSystemAdmin implements SystemAdmin {
+  private final Set<Partition> partitions;
+
+  public MockSystemAdmin(int partitionCount) {
+    this.partitions = new HashSet<Partition>();
+
+    for (int i = 0; i < partitionCount; ++i) {
+      partitions.add(new Partition(i));
+    }
+  }
+
+  @Override
+  public Set<Partition> getPartitions(String streamName) {
+    // Partitions are immutable, so making the set immutable should make the
+    // partition set fully safe to re-use.
+    return Collections.unmodifiableSet(partitions);
+  }
+
+  @Override
+  public Map<SystemStreamPartition, String> getLastOffsets(Set<String> 
streams) {
+    throw new RuntimeException("MockSystemAdmin doesn't implement this 
method.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java 
b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
new file mode 100644
index 0000000..c0791d7
--- /dev/null
+++ 
b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemConsumer.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.mock;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.util.Clock;
+
+/**
+ * MockSystemConsumer is a class that simulates a multi-threaded consumer that
+ * uses BlockingEnvelopeMap. The primary use for this class is to do 
performance
+ * testing.
+ * 
+ * This class works by starting up (threadCount) threads. Each thread adds
+ * (messagesPerBatch) to the BlockingEnvelopeMap, then sleeps for
+ * (brokerSleepMs). The sleep is important to simulate network latency when
+ * executing a fetch against a remote streaming system (i.e. Kafka).
+ */
+public class MockSystemConsumer extends BlockingEnvelopeMap {
+  private final int messagesPerBatch;
+  private final int threadCount;
+  private final int brokerSleepMs;
+
+  /**
+   * The SystemStreamPartitions that this consumer is in charge of.
+   */
+  private final Set<SystemStreamPartition> ssps;
+
+  /**
+   * The consumer threads that are putting IncomingMessageEnvelopes into
+   * BlockingEnvelopeMap.
+   */
+  private List<Thread> threads;
+
+  /**
+   * 
+   * @param messagesPerBatch
+   *          The number of messages to add to the BlockingEnvelopeMap before
+   *          sleeping.
+   * @param threadCount
+   *          How many threads to run.
+   * @param brokerSleepMs
+   *          How long each thread should sleep between batch writes.
+   */
+  public MockSystemConsumer(int messagesPerBatch, int threadCount, int 
brokerSleepMs) {
+    super(new MetricsRegistryMap("test-container-performance"), new Clock() {
+      @Override
+      public long currentTimeMillis() {
+        return System.currentTimeMillis();
+      }
+    });
+
+    this.messagesPerBatch = messagesPerBatch;
+    this.threadCount = threadCount;
+    this.brokerSleepMs = brokerSleepMs;
+    this.ssps = new HashSet<SystemStreamPartition>();
+    this.threads = new ArrayList<Thread>(threadCount);
+  }
+
+  /**
+   * Assign SystemStreamPartitions to all of the threads, and start them up to
+   * begin simulating consuming messages.
+   */
+  @Override
+  public void start() {
+    for (int i = 0; i < threadCount; ++i) {
+      Set<SystemStreamPartition> threadSsps = new 
HashSet<SystemStreamPartition>();
+
+      // Assign SystemStreamPartitions for this thread.
+      for (SystemStreamPartition ssp : ssps) {
+        if (Math.abs(ssp.hashCode()) % threadCount == i) {
+          threadSsps.add(ssp);
+        }
+      }
+
+      // Start thread.
+      Thread thread = new Thread(new MockSystemConsumerRunnable(threadSsps));
+      thread.setDaemon(true);
+      threads.add(thread);
+      thread.start();
+    }
+  }
+
+  /**
+   * Kill all the threads, and shutdown.
+   */
+  @Override
+  public void stop() {
+    for (Thread thread : threads) {
+      thread.interrupt();
+    }
+
+    try {
+      for (Thread thread : threads) {
+        thread.join();
+      }
+    } catch (InterruptedException e) {
+    }
+  }
+
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, String 
lastReadOffset) {
+    super.register(systemStreamPartition, lastReadOffset);
+    ssps.add(systemStreamPartition);
+    setIsAtHead(systemStreamPartition, true);
+  }
+
+  /**
+   * The worker thread for MockSystemConsumer that simulates reading messages
+   * from a remote streaming system (i.e. Kafka), and writing them to the
+   * BlockingEnvelopeMap.
+   */
+  public class MockSystemConsumerRunnable implements Runnable {
+    private final Set<SystemStreamPartition> ssps;
+
+    public MockSystemConsumerRunnable(Set<SystemStreamPartition> ssps) {
+      this.ssps = ssps;
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (!Thread.interrupted() && ssps.size() > 0) {
+          Set<SystemStreamPartition> sspsToFetch = new 
HashSet<SystemStreamPartition>();
+
+          // Only fetch messages when there are no outstanding messages left.
+          for (SystemStreamPartition ssp : ssps) {
+            if (getNumMessagesInQueue(ssp) <= 0) {
+              sspsToFetch.add(ssp);
+            }
+          }
+
+          // Simulate a broker fetch request's network latency.
+          Thread.sleep(brokerSleepMs);
+
+          // Add messages to the BlockingEnvelopeMap.
+          for (SystemStreamPartition ssp : sspsToFetch) {
+            for (int i = 0; i < messagesPerBatch; ++i) {
+              add(ssp, new IncomingMessageEnvelope(ssp, "0", "key", "value"));
+            }
+          }
+        }
+      } catch (InterruptedException e) {
+        System.out.println("Got interrupt. Shutting down.");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemFactory.java 
b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemFactory.java
new file mode 100644
index 0000000..02d6d37
--- /dev/null
+++ 
b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.mock;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+
+/**
+ * MockSystemFactory was built to make performance testing easier.
+ */
+public class MockSystemFactory implements SystemFactory {
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
+    MockSystemConsumerConfig consumerConfig = new 
MockSystemConsumerConfig(systemName, config);
+
+    return new MockSystemConsumer(consumerConfig.getMessagesPerBatch(), 
consumerConfig.getConsumerThreadCount(), consumerConfig.getBrokerSleepMs());
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry registry) {
+    throw new RuntimeException("MockSystemProducer not implemented.");
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    MockSystemConsumerConfig consumerConfig = new 
MockSystemConsumerConfig(systemName, config);
+
+    return new MockSystemAdmin(consumerConfig.getPartitionsPerStream());
+  }
+
+  /**
+   * A helper class that's useful for yanking out MockSystem's configuration
+   * out.
+   */
+  public static class MockSystemConsumerConfig {
+    public static final int DEFAULT_PARTITION_COUNT = 4;
+    public static final int DEFAULT_MESSAGES_PER_BATCH = 5000;
+    public static final int DEFAULT_CONSUMER_THREAD_COUNT = 12;
+    public static final int DEFAULT_BROKER_SLEEP_MS = 1;
+
+    private final String systemName;
+    private final Config config;
+
+    public MockSystemConsumerConfig(String systemName, Config config) {
+      this.systemName = systemName;
+      this.config = config;
+    }
+
+    /**
+     * @return the partition count to be used for MockSystemAdmin.
+     */
+    public int getPartitionsPerStream() {
+      return config.getInt("systems." + systemName + ".partitions.per.stream", 
DEFAULT_PARTITION_COUNT);
+    }
+
+    /**
+     * @return the messages per batch to be used for the MockSystemConsumer.
+     */
+    public int getMessagesPerBatch() {
+      return config.getInt("systems." + systemName + ".messages.per.batch", 
DEFAULT_MESSAGES_PER_BATCH);
+    }
+
+    /**
+     * @return the number of threads to be used for the MockSystemConsumer.
+     */
+    public int getConsumerThreadCount() {
+      return config.getInt("systems." + systemName + ".consumer.thread.count", 
DEFAULT_CONSUMER_THREAD_COUNT);
+    }
+
+    /**
+     * @return the milliseconds to sleep between each batch in
+     *         MockSystemConsumer.
+     */
+    public int getBrokerSleepMs() {
+      return config.getInt("systems." + systemName + ".broker.sleep.ms", 
DEFAULT_BROKER_SLEEP_MS);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 9602a52..55c24f8 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -228,7 +228,6 @@ class TestStatefulTask {
     "stores.mystore.msg.serde" -> "string",
     "stores.mystore.changelog" -> "kafka.mystore",
     "systems.kafka.samza.factory" -> 
"org.apache.samza.system.kafka.KafkaSystemFactory",
-    "systems.kafka.samza.partition.manager" -> 
"org.apache.samza.stream.kafka.KafkaPartitionManager",
     "systems.kafka.consumer.zookeeper.connect" -> zkConnect,
     "systems.kafka.consumer.auto.offset.reset" -> "smallest",
     "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format 
port1),

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b360405/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
new file mode 100644
index 0000000..e5a676e
--- /dev/null
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.performance
+
+import org.junit.Test
+import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskCoordinator
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.job.local.LocalJobFactory
+import org.apache.samza.config.MapConfig
+import scala.collection.JavaConversions._
+import org.apache.samza.job.ShellCommandBuilder
+import org.apache.samza.task.InitableTask
+import org.apache.samza.task.TaskContext
+import org.apache.samza.config.Config
+import grizzled.slf4j.Logging
+
+/**
+ * A simple unit test that drives the TestPerformanceTask. This unit test can
+ * be triggered by itself using:
+ *
+ * <pre>
+ * ./gradlew :samza-test:test -Dtest.single=TestSamzaContainerPerformance
+ * <pre>
+ *
+ * Once the test is running, you can attach JConsole, VisualVM, or YourKit to
+ * have a look at how things are behaving.
+ *
+ * The test can be configured with the following system properties:
+ *
+ * <pre>
+ * samza.mock.consumer.thread.count
+ * samza.mock.messages.per.batch
+ * samza.mock.input.streams
+ * samza.mock.partitions.per.stream
+ * samza.mock.broker.sleep.ms
+ * samza.task.log.interval
+ * samza.task.max.messages
+ * <pre>
+ *
+ * For example, you might specify wish to process 10000 messages simulated 
+ * from two input streams on one broker:
+ *
+ * <pre>
+ * ./gradlew :samza-test:test \
+ *   -Dsamza.test.single=TestSamzaContainerPerformance \
+ *   -Psamza.mock.input.streams=2 \
+ *   -Psamza.mock.consumer.thread.count=1 \
+ *   -Psamza.task.log.interval=1000 \
+ *   -Psamza.task.max.messages=10000
+ * <pre>
+ */
+class TestSamzaContainerPerformance extends Logging{
+  val consumerThreadCount = 
System.getProperty("samza.mock.consumer.thread.count", "12").toInt
+  val messagesPerBatch = System.getProperty("samza.mock.messages.per.batch", 
"5000").toInt
+  val streamCount = System.getProperty("samza.mock.input.streams", 
"1000").toInt
+  val partitionsPerStreamCount = 
System.getProperty("samza.mock.partitions.per.stream", "4").toInt
+  val brokerSleepMs = System.getProperty("samza.mock.broker.sleep.ms", 
"1").toInt
+  var logInterval = System.getProperty("samza.task.log.interval", 
"10000").toInt
+  var maxMessages = System.getProperty("samza.task.max.messages", 
"100000").toInt
+
+  val jobConfig = Map(
+    "job.factory.class" -> "org.apache.samza.job.local.LocalJobFactory",
+    "job.name" -> "test-container-performance",
+    "task.class" -> classOf[TestPerformanceTask].getName,
+    "task.inputs" -> (0 until streamCount).map(i => "mock.stream" + 
i).mkString(","),
+    "task.log.interval" -> logInterval.toString,
+    "task.max.messages" -> maxMessages.toString,
+    "systems.mock.samza.factory" -> 
"org.apache.samza.system.mock.MockSystemFactory",
+    "systems.mock.partitions.per.stream" -> partitionsPerStreamCount.toString,
+    "systems.mock.messages.per.batch" -> messagesPerBatch.toString,
+    "systems.mock.consumer.thread.count" -> consumerThreadCount.toString,
+    "systems.mock.broker.sleep.ms" -> brokerSleepMs.toString)
+
+  @Test
+  def testContainerPerformance {
+    info("Testing performance with configuration: %s" format jobConfig)
+
+    val jobFactory = new LocalJobFactory
+    val job = jobFactory
+      .getJob(new MapConfig(jobConfig))
+      .submit
+
+    job.waitForFinish(Int.MaxValue)
+  }
+}
+
+object TestPerformanceTask {
+  var messagesProcessed = 0
+  var startTime = 0L
+}
+
+/**
+ * A little test task that prints how many messages a SamzaContainer has
+ * received, and over what period of time. The messages-processed count is
+ * stored statically, so that all tasks in a single SamzaContainer increment
+ * the same counter.
+ *
+ * The log interval is configured with task.log.interval, which defines how 
many
+ * messages to process before printing a log line. The task will continue 
running
+ * until task.max.messages have been processed, at which point it will shut
+ * itself down.
+ */
+class TestPerformanceTask extends StreamTask with InitableTask with Logging {
+  import TestPerformanceTask._
+
+  /**
+   * How many messages to process before a log message is printed.
+   */
+  var logInterval = 10000
+
+  /**
+   * How many messages to process before shutting down.
+   */
+  var maxMessages = 100000
+
+  def init(config: Config, context: TaskContext) {
+    logInterval = config.getInt("task.log.interval", 10000)
+    maxMessages = config.getInt("task.max.messages", 100000)
+  }
+
+  def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, 
coordinator: TaskCoordinator) {
+    if (startTime == 0) {
+      startTime = System.currentTimeMillis
+    }
+
+    messagesProcessed += 1
+
+    if (messagesProcessed % logInterval == 0) {
+      val seconds = (System.currentTimeMillis - startTime) / 1000
+      info("Processed %s messages in %s seconds." format (messagesProcessed, 
seconds))
+    }
+
+    if (messagesProcessed >= maxMessages) {
+      coordinator.shutdown
+    }
+  }
+}
\ No newline at end of file

Reply via email to