Repository: beam
Updated Branches:
  refs/heads/master f4e109767 -> 0c24286e1


Allow absolute timers


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a93c5c05
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a93c5c05
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a93c5c05

Branch: refs/heads/master
Commit: a93c5c0594dcd4519fcf4b842f2fe0b8244a81a3
Parents: f4e1097
Author: Kenneth Knowles <k...@google.com>
Authored: Mon Jan 23 20:50:50 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Mon Feb 6 15:17:23 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     | 83 +++++++++++++++--
 .../java/org/apache/beam/sdk/util/Timer.java    | 11 +++
 .../apache/beam/sdk/transforms/ParDoTest.java   | 93 ++++++++++++++++++++
 3 files changed, 178 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a93c5c05/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 8c9b8b7..7a89389 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.core;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.Iterables;
@@ -94,6 +95,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
 
   private final Coder<BoundedWindow> windowCoder;
 
+  private final Duration allowedLateness;
+
   // Because of setKey(Object), we really must refresh stateInternals() at 
each access
   private final StepContext stepContext;
 
@@ -121,6 +124,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     Coder<BoundedWindow> untypedCoder =
         (Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder();
     this.windowCoder = untypedCoder;
+    this.allowedLateness = windowingStrategy.getAllowedLateness();
 
     this.context =
         new DoFnContext<>(
@@ -182,7 +186,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     OnTimerArgumentProvider<InputT, OutputT> argumentProvider =
-        new OnTimerArgumentProvider<>(fn, context, window, effectiveTimestamp, 
timeDomain);
+        new OnTimerArgumentProvider<>(
+            fn, context, window, allowedLateness, effectiveTimestamp, 
timeDomain);
     invoker.invokeOnTimer(timerId, argumentProvider);
   }
 
@@ -210,7 +215,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
 
   /** Returns a new {@link DoFn.ProcessContext} for the given element. */
   private DoFnProcessContext<InputT, OutputT> 
createProcessContext(WindowedValue<InputT> elem) {
-    return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
+    return new DoFnProcessContext<InputT, OutputT>(fn, context, elem, 
allowedLateness);
   }
 
   private RuntimeException wrapUserCodeException(Throwable t) {
@@ -465,6 +470,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     final DoFn<InputT, OutputT> fn;
     final DoFnContext<InputT, OutputT> context;
     final WindowedValue<InputT> windowedValue;
+    private final Duration allowedLateness;
 
     /** Lazily initialized; should only be accessed via {@link 
#getNamespace()}. */
     @Nullable private StateNamespace namespace;
@@ -486,11 +492,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     private DoFnProcessContext(
         DoFn<InputT, OutputT> fn,
         DoFnContext<InputT, OutputT> context,
-        WindowedValue<InputT> windowedValue) {
+        WindowedValue<InputT> windowedValue,
+        Duration allowedLateness) {
       fn.super();
       this.fn = fn;
       this.context = context;
       this.windowedValue = windowedValue;
+      this.allowedLateness = allowedLateness;
     }
 
     @Override
@@ -633,7 +641,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
       try {
         TimerSpec spec =
             (TimerSpec) 
signature.timerDeclarations().get(timerId).field().get(fn);
-        return new TimerInternalsTimer(getNamespace(), timerId, spec, 
stepContext.timerInternals());
+        return new TimerInternalsTimer(
+            window(), getNamespace(), allowedLateness, timerId, spec, 
stepContext.timerInternals());
       } catch (IllegalAccessException e) {
         throw new RuntimeException(e);
       }
@@ -656,6 +665,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     private final BoundedWindow window;
     private final Instant timestamp;
     private final TimeDomain timeDomain;
+    private final Duration allowedLateness;
 
     /** Lazily initialized; should only be accessed via {@link 
#getNamespace()}. */
     private StateNamespace namespace;
@@ -678,12 +688,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
         DoFn<InputT, OutputT> fn,
         DoFnContext<InputT, OutputT> context,
         BoundedWindow window,
+        Duration allowedLateness,
         Instant timestamp,
         TimeDomain timeDomain) {
       fn.super();
       this.fn = fn;
       this.context = context;
       this.window = window;
+      this.allowedLateness = allowedLateness;
       this.timestamp = timestamp;
       this.timeDomain = timeDomain;
     }
@@ -741,7 +753,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
       try {
         TimerSpec spec =
             (TimerSpec) 
signature.timerDeclarations().get(timerId).field().get(fn);
-        return new TimerInternalsTimer(getNamespace(), timerId, spec, 
stepContext.timerInternals());
+        return new TimerInternalsTimer(
+            window, getNamespace(), allowedLateness, timerId, spec, 
stepContext.timerInternals());
       } catch (IllegalAccessException e) {
         throw new RuntimeException(e);
       }
@@ -782,12 +795,25 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
 
   private static class TimerInternalsTimer implements Timer {
     private final TimerInternals timerInternals;
+
+    // The window and the namespace represent the same thing, but the 
namespace is a cached
+    // and specially encoded form. Since the namespace can be cached across 
timers, it is
+    // passed in whole rather than being computed here.
+    private final BoundedWindow window;
+    private final Duration allowedLateness;
+    private final StateNamespace namespace;
     private final String timerId;
     private final TimerSpec spec;
-    private final StateNamespace namespace;
 
     public TimerInternalsTimer(
-        StateNamespace namespace, String timerId, TimerSpec spec, 
TimerInternals timerInternals) {
+        BoundedWindow window,
+        StateNamespace namespace,
+        Duration allowedLateness,
+        String timerId,
+        TimerSpec spec,
+        TimerInternals timerInternals) {
+      this.window = window;
+      this.allowedLateness = allowedLateness;
       this.namespace = namespace;
       this.timerId = timerId;
       this.spec = spec;
@@ -795,9 +821,48 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
+    public void set(Instant target) {
+      verifyAbsoluteTimeDomain();
+      verifyTargetTime(target);
+      setUnderlyingTimer(target);
+    }
+
+    @Override
     public void setForNowPlus(Duration durationFromNow) {
-      timerInternals.setTimer(
-          namespace, timerId, getCurrentTime().plus(durationFromNow), 
spec.getTimeDomain());
+      Instant target = getCurrentTime().plus(durationFromNow);
+      verifyTargetTime(target);
+      setUnderlyingTimer(target);
+    }
+
+    /**
+     * Ensures that the target time is reasonable. For event time timers this 
means that the
+     * time should be prior to window GC time.
+     */
+    private void verifyTargetTime(Instant target) {
+      if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+        Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
+        checkArgument(!target.isAfter(windowExpiry),
+            "Attempted to set event time timer for %s but that is after"
+            + " the expiration of window %s", target, windowExpiry);
+      }
+    }
+
+    /** Verifies that the time domain of this timer is acceptable for absolute 
timers. */
+    private void verifyAbsoluteTimeDomain() {
+      if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+        throw new IllegalStateException(
+            "Cannot only set relative timers in processing time domain."
+                + " Use #setForNowPlus(Duration)");
+      }
+    }
+
+    /**
+     * Sets the timer for the target time without checking anything about 
whether it is
+     * a reasonable thing to do. For example, absolute processing time timers 
are not
+     * really sensible since the user has no way to compute a good choice of 
time.
+     */
+    private void setUnderlyingTimer(Instant target) {
+      timerInternals.setTimer(namespace, timerId, target, 
spec.getTimeDomain());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a93c5c05/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
index 556287d..45a2a66 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.util;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.joda.time.Duration;
+import org.joda.time.Instant;
 
 /**
  * A timer for a specified time domain that can be set to register the desire 
for further processing
@@ -43,6 +44,16 @@ import org.joda.time.Duration;
 @Experimental(Experimental.Kind.TIMERS)
 public interface Timer {
   /**
+   * Sets or resets the time in the timer's {@link TimeDomain} at which it 
should fire. If the timer
+   * was already set, resets it to the new requested time.
+   *
+   * <p>For {@link TimeDomain#PROCESSING_TIME}, the behavior is be 
unpredictable, since processing
+   * time timers are ignored after a window has expired. Instead, it is 
recommended to use
+   * {@link #setForNowPlus(Duration)}.
+   */
+  void set(Instant instant);
+
+  /**
    * Sets or resets the time relative to the current time in the timer's 
{@link TimeDomain} at which
    * this it should fire. If the timer was already set, resets it to the new 
requested time.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/a93c5c05/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 2e3fb85..54aad0c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -48,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -1675,6 +1676,98 @@ public class ParDoTest implements Serializable {
     pipeline.run();
   }
 
+  /**
+   * Tests that an event time timer set absolutely for the last possible 
moment fires and results in
+   * supplementary output. The test is otherwise identical to {@link 
#testEventTimeTimerBounded()}.
+   */
+  @Test
+  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  public void testEventTimeTimerAbsolute() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, Integer> fn =
+        new DoFn<KV<String, Integer>, Integer>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext context, @TimerId(timerId) Timer timer, 
BoundedWindow window) {
+            timer.set(window.maxTimestamp());
+            context.output(3);
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context) {
+            context.output(42);
+          }
+        };
+
+    PCollection<Integer> output = pipeline.apply(Create.of(KV.of("hello", 
37))).apply(ParDo.of(fn));
+    PAssert.that(output).containsInAnyOrder(3, 42);
+    pipeline.run();
+  }
+
+  @Test
+  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  public void testAbsoluteProcessingTimeTimerRejected() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, Integer> fn =
+        new DoFn<KV<String, Integer>, Integer>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(timerId) 
Timer timer) {
+            timer.set(new Instant(0));
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context) {}
+        };
+
+    PCollection<Integer> output = pipeline.apply(Create.of(KV.of("hello", 
37))).apply(ParDo.of(fn));
+    thrown.expect(PipelineExecutionException.class);
+    // Note that runners can reasonably vary their message - this matcher 
should be flexible
+    // and can be evolved.
+    thrown.expectMessage("relative timers");
+    thrown.expectMessage("processing time");
+    pipeline.run();
+  }
+
+  @Test
+  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  public void testOutOfBoundsEventTimeTimer() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, Integer> fn =
+        new DoFn<KV<String, Integer>, Integer>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext context, BoundedWindow window, @TimerId(timerId) 
Timer timer) {
+            timer.set(window.maxTimestamp().plus(1L));
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context) {}
+        };
+
+    PCollection<Integer> output = pipeline.apply(Create.of(KV.of("hello", 
37))).apply(ParDo.of(fn));
+    thrown.expect(PipelineExecutionException.class);
+    // Note that runners can reasonably vary their message - this matcher 
should be flexible
+    // and can be evolved.
+    thrown.expectMessage("event time timer");
+    thrown.expectMessage("expiration");
+    pipeline.run();
+  }
+
   @Test
   @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
   public void testSimpleProcessingTimerTimer() throws Exception {

Reply via email to