Repository: incubator-samza
Updated Branches:
  refs/heads/master a73c4cbc9 -> 0d544aed1


SAMZA-384; eliminate send method in task instance and send messages from 
collector immediately.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/0d544aed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/0d544aed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/0d544aed

Branch: refs/heads/master
Commit: 0d544aed1ac34fdb8d56bada338de83a1f8126a0
Parents: a73c4cb
Author: Chris Riccomini <[email protected]>
Authored: Tue Aug 19 14:21:30 2014 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Tue Aug 19 14:21:30 2014 -0700

----------------------------------------------------------------------
 .../org/apache/samza/container/RunLoop.scala    | 13 ----
 .../apache/samza/container/SamzaContainer.scala | 11 ++-
 .../samza/container/SamzaContainerMetrics.scala |  1 -
 .../apache/samza/container/TaskInstance.scala   | 34 ++-------
 .../samza/container/TaskInstanceMetrics.scala   |  2 +-
 .../apache/samza/task/ReadableCollector.scala   | 37 ----------
 .../samza/task/TaskInstanceCollector.scala      | 72 ++++++++++++++++++++
 .../apache/samza/container/TestRunLoop.scala    |  6 +-
 .../samza/container/TestSamzaContainer.scala    |  6 +-
 .../samza/container/TestTaskInstance.scala      |  4 +-
 .../system/kafka/KafkaSystemProducer.scala      | 67 +++++++++---------
 .../kafka/KafkaSystemProducerMetrics.scala      |  1 +
 .../performance/TestKeyValuePerformance.scala   | 13 +++-
 .../samza/job/yarn/SamzaAppMasterMetrics.scala  |  1 -
 14 files changed, 139 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 3a264ad..6862460 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -64,7 +64,6 @@ class RunLoop(
     while (!shutdownNow) {
       process
       window
-      send
       commit
     }
   }
