Repository: incubator-gearpump
Updated Branches:
  refs/heads/master d6cc5da7b -> fb0f1ef31


[GEARPUMP-271] Don't count self messages in receiveThroughput

Author: manuzhang <[email protected]>

Closes #147 from manuzhang/source.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/fb0f1ef3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/fb0f1ef3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/fb0f1ef3

Branch: refs/heads/master
Commit: fb0f1ef312e39fde7b987be9a55d2e58115e53e5
Parents: d6cc5da
Author: manuzhang <[email protected]>
Authored: Tue Feb 14 06:10:39 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Tue Feb 14 06:10:46 2017 +0800

----------------------------------------------------------------------
 .../apache/gearpump/streaming/task/TaskActor.scala | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fb0f1ef3/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index c814fa5..14c2b59 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -147,11 +147,10 @@ class TaskActor(
     subscriptions.forall(_._2.allowSendingMoreMessages())
   }
 
-  private def doHandleMessage(): Unit = {
+  private def doHandleMessage(): Int = {
     var done = false
 
     var count = 0
-    val start = System.currentTimeMillis()
 
     while (allowSendingMoreMessages() && !done) {
       val msg = queue.poll()
@@ -171,10 +170,7 @@ class TaskActor(
       }
     }
 
-    receiveThroughput.mark(count)
-    if (count > 0) {
-      processTime.update((System.currentTimeMillis() - start) / count)
-    }
+    count
   }
 
   private def onStartClock(): Unit = {
@@ -296,7 +292,14 @@ class TaskActor(
     messageAfterCheck match {
       case Some(m) =>
         queue.add(m)
-        doHandleMessage()
+        val start = System.currentTimeMillis()
+        val count = doHandleMessage()
+        if (!self.eq(sender)) {
+          receiveThroughput.mark(count)
+        }
+        if (count > 0) {
+          processTime.update((System.currentTimeMillis() - start) / count)
+        }
       case None =>
       // TODO: Indicate the error and avoid the LOG flood
       // LOG.error(s"Task $taskId drop message $msg")

Reply via email to