Add conversion to/from Java SDK trigger to runner API proto

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f4ceaeef
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f4ceaeef
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f4ceaeef

Branch: refs/heads/master
Commit: f4ceaeefe9e8e9d069b760e166c7057a00465360
Parents: 2803864
Author: Kenneth Knowles <k...@google.com>
Authored: Sat Feb 11 17:50:27 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Feb 14 14:55:49 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/transforms/windowing/Triggers.java | 313 +++++++++++++++++++
 .../sdk/transforms/windowing/TriggersTest.java  | 100 ++++++
 2 files changed, 413 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f4ceaeef/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
new file mode 100644
index 0000000..8ac904c
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import 
org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate;
+import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** Utilities for working with {@link Triggers Triggers}. */
+@Experimental(Experimental.Kind.TRIGGER)
+public class Triggers implements Serializable {
+
+  @VisibleForTesting static final ProtoConverter CONVERTER = new 
ProtoConverter();
+
+  public static RunnerApi.Trigger toProto(Trigger trigger) {
+    return CONVERTER.convertTrigger(trigger);
+  }
+
+  @VisibleForTesting
+  static class ProtoConverter {
+
+    public RunnerApi.Trigger convertTrigger(Trigger trigger) {
+      Method evaluationMethod = getEvaluationMethod(trigger.getClass());
+      return tryConvert(evaluationMethod, trigger);
+    }
+
+    private RunnerApi.Trigger tryConvert(Method evaluationMethod, Trigger 
trigger) {
+      try {
+        return (RunnerApi.Trigger) evaluationMethod.invoke(this, trigger);
+      } catch (InvocationTargetException exc) {
+        if (exc.getCause() instanceof RuntimeException) {
+          throw (RuntimeException) exc.getCause();
+        } else {
+          throw new RuntimeException(exc.getCause());
+        }
+      } catch (IllegalAccessException exc) {
+        throw new IllegalStateException(
+            String.format("Internal error: could not invoke %s", 
evaluationMethod));
+      }
+    }
+
+    private Method getEvaluationMethod(Class<?> clazz) {
+      try {
+        return getClass().getDeclaredMethod("convertSpecific", clazz);
+      } catch (NoSuchMethodException exc) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot translate trigger class %s to a runner-API proto.",
+                clazz.getCanonicalName()),
+            exc);
+      }
+    }
+
+    private RunnerApi.Trigger convertSpecific(DefaultTrigger v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setDefault(RunnerApi.Trigger.Default.getDefaultInstance())
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterWatermark.FromEndOfWindow 
v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setAfterEndOfWidow(RunnerApi.Trigger.AfterEndOfWindow.newBuilder())
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(NeverTrigger v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setNever(RunnerApi.Trigger.Never.getDefaultInstance())
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime 
v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setAfterSynchronizedProcessingTime(
+              
RunnerApi.Trigger.AfterSynchronizedProcessingTime.getDefaultInstance())
+          .build();
+    }
+
+    private RunnerApi.TimeDomain convertTimeDomain(TimeDomain timeDomain) {
+      switch (timeDomain) {
+        case EVENT_TIME:
+          return RunnerApi.TimeDomain.EVENT_TIME;
+        case PROCESSING_TIME:
+          return RunnerApi.TimeDomain.PROCESSING_TIME;
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME;
+        default:
+          throw new IllegalArgumentException(String.format("Unknown time 
domain: %s", timeDomain));
+      }
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterFirst v) {
+      RunnerApi.Trigger.AfterAny.Builder builder = 
RunnerApi.Trigger.AfterAny.newBuilder();
+
+      for (Trigger subtrigger : v.subTriggers()) {
+        builder.addSubtriggers(toProto(subtrigger));
+      }
+
+      return RunnerApi.Trigger.newBuilder().setAfterAny(builder).build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterAll v) {
+      RunnerApi.Trigger.AfterAll.Builder builder = 
RunnerApi.Trigger.AfterAll.newBuilder();
+
+      for (Trigger subtrigger : v.subTriggers()) {
+        builder.addSubtriggers(toProto(subtrigger));
+      }
+
+      return RunnerApi.Trigger.newBuilder().setAfterAll(builder).build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterPane v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setElementCount(
+              
RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(v.getElementCount()))
+          .build();
+    }
+
+    private RunnerApi.Trigger 
convertSpecific(AfterWatermark.AfterWatermarkEarlyAndLate v) {
+      RunnerApi.Trigger.AfterEndOfWindow.Builder builder =
+          RunnerApi.Trigger.AfterEndOfWindow.newBuilder();
+
+      builder.setEarlyFirings(toProto(v.getEarlyTrigger()));
+      if (v.getLateTrigger() != null) {
+        builder.setLateFirings(toProto(v.getLateTrigger()));
+      }
+
+      return 
RunnerApi.Trigger.newBuilder().setAfterEndOfWidow(builder).build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterEach v) {
+      RunnerApi.Trigger.AfterEach.Builder builder = 
RunnerApi.Trigger.AfterEach.newBuilder();
+
+      for (Trigger subtrigger : v.subTriggers()) {
+        builder.addSubtriggers(toProto(subtrigger));
+      }
+
+      return RunnerApi.Trigger.newBuilder().setAfterEach(builder).build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(Repeatedly v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setRepeat(
+              RunnerApi.Trigger.Repeat.newBuilder()
+                  .setSubtrigger(toProto(v.getRepeatedTrigger())))
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(OrFinallyTrigger v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setOrFinally(
+              RunnerApi.Trigger.OrFinally.newBuilder()
+                  .setMain(toProto(v.getMainTrigger()))
+                  .setFinally(toProto(v.getUntilTrigger())))
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) {
+      RunnerApi.Trigger.AfterProcessingTime.Builder builder =
+          RunnerApi.Trigger.AfterProcessingTime.newBuilder();
+
+      for (TimestampTransform transform : v.getTimestampTransforms()) {
+        builder.addTimestampTransforms(convertTimestampTransform(transform));
+      }
+
+      return 
RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build();
+    }
+
+    private RunnerApi.TimestampTransform 
convertTimestampTransform(TimestampTransform transform) {
+      if (transform instanceof TimestampTransform.Delay) {
+        return RunnerApi.TimestampTransform.newBuilder()
+            .setDelay(
+                RunnerApi.TimestampTransform.Delay.newBuilder()
+                    .setDelayMillis(((TimestampTransform.Delay) 
transform).getDelay().getMillis()))
+            .build();
+      } else if (transform instanceof TimestampTransform.AlignTo) {
+        TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) 
transform;
+        return RunnerApi.TimestampTransform.newBuilder()
+            .setAlignTo(
+                RunnerApi.TimestampTransform.AlignTo.newBuilder()
+                    .setPeriod(alignTo.getPeriod().getMillis())
+                    .setOffset(alignTo.getOffset().getMillis()))
+            .build();
+
+      } else {
+        throw new IllegalArgumentException(
+            String.format("Unknown %s: %s", 
TimestampTransform.class.getSimpleName(), transform));
+      }
+    }
+  }
+
+  public static Trigger fromProto(RunnerApi.Trigger triggerProto) {
+    switch (triggerProto.getTriggerCase()) {
+      case AFTER_ALL:
+        return 
AfterAll.of(protosToTriggers(triggerProto.getAfterAll().getSubtriggersList()));
+      case AFTER_ANY:
+        return 
AfterFirst.of(protosToTriggers(triggerProto.getAfterAny().getSubtriggersList()));
+      case AFTER_EACH:
+        return AfterEach.inOrder(
+            
protosToTriggers(triggerProto.getAfterEach().getSubtriggersList()));
+      case AFTER_END_OF_WIDOW:
+        RunnerApi.Trigger.AfterEndOfWindow eowProto = 
triggerProto.getAfterEndOfWidow();
+
+        if (!eowProto.hasEarlyFirings() && !eowProto.hasLateFirings()) {
+          return AfterWatermark.pastEndOfWindow();
+        }
+
+        // It either has early or late firings or both; our typing in Java 
makes this a smidge
+        // annoying
+        if (triggerProto.getAfterEndOfWidow().hasEarlyFirings()) {
+          AfterWatermarkEarlyAndLate trigger =
+              AfterWatermark.pastEndOfWindow()
+                  .withEarlyFirings(
+                      (OnceTrigger)
+                          
fromProto(triggerProto.getAfterEndOfWidow().getEarlyFirings()));
+
+          if (triggerProto.getAfterEndOfWidow().hasLateFirings()) {
+            trigger =
+                trigger.withLateFirings(
+                    (OnceTrigger)
+                        
fromProto(triggerProto.getAfterEndOfWidow().getLateFirings()));
+          }
+          return trigger;
+        } else {
+          // only late firings, so return directly
+          return AfterWatermark.pastEndOfWindow()
+              .withLateFirings((OnceTrigger) 
fromProto(eowProto.getLateFirings()));
+        }
+      case AFTER_PROCESSING_TIME:
+        AfterProcessingTime trigger = 
AfterProcessingTime.pastFirstElementInPane();
+        for (RunnerApi.TimestampTransform transform :
+            
triggerProto.getAfterProcessingTime().getTimestampTransformsList()) {
+          switch (transform.getTimestampTransformCase()) {
+            case ALIGN_TO:
+              trigger =
+                  trigger.alignedTo(
+                      Duration.millis(transform.getAlignTo().getPeriod()),
+                      new Instant(transform.getAlignTo().getOffset()));
+              break;
+            case DELAY:
+              trigger = 
trigger.plusDelayOf(Duration.millis(transform.getDelay().getDelayMillis()));
+              break;
+            case TIMESTAMPTRANSFORM_NOT_SET:
+              throw new IllegalArgumentException(
+                  String.format(
+                      "Required field 'timestamp_transform' not set in %s", 
transform));
+            default:
+              throw new IllegalArgumentException(
+                  String.format(
+                      "Unknown timestamp transform case: %s",
+                      transform.getTimestampTransformCase()));
+          }
+        }
+        return trigger;
+      case AFTER_SYNCHRONIZED_PROCESSING_TIME:
+        return AfterSynchronizedProcessingTime.ofFirstElement();
+      case ELEMENT_COUNT:
+        return 
AfterPane.elementCountAtLeast(triggerProto.getElementCount().getElementCount());
+      case NEVER:
+        return Never.ever();
+      case OR_FINALLY:
+        return fromProto(triggerProto.getOrFinally().getMain())
+            .orFinally((OnceTrigger) 
fromProto(triggerProto.getOrFinally().getFinally()));
+      case REPEAT:
+        return 
Repeatedly.forever(fromProto(triggerProto.getRepeat().getSubtrigger()));
+      case DEFAULT:
+        return DefaultTrigger.of();
+      case TRIGGER_NOT_SET:
+        throw new IllegalArgumentException(
+            String.format("Required field 'trigger' not set in %s", 
triggerProto));
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unknown trigger case: %s", 
triggerProto.getTriggerCase()));
+    }
+  }
+
+  private static List<Trigger> protosToTriggers(List<RunnerApi.Trigger> 
triggers) {
+    List<Trigger> result = Lists.newArrayList();
+    for (RunnerApi.Trigger trigger : triggers) {
+      result.add(fromProto(trigger));
+    }
+    return result;
+  }
+
+  // Do not instantiate
+  private Triggers() {}
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4ceaeef/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java
new file mode 100644
index 0000000..0ac5966
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for utilities in {@link Triggers}. */
+@RunWith(Parameterized.class)
+public class TriggersTest {
+
+  @AutoValue
+  abstract static class ToProtoAndBackSpec {
+    abstract Trigger getTrigger();
+  }
+
+  private static ToProtoAndBackSpec toProtoAndBackSpec(Trigger trigger) {
+    return new AutoValue_TriggersTest_ToProtoAndBackSpec(trigger);
+  }
+
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<ToProtoAndBackSpec> data() {
+    return ImmutableList.of(
+        // Atomic triggers
+        toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()),
+        toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)),
+        toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()),
+        toProtoAndBackSpec(Never.ever()),
+        toProtoAndBackSpec(DefaultTrigger.of()),
+        toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()),
+        toProtoAndBackSpec(
+            
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))),
+        toProtoAndBackSpec(
+            AfterProcessingTime.pastFirstElementInPane()
+                .alignedTo(Duration.millis(5), new Instant(27))),
+        toProtoAndBackSpec(
+            AfterProcessingTime.pastFirstElementInPane()
+                .plusDelayOf(Duration.standardSeconds(3))
+                .alignedTo(Duration.millis(5), new Instant(27))
+                .plusDelayOf(Duration.millis(13))),
+
+        // Composite triggers
+
+        toProtoAndBackSpec(
+            AfterAll.of(AfterPane.elementCountAtLeast(79), 
AfterWatermark.pastEndOfWindow())),
+        toProtoAndBackSpec(
+            AfterEach.inOrder(AfterPane.elementCountAtLeast(79), 
AfterPane.elementCountAtLeast(3))),
+        toProtoAndBackSpec(
+            AfterFirst.of(AfterWatermark.pastEndOfWindow(), 
AfterPane.elementCountAtLeast(3))),
+        toProtoAndBackSpec(
+            
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))),
+        toProtoAndBackSpec(
+            
AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))),
+        toProtoAndBackSpec(
+            AfterWatermark.pastEndOfWindow()
+                .withEarlyFirings(
+                    
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42)))
+                .withLateFirings(AfterPane.elementCountAtLeast(3))),
+        
toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())),
+        toProtoAndBackSpec(
+            Repeatedly.forever(AfterPane.elementCountAtLeast(1))
+                .orFinally(AfterWatermark.pastEndOfWindow())));
+  }
+
+  @Parameter(0)
+  public ToProtoAndBackSpec toProtoAndBackSpec;
+
+  @Test
+  public void testToProtoAndBack() throws Exception {
+    Trigger trigger = toProtoAndBackSpec.getTrigger();
+    Trigger toProtoAndBackTrigger = 
Triggers.fromProto(Triggers.toProto(trigger));
+
+    assertThat(toProtoAndBackTrigger, equalTo(trigger));
+  }
+}

Reply via email to