@@ -121,18 +120,6 @@ class RunLoop(
   }
 
   /**
-   * If task instances published any messages to output streams, this flushes
-   * them to the underlying systems.
-   */
-  private def send {
-    updateTimer(metrics.sendMs) {
-      trace("Triggering send in task instances.")
-      metrics.sends.inc
-      taskInstances.values.foreach(_.send)
-    }
-  }
-
-  /**
    * Commits task state as a a checkpoint, if necessary.
    */
   private def commit {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index d574ac4..0ab8a55 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -55,12 +55,12 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.chooser.DefaultChooser
 import org.apache.samza.system.chooser.MessageChooserFactory
 import org.apache.samza.system.chooser.RoundRobinChooserFactory
-import org.apache.samza.task.ReadableCollector
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskLifecycleListener
 import org.apache.samza.task.TaskLifecycleListenerFactory
 import org.apache.samza.util.Util
 import scala.collection.JavaConversions._
+import org.apache.samza.task.TaskInstanceCollector
 
 object SamzaContainer extends Logging {
 
@@ -410,10 +410,10 @@ object SamzaContainer extends Logging {
 
       val task = Util.getObj[StreamTask](taskClassName)
 
-      val collector = new ReadableCollector
-
       val taskInstanceMetrics = new TaskInstanceMetrics("TaskName-%s" format 
taskName)
 
+      val collector = new TaskInstanceCollector(producerMultiplexer, 
taskInstanceMetrics)
+
       val storeConsumers = changeLogSystemStreams
         .map {
           case (storeName, changeLogSystemStream) =>
@@ -481,13 +481,12 @@ object SamzaContainer extends Logging {
         config = config,
         metrics = taskInstanceMetrics,
         consumerMultiplexer = consumerMultiplexer,
-        producerMultiplexer = producerMultiplexer,
+        collector = collector,
         offsetManager = offsetManager,
         storageManager = storageManager,
         reporters = reporters,
         listeners = listeners,
-        systemStreamPartitions = systemStreamPartitions,
-        collector = collector)
+        systemStreamPartitions = systemStreamPartitions)
 
       (taskName, taskInstance)
     }).toMap

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 44d5dff..7d9ff00 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -37,5 +37,4 @@ class SamzaContainerMetrics(
   val windowMs = newTimer("window-ms")
   val processMs = newTimer("process-ms")
   val commitMs = newTimer("commit-ms")
-  val sendMs = newTimer("send-ms")
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 9484ddb..92d48eb 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -31,13 +31,12 @@ import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.task.WindowableTask
 import org.apache.samza.task.TaskLifecycleListener
 import org.apache.samza.task.StreamTask
-import org.apache.samza.task.ReadableCollector
 import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.SystemProducers
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.checkpoint.OffsetManager
 import org.apache.samza.SamzaException
 import scala.collection.JavaConversions._
+import org.apache.samza.task.TaskInstanceCollector
 
 class TaskInstance(
   task: StreamTask,
@@ -45,13 +44,12 @@ class TaskInstance(
   config: Config,
   metrics: TaskInstanceMetrics,
   consumerMultiplexer: SystemConsumers,
-  producerMultiplexer: SystemProducers,
+  collector: TaskInstanceCollector,
   offsetManager: OffsetManager = new OffsetManager,
   storageManager: TaskStorageManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
   listeners: Seq[TaskLifecycleListener] = Seq(),
-  val systemStreamPartitions: Set[SystemStreamPartition] = Set(),
-  collector: ReadableCollector = new ReadableCollector) extends Logging {
+  val systemStreamPartitions: Set[SystemStreamPartition] = Set()) extends 
Logging {
   val isInitableTask = task.isInstanceOf[InitableTask]
   val isWindowableTask = task.isInstanceOf[WindowableTask]
   val isClosableTask = task.isInstanceOf[ClosableTask]
@@ -107,7 +105,7 @@ class TaskInstance(
   def registerProducers {
     debug("Registering producers for taskName: %s" format taskName)
 
-    producerMultiplexer.register(metrics.source)
+    collector.register
   }
 
   def registerConsumers {
@@ -151,25 +149,6 @@ class TaskInstance(
     }
   }
 
-  def send {
-    if (collector.envelopes.size > 0) {
-      trace("Sending messages for taskName: %s, %s" format (taskName, 
collector.envelopes.size))
-
-      metrics.sends.inc
-      metrics.messagesSent.inc(collector.envelopes.size)
-
-      collector.envelopes.foreach(envelope => 
producerMultiplexer.send(metrics.source, envelope))
-
-      trace("Resetting collector for taskName: %s" format taskName)
-
-      collector.reset
-    } else {
-      trace("Skipping send for taskName %s because no messages were 
collected." format taskName)
-
-      metrics.sendsSkipped.inc
-    }
-  }
-
   def commit {
     trace("Flushing state stores for taskName: %s" format taskName)
 
@@ -179,7 +158,7 @@ class TaskInstance(
 
     trace("Flushing producers for taskName: %s" format taskName)
 
-    producerMultiplexer.flush(metrics.source)
+    collector.flush
 
     trace("Committing offset manager for taskName: %s" format taskName)
 
@@ -212,6 +191,5 @@ class TaskInstance(
 
   override def toString() = "TaskInstance for class %s and taskName %s." 
format (task.getClass.getName, taskName)
 
-  def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, 
closable=%s, collector_size=%s]" format (taskName, isWindowableTask, 
isClosableTask, collector.envelopes.size)
-
+  def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, 
closable=%s]" format (taskName, isWindowableTask, isClosableTask)
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
index aae3f87..9dc7051 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
@@ -32,7 +32,7 @@ class TaskInstanceMetrics(
   val windows = newCounter("window-calls")
   val processes = newCounter("process-calls")
   val sends = newCounter("send-calls")
-  val sendsSkipped = newCounter("send-skipped")
+  val flushes = newCounter("flush-calls")
   val messagesSent = newCounter("messages-sent")
 
   def addOffsetGauge(systemStreamPartition: SystemStreamPartition, getValue: 
() => String) {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala 
b/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala
deleted file mode 100644
index 444bf37..0000000
--- a/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task
-
-import scala.collection.mutable
-
-import org.apache.samza.system.OutgoingMessageEnvelope
-
-/** An in-memory implementation of MessageCollector that stores all outgoing 
messages in a list */
-class ReadableCollector extends MessageCollector {
-  val envelopes = new mutable.ArrayBuffer[OutgoingMessageEnvelope]()
-
-  def send(envelope: OutgoingMessageEnvelope) {
-    envelopes += envelope
-  }
-
-  def getEnvelopes = envelopes
-
-  def reset() = envelopes.clear
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/main/scala/org/apache/samza/task/TaskInstanceCollector.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/task/TaskInstanceCollector.scala 
b/samza-core/src/main/scala/org/apache/samza/task/TaskInstanceCollector.scala
new file mode 100644
index 0000000..ec6b6f4
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/task/TaskInstanceCollector.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task
+
+import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.samza.system.SystemProducers
+import org.apache.samza.container.TaskInstanceMetrics
+import grizzled.slf4j.Logging
+
+/**
+ * TaskInstanceCollector is an implementation of MessageCollector that sends
+ * messages to the underlying SystemProducers class immediately. Since the
+ * SystemProducers object is typically shared between TaskInstances (in order
+ * to share connections to the underlying system), this class is necessary as
+ * a way to fill in the "source" for the underlying SystemProducers. If a
+ * StreamTask calls collector.send, the messages will be immediately given to
+ * the SystemProducers, which will in turn forward the outgoing message
+ * immediately to the underlying producer for the system. Note that, if the
+ * underlying system producer buffers messages, then using this collector will
+ * still not result in an immediate send, but calling flush on it should.
+ */
+class TaskInstanceCollector(
+  producerMultiplexer: SystemProducers,
+  metrics: TaskInstanceMetrics = new TaskInstanceMetrics) extends 
MessageCollector with Logging {
+
+  /**
+   * Register as a new source with SystemProducers. This allows this collector
+   * to send messages to the SystemProducers.
+   */
+  def register {
+    debug("Registering source: %s" format metrics.source)
+    producerMultiplexer.register(metrics.source)
+  }
+
+  /**
+   * Sends a message to the underlying SystemProducers.
+   *
+   * @param envelope An outgoing envelope that's to be sent to SystemProducers.
+   */
+  def send(envelope: OutgoingMessageEnvelope) {
+    trace("Sending message from source: %s, %s" format (metrics.source, 
envelope))
+    metrics.sends.inc
+    metrics.messagesSent.inc
+    producerMultiplexer.send(metrics.source, envelope)
+  }
+
+  /**
+   * Flushes the underlying SystemProducers.
+   */
+  def flush {
+    trace("Flushing messages from source: %s" format metrics.source)
+    metrics.flushes.inc
+    producerMultiplexer.flush(metrics.source)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index 86b7f31..ff425da 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -191,9 +191,9 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ShouldMatche
       commitMs = 1L,
       clock = () => {
         now += 1L
-        // clock() is called 15 times totally in RunLoop
+        // clock() is called 13 times totally in RunLoop
         // stop the runLoop after one run
-        if (now == 15L) throw new StopRunLoop
+        if (now == 13L) throw new StopRunLoop
         now
       })
     intercept[StopRunLoop] { runLoop.run }
@@ -202,7 +202,6 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ShouldMatche
     testMetrics.windowMs.getSnapshot.getAverage should equal(3L)
     testMetrics.processMs.getSnapshot.getAverage should equal(3L)
     testMetrics.commitMs.getSnapshot.getAverage should equal(3L)
-    testMetrics.sendMs.getSnapshot.getAverage should equal(1L)
 
     now = 0L
     intercept[StopRunLoop] { runLoop.run }
@@ -211,6 +210,5 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ShouldMatche
     testMetrics.windowMs.getSnapshot.getSize should equal(2)
     testMetrics.processMs.getSnapshot.getSize should equal(2)
     testMetrics.commitMs.getSnapshot.getSize should equal(2)
-    testMetrics.sendMs.getSnapshot.getSize should equal(2)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index e3c7fe3..9fc6771 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -41,6 +41,7 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.StreamMetadataCache
+import org.apache.samza.task.TaskInstanceCollector
 
 class TestSamzaContainer {
   @Test
@@ -86,13 +87,14 @@ class TestSamzaContainer {
     val producerMultiplexer = new SystemProducers(
       Map[String, SystemProducer](),
       new SerdeManager)
+    val collector = new TaskInstanceCollector(producerMultiplexer)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
       config,
       new TaskInstanceMetrics,
-      consumerMultiplexer: SystemConsumers,
-      producerMultiplexer: SystemProducers)
+      consumerMultiplexer,
+      collector)
     val runLoop = new RunLoop(
       taskInstances = Map(taskName -> taskInstance),
       consumerMultiplexer = consumerMultiplexer,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 9d5ff13..be53373 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -40,6 +40,7 @@ import org.apache.samza.checkpoint.OffsetManager
 import org.apache.samza.system.SystemStreamMetadata
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import scala.collection.JavaConversions._
+import org.apache.samza.task.TaskInstanceCollector
 
 class TestTaskInstance {
   @Test
@@ -62,13 +63,14 @@ class TestTaskInstance {
     val testSystemStreamMetadata = new 
SystemStreamMetadata(systemStream.getStream, Map(partition -> new 
SystemStreamPartitionMetadata("0", "1", "2")))
     val offsetManager = OffsetManager(Map(systemStream -> 
testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
+    val collector = new TaskInstanceCollector(producerMultiplexer)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
       config,
       new TaskInstanceMetrics,
       consumerMultiplexer,
-      producerMultiplexer,
+      collector,
       offsetManager)
     // Pretend we got a message with offset 2 and next offset 3.
     val coordinator = new ReadableCoordinator(taskName)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index 22c8f0c..3264cbd 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -26,13 +26,15 @@ import kafka.producer.Producer
 import org.apache.samza.system.SystemProducer
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.util.ExponentialSleepStrategy
+import org.apache.samza.util.TimerUtils
 
 class KafkaSystemProducer(
   systemName: String,
   batchSize: Int,
   retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
   getProducer: () => Producer[Object, Object],
-  metrics: KafkaSystemProducerMetrics) extends SystemProducer with Logging {
+  metrics: KafkaSystemProducerMetrics,
+  val clock: () => Long = () => System.currentTimeMillis) extends 
SystemProducer with Logging with TimerUtils {
 
   var sourceBuffers = Map[String, ArrayBuffer[KeyedMessage[Object, Object]]]()
   var producer: Producer[Object, Object] = null
@@ -69,36 +71,37 @@ class KafkaSystemProducer(
   }
 
   def flush(source: String) {
-    val buffer = sourceBuffers(source)
-    trace("Flushing buffer with size: %s." format buffer.size)
-    metrics.flushes.inc
-
-    retryBackoff.run(
-      loop => {
-        if (producer == null) {
-          info("Creating a new producer for system %s." format systemName)
-          producer = getProducer()
-          debug("Created a new producer for system %s." format systemName)
-        }
-
-        producer.send(buffer: _*)
-        loop.done
-        metrics.flushSizes.inc(buffer.size)
-      },
-
-      (exception, loop) => {
-        warn("Triggering a reconnect for %s because connection failed: %s" 
format (systemName, exception))
-        debug("Exception detail: ", exception)
-        metrics.reconnects.inc
-
-        if (producer != null) {
-          producer.close
-          producer = null
-        }
-      }
-    )
-
-    buffer.clear
-    trace("Flushed buffer.")
+    updateTimer(metrics.flushMs) {
+      val buffer = sourceBuffers(source)
+      trace("Flushing buffer with size: %s." format buffer.size)
+      metrics.flushes.inc
+
+      retryBackoff.run(
+        loop => {
+          if (producer == null) {
+            info("Creating a new producer for system %s." format systemName)
+            producer = getProducer()
+            debug("Created a new producer for system %s." format systemName)
+          }
+
+          producer.send(buffer: _*)
+          loop.done
+          metrics.flushSizes.inc(buffer.size)
+        },
+
+        (exception, loop) => {
+          warn("Triggering a reconnect for %s because connection failed: %s" 
format (systemName, exception))
+          debug("Exception detail: ", exception)
+          metrics.reconnects.inc
+
+          if (producer != null) {
+            producer.close
+            producer = null
+          }
+        })
+
+      buffer.clear
+      trace("Flushed buffer.")
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
index ad39157..7e1383f 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
@@ -32,6 +32,7 @@ class KafkaSystemProducerMetrics(val systemName: String = 
"unknown", val registr
   val sends = newCounter("producer-sends")
   val flushes = newCounter("flushes")
   val flushSizes = newCounter("flush-sizes")
+  val flushMs = newTimer("flush-ms")
 
   def setBufferSize(source: String, getValue: () => Int) {
     newGauge("%s-producer-buffer-size" format source, getValue)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 7d0b8db..7f8663d 100644
--- 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -22,17 +22,20 @@ package org.apache.samza.test.performance
 import grizzled.slf4j.Logging
 import org.apache.samza.config.Config
 import org.apache.samza.config.StorageConfig._
-import org.apache.samza.container.{TaskName, SamzaContainerContext}
+import org.apache.samza.container.{ TaskName, SamzaContainerContext }
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.storage.kv.KeyValueStorageEngine
 import org.apache.samza.storage.StorageEngineFactory
-import org.apache.samza.task.ReadableCollector
 import org.apache.samza.util.CommandLine
 import org.apache.samza.util.Util
 import org.apache.samza.serializers.ByteSerde
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
+import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.system.SystemProducers
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.serializers.SerdeManager
 import java.io.File
 import java.util.UUID
 
@@ -97,6 +100,10 @@ object TestKeyValuePerformance extends Logging {
         (storeName, Util.getObj[StorageEngineFactory[Array[Byte], 
Array[Byte]]](storageFactoryClassName))
       }).toMap
 
+    val producerMultiplexer = new SystemProducers(
+      Map[String, SystemProducer](),
+      new SerdeManager)
+
     for ((storeName, storageEngine) <- storageEngineFactories) {
       val output = new File("/tmp/" + UUID.randomUUID)
 
@@ -107,7 +114,7 @@ object TestKeyValuePerformance extends Logging {
         output,
         serde,
         serde,
-        new ReadableCollector,
+        new TaskInstanceCollector(producerMultiplexer),
         new MetricsRegistryMap,
         null,
         new SamzaContainerContext("test", config, taskNames))

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index 851aae6..28ed2c8 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -33,7 +33,6 @@ import grizzled.slf4j.Logging
 import org.apache.samza.SamzaException
 import java.util.Timer
 import java.util.TimerTask
-import org.apache.samza.task.ReadableCollector
 import org.apache.samza.metrics.MetricsHelper
 
 object SamzaAppMasterMetrics {

Reply via email to