Repository: beam
Updated Branches:
  refs/heads/master 4ccbdbc38 -> 72fef99a6


Drop late data in Flink runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0454a189
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0454a189
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0454a189

Branch: refs/heads/master
Commit: 0454a1897c645f674754bc9ef69dc7bab2b3c3ba
Parents: 7da5a2c
Author: Kenneth Knowles <k...@google.com>
Authored: Wed Feb 1 18:25:42 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Wed Feb 1 18:25:42 2017 -0800

----------------------------------------------------------------------
 .../wrappers/streaming/DoFnOperator.java        | 20 ++++++++++++++++++--
 1 file changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0454a189/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index de0264a..c1d33f7 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -33,6 +33,7 @@ import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
@@ -234,6 +235,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
     doFnInvoker.invokeSetup();
 
+    ExecutionContext.StepContext stepContext = createStepContext();
+
     DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.simpleRunner(
         serializedOptions.getPipelineOptions(),
         doFn,
@@ -241,13 +244,26 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
         outputManager,
         mainOutputTag,
         sideOutputTags,
-        createStepContext(),
+        stepContext,
         aggregatorFactory,
         windowingStrategy);
 
+    if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) {
+      // When the doFn is this, we know it came from WindowDoFnOperator and
+      //   InputT = KeyedWorkItem<K, V>
+      //   OutputT = KV<K, V>
+      //
+      // for some K, V
+
+      doFnRunner = DoFnRunners.lateDataDroppingRunner(
+          (DoFnRunner) doFnRunner,
+          stepContext,
+          windowingStrategy,
+          ((GroupAlsoByWindowViaWindowSetNewDoFn) 
doFn).getDroppedDueToLatenessAggregator());
+    }
+
     pushbackDoFnRunner =
         PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, 
sideInputHandler);
-
   }
 
   @Override

Reply via email to