Reject timers for ParDo in FlinkRunner

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69e0ea25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69e0ea25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69e0ea25

Branch: refs/heads/master
Commit: 69e0ea25f24597b84c93137dd94e2f25a9b88a15
Parents: 18db3ac
Author: Kenneth Knowles <k...@google.com>
Authored: Wed Dec 7 20:34:59 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 .../FlinkBatchTransformTranslators.java         | 46 ++++++++++++--------
 .../FlinkStreamingTransformTranslators.java     | 45 +++++++++++--------
 2 files changed, 54 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69e0ea25/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 474d4e3..9ac907f 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -483,6 +484,30 @@ class FlinkBatchTransformTranslators {
     }
   }
 
+  private static void rejectStateAndTimers(DoFn<?, ?> doFn) {
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+    if (signature.stateDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with 
state in the %s.",
+              DoFn.StateId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              FlinkRunner.class.getSimpleName()));
+    }
+
+    if (signature.timerDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with 
timers in the %s.",
+              DoFn.TimerId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              FlinkRunner.class.getSimpleName()));
+    }
+  }
+
   private static class ParDoBoundTranslatorBatch<InputT, OutputT>
       implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
           ParDo.Bound<InputT, OutputT>> {
@@ -493,15 +518,7 @@ class FlinkBatchTransformTranslators {
 
         FlinkBatchTranslationContext context) {
       DoFn<InputT, OutputT> doFn = transform.getNewFn();
-      if 
(DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
-        throw new UnsupportedOperationException(
-            String.format(
-                "Found %s annotations on %s, but %s cannot yet be used with 
state in the %s.",
-                DoFn.StateId.class.getSimpleName(),
-                doFn.getClass().getName(),
-                DoFn.class.getSimpleName(),
-                FlinkRunner.class.getSimpleName()));
-      }
+      rejectStateAndTimers(doFn);
 
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
@@ -549,16 +566,7 @@ class FlinkBatchTransformTranslators {
         ParDo.BoundMulti<InputT, OutputT> transform,
         FlinkBatchTranslationContext context) {
       DoFn<InputT, OutputT> doFn = transform.getNewFn();
-      if 
(DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
-        throw new UnsupportedOperationException(
-            String.format(
-                "Found %s annotations on %s, but %s cannot yet be used with 
state in the %s.",
-                DoFn.StateId.class.getSimpleName(),
-                doFn.getClass().getName(),
-                DoFn.class.getSimpleName(),
-                FlinkRunner.class.getSimpleName()));
-      }
-
+      rejectStateAndTimers(doFn);
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69e0ea25/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 7b32c76..042f8df 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -304,6 +305,30 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
+  private static void rejectStateAndTimers(DoFn<?, ?> doFn) {
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+    if (signature.stateDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with 
state in the %s.",
+              DoFn.StateId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              FlinkRunner.class.getSimpleName()));
+    }
+
+    if (signature.timerDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with 
timers in the %s.",
+              DoFn.TimerId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              FlinkRunner.class.getSimpleName()));
+    }
+  }
+
   private static class ParDoBoundStreamingTranslator<InputT, OutputT>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
         ParDo.Bound<InputT, OutputT>> {
@@ -314,15 +339,7 @@ public class FlinkStreamingTransformTranslators {
         FlinkStreamingTranslationContext context) {
 
       DoFn<InputT, OutputT> doFn = transform.getNewFn();
-      if 
(DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
-        throw new UnsupportedOperationException(
-            String.format(
-                "Found %s annotations on %s, but %s cannot yet be used with 
state in the %s.",
-                DoFn.StateId.class.getSimpleName(),
-                doFn.getClass().getName(),
-                DoFn.class.getSimpleName(),
-                FlinkRunner.class.getSimpleName()));
-      }
+      rejectStateAndTimers(doFn);
 
       WindowingStrategy<?, ?> windowingStrategy =
           context.getOutput(transform).getWindowingStrategy();
@@ -474,15 +491,7 @@ public class FlinkStreamingTransformTranslators {
         FlinkStreamingTranslationContext context) {
 
       DoFn<InputT, OutputT> doFn = transform.getNewFn();
-      if 
(DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
-        throw new UnsupportedOperationException(
-            String.format(
-                "Found %s annotations on %s, but %s cannot yet be used with 
state in the %s.",
-                DoFn.StateId.class.getSimpleName(),
-                doFn.getClass().getName(),
-                DoFn.class.getSimpleName(),
-                FlinkRunner.class.getSimpleName()));
-      }
+      rejectStateAndTimers(doFn);
 
       // we assume that the transformation does not change the windowing 
strategy.
       WindowingStrategy<?, ?> windowingStrategy =

Reply via email to