Actually propagate and commit state in direct runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55176c38 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55176c38 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55176c38 Branch: refs/heads/python-sdk Commit: 55176c385cc802be42b5467fbb2dcc9a1c7467ea Parents: 4fb16e8 Author: Kenneth Knowles <k...@google.com> Authored: Tue Dec 20 15:59:45 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Dec 21 13:11:22 2016 -0800 ---------------------------------------------------------------------- .../apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java | 1 + 1 file changed, 1 insertion(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55176c38/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index 5f9d8f4..003df0f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -233,6 +233,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo StepTransformResult.<KeyedWorkItem<K, KV<K, InputT>>>withHold( delegateResult.getTransform(), delegateResult.getWatermarkHold()) .withTimerUpdate(delegateResult.getTimerUpdate()) + .withState(delegateResult.getState()) .withAggregatorChanges(delegateResult.getAggregatorChanges()) .withMetricUpdates(delegateResult.getLogicalMetricUpdates()) .addOutput(Lists.newArrayList(delegateResult.getOutputBundles()));