Repository: incubator-samza
Updated Branches:
  refs/heads/master 8acef467f -> 3964ce735


SAMZA-251: add metrics for choose/process/window/commit/send time


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/3964ce73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/3964ce73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/3964ce73

Branch: refs/heads/master
Commit: 3964ce73530fac3d658b088e1f005054bc7214d1
Parents: 8acef46
Author: Yan Fang <[email protected]>
Authored: Thu Aug 7 23:01:17 2014 -0700
Committer: Yan Fang <[email protected]>
Committed: Thu Aug 7 23:01:17 2014 -0700

----------------------------------------------------------------------
 .../org/apache/samza/container/RunLoop.scala    | 97 +++++++++++---------
 .../samza/container/SamzaContainerMetrics.scala |  5 +
 .../org/apache/samza/util/TimerUtils.scala      | 41 +++++++++
 .../apache/samza/container/TestRunLoop.scala    | 44 ++++++++-
 4 files changed, 139 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3964ce73/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 7fb4763..6851731 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -20,8 +20,9 @@
 package org.apache.samza.container
 
 import grizzled.slf4j.Logging
-import org.apache.samza.system.{SystemStreamPartition, SystemConsumers}
+import org.apache.samza.system.{ SystemStreamPartition, SystemConsumers }
 import org.apache.samza.task.ReadableCoordinator
