Repository: beam Updated Branches: refs/heads/master 8d71ebf82 -> 50532f0a9
[BEAM-2859] Fixed processing timers not being properly fired when watermark stays put by tweaking the way spark-runner was delivering timers to reduceFnRunner in SparkGroupAlsoByWindowViaWindowSet Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c3d4c5d9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c3d4c5d9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c3d4c5d9 Branch: refs/heads/master Commit: c3d4c5d98cc115dce7e03e64cd29713562ff62b3 Parents: 8d71ebf Author: Stas Levin <stasle...@apache.org> Authored: Tue Sep 12 10:34:45 2017 +0300 Committer: Stas Levin <stasle...@apache.org> Committed: Wed Sep 13 11:04:08 2017 +0300 ---------------------------------------------------------------------- .../SparkGroupAlsoByWindowViaWindowSet.java | 82 +++++++++++++------- .../spark/stateful/SparkTimerInternals.java | 15 ---- 2 files changed, 56 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c3d4c5d9/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 2258f05..1fb8700 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -18,7 +18,9 @@ package org.apache.beam.runners.spark.stateful; import com.google.common.base.Joiner; +import com.google.common.base.Predicate; import com.google.common.collect.AbstractIterator; +import com.google.common.collect.FluentIterable; import com.google.common.collect.Lists; import com.google.common.collect.Table; import java.io.Serializable; @@ -51,6 +53,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -204,6 +207,32 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable { this.droppedDueToLateness = droppedDueToLateness; } + /** + * Retrieves the timers that are eligible for processing by {@link + * org.apache.beam.runners.core.ReduceFnRunner}. + * + * @return A collection of timers that are eligible for processing. For a {@link + * TimeDomain#EVENT_TIME} timer, this implies that the watermark has passed the timer's + * timestamp. For other <code>TimeDomain</code>s (e.g., {@link + * TimeDomain#PROCESSING_TIME}), a timer is always considered eligible for processing (no + * restrictions). + */ + private Collection<TimerInternals.TimerData> filterTimersEligibleForProcessing( + final Collection<TimerInternals.TimerData> timers, final Instant inputWatermark) { + final Predicate<TimerInternals.TimerData> eligibleForProcessing = + new Predicate<TimerInternals.TimerData>() { + + @Override + public boolean apply(final TimerInternals.TimerData timer) { + return !timer.getDomain().equals(TimeDomain.EVENT_TIME) + || inputWatermark.isAfter(timer.getTimestamp()); + } + }; + + return FluentIterable.from(timers).filter(eligibleForProcessing).toSet(); + } + + @Override protected Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>> computeNext() { @@ -268,16 +297,14 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable { LOG.trace(logPrefix + ": input elements: {}", elements); - /* - Incoming expired windows are filtered based on - timerInternals.currentInputWatermarkTime() and the configured allowed - lateness. Note that this is done prior to calling - timerInternals.advanceWatermark so essentially the inputWatermark is - the highWatermark of the previous batch and the lowWatermark of the - current batch. - The highWatermark of the current batch will only affect filtering - as of the next batch. - */ + // Incoming expired windows are filtered based on + // timerInternals.currentInputWatermarkTime() and the configured allowed + // lateness. Note that this is done prior to calling + // timerInternals.advanceWatermark so essentially the inputWatermark is + // the highWatermark of the previous batch and the lowWatermark of the + // current batch. + // The highWatermark of the current batch will only affect filtering + // as of the next batch. final Iterable<WindowedValue<InputT>> nonExpiredElements = Lists.newArrayList( LateDataUtils.dropExpiredWindows( @@ -302,23 +329,26 @@ public class SparkGroupAlsoByWindowViaWindowSet implements Serializable { // store the highWatermark as the new inputWatermark to calculate triggers timerInternals.advanceWatermark(); - LOG.debug( - logPrefix + ": timerInternals after advance are {}", - timerInternals.toString()); - - // call on timers that are ready. - final Collection<TimerInternals.TimerData> readyToProcess = - timerInternals.getTimersReadyToProcess(); - - LOG.debug(logPrefix + ": ready timers are {}", readyToProcess); + final Collection<TimerInternals.TimerData> timersEligibleForProcessing = + filterTimersEligibleForProcessing( + timerInternals.getTimers(), timerInternals.currentInputWatermarkTime()); - /* - Note that at this point, the watermark has already advanced since - timerInternals.advanceWatermark() has been called and the highWatermark - is now stored as the new inputWatermark, according to which triggers are - calculated. - */ - reduceFnRunner.onTimers(readyToProcess); + LOG.debug( + logPrefix + ": timers eligible for processing are {}", timersEligibleForProcessing); + + // Note that at this point, the watermark has already advanced since + // timerInternals.advanceWatermark() has been called and the highWatermark + // is now stored as the new inputWatermark, according to which triggers are + // calculated. + // Note 2: The implicit contract between the runner and reduceFnRunner is that + // event_time based triggers are only delivered if the watermark has passed their + // timestamp. + // Note 3: Timer cleanups are performed by the GC timer scheduled by reduceFnRunner as + // part of processing timers. + // Note 4: Even if a given timer is deemed eligible for processing, it does not + // necessarily mean that it will actually fire (firing is determined by the trigger + // itself, not the TimerInternals/TimerData objects). + reduceFnRunner.onTimers(timersEligibleForProcessing); } catch (final Exception e) { throw new RuntimeException("Failed to process ReduceFnRunner onTimer.", e); } http://git-wip-us.apache.org/repos/asf/beam/blob/c3d4c5d9/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java index c998328..4fd8146 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java @@ -23,7 +23,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -102,20 +101,6 @@ public class SparkTimerInternals implements TimerInternals { return timers; } - /** This should only be called after processing the element. */ - Collection<TimerData> getTimersReadyToProcess() { - Set<TimerData> toFire = Sets.newHashSet(); - Iterator<TimerData> iterator = timers.iterator(); - while (iterator.hasNext()) { - TimerData timer = iterator.next(); - if (timer.getTimestamp().isBefore(inputWatermark)) { - toFire.add(timer); - iterator.remove(); - } - } - return toFire; - } - void addTimers(Iterable<TimerData> timers) { for (TimerData timer: timers) { this.timers.add(timer);