Flesh out triggers in Runner API proto

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/661cd8d7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/661cd8d7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/661cd8d7

Branch: refs/heads/master
Commit: 661cd8d7407d5f414d5d94badacdeadb519107b7
Parents: 67854e6
Author: Kenneth Knowles <k...@google.com>
Authored: Sat Feb 11 17:32:21 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Feb 14 14:55:49 2017 -0800

----------------------------------------------------------------------
 .../src/main/proto/beam_runner_api.proto        | 109 ++++++++++++++-----
 1 file changed, 83 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/661cd8d7/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto 
b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 370b57c..91f1558 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -29,7 +29,6 @@ option java_package = 
"org.apache.beam.sdk.common.runner_api.v1";
 option java_outer_classname = "RunnerApi";
 
 import "google/protobuf/any.proto";
-import "google/protobuf/timestamp.proto";
 
 // A Pipeline is a hierarchical graph of PTransforms, linked
 // by PCollections.
@@ -402,6 +401,24 @@ enum OutputTime {
   EARLIEST_IN_PANE = 2;
 }
 
+// The different time domains in the Beam model.
+enum TimeDomain {
+
+  // Event time is time from the perspective of the data
+  EVENT_TIME = 0;
+
+  // Processing time is time from the perspective of the
+  // execution of your pipeline
+  PROCESSING_TIME = 1;
+
+  // Synchronized processing time is the minimum of the
+  // processing time of all pending elements.
+  //
+  // The "processing time" of an element refers to
+  // the local processing time at which it was emitted
+  SYNCHRONIZED_PROCESSING_TIME = 2;
+}
+
 // A small DSL for expressing when to emit new aggregations
 // from a GroupByKey or CombinePerKey
 //
@@ -439,27 +456,31 @@ message Trigger {
   }
 
   // After input arrives, ready when the specified delay has passed.
-  message AfterProcessingTimeDelay {
-    // (Required) The delay, in milliseconds.
-    int64 delay_millis = 1;
+  message AfterProcessingTime {
+
+    // (Required) The transforms to apply to an arriving element's timestamp,
+    // in order
+    repeated TimestampTransform timestamp_transforms = 1;
   }
 
-  // After input arrives, ready when the synchronized processing time
-  // progresses as far as the given delay.
-  message AfterSynchronizedProcessingTimeDelay {
-    // (Required) The delay, in milliseconds.
-    int64 delay_millis = 1;
+  // Ready whenever upstream processing time has all caught up with
+  // the arrival time of an input element
+  message AfterSynchronizedProcessingTime {
+  }
+
+  // The default trigger. Equivalent to Repeat { AfterEndOfWindow } but
+  // specially denoted to indicate the user did not alter the triggering.
+  message Default {
+  }
+
+  // Ready whenever the requisite number of input elements have arrived
+  message ElementCount {
+    int32 element_count = 1;
   }
 
   // Never ready. There will only be an ON_TIME output and a final
   // output at window expiration.
-  message Never { }
-
-  // Ready whenever the subtrigger is ready; resets state when the subtrigger
-  // completes.
-  message Repeat {
-    // (Require) Trigger that is run repeatedly.
-    Trigger subtrigger = 1;
+  message Never {
   }
 
   // Ready whenever either of its subtriggers are ready, but finishes output
@@ -473,9 +494,12 @@ message Trigger {
     Trigger finally = 2;
   }
 
-  // The default trigger. Equivalent to Repeat { AfterEndOfWindow } but
-  // specially denoted to indicate the user did not alter the triggering.
-  message Default { }
+  // Ready whenever the subtrigger is ready; resets state when the subtrigger
+  // completes.
+  message Repeat {
+    // (Require) Trigger that is run repeatedly.
+    Trigger subtrigger = 1;
+  }
 
   // The full disjoint union of possible triggers.
   oneof trigger {
@@ -483,12 +507,39 @@ message Trigger {
     AfterAny after_any = 2;
     AfterEach after_each = 3;
     AfterEndOfWindow after_end_of_widow = 4;
-    AfterProcessingTimeDelay after_processing_time_delay = 5;
-    AfterSynchronizedProcessingTimeDelay 
after_synchronized_processing_time_delay = 6;
-    Never never = 7;
-    Repeat repeat = 8;
-    OrFinally or_finally = 9;
-    Default default = 10;
+    AfterProcessingTime after_processing_time = 5;
+    AfterSynchronizedProcessingTime after_synchronized_processing_time = 6;
+    Default default = 7;
+    ElementCount element_count = 8;
+    Never never = 9;
+    OrFinally or_finally = 10;
+    Repeat repeat = 11;
+  }
+}
+
+// A specification for a transformation on a timestamp.
+//
+// Primarily used by AfterProcessingTime triggers to transform
+// the arrival time of input to a target time for firing.
+message TimestampTransform {
+  oneof timestamp_transform {
+    Delay delay = 1;
+    AlignTo align_to = 2;
+  }
+
+  message Delay {
+    // (Required) The delay, in milliseconds.
+    int64 delay_millis = 1;
+  }
+
+  message AlignTo {
+    // (Required) A duration to which delays should be quantized
+    // in milliseconds.
+    int64 period = 3;
+
+    // (Required) An offset from 0 for the quantization specified by
+    // alignment_size, in milliseconds
+    int64 offset = 4;
   }
 }
 
@@ -633,6 +684,12 @@ message DisplayData {
   }
 
   enum Type {
-    STRING = 0; INTEGER = 1; FLOAT = 2; BOOLEAN = 3; TIMESTAMP = 4; DURATION = 
5; JAVA_CLASS = 6;
+    STRING = 0;
+    INTEGER = 1;
+    FLOAT = 2;
+    BOOLEAN = 3;
+    TIMESTAMP = 4;
+    DURATION = 5;
+    JAVA_CLASS = 6;
   }
 }

Reply via email to