Repository: beam
Updated Branches:
  refs/heads/master b3d962df2 -> 7564486f5


Remove extraneous chunking from GroupAlsoByWindowsViaOutputBufferDoFn


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4800a3eb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4800a3eb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4800a3eb

Branch: refs/heads/master
Commit: 4800a3eb42e1c92a307578a7b42fb6a4024eb27f
Parents: b3d962d
Author: Kenneth Knowles <k...@google.com>
Authored: Sun Feb 5 20:42:03 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Mon Feb 6 08:30:16 2017 -0800

----------------------------------------------------------------------
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  | 23 ++------------------
 1 file changed, 2 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4800a3eb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index efcd771..d83060f 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
@@ -25,7 +24,6 @@ import 
org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
@@ -77,25 +75,8 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, 
InputT, OutputT, W extends
             reduceFn,
             c.getPipelineOptions());
 
-    Iterable<List<WindowedValue<InputT>>> chunks =
-        Iterables.partition(c.element().getValue(), 1000);
-    for (Iterable<WindowedValue<InputT>> chunk : chunks) {
-      // Process the chunk of elements.
-      reduceFnRunner.processElements(chunk);
-
-      // Then, since elements are sorted by their timestamp, advance the input 
watermark
-      // to the first element.
-      
timerInternals.advanceInputWatermark(chunk.iterator().next().getTimestamp());
-      // Advance the processing times.
-      timerInternals.advanceProcessingTime(Instant.now());
-      timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-
-      // Fire all the eligible timers.
-      fireEligibleTimers(timerInternals, reduceFnRunner);
-
-      // Leave the output watermark undefined. Since there's no late data in 
batch mode
-      // there's really no need to track it as we do for streaming.
-    }
+    // Process the elements.
+    reduceFnRunner.processElements(c.element().getValue());
 
     // Finish any pending windows by advancing the input watermark to infinity.
     timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);

Reply via email to