Repository: beam
Updated Branches:
  refs/heads/master 8d71ebf82 -> 50532f0a9


[BEAM-2859] Fixed processing timers not being properly fired when watermark 
stays put by tweaking the way spark-runner was delivering timers to 
reduceFnRunner in SparkGroupAlsoByWindowViaWindowSet


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c3d4c5d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c3d4c5d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c3d4c5d9

Branch: refs/heads/master
Commit: c3d4c5d98cc115dce7e03e64cd29713562ff62b3
Parents: 8d71ebf
Author: Stas Levin <stasle...@apache.org>
Authored: Tue Sep 12 10:34:45 2017 +0300
Committer: Stas Levin <stasle...@apache.org>
Committed: Wed Sep 13 11:04:08 2017 +0300

----------------------------------------------------------------------
 .../SparkGroupAlsoByWindowViaWindowSet.java     | 82 +++++++++++++-------
 .../spark/stateful/SparkTimerInternals.java     | 15 ----
 2 files changed, 56 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c3d4c5d9/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 2258f05..1fb8700 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -18,7 +18,9 @@
 package org.apache.beam.runners.spark.stateful;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Table;
 import java.io.Serializable;
@@ -51,6 +53,7 @@ import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -204,6 +207,32 @@ public class SparkGroupAlsoByWindowViaWindowSet implements 
Serializable {
         this.droppedDueToLateness = droppedDueToLateness;
       }
 
+      /**
+       * Retrieves the timers that are eligible for processing by {@link
+       * org.apache.beam.runners.core.ReduceFnRunner}.
+       *
+       * @return A collection of timers that are eligible for processing. For 
a {@link
+       *     TimeDomain#EVENT_TIME} timer, this implies that the watermark has 
passed the timer's
+       *     timestamp. For other <code>TimeDomain</code>s (e.g., {@link
+       *     TimeDomain#PROCESSING_TIME}), a timer is always considered 
eligible for processing (no
+       *     restrictions).
+       */
+      private Collection<TimerInternals.TimerData> 
filterTimersEligibleForProcessing(
+          final Collection<TimerInternals.TimerData> timers, final Instant 
inputWatermark) {
+        final Predicate<TimerInternals.TimerData> eligibleForProcessing =
+            new Predicate<TimerInternals.TimerData>() {
+
+              @Override
+              public boolean apply(final TimerInternals.TimerData timer) {
+                return !timer.getDomain().equals(TimeDomain.EVENT_TIME)
+                    || inputWatermark.isAfter(timer.getTimestamp());
+              }
+            };
+
+        return 
FluentIterable.from(timers).filter(eligibleForProcessing).toSet();
+      }
+
+
       @Override
       protected Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, 
Itr<I>>>*/ List<byte[]>>>
           computeNext() {
@@ -268,16 +297,14 @@ public class SparkGroupAlsoByWindowViaWindowSet 
implements Serializable {
 
               LOG.trace(logPrefix + ": input elements: {}", elements);
 
-              /*
-              Incoming expired windows are filtered based on
-              timerInternals.currentInputWatermarkTime() and the configured 
allowed
-              lateness. Note that this is done prior to calling
-              timerInternals.advanceWatermark so essentially the 
inputWatermark is
-              the highWatermark of the previous batch and the lowWatermark of 
the
-              current batch.
-              The highWatermark of the current batch will only affect filtering
-              as of the next batch.
-               */
+              // Incoming expired windows are filtered based on
+              // timerInternals.currentInputWatermarkTime() and the configured 
allowed
+              // lateness. Note that this is done prior to calling
+              // timerInternals.advanceWatermark so essentially the 
inputWatermark is
+              // the highWatermark of the previous batch and the lowWatermark 
of the
+              // current batch.
+              // The highWatermark of the current batch will only affect 
filtering
+              // as of the next batch.
               final Iterable<WindowedValue<InputT>> nonExpiredElements =
                   Lists.newArrayList(
                       LateDataUtils.dropExpiredWindows(
@@ -302,23 +329,26 @@ public class SparkGroupAlsoByWindowViaWindowSet 
implements Serializable {
             // store the highWatermark as the new inputWatermark to calculate 
triggers
             timerInternals.advanceWatermark();
 
-            LOG.debug(
-                logPrefix + ": timerInternals after advance are {}",
-                timerInternals.toString());
-
-            // call on timers that are ready.
-            final Collection<TimerInternals.TimerData> readyToProcess =
-                timerInternals.getTimersReadyToProcess();
-
-            LOG.debug(logPrefix + ": ready timers are {}", readyToProcess);
+            final Collection<TimerInternals.TimerData> 
timersEligibleForProcessing =
+                filterTimersEligibleForProcessing(
+                    timerInternals.getTimers(), 
timerInternals.currentInputWatermarkTime());
 
-            /*
-            Note that at this point, the watermark has already advanced since
-            timerInternals.advanceWatermark() has been called and the 
highWatermark
-            is now stored as the new inputWatermark, according to which 
triggers are
-            calculated.
-             */
-            reduceFnRunner.onTimers(readyToProcess);
+            LOG.debug(
+                logPrefix + ": timers eligible for processing are {}", 
timersEligibleForProcessing);
+
+            // Note that at this point, the watermark has already advanced 
since
+            // timerInternals.advanceWatermark() has been called and the 
highWatermark
+            // is now stored as the new inputWatermark, according to which 
triggers are
+            // calculated.
+            // Note 2: The implicit contract between the runner and 
reduceFnRunner is that
+            // event_time based triggers are only delivered if the watermark 
has passed their
+            // timestamp.
+            // Note 3: Timer cleanups are performed by the GC timer scheduled 
by reduceFnRunner as
+            // part of processing timers.
+            // Note 4: Even if a given timer is deemed eligible for 
processing, it does not
+            // necessarily mean that it will actually fire (firing is 
determined by the trigger
+            // itself, not the TimerInternals/TimerData objects).
+            reduceFnRunner.onTimers(timersEligibleForProcessing);
           } catch (final Exception e) {
             throw new RuntimeException("Failed to process ReduceFnRunner 
onTimer.", e);
           }

http://git-wip-us.apache.org/repos/asf/beam/blob/c3d4c5d9/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
index c998328..4fd8146 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
@@ -23,7 +23,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -102,20 +101,6 @@ public class SparkTimerInternals implements TimerInternals 
{
     return timers;
   }
 
-  /** This should only be called after processing the element. */
-  Collection<TimerData> getTimersReadyToProcess() {
-    Set<TimerData> toFire = Sets.newHashSet();
-    Iterator<TimerData> iterator = timers.iterator();
-    while (iterator.hasNext()) {
-      TimerData timer = iterator.next();
-      if (timer.getTimestamp().isBefore(inputWatermark)) {
-        toFire.add(timer);
-        iterator.remove();
-      }
-    }
-    return toFire;
-  }
-
   void addTimers(Iterable<TimerData> timers) {
     for (TimerData timer: timers) {
       this.timers.add(timer);

Reply via email to