Upgrade Java triggers to support runner API deserialization

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d5602db
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d5602db
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d5602db

Branch: refs/heads/master
Commit: 5d5602dbff41ef48add2ea763527f8c0901f0bc0
Parents: 40c4a5c
Author: Kenneth Knowles <k...@google.com>
Authored: Sun Feb 12 15:53:17 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Feb 14 14:55:49 2017 -0800

----------------------------------------------------------------------
 .../AfterSynchronizedProcessingTimeStateMachine.java      |  6 +++++-
 .../beam/runners/core/triggers/TriggerStateMachines.java  |  2 +-
 .../AfterSynchronizedProcessingTimeStateMachineTest.java  |  3 ++-
 .../apache/beam/sdk/transforms/windowing/AfterAll.java    |  7 +++++++
 .../apache/beam/sdk/transforms/windowing/AfterEach.java   |  7 +++++++
 .../apache/beam/sdk/transforms/windowing/AfterFirst.java  |  7 +++++++
 .../windowing/AfterSynchronizedProcessingTime.java        | 10 +++++++---
 .../windowing/AfterSynchronizedProcessingTimeTest.java    |  2 +-
 8 files changed, 37 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java
index 1319a13..07fab22 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java
@@ -28,13 +28,17 @@ import org.joda.time.Instant;
 // This should not really have the superclass 
https://issues.apache.org/jira/browse/BEAM-1486
 class AfterSynchronizedProcessingTimeStateMachine extends 
AfterDelayFromFirstElementStateMachine {
 
+  public static AfterSynchronizedProcessingTimeStateMachine ofFirstElement() {
+    return new AfterSynchronizedProcessingTimeStateMachine();
+  }
+
   @Override
   @Nullable
   public Instant getCurrentTime(TriggerStateMachine.TriggerContext context) {
     return context.currentSynchronizedProcessingTime();
   }
 
-  public AfterSynchronizedProcessingTimeStateMachine() {
+  private AfterSynchronizedProcessingTimeStateMachine() {
     super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
         Collections.<SerializableFunction<Instant, Instant>>emptyList());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
index f0e9d21..b13ac40 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
@@ -116,7 +116,7 @@ public class TriggerStateMachines {
     }
 
     private OnceTriggerStateMachine 
evaluateSpecific(AfterSynchronizedProcessingTime v) {
-      return new AfterSynchronizedProcessingTimeStateMachine();
+      return AfterSynchronizedProcessingTimeStateMachine.ofFirstElement();
     }
 
     private OnceTriggerStateMachine evaluateSpecific(AfterFirst v) {

http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java
index 140bd62..7bfd48d 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java
@@ -36,7 +36,8 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class AfterSynchronizedProcessingTimeStateMachineTest {
 
-  private TriggerStateMachine underTest = new 
AfterSynchronizedProcessingTimeStateMachine();
+  private TriggerStateMachine underTest =
+      AfterSynchronizedProcessingTimeStateMachine.ofFirstElement();
 
   @Test
   public void testAfterProcessingTimeWithFixedWindows() throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
index c3f0848..2747311 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
@@ -44,6 +44,13 @@ public class AfterAll extends OnceTrigger {
     return new AfterAll(Arrays.<Trigger>asList(triggers));
   }
 
+  /**
+   * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
+   */
+  public static AfterAll of(List<Trigger> triggers) {
+    return new AfterAll(triggers);
+  }
+
   @Override
   public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
     // This trigger will fire after the latest of its sub-triggers.

http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
index 872ad46..56a9d14 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
@@ -57,6 +57,13 @@ public class AfterEach extends Trigger {
     return new AfterEach(Arrays.<Trigger>asList(triggers));
   }
 
+  /**
+   * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers.
+   */
+  public static AfterEach inOrder(List<Trigger> triggers) {
+    return new AfterEach(triggers);
+  }
+
   @Override
   public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
     // This trigger will fire at least once when the first trigger in the 
sequence

http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
index a742b43..79fd639 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
@@ -45,6 +45,13 @@ public class AfterFirst extends OnceTrigger {
     return new AfterFirst(Arrays.<Trigger>asList(triggers));
   }
 
+  /**
+   * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers.
+   */
+  public static AfterFirst of(List<Trigger> triggers) {
+    return new AfterFirst(triggers);
+  }
+
   @Override
   public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
     // This trigger will fire after the earliest of its sub-triggers.

http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
index 6249954..f982699 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
@@ -23,8 +23,8 @@ import 
org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
 import org.joda.time.Instant;
 
 /**
- * A trigger that fires after synchronized processing time has reached the 
processing time of the
- * first element's arrival.
+ * <i><b>FOR INTERNAL USE ONLY</b></i>. A trigger that fires after 
synchronized processing time has
+ * reached the processing time of the first element's arrival.
  *
  * <p>This is for internal, primarily as a "continuation trigger" for {@link 
AfterProcessingTime}
  * triggers. In that use, this trigger is ready as soon as all upstream 
workers processing time
@@ -32,7 +32,11 @@ import org.joda.time.Instant;
  */
 public class AfterSynchronizedProcessingTime extends OnceTrigger {
 
-  public AfterSynchronizedProcessingTime() {
+  public static AfterSynchronizedProcessingTime ofFirstElement() {
+    return new AfterSynchronizedProcessingTime();
+  }
+
+  private AfterSynchronizedProcessingTime() {
     super(null);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5d5602db/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
index 49d44c5..e2cfdd2 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java
@@ -30,7 +30,7 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class AfterSynchronizedProcessingTimeTest {
 
-  private Trigger underTest = new AfterSynchronizedProcessingTime();
+  private Trigger underTest = AfterSynchronizedProcessingTime.ofFirstElement();
 
   @Test
   public void testFireDeadline() throws Exception {

Reply via email to