Translate ReshuffleTrigger into Always trigger proto

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bfdbebdd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bfdbebdd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bfdbebdd

Branch: refs/heads/master
Commit: bfdbebdd6ecdc29d9eb12590399ebae45466a038
Parents: 9dbaeaf
Author: Kenneth Knowles <k...@google.com>
Authored: Sat Feb 18 19:19:33 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Fri Feb 24 07:16:35 2017 -0800

----------------------------------------------------------------------
 sdks/common/runner-api/src/main/proto/beam_runner_api.proto   | 6 ++++++
 .../org/apache/beam/sdk/transforms/windowing/Triggers.java    | 7 +++++++
 2 files changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bfdbebdd/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto 
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index ce089f5..c030e73 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -540,6 +540,11 @@ message Trigger {
   message Never {
   }
 
+  // Always ready. This can also be expressed as ElementCount(1) but
+  // is more explicit.
+  message Always {
+  }
+
   // Ready whenever either of its subtriggers are ready, but finishes output
   // when the finally subtrigger fires.
   message OrFinally {
@@ -566,6 +571,7 @@ message Trigger {
     AfterEndOfWindow after_end_of_widow = 4;
     AfterProcessingTime after_processing_time = 5;
     AfterSynchronizedProcessingTime after_synchronized_processing_time = 6;
+    Always always = 12;
     Default default = 7;
     ElementCount element_count = 8;
     Never never = 9;

http://git-wip-us.apache.org/repos/asf/beam/blob/bfdbebdd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
index 8ac904c..d788ca2 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import 
org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate;
 import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
 import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.ReshuffleTrigger;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -95,6 +96,12 @@ public class Triggers implements Serializable {
           .build();
     }
 
+    private RunnerApi.Trigger convertSpecific(ReshuffleTrigger v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setAlways(RunnerApi.Trigger.Always.getDefaultInstance())
+          .build();
+    }
+
     private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime 
v) {
       return RunnerApi.Trigger.newBuilder()
           .setAfterSynchronizedProcessingTime(

Reply via email to