Repository: incubator-beam
Updated Branches:
  refs/heads/release-0.4.0-incubating b2780881a -> 10bb4767a


Revert "Allow stateful DoFn in DataflowRunner"

This reverts commit 42bb15d2df28b99b6788010450f41f2932095771.

The Dataflow service has introduced a bug that was masked by various
test disabling.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1af44fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1af44fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1af44fa

Branch: refs/heads/release-0.4.0-incubating
Commit: c1af44fa27633fd2a9592a13579415f6b974cfe6
Parents: f78d960
Author: Kenneth Knowles <k...@google.com>
Authored: Tue Dec 13 16:36:42 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Dec 13 16:57:26 2016 -0800

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    | 22 +++++++++++++-------
 1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1af44fa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 8048df9..a56690c 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -77,7 +77,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -956,6 +955,7 @@ public class DataflowPipelineTranslator {
           private <InputT, OutputT> void translateMultiHelper(
               ParDo.BoundMulti<InputT, OutputT> transform,
               TranslationContext context) {
+            rejectStatefulDoFn(transform.getNewFn());
 
             context.addStep(transform, "ParallelDo");
             translateInputs(context.getInput(transform), 
transform.getSideInputs(), context);
@@ -985,6 +985,7 @@ public class DataflowPipelineTranslator {
           private <InputT, OutputT> void translateSingleHelper(
               ParDo.Bound<InputT, OutputT> transform,
               TranslationContext context) {
+            rejectStatefulDoFn(transform.getNewFn());
 
             context.addStep(transform, "ParallelDo");
             translateInputs(context.getInput(transform), 
transform.getSideInputs(), context);
@@ -1032,6 +1033,18 @@ public class DataflowPipelineTranslator {
     registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
   }
 
+  private static void rejectStatefulDoFn(DoFn<?, ?> doFn) {
+    if (DoFnSignatures.getSignature(doFn.getClass()).isStateful()) {
+    throw new UnsupportedOperationException(
+        String.format(
+            "Found %s annotations on %s, but %s cannot yet be used with state 
in the %s.",
+            DoFn.StateId.class.getSimpleName(),
+            doFn.getClass().getName(),
+            DoFn.class.getSimpleName(),
+            DataflowRunner.class.getSimpleName()));
+    }
+  }
+
   private static void translateInputs(
       PCollection<?> input,
       List<PCollectionView<?>> sideInputs,
@@ -1063,9 +1076,6 @@ public class DataflowPipelineTranslator {
       TranslationContext context,
       long mainOutput,
       Map<Long, TupleTag<?>> outputMap) {
-
-    DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
-
     context.addInput(PropertyNames.USER_FN, fn.getClass().getName());
     context.addInput(
         PropertyNames.SERIALIZED_FN,
@@ -1073,10 +1083,6 @@ public class DataflowPipelineTranslator {
             serializeToByteArray(
                 DoFnInfo.forFn(
                     fn, windowingStrategy, sideInputs, inputCoder, mainOutput, 
outputMap))));
-
-    if (signature.isStateful()) {
-      context.addInput(PropertyNames.USES_KEYED_STATE, "true");
-    }
   }
 
   private static BiMap<Long, TupleTag<?>> translateOutputs(

Reply via email to