Repository: incubator-samza
Updated Branches:
  refs/heads/0.7.0 5b81c0402 -> 477ad024e


SAMAZ-258; make task.window.ms and task.commit.ms accurate


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/477ad024
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/477ad024
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/477ad024

Branch: refs/heads/0.7.0
Commit: 477ad024e979e4b64468943645bb3c0909fa0048
Parents: 5b81c04
Author: Chris Riccomini <[email protected]>
Authored: Mon May 5 12:20:17 2014 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Mon May 5 12:22:34 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/container/TaskInstance.scala   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/477ad024/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index c4b135c..049fd1f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -161,8 +161,8 @@ class TaskInstance(
 
       metrics.windows.inc
 
-      task.asInstanceOf[WindowableTask].window(collector, coordinator)
       lastWindowMs = clock()
+      task.asInstanceOf[WindowableTask].window(collector, coordinator)
 
       trace("Assigned last window time for partition: %s, %s" format 
(partition, lastWindowMs))
     } else {
@@ -197,6 +197,8 @@ class TaskInstance(
 
       metrics.commits.inc
 
+      lastCommitMs = clock()
+
       storageManager.flush
 
       trace("Flushing producers for partition: %s" format partition)
@@ -206,8 +208,6 @@ class TaskInstance(
       trace("Committing offset manager for partition: %s" format partition)
 
       offsetManager.checkpoint(partition)
-
-      lastCommitMs = clock()
     } else {
       trace("Skipping commit for partition: %s" format partition)
 

Reply via email to