Add conversion to/from Java SDK trigger to runner API proto
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f4ceaeef Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f4ceaeef Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f4ceaeef Branch: refs/heads/master Commit: f4ceaeefe9e8e9d069b760e166c7057a00465360 Parents: 2803864 Author: Kenneth Knowles <k...@google.com> Authored: Sat Feb 11 17:50:27 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Feb 14 14:55:49 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/transforms/windowing/Triggers.java | 313 +++++++++++++++++++ .../sdk/transforms/windowing/TriggersTest.java | 100 ++++++ 2 files changed, 413 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f4ceaeef/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java new file mode 100644 index 0000000..8ac904c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate; +import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TimeDomain; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** Utilities for working with {@link Triggers Triggers}. */ +@Experimental(Experimental.Kind.TRIGGER) +public class Triggers implements Serializable { + + @VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter(); + + public static RunnerApi.Trigger toProto(Trigger trigger) { + return CONVERTER.convertTrigger(trigger); + } + + @VisibleForTesting + static class ProtoConverter { + + public RunnerApi.Trigger convertTrigger(Trigger trigger) { + Method evaluationMethod = getEvaluationMethod(trigger.getClass()); + return tryConvert(evaluationMethod, trigger); + } + + private RunnerApi.Trigger tryConvert(Method evaluationMethod, Trigger trigger) { + try { + return (RunnerApi.Trigger) evaluationMethod.invoke(this, trigger); + } catch (InvocationTargetException exc) { + if (exc.getCause() instanceof RuntimeException) { + throw (RuntimeException) exc.getCause(); + } else { + throw new RuntimeException(exc.getCause()); + } + } catch (IllegalAccessException exc) { + throw new IllegalStateException( + String.format("Internal error: could not invoke %s", evaluationMethod)); + } + } + + private Method getEvaluationMethod(Class<?> clazz) { + try { + return getClass().getDeclaredMethod("convertSpecific", clazz); + } catch (NoSuchMethodException exc) { + throw new IllegalArgumentException( + String.format( + "Cannot translate trigger class %s to a runner-API proto.", + clazz.getCanonicalName()), + exc); + } + } + + private RunnerApi.Trigger convertSpecific(DefaultTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setDefault(RunnerApi.Trigger.Default.getDefaultInstance()) + .build(); + } + + private RunnerApi.Trigger convertSpecific(AfterWatermark.FromEndOfWindow v) { + return RunnerApi.Trigger.newBuilder() + .setAfterEndOfWidow(RunnerApi.Trigger.AfterEndOfWindow.newBuilder()) + .build(); + } + + private RunnerApi.Trigger convertSpecific(NeverTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setNever(RunnerApi.Trigger.Never.getDefaultInstance()) + .build(); + } + + private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime v) { + return RunnerApi.Trigger.newBuilder() + .setAfterSynchronizedProcessingTime( + RunnerApi.Trigger.AfterSynchronizedProcessingTime.getDefaultInstance()) + .build(); + } + + private RunnerApi.TimeDomain convertTimeDomain(TimeDomain timeDomain) { + switch (timeDomain) { + case EVENT_TIME: + return RunnerApi.TimeDomain.EVENT_TIME; + case PROCESSING_TIME: + return RunnerApi.TimeDomain.PROCESSING_TIME; + case SYNCHRONIZED_PROCESSING_TIME: + return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME; + default: + throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain)); + } + } + + private RunnerApi.Trigger convertSpecific(AfterFirst v) { + RunnerApi.Trigger.AfterAny.Builder builder = RunnerApi.Trigger.AfterAny.newBuilder(); + + for (Trigger subtrigger : v.subTriggers()) { + builder.addSubtriggers(toProto(subtrigger)); + } + + return RunnerApi.Trigger.newBuilder().setAfterAny(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(AfterAll v) { + RunnerApi.Trigger.AfterAll.Builder builder = RunnerApi.Trigger.AfterAll.newBuilder(); + + for (Trigger subtrigger : v.subTriggers()) { + builder.addSubtriggers(toProto(subtrigger)); + } + + return RunnerApi.Trigger.newBuilder().setAfterAll(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(AfterPane v) { + return RunnerApi.Trigger.newBuilder() + .setElementCount( + RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(v.getElementCount())) + .build(); + } + + private RunnerApi.Trigger convertSpecific(AfterWatermark.AfterWatermarkEarlyAndLate v) { + RunnerApi.Trigger.AfterEndOfWindow.Builder builder = + RunnerApi.Trigger.AfterEndOfWindow.newBuilder(); + + builder.setEarlyFirings(toProto(v.getEarlyTrigger())); + if (v.getLateTrigger() != null) { + builder.setLateFirings(toProto(v.getLateTrigger())); + } + + return RunnerApi.Trigger.newBuilder().setAfterEndOfWidow(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(AfterEach v) { + RunnerApi.Trigger.AfterEach.Builder builder = RunnerApi.Trigger.AfterEach.newBuilder(); + + for (Trigger subtrigger : v.subTriggers()) { + builder.addSubtriggers(toProto(subtrigger)); + } + + return RunnerApi.Trigger.newBuilder().setAfterEach(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(Repeatedly v) { + return RunnerApi.Trigger.newBuilder() + .setRepeat( + RunnerApi.Trigger.Repeat.newBuilder() + .setSubtrigger(toProto(v.getRepeatedTrigger()))) + .build(); + } + + private RunnerApi.Trigger convertSpecific(OrFinallyTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setOrFinally( + RunnerApi.Trigger.OrFinally.newBuilder() + .setMain(toProto(v.getMainTrigger())) + .setFinally(toProto(v.getUntilTrigger()))) + .build(); + } + + private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) { + RunnerApi.Trigger.AfterProcessingTime.Builder builder = + RunnerApi.Trigger.AfterProcessingTime.newBuilder(); + + for (TimestampTransform transform : v.getTimestampTransforms()) { + builder.addTimestampTransforms(convertTimestampTransform(transform)); + } + + return RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build(); + } + + private RunnerApi.TimestampTransform convertTimestampTransform(TimestampTransform transform) { + if (transform instanceof TimestampTransform.Delay) { + return RunnerApi.TimestampTransform.newBuilder() + .setDelay( + RunnerApi.TimestampTransform.Delay.newBuilder() + .setDelayMillis(((TimestampTransform.Delay) transform).getDelay().getMillis())) + .build(); + } else if (transform instanceof TimestampTransform.AlignTo) { + TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform; + return RunnerApi.TimestampTransform.newBuilder() + .setAlignTo( + RunnerApi.TimestampTransform.AlignTo.newBuilder() + .setPeriod(alignTo.getPeriod().getMillis()) + .setOffset(alignTo.getOffset().getMillis())) + .build(); + + } else { + throw new IllegalArgumentException( + String.format("Unknown %s: %s", TimestampTransform.class.getSimpleName(), transform)); + } + } + } + + public static Trigger fromProto(RunnerApi.Trigger triggerProto) { + switch (triggerProto.getTriggerCase()) { + case AFTER_ALL: + return AfterAll.of(protosToTriggers(triggerProto.getAfterAll().getSubtriggersList())); + case AFTER_ANY: + return AfterFirst.of(protosToTriggers(triggerProto.getAfterAny().getSubtriggersList())); + case AFTER_EACH: + return AfterEach.inOrder( + protosToTriggers(triggerProto.getAfterEach().getSubtriggersList())); + case AFTER_END_OF_WIDOW: + RunnerApi.Trigger.AfterEndOfWindow eowProto = triggerProto.getAfterEndOfWidow(); + + if (!eowProto.hasEarlyFirings() && !eowProto.hasLateFirings()) { + return AfterWatermark.pastEndOfWindow(); + } + + // It either has early or late firings or both; our typing in Java makes this a smidge + // annoying + if (triggerProto.getAfterEndOfWidow().hasEarlyFirings()) { + AfterWatermarkEarlyAndLate trigger = + AfterWatermark.pastEndOfWindow() + .withEarlyFirings( + (OnceTrigger) + fromProto(triggerProto.getAfterEndOfWidow().getEarlyFirings())); + + if (triggerProto.getAfterEndOfWidow().hasLateFirings()) { + trigger = + trigger.withLateFirings( + (OnceTrigger) + fromProto(triggerProto.getAfterEndOfWidow().getLateFirings())); + } + return trigger; + } else { + // only late firings, so return directly + return AfterWatermark.pastEndOfWindow() + .withLateFirings((OnceTrigger) fromProto(eowProto.getLateFirings())); + } + case AFTER_PROCESSING_TIME: + AfterProcessingTime trigger = AfterProcessingTime.pastFirstElementInPane(); + for (RunnerApi.TimestampTransform transform : + triggerProto.getAfterProcessingTime().getTimestampTransformsList()) { + switch (transform.getTimestampTransformCase()) { + case ALIGN_TO: + trigger = + trigger.alignedTo( + Duration.millis(transform.getAlignTo().getPeriod()), + new Instant(transform.getAlignTo().getOffset())); + break; + case DELAY: + trigger = trigger.plusDelayOf(Duration.millis(transform.getDelay().getDelayMillis())); + break; + case TIMESTAMPTRANSFORM_NOT_SET: + throw new IllegalArgumentException( + String.format( + "Required field 'timestamp_transform' not set in %s", transform)); + default: + throw new IllegalArgumentException( + String.format( + "Unknown timestamp transform case: %s", + transform.getTimestampTransformCase())); + } + } + return trigger; + case AFTER_SYNCHRONIZED_PROCESSING_TIME: + return AfterSynchronizedProcessingTime.ofFirstElement(); + case ELEMENT_COUNT: + return AfterPane.elementCountAtLeast(triggerProto.getElementCount().getElementCount()); + case NEVER: + return Never.ever(); + case OR_FINALLY: + return fromProto(triggerProto.getOrFinally().getMain()) + .orFinally((OnceTrigger) fromProto(triggerProto.getOrFinally().getFinally())); + case REPEAT: + return Repeatedly.forever(fromProto(triggerProto.getRepeat().getSubtrigger())); + case DEFAULT: + return DefaultTrigger.of(); + case TRIGGER_NOT_SET: + throw new IllegalArgumentException( + String.format("Required field 'trigger' not set in %s", triggerProto)); + default: + throw new IllegalArgumentException( + String.format("Unknown trigger case: %s", triggerProto.getTriggerCase())); + } + } + + private static List<Trigger> protosToTriggers(List<RunnerApi.Trigger> triggers) { + List<Trigger> result = Lists.newArrayList(); + for (RunnerApi.Trigger trigger : triggers) { + result.add(fromProto(trigger)); + } + return result; + } + + // Do not instantiate + private Triggers() {} +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4ceaeef/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java new file mode 100644 index 0000000..0ac5966 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Tests for utilities in {@link Triggers}. */ +@RunWith(Parameterized.class) +public class TriggersTest { + + @AutoValue + abstract static class ToProtoAndBackSpec { + abstract Trigger getTrigger(); + } + + private static ToProtoAndBackSpec toProtoAndBackSpec(Trigger trigger) { + return new AutoValue_TriggersTest_ToProtoAndBackSpec(trigger); + } + + @Parameters(name = "{index}: {0}") + public static Iterable<ToProtoAndBackSpec> data() { + return ImmutableList.of( + // Atomic triggers + toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()), + toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)), + toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()), + toProtoAndBackSpec(Never.ever()), + toProtoAndBackSpec(DefaultTrigger.of()), + toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()), + toProtoAndBackSpec( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))), + toProtoAndBackSpec( + AfterProcessingTime.pastFirstElementInPane() + .alignedTo(Duration.millis(5), new Instant(27))), + toProtoAndBackSpec( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardSeconds(3)) + .alignedTo(Duration.millis(5), new Instant(27)) + .plusDelayOf(Duration.millis(13))), + + // Composite triggers + + toProtoAndBackSpec( + AfterAll.of(AfterPane.elementCountAtLeast(79), AfterWatermark.pastEndOfWindow())), + toProtoAndBackSpec( + AfterEach.inOrder(AfterPane.elementCountAtLeast(79), AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42))) + .withLateFirings(AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())), + toProtoAndBackSpec( + Repeatedly.forever(AfterPane.elementCountAtLeast(1)) + .orFinally(AfterWatermark.pastEndOfWindow()))); + } + + @Parameter(0) + public ToProtoAndBackSpec toProtoAndBackSpec; + + @Test + public void testToProtoAndBack() throws Exception { + Trigger trigger = toProtoAndBackSpec.getTrigger(); + Trigger toProtoAndBackTrigger = Triggers.fromProto(Triggers.toProto(trigger)); + + assertThat(toProtoAndBackTrigger, equalTo(trigger)); + } +}