Fix windowing in direct runner Stateful ParDo

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4fb16e8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4fb16e8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4fb16e8f

Branch: refs/heads/python-sdk
Commit: 4fb16e8fb9bb087c0975f38c54665634868cfed7
Parents: 7ee8c86
Author: Kenneth Knowles <k...@google.com>
Authored: Tue Dec 20 13:58:29 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Wed Dec 21 13:11:22 2016 -0800

----------------------------------------------------------------------
 .../direct/ParDoMultiOverrideFactory.java       | 34 ++++++++++++++++++--
 1 file changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4fb16e8f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 2cea999..b35df87 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -34,8 +34,13 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -92,9 +97,12 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
     @Override
     public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {
 
+      WindowingStrategy<?, ?> inputWindowingStrategy = 
input.getWindowingStrategy();
+
       // A KvCoder is required since this goes through GBK. Further, 
WindowedValueCoder
       // is not registered by default, so we explicitly set the relevant 
coders.
-      checkState(input.getCoder() instanceof KvCoder,
+      checkState(
+          input.getCoder() instanceof KvCoder,
           "Input to a %s using state requires a %s, but the coder was %s",
           ParDo.class.getSimpleName(),
           KvCoder.class.getSimpleName(),
@@ -102,14 +110,27 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
       KvCoder<K, InputT> kvCoder = (KvCoder<K, InputT>) input.getCoder();
       Coder<K> keyCoder = kvCoder.getKeyCoder();
       Coder<? extends BoundedWindow> windowCoder =
-          input.getWindowingStrategy().getWindowFn().windowCoder();
+          inputWindowingStrategy.getWindowFn().windowCoder();
 
-      PCollectionTuple outputs =
+      PCollection<KeyedWorkItem<K, KV<K, InputT>>> adjustedInput =
           input
               // Stash the original timestamps, etc, for when it is fed to the 
user's DoFn
               .apply("Reify timestamps", ParDo.of(new ReifyWindowedValueFn<K, 
InputT>()))
               .setCoder(KvCoder.of(keyCoder, 
WindowedValue.getFullCoder(kvCoder, windowCoder)))
 
+              // We are going to GBK to gather keys and windows but otherwise 
do not want
+              // to alter the flow of data. This entails:
+              //  - trigger as fast as possible
+              //  - maintain the full timestamps of elements
+              //  - ensure this GBK holds to the minimum of those timestamps 
(via OutputTimeFn)
+              //  - discard past panes as it is "just a stream" of elements
+              .apply(
+                  Window.<KV<K, WindowedValue<KV<K, InputT>>>>triggering(
+                          Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
+                      .discardingFiredPanes()
+                      
.withAllowedLateness(inputWindowingStrategy.getAllowedLateness())
+                      
.withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
+
               // A full GBK to group by key _and_ window
               .apply("Group by key", GroupByKey.<K, WindowedValue<KV<K, 
InputT>>>create())
 
@@ -117,6 +138,13 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
               .apply("To KeyedWorkItem", ParDo.of(new ToKeyedWorkItem<K, 
InputT>()))
               .setCoder(KeyedWorkItemCoder.of(keyCoder, kvCoder, windowCoder))
 
+              // Because of the intervening GBK, we may have abused the 
windowing strategy
+              // of the input, which should be transferred to the output in a 
straightforward manner
+              // according to what ParDo already does.
+              .setWindowingStrategyInternal(inputWindowingStrategy);
+
+      PCollectionTuple outputs =
+          adjustedInput
               // Explode the resulting iterable into elements that are exactly 
the ones from
               // the input
               .apply("Stateful ParDo", new StatefulParDo<>(underlyingParDo, 
input));

Reply via email to