Build trigger state machine from Runner API Trigger proto directly
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/27a482ba Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/27a482ba Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/27a482ba Branch: refs/heads/master Commit: 27a482baf498bab8c931670adec5134c0bdf08ac Parents: bd8b72c Author: Kenneth Knowles <k...@google.com> Authored: Fri Feb 17 16:05:13 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Feb 24 07:16:36 2017 -0800 ---------------------------------------------------------------------- .../GroupAlsoByWindowViaOutputBufferDoFn.java | 4 +- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 4 +- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 4 +- .../core/triggers/AfterAllStateMachine.java | 2 +- .../core/triggers/AfterFirstStateMachine.java | 2 +- .../triggers/AfterWatermarkStateMachine.java | 14 +- .../core/triggers/OrFinallyStateMachine.java | 2 +- .../core/triggers/TriggerStateMachine.java | 2 +- .../core/triggers/TriggerStateMachines.java | 272 ++++++------------- .../beam/runners/core/ReduceFnTester.java | 8 +- .../core/triggers/TriggerStateMachinesTest.java | 160 +++++++---- .../GroupAlsoByWindowEvaluatorFactory.java | 4 +- .../translation/SparkGroupAlsoByWindowFn.java | 40 +-- 13 files changed, 243 insertions(+), 275 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java index 5b2e051..e3ce1ef 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Triggers; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowingStrategy; import org.joda.time.Instant; @@ -63,7 +64,8 @@ public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W extends key, strategy, ExecutableTriggerStateMachine.create( - TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger())), + TriggerStateMachines.stateMachineForTrigger( + Triggers.toProto(strategy.getTrigger()))), stateInternals, timerInternals, WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()), http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index ac0b1ab..8dc1502 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -22,6 +22,7 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Triggers; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -77,7 +78,8 @@ public class GroupAlsoByWindowViaWindowSetDoFn< key, windowingStrategy, ExecutableTriggerStateMachine.create( - TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())), + TriggerStateMachines.stateMachineForTrigger( + Triggers.toProto(windowingStrategy.getTrigger()))), stateInternals, timerInternals, WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()), http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java index 7b65a0b..444f8fe 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Triggers; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowedValue; @@ -128,7 +129,8 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn< key, windowingStrategy, ExecutableTriggerStateMachine.create( - TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())), + TriggerStateMachines.stateMachineForTrigger( + Triggers.toProto(windowingStrategy.getTrigger()))), stateInternals, timerInternals, outputWindowedValue(), http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java index 12cbc3d..0f0c17c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java @@ -42,7 +42,7 @@ public class AfterAllStateMachine extends OnceTriggerStateMachine { * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers. */ @SafeVarargs - public static OnceTriggerStateMachine of(OnceTriggerStateMachine... triggers) { + public static OnceTriggerStateMachine of(TriggerStateMachine... triggers) { return new AfterAllStateMachine(Arrays.<TriggerStateMachine>asList(triggers)); } http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java index f4b305e..840a65c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java @@ -43,7 +43,7 @@ public class AfterFirstStateMachine extends OnceTriggerStateMachine { */ @SafeVarargs public static OnceTriggerStateMachine of( - OnceTriggerStateMachine... triggers) { + TriggerStateMachine... triggers) { return new AfterFirstStateMachine(Arrays.<TriggerStateMachine>asList(triggers)); } http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java index e83c2f8..0b12005 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java @@ -78,13 +78,13 @@ public class AfterWatermarkStateMachine { private static final int EARLY_INDEX = 0; private static final int LATE_INDEX = 1; - private final OnceTriggerStateMachine earlyTrigger; + private final TriggerStateMachine earlyTrigger; @Nullable - private final OnceTriggerStateMachine lateTrigger; + private final TriggerStateMachine lateTrigger; @SuppressWarnings("unchecked") private AfterWatermarkEarlyAndLate( - OnceTriggerStateMachine earlyTrigger, OnceTriggerStateMachine lateTrigger) { + TriggerStateMachine earlyTrigger, TriggerStateMachine lateTrigger) { super( lateTrigger == null ? ImmutableList.<TriggerStateMachine>of(earlyTrigger) @@ -93,11 +93,11 @@ public class AfterWatermarkStateMachine { this.lateTrigger = lateTrigger; } - public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTriggerStateMachine earlyTrigger) { + public AfterWatermarkEarlyAndLate withEarlyFirings(TriggerStateMachine earlyTrigger) { return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); } - public AfterWatermarkEarlyAndLate withLateFirings(OnceTriggerStateMachine lateTrigger) { + public AfterWatermarkEarlyAndLate withLateFirings(TriggerStateMachine lateTrigger) { return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); } @@ -254,7 +254,7 @@ public class AfterWatermarkStateMachine { * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever * the given {@code Trigger} fires before the watermark has passed the end of the window. */ - public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTriggerStateMachine earlyFirings) { + public AfterWatermarkEarlyAndLate withEarlyFirings(TriggerStateMachine earlyFirings) { checkNotNull(earlyFirings, "Must specify the trigger to use for early firings"); return new AfterWatermarkEarlyAndLate(earlyFirings, null); } @@ -263,7 +263,7 @@ public class AfterWatermarkStateMachine { * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever * the given {@code Trigger} fires after the watermark has passed the end of the window. */ - public AfterWatermarkEarlyAndLate withLateFirings(OnceTriggerStateMachine lateFirings) { + public AfterWatermarkEarlyAndLate withLateFirings(TriggerStateMachine lateFirings) { checkNotNull(lateFirings, "Must specify the trigger to use for late firings"); return new AfterWatermarkEarlyAndLate(NeverStateMachine.ever(), lateFirings); } http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java index f9ec5e7..e3bfb4f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/OrFinallyStateMachine.java @@ -29,7 +29,7 @@ class OrFinallyStateMachine extends TriggerStateMachine { private static final int UNTIL = 1; @VisibleForTesting - OrFinallyStateMachine(TriggerStateMachine actual, OnceTriggerStateMachine until) { + OrFinallyStateMachine(TriggerStateMachine actual, TriggerStateMachine until) { super(Arrays.asList(actual, until)); } http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java index 8b8d737..d622ac9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java @@ -456,7 +456,7 @@ public abstract class TriggerStateMachine implements Serializable { * <p>Note that if {@code t1} is {@link OnceTriggerStateMachine}, then {@code t1.orFinally(t2)} is * the same as {@code AfterFirst.of(t1, t2)}. */ - public TriggerStateMachine orFinally(OnceTriggerStateMachine until) { + public TriggerStateMachine orFinally(TriggerStateMachine until) { return new OrFinallyStateMachine(this, until); } http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java index b13ac40..1088435 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java @@ -17,32 +17,11 @@ */ package org.apache.beam.runners.core.triggers; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.Lists; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.List; -import javax.annotation.Nonnull; -import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.windowing.AfterAll; -import org.apache.beam.sdk.transforms.windowing.AfterEach; -import org.apache.beam.sdk.transforms.windowing.AfterFirst; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; -import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; -import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.TimestampTransform; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.ReshuffleTrigger; -import org.apache.beam.sdk.util.TimeDomain; +import org.joda.time.Duration; import org.joda.time.Instant; /** Translates a {@link Trigger} to a {@link TriggerStateMachine}. */ @@ -50,187 +29,96 @@ public class TriggerStateMachines { private TriggerStateMachines() {} - @VisibleForTesting static final StateMachineConverter CONVERTER = new StateMachineConverter(); - - public static TriggerStateMachine stateMachineForTrigger(Trigger trigger) { - return CONVERTER.evaluateTrigger(trigger); - } - - public static OnceTriggerStateMachine stateMachineForOnceTrigger(OnceTrigger trigger) { - return CONVERTER.evaluateOnceTrigger(trigger); - } - - @VisibleForTesting - static class StateMachineConverter { - - public TriggerStateMachine evaluateTrigger(Trigger trigger) { - Method evaluationMethod = getEvaluationMethod(trigger.getClass()); - return tryEvaluate(evaluationMethod, trigger); - } - - public OnceTriggerStateMachine evaluateOnceTrigger(OnceTrigger trigger) { - Method evaluationMethod = getEvaluationMethod(trigger.getClass()); - return (OnceTriggerStateMachine) tryEvaluate(evaluationMethod, trigger); - } - - private TriggerStateMachine tryEvaluate(Method evaluationMethod, Trigger trigger) { - try { - return (TriggerStateMachine) evaluationMethod.invoke(this, trigger); - } catch (InvocationTargetException exc) { - if (exc.getCause() instanceof RuntimeException) { - throw (RuntimeException) exc.getCause(); - } else { - throw new RuntimeException(exc.getCause()); - } - } catch (IllegalAccessException exc) { - throw new IllegalStateException( - String.format("Internal error: could not invoke %s", evaluationMethod)); - } - } - - private Method getEvaluationMethod(Class<?> clazz) { - try { - return getClass().getDeclaredMethod("evaluateSpecific", clazz); - } catch (NoSuchMethodException exc) { - throw new UnsupportedOperationException( - String.format( - "Cannot translate trigger class %s to a state machine.", clazz.getCanonicalName()), - exc); - } - } - - private TriggerStateMachine evaluateSpecific(DefaultTrigger v) { - return DefaultTriggerStateMachine.of(); - } - - private TriggerStateMachine evaluateSpecific(ReshuffleTrigger v) { - return new ReshuffleTriggerStateMachine(); + public static TriggerStateMachine stateMachineForTrigger(RunnerApi.Trigger trigger) { + switch (trigger.getTriggerCase()) { + case AFTER_ALL: + return AfterAllStateMachine.of( + stateMachinesForTriggers(trigger.getAfterAll().getSubtriggersList())); + case AFTER_ANY: + return AfterFirstStateMachine.of( + stateMachinesForTriggers(trigger.getAfterAny().getSubtriggersList())); + case AFTER_END_OF_WIDOW: + return stateMachineForAfterEndOfWindow(trigger.getAfterEndOfWidow()); + case ELEMENT_COUNT: + return AfterPaneStateMachine.elementCountAtLeast( + trigger.getElementCount().getElementCount()); + case AFTER_SYNCHRONIZED_PROCESSING_TIME: + return AfterSynchronizedProcessingTimeStateMachine.ofFirstElement(); + case DEFAULT: + return DefaultTriggerStateMachine.of(); + case NEVER: + return NeverStateMachine.ever(); + case ALWAYS: + return ReshuffleTriggerStateMachine.create(); + case OR_FINALLY: + return stateMachineForTrigger(trigger.getOrFinally().getMain()) + .orFinally(stateMachineForTrigger(trigger.getOrFinally().getFinally())); + case REPEAT: + return RepeatedlyStateMachine.forever( + stateMachineForTrigger(trigger.getRepeat().getSubtrigger())); + case AFTER_EACH: + return AfterEachStateMachine.inOrder( + stateMachinesForTriggers(trigger.getAfterEach().getSubtriggersList())); + case AFTER_PROCESSING_TIME: + return stateMachineForAfterProcessingTime(trigger.getAfterProcessingTime()); + case TRIGGER_NOT_SET: + throw new IllegalArgumentException( + String.format("Required field 'trigger' not set on %s", trigger)); + default: + throw new IllegalArgumentException(String.format("Unknown trigger type %s", trigger)); } + } - private OnceTriggerStateMachine evaluateSpecific(AfterWatermark.FromEndOfWindow v) { + private static TriggerStateMachine stateMachineForAfterEndOfWindow( + RunnerApi.Trigger.AfterEndOfWindow trigger) { + if (!trigger.hasEarlyFirings() && !trigger.hasLateFirings()) { return AfterWatermarkStateMachine.pastEndOfWindow(); - } - - private OnceTriggerStateMachine evaluateSpecific(NeverTrigger v) { - return NeverStateMachine.ever(); - } - - private OnceTriggerStateMachine evaluateSpecific(AfterSynchronizedProcessingTime v) { - return AfterSynchronizedProcessingTimeStateMachine.ofFirstElement(); - } - - private OnceTriggerStateMachine evaluateSpecific(AfterFirst v) { - List<OnceTriggerStateMachine> subStateMachines = - Lists.newArrayListWithCapacity(v.subTriggers().size()); - for (Trigger subtrigger : v.subTriggers()) { - subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) subtrigger)); - } - return AfterFirstStateMachine.of(subStateMachines); - } - - private OnceTriggerStateMachine evaluateSpecific(AfterAll v) { - List<OnceTriggerStateMachine> subStateMachines = - Lists.newArrayListWithCapacity(v.subTriggers().size()); - for (Trigger subtrigger : v.subTriggers()) { - subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) subtrigger)); - } - return AfterAllStateMachine.of(subStateMachines); - } - - private OnceTriggerStateMachine evaluateSpecific(AfterPane v) { - return AfterPaneStateMachine.elementCountAtLeast(v.getElementCount()); - } - - private TriggerStateMachine evaluateSpecific(AfterWatermark.AfterWatermarkEarlyAndLate v) { + } else { AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine = AfterWatermarkStateMachine.pastEndOfWindow() - .withEarlyFirings(stateMachineForOnceTrigger(v.getEarlyTrigger())); + .withEarlyFirings(stateMachineForTrigger(trigger.getEarlyFirings())); - if (v.getLateTrigger() != null) { - machine = machine.withLateFirings(stateMachineForOnceTrigger(v.getLateTrigger())); + if (trigger.hasLateFirings()) { + machine = machine.withLateFirings(stateMachineForTrigger(trigger.getLateFirings())); } return machine; } + } - private TriggerStateMachine evaluateSpecific(AfterEach v) { - List<TriggerStateMachine> subStateMachines = - Lists.newArrayListWithCapacity(v.subTriggers().size()); - - for (Trigger subtrigger : v.subTriggers()) { - subStateMachines.add(stateMachineForTrigger(subtrigger)); + private static TriggerStateMachine stateMachineForAfterProcessingTime( + RunnerApi.Trigger.AfterProcessingTime trigger) { + AfterDelayFromFirstElementStateMachine stateMachine = + AfterProcessingTimeStateMachine.pastFirstElementInPane(); + for (RunnerApi.TimestampTransform transform : trigger.getTimestampTransformsList()) { + switch (transform.getTimestampTransformCase()) { + case ALIGN_TO: + stateMachine = + stateMachine.alignedTo( + Duration.millis(transform.getAlignTo().getPeriod()), + new Instant(transform.getAlignTo().getOffset())); + break; + case DELAY: + stateMachine = + stateMachine.plusDelayOf(Duration.millis(transform.getDelay().getDelayMillis())); + break; + case TIMESTAMPTRANSFORM_NOT_SET: + throw new IllegalArgumentException( + String.format("Required field 'timestamp_transform' not set in %s", transform)); + default: + throw new IllegalArgumentException( + String.format( + "Unknown timestamp transform case: %s", transform.getTimestampTransformCase())); } - - return AfterEachStateMachine.inOrder(subStateMachines); - } - - private TriggerStateMachine evaluateSpecific(Repeatedly v) { - return RepeatedlyStateMachine.forever(stateMachineForTrigger(v.getRepeatedTrigger())); - } - - private TriggerStateMachine evaluateSpecific(OrFinallyTrigger v) { - return new OrFinallyStateMachine( - stateMachineForTrigger(v.getMainTrigger()), - stateMachineForOnceTrigger(v.getUntilTrigger())); - } - - private OnceTriggerStateMachine evaluateSpecific(AfterProcessingTime v) { - return new AfterDelayFromFirstElementStateMachineAdapter(v); } + return stateMachine; + } - private static class AfterDelayFromFirstElementStateMachineAdapter - extends AfterDelayFromFirstElementStateMachine { - - private static final Function<TimestampTransform, SerializableFunction<Instant, Instant>> - CONVERT_TIMESTAMP_TRANSFORM = - new Function<TimestampTransform, SerializableFunction<Instant, Instant>>() { - @Override - public SerializableFunction<Instant, Instant> apply( - @Nonnull TimestampTransform transform) { - if (transform instanceof TimestampTransform.Delay) { - return new DelayFn(((TimestampTransform.Delay) transform).getDelay()); - } else if (transform instanceof TimestampTransform.AlignTo) { - TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform; - return new AlignFn(alignTo.getPeriod(), alignTo.getOffset()); - } else { - throw new IllegalArgumentException( - String.format( - "Unknown %s: %s", TimestampTransform.class.getSimpleName(), transform)); - } - } - }; - - public AfterDelayFromFirstElementStateMachineAdapter(AfterProcessingTime v) { - this( - TimeDomain.PROCESSING_TIME, - FluentIterable.from(v.getTimestampTransforms()) - .transform(CONVERT_TIMESTAMP_TRANSFORM) - .toList()); - } - - private AfterDelayFromFirstElementStateMachineAdapter( - TimeDomain timeDomain, List<SerializableFunction<Instant, Instant>> timestampMappers) { - super(timeDomain, timestampMappers); - } - - @Override - public Instant getCurrentTime(TriggerContext context) { - switch (timeDomain) { - case PROCESSING_TIME: - return context.currentProcessingTime(); - case SYNCHRONIZED_PROCESSING_TIME: - return context.currentSynchronizedProcessingTime(); - case EVENT_TIME: - return context.currentEventTime(); - default: - throw new IllegalArgumentException("A time domain that doesn't exist was received!"); - } - } - - @Override - protected AfterDelayFromFirstElementStateMachine newWith( - List<SerializableFunction<Instant, Instant>> transform) { - return new AfterDelayFromFirstElementStateMachineAdapter(timeDomain, transform); - } + private static List<TriggerStateMachine> stateMachinesForTriggers( + List<RunnerApi.Trigger> triggers) { + List<TriggerStateMachine> stateMachines = new ArrayList<>(triggers.size()); + for (RunnerApi.Trigger trigger : triggers) { + stateMachines.add(stateMachineForTrigger(trigger)); } + return stateMachines; } } http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index dab2bf9..d18a1c3 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -60,6 +60,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Triggers; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.AppliedCombineFn; @@ -122,7 +123,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { nonCombining(WindowingStrategy<?, W> windowingStrategy) throws Exception { return new ReduceFnTester<Integer, Iterable<Integer>, W>( windowingStrategy, - TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger()), + TriggerStateMachines.stateMachineForTrigger( + Triggers.toProto(windowingStrategy.getTrigger())), SystemReduceFn.<String, Integer, W>buffering(VarIntCoder.of()), IterableCoder.of(VarIntCoder.of()), PipelineOptionsFactory.create(), @@ -186,7 +188,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { return combining( strategy, - TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger()), + TriggerStateMachines.stateMachineForTrigger(Triggers.toProto(strategy.getTrigger())), combineFn, outputCoder); } @@ -236,7 +238,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { return combining( strategy, - TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger()), + TriggerStateMachines.stateMachineForTrigger(Triggers.toProto(strategy.getTrigger())), combineFn, outputCoder, options, http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java index 26c0597..497a3c2 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java @@ -19,21 +19,10 @@ package org.apache.beam.runners.core.triggers; import static com.google.common.base.Preconditions.checkNotNull; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertThat; -import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; -import org.apache.beam.sdk.transforms.windowing.AfterAll; -import org.apache.beam.sdk.transforms.windowing.AfterEach; -import org.apache.beam.sdk.transforms.windowing.AfterFirst; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; -import org.apache.beam.sdk.transforms.windowing.Never; -import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; -import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Duration; import org.junit.Test; @@ -51,49 +40,79 @@ public class TriggerStateMachinesTest { @Test public void testStateMachineForAfterPane() { int count = 37; - AfterPane trigger = AfterPane.elementCountAtLeast(count); + RunnerApi.Trigger trigger = + RunnerApi.Trigger.newBuilder() + .setElementCount(RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(count)) + .build(); + AfterPaneStateMachine machine = - (AfterPaneStateMachine) TriggerStateMachines.stateMachineForOnceTrigger(trigger); + (AfterPaneStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); - assertThat(machine.getElementCount(), equalTo(trigger.getElementCount())); + assertThat(machine.getElementCount(), equalTo(trigger.getElementCount().getElementCount())); } + // TODO: make these all build the proto @Test public void testStateMachineForAfterProcessingTime() { Duration minutes = Duration.standardMinutes(94); Duration hours = Duration.standardHours(13); - AfterProcessingTime trigger = - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(minutes).alignedTo(hours); + RunnerApi.Trigger trigger = + RunnerApi.Trigger.newBuilder() + .setAfterProcessingTime( + RunnerApi.Trigger.AfterProcessingTime.newBuilder() + .addTimestampTransforms( + RunnerApi.TimestampTransform.newBuilder() + .setDelay( + RunnerApi.TimestampTransform.Delay.newBuilder() + .setDelayMillis(minutes.getMillis()))) + .addTimestampTransforms( + RunnerApi.TimestampTransform.newBuilder() + .setAlignTo( + RunnerApi.TimestampTransform.AlignTo.newBuilder() + .setPeriod(hours.getMillis())))) + .build(); AfterDelayFromFirstElementStateMachine machine = (AfterDelayFromFirstElementStateMachine) - TriggerStateMachines.stateMachineForOnceTrigger(trigger); + TriggerStateMachines.stateMachineForTrigger(trigger); assertThat(machine.getTimeDomain(), equalTo(TimeDomain.PROCESSING_TIME)); } @Test public void testStateMachineForAfterWatermark() { - AfterWatermark.FromEndOfWindow trigger = AfterWatermark.pastEndOfWindow(); + RunnerApi.Trigger trigger = + RunnerApi.Trigger.newBuilder() + .setAfterEndOfWidow(RunnerApi.Trigger.AfterEndOfWindow.getDefaultInstance()) + .build(); AfterWatermarkStateMachine.FromEndOfWindow machine = (AfterWatermarkStateMachine.FromEndOfWindow) - TriggerStateMachines.stateMachineForOnceTrigger(trigger); - // No parameters, so if it doesn't crash, we win! + TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat( + TriggerStateMachines.stateMachineForTrigger(trigger), + instanceOf(AfterWatermarkStateMachine.FromEndOfWindow.class)); } @Test public void testDefaultTriggerTranslation() { - DefaultTrigger trigger = DefaultTrigger.of(); - DefaultTriggerStateMachine machine = - (DefaultTriggerStateMachine) - checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger)); - // No parameters, so if it doesn't crash, we win! + RunnerApi.Trigger trigger = + RunnerApi.Trigger.newBuilder() + .setDefault(RunnerApi.Trigger.Default.getDefaultInstance()) + .build(); + + assertThat( + TriggerStateMachines.stateMachineForTrigger(trigger), + instanceOf(DefaultTriggerStateMachine.class)); } @Test public void testNeverTranslation() { - NeverTrigger trigger = Never.ever(); + RunnerApi.Trigger trigger = + RunnerApi.Trigger.newBuilder() + .setNever(RunnerApi.Trigger.Never.getDefaultInstance()) + .build(); NeverStateMachine machine = (NeverStateMachine) checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger)); // No parameters, so if it doesn't crash, we win! @@ -109,18 +128,35 @@ public class TriggerStateMachinesTest { private static final int ELEM_COUNT = 472; private static final Duration DELAY = Duration.standardSeconds(95673); - private final OnceTrigger subtrigger1 = AfterPane.elementCountAtLeast(ELEM_COUNT); - private final OnceTrigger subtrigger2 = - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DELAY); + private final RunnerApi.Trigger subtrigger1 = + RunnerApi.Trigger.newBuilder() + .setElementCount(RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(ELEM_COUNT)) + .build(); + private final RunnerApi.Trigger subtrigger2 = + RunnerApi.Trigger.newBuilder() + .setAfterProcessingTime( + RunnerApi.Trigger.AfterProcessingTime.newBuilder() + .addTimestampTransforms( + RunnerApi.TimestampTransform.newBuilder() + .setDelay( + RunnerApi.TimestampTransform.Delay.newBuilder() + .setDelayMillis(DELAY.getMillis())))) + .build(); - private final OnceTriggerStateMachine submachine1 = - TriggerStateMachines.stateMachineForOnceTrigger(subtrigger1); - private final OnceTriggerStateMachine submachine2 = - TriggerStateMachines.stateMachineForOnceTrigger(subtrigger2); + private final TriggerStateMachine submachine1 = + TriggerStateMachines.stateMachineForTrigger(subtrigger1); + private final TriggerStateMachine submachine2 = + TriggerStateMachines.stateMachineForTrigger(subtrigger2); @Test public void testAfterEachTranslation() { - AfterEach trigger = AfterEach.inOrder(subtrigger1, subtrigger2); + RunnerApi.Trigger trigger = + RunnerApi.Trigger.newBuilder() + .setAfterEach( + RunnerApi.Trigger.AfterEach.newBuilder() + .addSubtriggers(subtrigger1) + .addSubtriggers(subtrigger2)) + .build(); AfterEachStateMachine machine = (AfterEachStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); @@ -129,7 +165,13 @@ public class TriggerStateMachinesTest { @Test public void testAfterFirstTranslation() { - AfterFirst trigger = AfterFirst.of(subtrigger1, subtrigger2); + RunnerApi.Trigger trigger = + RunnerApi.Trigger.newBuilder() + .setAfterAny( + RunnerApi.Trigger.AfterAny.newBuilder() + .addSubtriggers(subtrigger1) + .addSubtriggers(subtrigger2)) + .build(); AfterFirstStateMachine machine = (AfterFirstStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); @@ -138,7 +180,13 @@ public class TriggerStateMachinesTest { @Test public void testAfterAllTranslation() { - AfterAll trigger = AfterAll.of(subtrigger1, subtrigger2); + RunnerApi.Trigger trigger = + RunnerApi.Trigger.newBuilder() + .setAfterAll( + RunnerApi.Trigger.AfterAll.newBuilder() + .addSubtriggers(subtrigger1) + .addSubtriggers(subtrigger2)) + .build(); AfterAllStateMachine machine = (AfterAllStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); @@ -147,8 +195,11 @@ public class TriggerStateMachinesTest { @Test public void testAfterWatermarkEarlyTranslation() { - AfterWatermark.AfterWatermarkEarlyAndLate trigger = - AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1); + RunnerApi.Trigger trigger = + RunnerApi.Trigger.newBuilder() + .setAfterEndOfWidow( + RunnerApi.Trigger.AfterEndOfWindow.newBuilder().setEarlyFirings(subtrigger1)) + .build(); AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine = (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate) TriggerStateMachines.stateMachineForTrigger(trigger); @@ -160,8 +211,13 @@ public class TriggerStateMachinesTest { @Test public void testAfterWatermarkEarlyLateTranslation() { - AfterWatermark.AfterWatermarkEarlyAndLate trigger = - AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1).withLateFirings(subtrigger2); + RunnerApi.Trigger trigger = + RunnerApi.Trigger.newBuilder() + .setAfterEndOfWidow( + RunnerApi.Trigger.AfterEndOfWindow.newBuilder() + .setEarlyFirings(subtrigger1) + .setLateFirings(subtrigger2)) + .build(); AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine = (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate) TriggerStateMachines.stateMachineForTrigger(trigger); @@ -176,18 +232,30 @@ public class TriggerStateMachinesTest { @Test public void testOrFinallyTranslation() { - OrFinallyTrigger trigger = subtrigger1.orFinally(subtrigger2); + RunnerApi.Trigger trigger = + RunnerApi.Trigger.newBuilder() + .setOrFinally( + RunnerApi.Trigger.OrFinally.newBuilder() + .setMain(subtrigger1) + .setFinally(subtrigger2)) + .build(); OrFinallyStateMachine machine = - (OrFinallyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); + (OrFinallyStateMachine) + TriggerStateMachines.stateMachineForTrigger(trigger); assertThat(machine, equalTo(submachine1.orFinally(submachine2))); } @Test public void testRepeatedlyTranslation() { - Repeatedly trigger = Repeatedly.forever(subtrigger1); + RunnerApi.Trigger trigger = + RunnerApi.Trigger.newBuilder() + .setRepeat( + RunnerApi.Trigger.Repeat.newBuilder() + .setSubtrigger(subtrigger1)).build(); RepeatedlyStateMachine machine = - (RepeatedlyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); + (RepeatedlyStateMachine) + TriggerStateMachines.stateMachineForTrigger(trigger); assertThat(machine, equalTo(RepeatedlyStateMachine.forever(submachine1))); } http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index ecf4ecd..dc0ac60 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Triggers; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; @@ -172,7 +173,8 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { key, windowingStrategy, ExecutableTriggerStateMachine.create( - TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())), + TriggerStateMachines.stateMachineForTrigger( + Triggers.toProto(windowingStrategy.getTrigger()))), stateInternals, timerInternals, new OutputWindowedValueToBundle<>(bundle), http://git-wip-us.apache.org/repos/asf/beam/blob/27a482ba/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java index 44d5b7c..bd37fdb 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Triggers; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -101,29 +102,30 @@ public class SparkGroupAlsoByWindowFn<K, InputT, W extends BoundedWindow> key, windowingStrategy, ExecutableTriggerStateMachine.create( - TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())), + TriggerStateMachines.stateMachineForTrigger( + Triggers.toProto(windowingStrategy.getTrigger()))), stateInternals, timerInternals, outputter, new SideInputReader() { - @Override - public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow) { - throw new UnsupportedOperationException( - "GroupAlsoByWindow must not have side inputs"); - } - - @Override - public <T> boolean contains(PCollectionView<T> view) { - throw new UnsupportedOperationException( - "GroupAlsoByWindow must not have side inputs"); - } - - @Override - public boolean isEmpty() { - throw new UnsupportedOperationException( - "GroupAlsoByWindow must not have side inputs"); - } - }, + @Override + public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow) { + throw new UnsupportedOperationException( + "GroupAlsoByWindow must not have side inputs"); + } + + @Override + public <T> boolean contains(PCollectionView<T> view) { + throw new UnsupportedOperationException( + "GroupAlsoByWindow must not have side inputs"); + } + + @Override + public boolean isEmpty() { + throw new UnsupportedOperationException( + "GroupAlsoByWindow must not have side inputs"); + } + }, droppedDueToClosedWindow, reduceFn, runtimeContext.getPipelineOptions());