Actually propagate and commit state in direct runner

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55176c38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55176c38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55176c38

Branch: refs/heads/python-sdk
Commit: 55176c385cc802be42b5467fbb2dcc9a1c7467ea
Parents: 4fb16e8
Author: Kenneth Knowles <k...@google.com>
Authored: Tue Dec 20 15:59:45 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Wed Dec 21 13:11:22 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java   | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55176c38/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 5f9d8f4..003df0f 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -233,6 +233,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, 
OutputT> implements Transfo
           StepTransformResult.<KeyedWorkItem<K, KV<K, InputT>>>withHold(
                   delegateResult.getTransform(), 
delegateResult.getWatermarkHold())
               .withTimerUpdate(delegateResult.getTimerUpdate())
+              .withState(delegateResult.getState())
               .withAggregatorChanges(delegateResult.getAggregatorChanges())
               .withMetricUpdates(delegateResult.getLogicalMetricUpdates())
               
.addOutput(Lists.newArrayList(delegateResult.getOutputBundles()));

Reply via email to