Repository: incubator-samza Updated Branches: refs/heads/master a79681327 -> cca93bd2d
SAMAZ-258; make task.window.ms and task.commit.ms accurate Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/cca93bd2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/cca93bd2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/cca93bd2 Branch: refs/heads/master Commit: cca93bd2d7ad5e8ea57867b919008ef406704b95 Parents: a796813 Author: Chris Riccomini <[email protected]> Authored: Mon May 5 12:20:17 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Mon May 5 12:20:17 2014 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/samza/container/TaskInstance.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cca93bd2/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index c4b135c..049fd1f 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -161,8 +161,8 @@ class TaskInstance( metrics.windows.inc - task.asInstanceOf[WindowableTask].window(collector, coordinator) lastWindowMs = clock() + task.asInstanceOf[WindowableTask].window(collector, coordinator) trace("Assigned last window time for partition: %s, %s" format (partition, lastWindowMs)) } else { @@ -197,6 +197,8 @@ class TaskInstance( metrics.commits.inc + lastCommitMs = clock() + storageManager.flush trace("Flushing producers for partition: %s" format partition) @@ -206,8 +208,6 @@ class TaskInstance( trace("Committing offset manager for partition: %s" format partition) offsetManager.checkpoint(partition) - - lastCommitMs = clock() } else { trace("Skipping commit for partition: %s" format partition)
