Repository: beam Updated Branches: refs/heads/master b3d962df2 -> 7564486f5
Remove extraneous chunking from GroupAlsoByWindowsViaOutputBufferDoFn Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4800a3eb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4800a3eb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4800a3eb Branch: refs/heads/master Commit: 4800a3eb42e1c92a307578a7b42fb6a4024eb27f Parents: b3d962d Author: Kenneth Knowles <k...@google.com> Authored: Sun Feb 5 20:42:03 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Feb 6 08:30:16 2017 -0800 ---------------------------------------------------------------------- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 23 ++------------------ 1 file changed, 2 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4800a3eb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index efcd771..d83060f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.core; -import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; @@ -25,7 +24,6 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; @@ -77,25 +75,8 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends reduceFn, c.getPipelineOptions()); - Iterable<List<WindowedValue<InputT>>> chunks = - Iterables.partition(c.element().getValue(), 1000); - for (Iterable<WindowedValue<InputT>> chunk : chunks) { - // Process the chunk of elements. - reduceFnRunner.processElements(chunk); - - // Then, since elements are sorted by their timestamp, advance the input watermark - // to the first element. - timerInternals.advanceInputWatermark(chunk.iterator().next().getTimestamp()); - // Advance the processing times. - timerInternals.advanceProcessingTime(Instant.now()); - timerInternals.advanceSynchronizedProcessingTime(Instant.now()); - - // Fire all the eligible timers. - fireEligibleTimers(timerInternals, reduceFnRunner); - - // Leave the output watermark undefined. Since there's no late data in batch mode - // there's really no need to track it as we do for streaming. - } + // Process the elements. + reduceFnRunner.processElements(c.element().getValue()); // Finish any pending windows by advancing the input watermark to infinity. timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);