Move watermark updates to the end of handleResult This ensures that any state modifications are visible before watermarks advancement permits additional progress.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0cb3832e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0cb3832e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0cb3832e Branch: refs/heads/apex-runner Commit: 0cb3832eb21ffeebc33b433d57cb814957788037 Parents: 55d9519 Author: Thomas Groh <tg...@google.com> Authored: Fri Oct 21 10:13:06 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Thu Nov 3 13:45:34 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/direct/EvaluationContext.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0cb3832e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index e5a30d4..965e77d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -179,11 +179,6 @@ class EvaluationContext { : completedBundle.withElements((Iterable) result.getUnprocessedElements()), committedBundles, outputTypes); - watermarkManager.updateWatermarks( - completedBundle, - result.getTimerUpdate().withCompletedTimers(completedTimers), - committedResult, - result.getWatermarkHold()); // Commit aggregator changes if (result.getAggregatorChanges() != null) { result.getAggregatorChanges().commit(); @@ -201,6 +196,13 @@ class EvaluationContext { applicationStateInternals.remove(stepAndKey); } } + // Watermarks are updated last to ensure visibility of any global state before progress is + // permitted + watermarkManager.updateWatermarks( + completedBundle, + result.getTimerUpdate().withCompletedTimers(completedTimers), + committedResult, + result.getWatermarkHold()); return committedResult; }