Fix windowing in direct runner Stateful ParDo
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4fb16e8f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4fb16e8f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4fb16e8f Branch: refs/heads/master Commit: 4fb16e8fb9bb087c0975f38c54665634868cfed7 Parents: 7ee8c86 Author: Kenneth Knowles <k...@google.com> Authored: Tue Dec 20 13:58:29 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Dec 21 13:11:22 2016 -0800 ---------------------------------------------------------------------- .../direct/ParDoMultiOverrideFactory.java | 34 ++++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4fb16e8f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 2cea999..b35df87 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -34,8 +34,13 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -92,9 +97,12 @@ class ParDoMultiOverrideFactory<InputT, OutputT> @Override public PCollectionTuple expand(PCollection<KV<K, InputT>> input) { + WindowingStrategy<?, ?> inputWindowingStrategy = input.getWindowingStrategy(); + // A KvCoder is required since this goes through GBK. Further, WindowedValueCoder // is not registered by default, so we explicitly set the relevant coders. - checkState(input.getCoder() instanceof KvCoder, + checkState( + input.getCoder() instanceof KvCoder, "Input to a %s using state requires a %s, but the coder was %s", ParDo.class.getSimpleName(), KvCoder.class.getSimpleName(), @@ -102,14 +110,27 @@ class ParDoMultiOverrideFactory<InputT, OutputT> KvCoder<K, InputT> kvCoder = (KvCoder<K, InputT>) input.getCoder(); Coder<K> keyCoder = kvCoder.getKeyCoder(); Coder<? extends BoundedWindow> windowCoder = - input.getWindowingStrategy().getWindowFn().windowCoder(); + inputWindowingStrategy.getWindowFn().windowCoder(); - PCollectionTuple outputs = + PCollection<KeyedWorkItem<K, KV<K, InputT>>> adjustedInput = input // Stash the original timestamps, etc, for when it is fed to the user's DoFn .apply("Reify timestamps", ParDo.of(new ReifyWindowedValueFn<K, InputT>())) .setCoder(KvCoder.of(keyCoder, WindowedValue.getFullCoder(kvCoder, windowCoder))) + // We are going to GBK to gather keys and windows but otherwise do not want + // to alter the flow of data. This entails: + // - trigger as fast as possible + // - maintain the full timestamps of elements + // - ensure this GBK holds to the minimum of those timestamps (via OutputTimeFn) + // - discard past panes as it is "just a stream" of elements + .apply( + Window.<KV<K, WindowedValue<KV<K, InputT>>>>triggering( + Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .discardingFiredPanes() + .withAllowedLateness(inputWindowingStrategy.getAllowedLateness()) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())) + // A full GBK to group by key _and_ window .apply("Group by key", GroupByKey.<K, WindowedValue<KV<K, InputT>>>create()) @@ -117,6 +138,13 @@ class ParDoMultiOverrideFactory<InputT, OutputT> .apply("To KeyedWorkItem", ParDo.of(new ToKeyedWorkItem<K, InputT>())) .setCoder(KeyedWorkItemCoder.of(keyCoder, kvCoder, windowCoder)) + // Because of the intervening GBK, we may have abused the windowing strategy + // of the input, which should be transferred to the output in a straightforward manner + // according to what ParDo already does. + .setWindowingStrategyInternal(inputWindowingStrategy); + + PCollectionTuple outputs = + adjustedInput // Explode the resulting iterable into elements that are exactly the ones from // the input .apply("Stateful ParDo", new StatefulParDo<>(underlyingParDo, input));