This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 74ac703bbfa40d16e7d1115912768c7a63598d52
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Thu Feb 8 13:02:26 2018 +0100

    [BEAM-2140] Use ProcessFnRunner in DoFnRunner for executing SDF
    
    For this to work, we need to also change how we wrap values in
    KeyedWorkItems, because ProcessFnRunner expects them to be in the
    GlobalWindow.
---
 .../flink/FlinkStreamingTransformTranslators.java  | 35 ++++++++++++++++++----
 .../wrappers/streaming/DoFnOperator.java           | 11 +++++--
 2 files changed, 39 insertions(+), 7 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index d39b5c1..f5dc3ce 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -1017,10 +1017,8 @@ class FlinkStreamingTransformTranslators {
 
 
       WindowedValue.
-          FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> 
windowedWorkItemCoder =
-          WindowedValue.getFullCoder(
-              workItemCoder,
-              input.getWindowingStrategy().getWindowFn().windowCoder());
+          ValueOnlyWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> 
windowedWorkItemCoder =
+          WindowedValue.getValueOnlyCoder(workItemCoder);
 
       CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemTypeInfo =
           new CoderTypeInformation<>(windowedWorkItemCoder);
@@ -1029,7 +1027,7 @@ class FlinkStreamingTransformTranslators {
 
       DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemStream =
           inputDataStream
-              .flatMap(new ToKeyedWorkItem<>())
+              .flatMap(new ToKeyedWorkItemInGlobalWindow<>())
               .returns(workItemTypeInfo)
               .name("ToKeyedWorkItem");
 
@@ -1041,6 +1039,33 @@ class FlinkStreamingTransformTranslators {
     }
   }
 
+  private static class ToKeyedWorkItemInGlobalWindow<K, InputT>
+      extends RichFlatMapFunction<
+      WindowedValue<KV<K, InputT>>,
+      WindowedValue<SingletonKeyedWorkItem<K, InputT>>> {
+
+    @Override
+    public void flatMap(
+        WindowedValue<KV<K, InputT>> inWithMultipleWindows,
+        Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) 
throws Exception {
+
+      // we need to wrap each one work item per window for now
+      // since otherwise the PushbackSideInputRunner will not correctly
+      // determine whether side inputs are ready
+      //
+      // this is tracked as https://issues.apache.org/jira/browse/BEAM-1850
+      for (WindowedValue<KV<K, InputT>> in : 
inWithMultipleWindows.explodeWindows()) {
+        SingletonKeyedWorkItem<K, InputT> workItem =
+            new SingletonKeyedWorkItem<>(
+                in.getValue().getKey(),
+                in.withValue(in.getValue().getValue()));
+
+        out.collect(WindowedValue.valueInGlobalWindow(workItem));
+      }
+    }
+  }
+
+
   private static class FlattenPCollectionTranslator<T>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
         PTransform<PCollection<T>, PCollection<T>>> {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 37f56f5..f9b4ee3 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -39,10 +39,12 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.ProcessFnRunner;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
@@ -352,8 +354,13 @@ public class DoFnOperator<InputT, OutputT>
             .scheduleAtFixedRate(
                 timestamp -> checkInvokeFinishBundleByTime(), 
bundleCheckPeriod, bundleCheckPeriod);
 
-    pushbackDoFnRunner =
-        SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, 
sideInputHandler);
+    if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) {
+      pushbackDoFnRunner =
+          new ProcessFnRunner<>((DoFnRunner) doFnRunner, sideInputs, 
sideInputHandler);
+    } else {
+      pushbackDoFnRunner =
+          SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, 
sideInputHandler);
+    }
   }
 
   @Override

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.

Reply via email to