+import org.apache.samza.util.TimerUtils
 
 /**
  * Each {@link SamzaContainer} uses a single-threaded execution model: 
activities for
@@ -38,7 +39,7 @@ class RunLoop(
   val metrics: SamzaContainerMetrics,
   val windowMs: Long = -1,
   val commitMs: Long = 60000,
-  val clock: () => Long = { System.currentTimeMillis }) extends Runnable with 
Logging {
+  val clock: () => Long = { System.currentTimeMillis }) extends Runnable with 
TimerUtils with Logging {
 
   private var lastWindowMs = 0L
   private var lastCommitMs = 0L
@@ -52,7 +53,7 @@ class RunLoop(
     // We could just pass in the SystemStreamPartitionMap during construction, 
but it's safer and cleaner to derive the information directly
     def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance) = 
taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap
 
-    taskInstances.values.map{ getSystemStreamPartitionToTaskInstance 
}.flatten.toMap
+    taskInstances.values.map { getSystemStreamPartitionToTaskInstance 
}.flatten.toMap
   }
 
   /**
@@ -76,22 +77,27 @@ class RunLoop(
     trace("Attempting to choose a message to process.")
     metrics.processes.inc
 
-    val envelope = consumerMultiplexer.choose
+    updateTimer(metrics.processMs) {
 
-    if (envelope != null) {
-      val ssp = envelope.getSystemStreamPartition
+      val envelope = updateTimer(metrics.chooseMs) {
+        consumerMultiplexer.choose
+      }
+
+      if (envelope != null) {
+        val ssp = envelope.getSystemStreamPartition
 
-      trace("Processing incoming message envelope for SSP %s." format ssp)
-      metrics.envelopes.inc
+        trace("Processing incoming message envelope for SSP %s." format ssp)
+        metrics.envelopes.inc
 
-      val taskInstance = systemStreamPartitionToTaskInstance(ssp)
+        val taskInstance = systemStreamPartitionToTaskInstance(ssp)
 
-      val coordinator = new ReadableCoordinator(taskInstance.taskName)
-      taskInstance.process(envelope, coordinator)
-      checkCoordinator(coordinator)
-    } else {
-      trace("No incoming message envelope was available.")
-      metrics.nullEnvelopes.inc
+        val coordinator = new ReadableCoordinator(taskInstance.taskName)
+        taskInstance.process(envelope, coordinator)
+        checkCoordinator(coordinator)
+      } else {
+        trace("No incoming message envelope was available.")
+        metrics.nullEnvelopes.inc
+      }
     }
   }
 
@@ -99,52 +105,56 @@ class RunLoop(
    * Invokes WindowableTask.window on all tasks if it's time to do so.
    */
   private def window {
-    if (windowMs >= 0 && lastWindowMs + windowMs < clock()) {
-      trace("Windowing stream tasks.")
-      lastWindowMs = clock()
-      metrics.windows.inc
-
-      taskInstances.foreach { case (taskName, task) =>
-        val coordinator = new ReadableCoordinator(taskName)
-        task.window(coordinator)
-        checkCoordinator(coordinator)
+    updateTimer(metrics.windowMs) {
+      if (windowMs >= 0 && lastWindowMs + windowMs < clock()) {
+        trace("Windowing stream tasks.")
+        lastWindowMs = clock()
+        metrics.windows.inc
+
+        taskInstances.foreach {
+          case (taskName, task) =>
+            val coordinator = new ReadableCoordinator(taskName)
+            task.window(coordinator)
+            checkCoordinator(coordinator)
+        }
       }
     }
   }
 
-
   /**
    * If task instances published any messages to output streams, this flushes
    * them to the underlying systems.
    */
   private def send {
-    trace("Triggering send in task instances.")
-    metrics.sends.inc
-    taskInstances.values.foreach(_.send)
+    updateTimer(metrics.sendMs) {
+      trace("Triggering send in task instances.")
+      metrics.sends.inc
+      taskInstances.values.foreach(_.send)
+    }
   }
 
-
   /**
    * Commits task state as a a checkpoint, if necessary.
    */
   private def commit {
-    if (commitMs >= 0 && lastCommitMs + commitMs < clock()) {
-      trace("Committing task instances because the commit interval has 
elapsed.")
-      lastCommitMs = clock()
-      metrics.commits.inc
-      taskInstances.values.foreach(_.commit)
-    } else if (!taskCommitRequests.isEmpty) {
-      trace("Committing due to explicit commit request.")
-      metrics.commits.inc
-      taskCommitRequests.foreach(taskName => {
-        taskInstances(taskName).commit
-      })
-    }
+    updateTimer(metrics.commitMs) {
+      if (commitMs >= 0 && lastCommitMs + commitMs < clock()) {
+        trace("Committing task instances because the commit interval has 
elapsed.")
+        lastCommitMs = clock()
+        metrics.commits.inc
+        taskInstances.values.foreach(_.commit)
+      } else if (!taskCommitRequests.isEmpty) {
+        trace("Committing due to explicit commit request.")
+        metrics.commits.inc
+        taskCommitRequests.foreach(taskName => {
+          taskInstances(taskName).commit
+        })
+      }
 
-    taskCommitRequests = Set()
+      taskCommitRequests = Set()
+    }
   }
 
-
   /**
    * A new TaskCoordinator object is passed to a task on every call to 
StreamTask.process
    * and WindowableTask.window. This method checks whether the task requested 
that we
@@ -172,5 +182,4 @@ class RunLoop(
       shutdownNow = true
     }
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3964ce73/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index bcb3fa3..44d5dff 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -33,4 +33,9 @@ class SamzaContainerMetrics(
   val sends = newCounter("send-calls")
   val envelopes = newCounter("process-envelopes")
   val nullEnvelopes = newCounter("process-null-envelopes")
+  val chooseMs = newTimer("choose-ms")
+  val windowMs = newTimer("window-ms")
+  val processMs = newTimer("process-ms")
+  val commitMs = newTimer("commit-ms")
+  val sendMs = newTimer("send-ms")
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3964ce73/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala 
b/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
new file mode 100644
index 0000000..6fedb62
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+import org.apache.samza.metrics.Timer
+
+/**
+ * a helper class to facilitate update {@link org.apache.samza.metrics.Timer} 
metric
+ */
+trait TimerUtils {
+  val clock: () => Long
+
+  /**
+   * A helper method to update the {@link org.apache.samza.metrics.Timer} 
metric.
+   * It accepts a {@link org.apache.samza.metrics.Timer} instance and a code 
block.
+   * It updates the Timer instance with the duration of running code block.
+   */
+  def updateTimer[T](timer: Timer)(runCodeBlock: => T): T = {
+    val startingTime = clock()
+    val returnValue = runCodeBlock
+    timer.update(clock() - startingTime)
+    returnValue
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3964ce73/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index d4ceffc..86b7f31 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -28,7 +28,7 @@ import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.matchers.ShouldMatchers
 import org.scalatest.mock.MockitoSugar
 import org.apache.samza.Partition
-import org.apache.samza.system.{IncomingMessageEnvelope, SystemConsumers, 
SystemStreamPartition}
+import org.apache.samza.system.{ IncomingMessageEnvelope, SystemConsumers, 
SystemStreamPartition }
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.task.TaskCoordinator.RequestScope
 
@@ -70,7 +70,6 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ShouldMatche
     runLoop.metrics.nullEnvelopes.getCount should equal(0L)
   }
 
-
   @Test
   def testNullMessageFromChooser {
     val consumers = mock[SystemConsumers]
@@ -171,10 +170,47 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ShouldMatche
   def stubProcess(taskInstance: TaskInstance, process: 
(IncomingMessageEnvelope, ReadableCoordinator) => Unit) {
     when(taskInstance.process(anyObject, anyObject)).thenAnswer(new 
Answer[Unit]() {
       override def answer(invocation: InvocationOnMock) {
-        val envelope    = 
invocation.getArguments()(0).asInstanceOf[IncomingMessageEnvelope]
+        val envelope = 
invocation.getArguments()(0).asInstanceOf[IncomingMessageEnvelope]
         val coordinator = 
invocation.getArguments()(1).asInstanceOf[ReadableCoordinator]
         process(envelope, coordinator)
       }
     })
   }
-}
+
+  @Test
+  def testUpdateTimerCorrectly {
+    var now = 0L
+    val consumers = mock[SystemConsumers]
+    when(consumers.choose).thenReturn(envelope0)
+    val testMetrics = new SamzaContainerMetrics
+    val runLoop = new RunLoop(
+      taskInstances = getMockTaskInstances,
+      consumerMultiplexer = consumers,
+      metrics = testMetrics,
+      windowMs = 1L,
+      commitMs = 1L,
+      clock = () => {
+        now += 1L
+        // clock() is called 15 times totally in RunLoop
+        // stop the runLoop after one run
+        if (now == 15L) throw new StopRunLoop
+        now
+      })
+    intercept[StopRunLoop] { runLoop.run }
+
+    testMetrics.chooseMs.getSnapshot.getAverage should equal(1L)
+    testMetrics.windowMs.getSnapshot.getAverage should equal(3L)
+    testMetrics.processMs.getSnapshot.getAverage should equal(3L)
+    testMetrics.commitMs.getSnapshot.getAverage should equal(3L)
+    testMetrics.sendMs.getSnapshot.getAverage should equal(1L)
+
+    now = 0L
+    intercept[StopRunLoop] { runLoop.run }
+    // after two loops
+    testMetrics.chooseMs.getSnapshot.getSize should equal(2)
+    testMetrics.windowMs.getSnapshot.getSize should equal(2)
+    testMetrics.processMs.getSnapshot.getSize should equal(2)
+    testMetrics.commitMs.getSnapshot.getSize should equal(2)
+    testMetrics.sendMs.getSnapshot.getSize should equal(2)
+  }
+}
\ No newline at end of file

Reply via email to