Repository: beam Updated Branches: refs/heads/master f4e109767 -> 0c24286e1
Allow absolute timers Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a93c5c05 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a93c5c05 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a93c5c05 Branch: refs/heads/master Commit: a93c5c0594dcd4519fcf4b842f2fe0b8244a81a3 Parents: f4e1097 Author: Kenneth Knowles <k...@google.com> Authored: Mon Jan 23 20:50:50 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Feb 6 15:17:23 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/core/SimpleDoFnRunner.java | 83 +++++++++++++++-- .../java/org/apache/beam/sdk/util/Timer.java | 11 +++ .../apache/beam/sdk/transforms/ParDoTest.java | 93 ++++++++++++++++++++ 3 files changed, 178 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a93c5c05/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 8c9b8b7..7a89389 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.core; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.Iterables; @@ -94,6 +95,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out private final Coder<BoundedWindow> windowCoder; + private final Duration allowedLateness; + // Because of setKey(Object), we really must refresh stateInternals() at each access private final StepContext stepContext; @@ -121,6 +124,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out Coder<BoundedWindow> untypedCoder = (Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder(); this.windowCoder = untypedCoder; + this.allowedLateness = windowingStrategy.getAllowedLateness(); this.context = new DoFnContext<>( @@ -182,7 +186,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } OnTimerArgumentProvider<InputT, OutputT> argumentProvider = - new OnTimerArgumentProvider<>(fn, context, window, effectiveTimestamp, timeDomain); + new OnTimerArgumentProvider<>( + fn, context, window, allowedLateness, effectiveTimestamp, timeDomain); invoker.invokeOnTimer(timerId, argumentProvider); } @@ -210,7 +215,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out /** Returns a new {@link DoFn.ProcessContext} for the given element. */ private DoFnProcessContext<InputT, OutputT> createProcessContext(WindowedValue<InputT> elem) { - return new DoFnProcessContext<InputT, OutputT>(fn, context, elem); + return new DoFnProcessContext<InputT, OutputT>(fn, context, elem, allowedLateness); } private RuntimeException wrapUserCodeException(Throwable t) { @@ -465,6 +470,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out final DoFn<InputT, OutputT> fn; final DoFnContext<InputT, OutputT> context; final WindowedValue<InputT> windowedValue; + private final Duration allowedLateness; /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */ @Nullable private StateNamespace namespace; @@ -486,11 +492,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out private DoFnProcessContext( DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context, - WindowedValue<InputT> windowedValue) { + WindowedValue<InputT> windowedValue, + Duration allowedLateness) { fn.super(); this.fn = fn; this.context = context; this.windowedValue = windowedValue; + this.allowedLateness = allowedLateness; } @Override @@ -633,7 +641,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out try { TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); - return new TimerInternalsTimer(getNamespace(), timerId, spec, stepContext.timerInternals()); + return new TimerInternalsTimer( + window(), getNamespace(), allowedLateness, timerId, spec, stepContext.timerInternals()); } catch (IllegalAccessException e) { throw new RuntimeException(e); } @@ -656,6 +665,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out private final BoundedWindow window; private final Instant timestamp; private final TimeDomain timeDomain; + private final Duration allowedLateness; /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */ private StateNamespace namespace; @@ -678,12 +688,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context, BoundedWindow window, + Duration allowedLateness, Instant timestamp, TimeDomain timeDomain) { fn.super(); this.fn = fn; this.context = context; this.window = window; + this.allowedLateness = allowedLateness; this.timestamp = timestamp; this.timeDomain = timeDomain; } @@ -741,7 +753,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out try { TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); - return new TimerInternalsTimer(getNamespace(), timerId, spec, stepContext.timerInternals()); + return new TimerInternalsTimer( + window, getNamespace(), allowedLateness, timerId, spec, stepContext.timerInternals()); } catch (IllegalAccessException e) { throw new RuntimeException(e); } @@ -782,12 +795,25 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out private static class TimerInternalsTimer implements Timer { private final TimerInternals timerInternals; + + // The window and the namespace represent the same thing, but the namespace is a cached + // and specially encoded form. Since the namespace can be cached across timers, it is + // passed in whole rather than being computed here. + private final BoundedWindow window; + private final Duration allowedLateness; + private final StateNamespace namespace; private final String timerId; private final TimerSpec spec; - private final StateNamespace namespace; public TimerInternalsTimer( - StateNamespace namespace, String timerId, TimerSpec spec, TimerInternals timerInternals) { + BoundedWindow window, + StateNamespace namespace, + Duration allowedLateness, + String timerId, + TimerSpec spec, + TimerInternals timerInternals) { + this.window = window; + this.allowedLateness = allowedLateness; this.namespace = namespace; this.timerId = timerId; this.spec = spec; @@ -795,9 +821,48 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override + public void set(Instant target) { + verifyAbsoluteTimeDomain(); + verifyTargetTime(target); + setUnderlyingTimer(target); + } + + @Override public void setForNowPlus(Duration durationFromNow) { - timerInternals.setTimer( - namespace, timerId, getCurrentTime().plus(durationFromNow), spec.getTimeDomain()); + Instant target = getCurrentTime().plus(durationFromNow); + verifyTargetTime(target); + setUnderlyingTimer(target); + } + + /** + * Ensures that the target time is reasonable. For event time timers this means that the + * time should be prior to window GC time. + */ + private void verifyTargetTime(Instant target) { + if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { + Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); + checkArgument(!target.isAfter(windowExpiry), + "Attempted to set event time timer for %s but that is after" + + " the expiration of window %s", target, windowExpiry); + } + } + + /** Verifies that the time domain of this timer is acceptable for absolute timers. */ + private void verifyAbsoluteTimeDomain() { + if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { + throw new IllegalStateException( + "Cannot only set relative timers in processing time domain." + + " Use #setForNowPlus(Duration)"); + } + } + + /** + * Sets the timer for the target time without checking anything about whether it is + * a reasonable thing to do. For example, absolute processing time timers are not + * really sensible since the user has no way to compute a good choice of time. + */ + private void setUnderlyingTimer(Instant target) { + timerInternals.setTimer(namespace, timerId, target, spec.getTimeDomain()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/a93c5c05/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java index 556287d..45a2a66 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.transforms.DoFn; import org.joda.time.Duration; +import org.joda.time.Instant; /** * A timer for a specified time domain that can be set to register the desire for further processing @@ -43,6 +44,16 @@ import org.joda.time.Duration; @Experimental(Experimental.Kind.TIMERS) public interface Timer { /** + * Sets or resets the time in the timer's {@link TimeDomain} at which it should fire. If the timer + * was already set, resets it to the new requested time. + * + * <p>For {@link TimeDomain#PROCESSING_TIME}, the behavior is be unpredictable, since processing + * time timers are ignored after a window has expired. Instead, it is recommended to use + * {@link #setForNowPlus(Duration)}. + */ + void set(Instant instant); + + /** * Sets or resets the time relative to the current time in the timer's {@link TimeDomain} at which * this it should fire. If the timer was already set, resets it to the new requested time. */ http://git-wip-us.apache.org/repos/asf/beam/blob/a93c5c05/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 2e3fb85..54aad0c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -48,6 +48,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; @@ -1675,6 +1676,98 @@ public class ParDoTest implements Serializable { pipeline.run(); } + /** + * Tests that an event time timer set absolutely for the last possible moment fires and results in + * supplementary output. The test is otherwise identical to {@link #testEventTimeTimerBounded()}. + */ + @Test + @Category({RunnableOnService.class, UsesTimersInParDo.class}) + public void testEventTimeTimerAbsolute() throws Exception { + final String timerId = "foo"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement( + ProcessContext context, @TimerId(timerId) Timer timer, BoundedWindow window) { + timer.set(window.maxTimestamp()); + context.output(3); + } + + @OnTimer(timerId) + public void onTimer(OnTimerContext context) { + context.output(42); + } + }; + + PCollection<Integer> output = pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); + PAssert.that(output).containsInAnyOrder(3, 42); + pipeline.run(); + } + + @Test + @Category({RunnableOnService.class, UsesTimersInParDo.class}) + public void testAbsoluteProcessingTimeTimerRejected() throws Exception { + final String timerId = "foo"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) { + timer.set(new Instant(0)); + } + + @OnTimer(timerId) + public void onTimer(OnTimerContext context) {} + }; + + PCollection<Integer> output = pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); + thrown.expect(PipelineExecutionException.class); + // Note that runners can reasonably vary their message - this matcher should be flexible + // and can be evolved. + thrown.expectMessage("relative timers"); + thrown.expectMessage("processing time"); + pipeline.run(); + } + + @Test + @Category({RunnableOnService.class, UsesTimersInParDo.class}) + public void testOutOfBoundsEventTimeTimer() throws Exception { + final String timerId = "foo"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement( + ProcessContext context, BoundedWindow window, @TimerId(timerId) Timer timer) { + timer.set(window.maxTimestamp().plus(1L)); + } + + @OnTimer(timerId) + public void onTimer(OnTimerContext context) {} + }; + + PCollection<Integer> output = pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); + thrown.expect(PipelineExecutionException.class); + // Note that runners can reasonably vary their message - this matcher should be flexible + // and can be evolved. + thrown.expectMessage("event time timer"); + thrown.expectMessage("expiration"); + pipeline.run(); + } + @Test @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) public void testSimpleProcessingTimerTimer() throws Exception {