Add conversion to/from Runner API proto for WindowingStrategy
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7dd57105 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7dd57105 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7dd57105 Branch: refs/heads/master Commit: 7dd571059de5eccce19c237ff7b4fe03eceb4806 Parents: aac38d6 Author: Kenneth Knowles <k...@google.com> Authored: Thu Feb 16 20:26:45 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Feb 21 11:50:53 2017 -0800 ---------------------------------------------------------------------- .../sdk/transforms/windowing/GlobalWindows.java | 10 + .../sdk/transforms/windowing/OutputTimeFns.java | 45 ++++ .../beam/sdk/transforms/windowing/Window.java | 3 +- .../beam/sdk/util/WindowingStrategies.java | 241 +++++++++++++++++++ .../apache/beam/sdk/util/WindowingStrategy.java | 53 +++- .../transforms/windowing/OutputTimeFnsTest.java | 51 ++++ .../beam/sdk/util/WindowingStrategiesTest.java | 91 +++++++ 7 files changed, 490 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7dd57105/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java index aba00a3..6606a5a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java @@ -54,4 +54,14 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> { public Instant getOutputTime(Instant inputTimestamp, GlobalWindow window) { return inputTimestamp; } + + @Override + public boolean equals(Object other) { + return other instanceof GlobalWindows; + } + + @Override + public String toString() { + return getClass().getCanonicalName(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7dd57105/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java index 2bcd319..b5d67fa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java @@ -23,6 +23,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.joda.time.Instant; /** @@ -163,5 +164,49 @@ public class OutputTimeFns { protected Instant assignOutputTime(BoundedWindow window) { return window.maxTimestamp(); } + + @Override + public String toString() { + return getClass().getCanonicalName(); + } + } + + public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) { + if (outputTimeFn instanceof OutputAtEarliestInputTimestamp) { + return RunnerApi.OutputTime.EARLIEST_IN_PANE; + } else if (outputTimeFn instanceof OutputAtLatestInputTimestamp) { + return RunnerApi.OutputTime.LATEST_IN_PANE; + } else if (outputTimeFn instanceof OutputAtEndOfWindow) { + return RunnerApi.OutputTime.END_OF_WINDOW; + } else { + throw new IllegalArgumentException( + String.format( + "Cannot convert %s to %s: %s", + OutputTimeFn.class.getCanonicalName(), + RunnerApi.OutputTime.class.getCanonicalName(), + outputTimeFn)); + } + } + + public static OutputTimeFn<?> fromProto(RunnerApi.OutputTime proto) { + switch (proto) { + case EARLIEST_IN_PANE: + return OutputTimeFns.outputAtEarliestInputTimestamp(); + case LATEST_IN_PANE: + return OutputTimeFns.outputAtLatestInputTimestamp(); + case END_OF_WINDOW: + return OutputTimeFns.outputAtEndOfWindow(); + case UNRECOGNIZED: + default: + // Whether or not it is proto that cannot recognize it (due to the version of the + // generated code we link to) or the switch hasn't been updated to handle it, + // the situation is the same: we don't know what this OutputTime means + throw new IllegalArgumentException( + String.format( + "Cannot convert unknown %s to %s: %s", + RunnerApi.OutputTime.class.getCanonicalName(), + OutputTimeFn.class.getCanonicalName(), + proto)); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/7dd57105/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index eac1c97..65dfaa9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -152,7 +152,8 @@ public class Window { * * <p>This is the default behavior. */ - FIRE_IF_NON_EMPTY + FIRE_IF_NON_EMPTY; + } /** http://git-wip-us.apache.org/repos/asf/beam/blob/7dd57105/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java new file mode 100644 index 0000000..1af7719 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.Serializable; +import java.util.UUID; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Triggers; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes; +import org.joda.time.Duration; + +/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */ +public class WindowingStrategies implements Serializable { + + public static AccumulationMode fromProto(RunnerApi.AccumulationMode proto) { + switch (proto) { + case DISCARDING: + return AccumulationMode.DISCARDING_FIRED_PANES; + case ACCUMULATING: + return AccumulationMode.ACCUMULATING_FIRED_PANES; + case UNRECOGNIZED: + default: + // Whether or not it is proto that cannot recognize it (due to the version of the + // generated code we link to) or the switch hasn't been updated to handle it, + // the situation is the same: we don't know what this OutputTime means + throw new IllegalArgumentException( + String.format( + "Cannot convert unknown %s to %s: %s", + RunnerApi.AccumulationMode.class.getCanonicalName(), + AccumulationMode.class.getCanonicalName(), + proto)); + } + } + + public static RunnerApi.AccumulationMode toProto(AccumulationMode accumulationMode) { + switch (accumulationMode) { + case DISCARDING_FIRED_PANES: + return RunnerApi.AccumulationMode.DISCARDING; + case ACCUMULATING_FIRED_PANES: + return RunnerApi.AccumulationMode.ACCUMULATING; + default: + throw new IllegalArgumentException( + String.format( + "Cannot convert unknown %s to %s: %s", + AccumulationMode.class.getCanonicalName(), + RunnerApi.AccumulationMode.class.getCanonicalName(), + accumulationMode)); + } + } + + public static RunnerApi.ClosingBehavior toProto(Window.ClosingBehavior closingBehavior) { + switch (closingBehavior) { + case FIRE_ALWAYS: + return RunnerApi.ClosingBehavior.EMIT_ALWAYS; + case FIRE_IF_NON_EMPTY: + return RunnerApi.ClosingBehavior.EMIT_IF_NONEMPTY; + default: + throw new IllegalArgumentException( + String.format( + "Cannot convert unknown %s to %s: %s", + ClosingBehavior.class.getCanonicalName(), + RunnerApi.ClosingBehavior.class.getCanonicalName(), + closingBehavior)); + } + } + + public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) { + switch (proto) { + case EMIT_ALWAYS: + return ClosingBehavior.FIRE_ALWAYS; + case EMIT_IF_NONEMPTY: + return ClosingBehavior.FIRE_IF_NON_EMPTY; + case UNRECOGNIZED: + default: + // Whether or not it is proto that cannot recognize it (due to the version of the + // generated code we link to) or the switch hasn't been updated to handle it, + // the situation is the same: we don't know what this OutputTime means + throw new IllegalArgumentException( + String.format( + "Cannot convert unknown %s to %s: %s", + RunnerApi.ClosingBehavior.class.getCanonicalName(), + ClosingBehavior.class.getCanonicalName(), + proto)); + + } + } + + public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) { + if (outputTimeFn instanceof WindowingStrategy.CombineWindowFnOutputTimes) { + return OutputTimeFns.toProto( + ((CombineWindowFnOutputTimes<?>) outputTimeFn).getOutputTimeFn()); + } else { + return OutputTimeFns.toProto(outputTimeFn); + } + } + + // This URN says that the coder is just a UDF blob the indicated SDK understands + private static final String CUSTOM_CODER_URN = "urn:beam:coders:custom:1.0"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> windowFn) + throws IOException { + Coder<?> windowCoder = windowFn.windowCoder(); + + // TODO: re-use components + String windowCoderId = UUID.randomUUID().toString(); + String customCoderId = UUID.randomUUID().toString(); + + return RunnerApi.MessageWithComponents.newBuilder() + .setFunctionSpec( + RunnerApi.FunctionSpec.newBuilder() + .setSdkFnSpec( + RunnerApi.SdkFunctionSpec.newBuilder() + .setData( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(windowFn))))) + .setComponents( + Components.newBuilder() + .putCoders( + windowCoderId, + RunnerApi.Coder.newBuilder() + .setUrn(CUSTOM_CODER_URN) + .setCustomCoderFnId(customCoderId) + .build()) + .putFunctionSpecs( + customCoderId, + RunnerApi.FunctionSpec.newBuilder() + .setSdkFnSpec( + RunnerApi.SdkFunctionSpec.newBuilder() + .setData( + ByteString.copyFrom( + OBJECT_MAPPER.writeValueAsBytes( + windowCoder.asCloudObject())))) + .build())) + .build(); + } + + public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy) + throws IOException { + + // TODO: have an inverted components to find the id for a thing already + // in the components + String windowFnId = UUID.randomUUID().toString(); + + RunnerApi.MessageWithComponents windowFnWithComponents = + toProto(windowingStrategy.getWindowFn()); + + RunnerApi.WindowingStrategy.Builder windowingStrategyProto = + RunnerApi.WindowingStrategy.newBuilder() + .setOutputTime(toProto(windowingStrategy.getOutputTimeFn())) + .setAccumulationMode(toProto(windowingStrategy.getMode())) + .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior())) + .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis()) + .setTrigger(Triggers.toProto(windowingStrategy.getTrigger())) + .setFnId(windowFnId); + + return RunnerApi.MessageWithComponents.newBuilder() + .setWindowingStrategy(windowingStrategyProto) + .setComponents( + windowFnWithComponents + .getComponents() + .toBuilder() + .putFunctionSpecs(windowFnId, windowFnWithComponents.getFunctionSpec())) + .build(); + } + + /** + * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link RunnerApi.Components} + * to the SDK's {@link WindowingStrategy}. + */ + public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto) { + switch (proto.getRootCase()) { + case WINDOWING_STRATEGY: + return fromProto(proto.getWindowingStrategy(), proto.getComponents()); + default: + throw new IllegalArgumentException( + String.format( + "Expected a %s with components but received %s", + RunnerApi.WindowingStrategy.class.getCanonicalName(), proto)); + } + } + + /** + * Converts from {@link RunnerApi.WindowingStrategy} to the SDK's {@link WindowingStrategy} using + * the provided components to dereferences identifiers found in the proto. + */ + public static WindowingStrategy<?, ?> fromProto( + RunnerApi.WindowingStrategy proto, RunnerApi.Components components) { + Object deserializedWindowFn = + SerializableUtils.deserializeFromByteArray( + components + .getFunctionSpecsMap() + .get(proto.getFnId()) + .getSdkFnSpec() + .getData() + .toByteArray(), + "WindowFn"); + + WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn; + OutputTimeFn<?> outputTimeFn = OutputTimeFns.fromProto(proto.getOutputTime()); + AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode()); + Trigger trigger = Triggers.fromProto(proto.getTrigger()); + ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior()); + Duration allowedLateness = Duration.millis(proto.getAllowedLateness()); + + return WindowingStrategy.of(windowFn) + .withAllowedLateness(allowedLateness) + .withMode(accumulationMode) + .withTrigger(trigger) + .withOutputTimeFn(outputTimeFn) + .withClosingBehavior(closingBehavior); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/7dd57105/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java index 137f108..c14523c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.util; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import java.io.Serializable; import java.util.Collections; @@ -50,7 +51,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab */ public enum AccumulationMode { DISCARDING_FIRED_PANES, - ACCUMULATING_FIRED_PANES + ACCUMULATING_FIRED_PANES; } private static final Duration DEFAULT_ALLOWED_LATENESS = Duration.ZERO; @@ -99,7 +100,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab DefaultTrigger.of(), false, AccumulationMode.DISCARDING_FIRED_PANES, false, DEFAULT_ALLOWED_LATENESS, false, - OutputTimeFns.outputAtEndOfWindow(), false, + new CombineWindowFnOutputTimes(OutputTimeFns.outputAtEndOfWindow(), windowFn), false, ClosingBehavior.FIRE_IF_NON_EMPTY); } @@ -235,6 +236,21 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab closingBehavior); } + /** + * Fixes all the defaults so that equals can be used to check that two strategies are the same, + * regardless of the state of "defaulted-ness". + */ + @VisibleForTesting + public WindowingStrategy<T, W> fixDefaults() { + return new WindowingStrategy<>( + windowFn, + trigger, true, + mode, true, + allowedLateness, true, + outputTimeFn, true, + closingBehavior); + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -283,7 +299,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab * is calculated using {@link OutputTimeFn#merge}.</li> * </ul> */ - private static class CombineWindowFnOutputTimes<W extends BoundedWindow> + public static class CombineWindowFnOutputTimes<W extends BoundedWindow> extends OutputTimeFn<W> { private final OutputTimeFn<? super W> outputTimeFn; @@ -295,6 +311,10 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab this.windowFn = windowFn; } + public OutputTimeFn<? super W> getOutputTimeFn() { + return outputTimeFn; + } + @Override public Instant assignOutputTime(Instant inputTimestamp, W window) { return outputTimeFn.merge( @@ -320,5 +340,32 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab public boolean dependsOnlyOnEarliestInputTimestamp() { return outputTimeFn.dependsOnlyOnEarliestInputTimestamp(); } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof CombineWindowFnOutputTimes)) { + return false; + } + + CombineWindowFnOutputTimes<?> that = (CombineWindowFnOutputTimes<?>) obj; + return outputTimeFn.equals(that.outputTimeFn) && windowFn.equals(that.windowFn); + } + + @Override + public int hashCode() { + return Objects.hash(outputTimeFn, windowFn); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("outputTimeFn", outputTimeFn) + .add("windowFn", windowFn) + .toString(); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/7dd57105/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java new file mode 100644 index 0000000..78d7a2f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Tests for {@link OutputTimeFns}. */ +@RunWith(Parameterized.class) +public class OutputTimeFnsTest { + + @Parameters(name = "{index}: {0}") + public static Iterable<OutputTimeFn<BoundedWindow>> data() { + return ImmutableList.of( + OutputTimeFns.outputAtEarliestInputTimestamp(), + OutputTimeFns.outputAtLatestInputTimestamp(), + OutputTimeFns.outputAtEndOfWindow()); + } + + @Parameter(0) + public OutputTimeFn<?> outputTimeFn; + + @Test + public void testToProtoAndBack() throws Exception { + OutputTimeFn<?> result = OutputTimeFns.fromProto(OutputTimeFns.toProto(outputTimeFn)); + + assertThat(result, equalTo((OutputTimeFn) outputTimeFn)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7dd57105/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java new file mode 100644 index 0000000..5d3de51 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Unit tests for {@link WindowingStrategy}. */ +@RunWith(Parameterized.class) +public class WindowingStrategiesTest { + + // Each spec activates tests of all subsets of its fields + @AutoValue + abstract static class ToProtoAndBackSpec { + abstract WindowingStrategy getWindowingStrategy(); + } + + private static ToProtoAndBackSpec toProtoAndBackSpec(WindowingStrategy windowingStrategy) { + return new AutoValue_WindowingStrategiesTest_ToProtoAndBackSpec(windowingStrategy); + } + + private static final WindowFn<?, ?> REPRESENTATIVE_WINDOW_FN = + FixedWindows.of(Duration.millis(12)); + + private static final Trigger REPRESENTATIVE_TRIGGER = AfterWatermark.pastEndOfWindow(); + + @Parameters(name = "{index}: {0}") + public static Iterable<ToProtoAndBackSpec> data() { + return ImmutableList.of( + toProtoAndBackSpec(WindowingStrategy.globalDefault()), + toProtoAndBackSpec( + WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN) + .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withTrigger(REPRESENTATIVE_TRIGGER) + .withAllowedLateness(Duration.millis(71)) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())), + toProtoAndBackSpec( + WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN) + .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY) + .withMode(AccumulationMode.DISCARDING_FIRED_PANES) + .withTrigger(REPRESENTATIVE_TRIGGER) + .withAllowedLateness(Duration.millis(93)) + .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))); + } + + @Parameter(0) + public ToProtoAndBackSpec toProtoAndBackSpec; + + @Test + public void testToProtoAndBack() throws Exception { + WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy(); + WindowingStrategy<?, ?> toProtoAndBackWindowingStrategy = + WindowingStrategies.fromProto(WindowingStrategies.toProto(windowingStrategy)); + + assertThat( + toProtoAndBackWindowingStrategy, + equalTo((WindowingStrategy) windowingStrategy.fixDefaults())); + } +}