This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 74ac703bbfa40d16e7d1115912768c7a63598d52 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> AuthorDate: Thu Feb 8 13:02:26 2018 +0100 [BEAM-2140] Use ProcessFnRunner in DoFnRunner for executing SDF For this to work, we need to also change how we wrap values in KeyedWorkItems, because ProcessFnRunner expects them to be in the GlobalWindow. --- .../flink/FlinkStreamingTransformTranslators.java | 35 ++++++++++++++++++---- .../wrappers/streaming/DoFnOperator.java | 11 +++++-- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index d39b5c1..f5dc3ce 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -1017,10 +1017,8 @@ class FlinkStreamingTransformTranslators { WindowedValue. - FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder = - WindowedValue.getFullCoder( - workItemCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()); + ValueOnlyWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder = + WindowedValue.getValueOnlyCoder(workItemCoder); CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo = new CoderTypeInformation<>(windowedWorkItemCoder); @@ -1029,7 +1027,7 @@ class FlinkStreamingTransformTranslators { DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream = inputDataStream - .flatMap(new ToKeyedWorkItem<>()) + .flatMap(new ToKeyedWorkItemInGlobalWindow<>()) .returns(workItemTypeInfo) .name("ToKeyedWorkItem"); @@ -1041,6 +1039,33 @@ class FlinkStreamingTransformTranslators { } } + private static class ToKeyedWorkItemInGlobalWindow<K, InputT> + extends RichFlatMapFunction< + WindowedValue<KV<K, InputT>>, + WindowedValue<SingletonKeyedWorkItem<K, InputT>>> { + + @Override + public void flatMap( + WindowedValue<KV<K, InputT>> inWithMultipleWindows, + Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) throws Exception { + + // we need to wrap each one work item per window for now + // since otherwise the PushbackSideInputRunner will not correctly + // determine whether side inputs are ready + // + // this is tracked as https://issues.apache.org/jira/browse/BEAM-1850 + for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) { + SingletonKeyedWorkItem<K, InputT> workItem = + new SingletonKeyedWorkItem<>( + in.getValue().getKey(), + in.withValue(in.getValue().getValue())); + + out.collect(WindowedValue.valueInGlobalWindow(workItem)); + } + } + } + + private static class FlattenPCollectionTranslator<T> extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< PTransform<PCollection<T>, PCollection<T>>> { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 37f56f5..f9b4ee3 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -39,10 +39,12 @@ import javax.annotation.Nullable; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.core.ProcessFnRunner; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; @@ -352,8 +354,13 @@ public class DoFnOperator<InputT, OutputT> .scheduleAtFixedRate( timestamp -> checkInvokeFinishBundleByTime(), bundleCheckPeriod, bundleCheckPeriod); - pushbackDoFnRunner = - SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); + if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) { + pushbackDoFnRunner = + new ProcessFnRunner<>((DoFnRunner) doFnRunner, sideInputs, sideInputHandler); + } else { + pushbackDoFnRunner = + SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); + } } @Override -- To stop receiving notification emails like this one, please contact aljos...@apache.org.