Repository: incubator-beam Updated Branches: refs/heads/release-0.4.0-incubating b2780881a -> 10bb4767a
Revert "Allow stateful DoFn in DataflowRunner" This reverts commit 42bb15d2df28b99b6788010450f41f2932095771. The Dataflow service has introduced a bug that was masked by various test disabling. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1af44fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1af44fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1af44fa Branch: refs/heads/release-0.4.0-incubating Commit: c1af44fa27633fd2a9592a13579415f6b974cfe6 Parents: f78d960 Author: Kenneth Knowles <k...@google.com> Authored: Tue Dec 13 16:36:42 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Dec 13 16:57:26 2016 -0800 ---------------------------------------------------------------------- .../dataflow/DataflowPipelineTranslator.java | 22 +++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1af44fa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 8048df9..a56690c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -77,7 +77,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.Window; @@ -956,6 +955,7 @@ public class DataflowPipelineTranslator { private <InputT, OutputT> void translateMultiHelper( ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) { + rejectStatefulDoFn(transform.getNewFn()); context.addStep(transform, "ParallelDo"); translateInputs(context.getInput(transform), transform.getSideInputs(), context); @@ -985,6 +985,7 @@ public class DataflowPipelineTranslator { private <InputT, OutputT> void translateSingleHelper( ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { + rejectStatefulDoFn(transform.getNewFn()); context.addStep(transform, "ParallelDo"); translateInputs(context.getInput(transform), transform.getSideInputs(), context); @@ -1032,6 +1033,18 @@ public class DataflowPipelineTranslator { registerTransformTranslator(Read.Bounded.class, new ReadTranslator()); } + private static void rejectStatefulDoFn(DoFn<?, ?> doFn) { + if (DoFnSignatures.getSignature(doFn.getClass()).isStateful()) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", + DoFn.StateId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + DataflowRunner.class.getSimpleName())); + } + } + private static void translateInputs( PCollection<?> input, List<PCollectionView<?>> sideInputs, @@ -1063,9 +1076,6 @@ public class DataflowPipelineTranslator { TranslationContext context, long mainOutput, Map<Long, TupleTag<?>> outputMap) { - - DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); - context.addInput(PropertyNames.USER_FN, fn.getClass().getName()); context.addInput( PropertyNames.SERIALIZED_FN, @@ -1073,10 +1083,6 @@ public class DataflowPipelineTranslator { serializeToByteArray( DoFnInfo.forFn( fn, windowingStrategy, sideInputs, inputCoder, mainOutput, outputMap)))); - - if (signature.isStateful()) { - context.addInput(PropertyNames.USES_KEYED_STATE, "true"); - } } private static BiMap<Long, TupleTag<?>> translateOutputs(