Translate ReshuffleTrigger into Always trigger proto
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bfdbebdd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bfdbebdd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bfdbebdd Branch: refs/heads/master Commit: bfdbebdd6ecdc29d9eb12590399ebae45466a038 Parents: 9dbaeaf Author: Kenneth Knowles <k...@google.com> Authored: Sat Feb 18 19:19:33 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Feb 24 07:16:35 2017 -0800 ---------------------------------------------------------------------- sdks/common/runner-api/src/main/proto/beam_runner_api.proto | 6 ++++++ .../org/apache/beam/sdk/transforms/windowing/Triggers.java | 7 +++++++ 2 files changed, 13 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bfdbebdd/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index ce089f5..c030e73 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -540,6 +540,11 @@ message Trigger { message Never { } + // Always ready. This can also be expressed as ElementCount(1) but + // is more explicit. + message Always { + } + // Ready whenever either of its subtriggers are ready, but finishes output // when the finally subtrigger fires. message OrFinally { @@ -566,6 +571,7 @@ message Trigger { AfterEndOfWindow after_end_of_widow = 4; AfterProcessingTime after_processing_time = 5; AfterSynchronizedProcessingTime after_synchronized_processing_time = 6; + Always always = 12; Default default = 7; ElementCount element_count = 8; Never never = 9; http://git-wip-us.apache.org/repos/asf/beam/blob/bfdbebdd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java index 8ac904c..d788ca2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate; import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.ReshuffleTrigger; import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Duration; import org.joda.time.Instant; @@ -95,6 +96,12 @@ public class Triggers implements Serializable { .build(); } + private RunnerApi.Trigger convertSpecific(ReshuffleTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setAlways(RunnerApi.Trigger.Always.getDefaultInstance()) + .build(); + } + private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime v) { return RunnerApi.Trigger.newBuilder() .setAfterSynchronizedProcessingTime(