Reject timers for ParDo in FlinkRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69e0ea25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69e0ea25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69e0ea25 Branch: refs/heads/master Commit: 69e0ea25f24597b84c93137dd94e2f25a9b88a15 Parents: 18db3ac Author: Kenneth Knowles <k...@google.com> Authored: Wed Dec 7 20:34:59 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Dec 8 09:53:08 2016 -0800 ---------------------------------------------------------------------- .../FlinkBatchTransformTranslators.java | 46 ++++++++++++-------- .../FlinkStreamingTransformTranslators.java | 45 +++++++++++-------- 2 files changed, 54 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69e0ea25/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java index 474d4e3..9ac907f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -483,6 +484,30 @@ class FlinkBatchTransformTranslators { } } + private static void rejectStateAndTimers(DoFn<?, ?> doFn) { + DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); + + if (signature.stateDeclarations().size() > 0) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", + DoFn.StateId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + FlinkRunner.class.getSimpleName())); + } + + if (signature.timerDeclarations().size() > 0) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.", + DoFn.TimerId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + FlinkRunner.class.getSimpleName())); + } + } + private static class ParDoBoundTranslatorBatch<InputT, OutputT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator< ParDo.Bound<InputT, OutputT>> { @@ -493,15 +518,7 @@ class FlinkBatchTransformTranslators { FlinkBatchTranslationContext context) { DoFn<InputT, OutputT> doFn = transform.getNewFn(); - if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) { - throw new UnsupportedOperationException( - String.format( - "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", - DoFn.StateId.class.getSimpleName(), - doFn.getClass().getName(), - DoFn.class.getSimpleName(), - FlinkRunner.class.getSimpleName())); - } + rejectStateAndTimers(doFn); DataSet<WindowedValue<InputT>> inputDataSet = context.getInputDataSet(context.getInput(transform)); @@ -549,16 +566,7 @@ class FlinkBatchTransformTranslators { ParDo.BoundMulti<InputT, OutputT> transform, FlinkBatchTranslationContext context) { DoFn<InputT, OutputT> doFn = transform.getNewFn(); - if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) { - throw new UnsupportedOperationException( - String.format( - "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", - DoFn.StateId.class.getSimpleName(), - doFn.getClass().getName(), - DoFn.class.getSimpleName(), - FlinkRunner.class.getSimpleName())); - } - + rejectStateAndTimers(doFn); DataSet<WindowedValue<InputT>> inputDataSet = context.getInputDataSet(context.getInput(transform)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69e0ea25/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 7b32c76..042f8df 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -57,6 +57,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -304,6 +305,30 @@ public class FlinkStreamingTransformTranslators { } } + private static void rejectStateAndTimers(DoFn<?, ?> doFn) { + DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); + + if (signature.stateDeclarations().size() > 0) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", + DoFn.StateId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + FlinkRunner.class.getSimpleName())); + } + + if (signature.timerDeclarations().size() > 0) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.", + DoFn.TimerId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + FlinkRunner.class.getSimpleName())); + } + } + private static class ParDoBoundStreamingTranslator<InputT, OutputT> extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< ParDo.Bound<InputT, OutputT>> { @@ -314,15 +339,7 @@ public class FlinkStreamingTransformTranslators { FlinkStreamingTranslationContext context) { DoFn<InputT, OutputT> doFn = transform.getNewFn(); - if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) { - throw new UnsupportedOperationException( - String.format( - "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", - DoFn.StateId.class.getSimpleName(), - doFn.getClass().getName(), - DoFn.class.getSimpleName(), - FlinkRunner.class.getSimpleName())); - } + rejectStateAndTimers(doFn); WindowingStrategy<?, ?> windowingStrategy = context.getOutput(transform).getWindowingStrategy(); @@ -474,15 +491,7 @@ public class FlinkStreamingTransformTranslators { FlinkStreamingTranslationContext context) { DoFn<InputT, OutputT> doFn = transform.getNewFn(); - if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) { - throw new UnsupportedOperationException( - String.format( - "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", - DoFn.StateId.class.getSimpleName(), - doFn.getClass().getName(), - DoFn.class.getSimpleName(), - FlinkRunner.class.getSimpleName())); - } + rejectStateAndTimers(doFn); // we assume that the transformation does not change the windowing strategy. WindowingStrategy<?, ?> windowingStrategy =