Move watermark updates to the end of handleResult

This ensures that any state modifications are visible before watermarks
advancement permits additional progress.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0cb3832e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0cb3832e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0cb3832e

Branch: refs/heads/apex-runner
Commit: 0cb3832eb21ffeebc33b433d57cb814957788037
Parents: 55d9519
Author: Thomas Groh <tg...@google.com>
Authored: Fri Oct 21 10:13:06 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 3 13:45:34 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/EvaluationContext.java   | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0cb3832e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index e5a30d4..965e77d 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -179,11 +179,6 @@ class EvaluationContext {
             : completedBundle.withElements((Iterable) 
result.getUnprocessedElements()),
         committedBundles,
         outputTypes);
-    watermarkManager.updateWatermarks(
-        completedBundle,
-        result.getTimerUpdate().withCompletedTimers(completedTimers),
-        committedResult,
-        result.getWatermarkHold());
     // Commit aggregator changes
     if (result.getAggregatorChanges() != null) {
       result.getAggregatorChanges().commit();
@@ -201,6 +196,13 @@ class EvaluationContext {
         applicationStateInternals.remove(stepAndKey);
       }
     }
+    // Watermarks are updated last to ensure visibility of any global state 
before progress is
+    // permitted
+    watermarkManager.updateWatermarks(
+        completedBundle,
+        result.getTimerUpdate().withCompletedTimers(completedTimers),
+        committedResult,
+        result.getWatermarkHold());
     return committedResult;
   }
 

Reply via email to