Upgrade Java triggers to support runner API deserialization
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d5602db Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d5602db Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d5602db Branch: refs/heads/master Commit: 5d5602dbff41ef48add2ea763527f8c0901f0bc0 Parents: 40c4a5c Author: Kenneth Knowles <k...@google.com> Authored: Sun Feb 12 15:53:17 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Feb 14 14:55:49 2017 -0800 ---------------------------------------------------------------------- .../AfterSynchronizedProcessingTimeStateMachine.java | 6 +++++- .../beam/runners/core/triggers/TriggerStateMachines.java | 2 +- .../AfterSynchronizedProcessingTimeStateMachineTest.java | 3 ++- .../apache/beam/sdk/transforms/windowing/AfterAll.java | 7 +++++++ .../apache/beam/sdk/transforms/windowing/AfterEach.java | 7 +++++++ .../apache/beam/sdk/transforms/windowing/AfterFirst.java | 7 +++++++ .../windowing/AfterSynchronizedProcessingTime.java | 10 +++++++--- .../windowing/AfterSynchronizedProcessingTimeTest.java | 2 +- 8 files changed, 37 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java index 1319a13..07fab22 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java @@ -28,13 +28,17 @@ import org.joda.time.Instant; // This should not really have the superclass https://issues.apache.org/jira/browse/BEAM-1486 class AfterSynchronizedProcessingTimeStateMachine extends AfterDelayFromFirstElementStateMachine { + public static AfterSynchronizedProcessingTimeStateMachine ofFirstElement() { + return new AfterSynchronizedProcessingTimeStateMachine(); + } + @Override @Nullable public Instant getCurrentTime(TriggerStateMachine.TriggerContext context) { return context.currentSynchronizedProcessingTime(); } - public AfterSynchronizedProcessingTimeStateMachine() { + private AfterSynchronizedProcessingTimeStateMachine() { super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, Collections.<SerializableFunction<Instant, Instant>>emptyList()); } http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java index f0e9d21..b13ac40 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java @@ -116,7 +116,7 @@ public class TriggerStateMachines { } private OnceTriggerStateMachine evaluateSpecific(AfterSynchronizedProcessingTime v) { - return new AfterSynchronizedProcessingTimeStateMachine(); + return AfterSynchronizedProcessingTimeStateMachine.ofFirstElement(); } private OnceTriggerStateMachine evaluateSpecific(AfterFirst v) { http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java index 140bd62..7bfd48d 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java @@ -36,7 +36,8 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class AfterSynchronizedProcessingTimeStateMachineTest { - private TriggerStateMachine underTest = new AfterSynchronizedProcessingTimeStateMachine(); + private TriggerStateMachine underTest = + AfterSynchronizedProcessingTimeStateMachine.ofFirstElement(); @Test public void testAfterProcessingTimeWithFixedWindows() throws Exception { http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java index c3f0848..2747311 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java @@ -44,6 +44,13 @@ public class AfterAll extends OnceTrigger { return new AfterAll(Arrays.<Trigger>asList(triggers)); } + /** + * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers. + */ + public static AfterAll of(List<Trigger> triggers) { + return new AfterAll(triggers); + } + @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger will fire after the latest of its sub-triggers. http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java index 872ad46..56a9d14 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java @@ -57,6 +57,13 @@ public class AfterEach extends Trigger { return new AfterEach(Arrays.<Trigger>asList(triggers)); } + /** + * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers. + */ + public static AfterEach inOrder(List<Trigger> triggers) { + return new AfterEach(triggers); + } + @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger will fire at least once when the first trigger in the sequence http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java index a742b43..79fd639 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java @@ -45,6 +45,13 @@ public class AfterFirst extends OnceTrigger { return new AfterFirst(Arrays.<Trigger>asList(triggers)); } + /** + * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers. + */ + public static AfterFirst of(List<Trigger> triggers) { + return new AfterFirst(triggers); + } + @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger will fire after the earliest of its sub-triggers. http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java index 6249954..f982699 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java @@ -23,8 +23,8 @@ import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.joda.time.Instant; /** - * A trigger that fires after synchronized processing time has reached the processing time of the - * first element's arrival. + * <i><b>FOR INTERNAL USE ONLY</b></i>. A trigger that fires after synchronized processing time has + * reached the processing time of the first element's arrival. * * <p>This is for internal, primarily as a "continuation trigger" for {@link AfterProcessingTime} * triggers. In that use, this trigger is ready as soon as all upstream workers processing time @@ -32,7 +32,11 @@ import org.joda.time.Instant; */ public class AfterSynchronizedProcessingTime extends OnceTrigger { - public AfterSynchronizedProcessingTime() { + public static AfterSynchronizedProcessingTime ofFirstElement() { + return new AfterSynchronizedProcessingTime(); + } + + private AfterSynchronizedProcessingTime() { super(null); } http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java index 49d44c5..e2cfdd2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java @@ -30,7 +30,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class AfterSynchronizedProcessingTimeTest { - private Trigger underTest = new AfterSynchronizedProcessingTime(); + private Trigger underTest = AfterSynchronizedProcessingTime.ofFirstElement(); @Test public void testFireDeadline() throws Exception {