Add conversion to/from Runner API proto for WindowingStrategy

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7dd57105
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7dd57105
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7dd57105

Branch: refs/heads/master
Commit: 7dd571059de5eccce19c237ff7b4fe03eceb4806
Parents: aac38d6
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Feb 16 20:26:45 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Feb 21 11:50:53 2017 -0800

----------------------------------------------------------------------
 .../sdk/transforms/windowing/GlobalWindows.java |  10 +
 .../sdk/transforms/windowing/OutputTimeFns.java |  45 ++++
 .../beam/sdk/transforms/windowing/Window.java   |   3 +-
 .../beam/sdk/util/WindowingStrategies.java      | 241 +++++++++++++++++++
 .../apache/beam/sdk/util/WindowingStrategy.java |  53 +++-
 .../transforms/windowing/OutputTimeFnsTest.java |  51 ++++
 .../beam/sdk/util/WindowingStrategiesTest.java  |  91 +++++++
 7 files changed, 490 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7dd57105/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
index aba00a3..6606a5a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
@@ -54,4 +54,14 @@ public class GlobalWindows extends 
NonMergingWindowFn<Object, GlobalWindow> {
   public Instant getOutputTime(Instant inputTimestamp, GlobalWindow window) {
     return inputTimestamp;
   }
+
+  @Override
+  public boolean equals(Object other) {
+    return other instanceof GlobalWindows;
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getCanonicalName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7dd57105/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
index 2bcd319..b5d67fa 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFns.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.joda.time.Instant;
 
 /**
@@ -163,5 +164,49 @@ public class OutputTimeFns {
     protected Instant assignOutputTime(BoundedWindow window) {
       return window.maxTimestamp();
     }
+
+    @Override
+    public String toString() {
+      return getClass().getCanonicalName();
+    }
+  }
+
+  public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
+    if (outputTimeFn instanceof OutputAtEarliestInputTimestamp) {
+      return RunnerApi.OutputTime.EARLIEST_IN_PANE;
+    } else if (outputTimeFn instanceof OutputAtLatestInputTimestamp) {
+      return RunnerApi.OutputTime.LATEST_IN_PANE;
+    } else if (outputTimeFn instanceof OutputAtEndOfWindow) {
+      return RunnerApi.OutputTime.END_OF_WINDOW;
+    } else {
+      throw new IllegalArgumentException(
+          String.format(
+              "Cannot convert %s to %s: %s",
+              OutputTimeFn.class.getCanonicalName(),
+              RunnerApi.OutputTime.class.getCanonicalName(),
+              outputTimeFn));
+    }
+  }
+
+  public static OutputTimeFn<?> fromProto(RunnerApi.OutputTime proto) {
+    switch (proto) {
+      case EARLIEST_IN_PANE:
+        return OutputTimeFns.outputAtEarliestInputTimestamp();
+      case LATEST_IN_PANE:
+        return OutputTimeFns.outputAtLatestInputTimestamp();
+      case END_OF_WINDOW:
+        return OutputTimeFns.outputAtEndOfWindow();
+      case UNRECOGNIZED:
+      default:
+        // Whether or not it is proto that cannot recognize it (due to the 
version of the
+        // generated code we link to) or the switch hasn't been updated to 
handle it,
+        // the situation is the same: we don't know what this OutputTime means
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                RunnerApi.OutputTime.class.getCanonicalName(),
+                OutputTimeFn.class.getCanonicalName(),
+                proto));
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7dd57105/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index eac1c97..65dfaa9 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -152,7 +152,8 @@ public class Window {
      *
      * <p>This is the default behavior.
      */
-    FIRE_IF_NON_EMPTY
+    FIRE_IF_NON_EMPTY;
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/7dd57105/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
new file mode 100644
index 0000000..1af7719
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Triggers;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes;
+import org.joda.time.Duration;
+
+/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. 
*/
+public class WindowingStrategies implements Serializable {
+
+  public static AccumulationMode fromProto(RunnerApi.AccumulationMode proto) {
+    switch (proto) {
+      case DISCARDING:
+        return AccumulationMode.DISCARDING_FIRED_PANES;
+      case ACCUMULATING:
+        return AccumulationMode.ACCUMULATING_FIRED_PANES;
+      case UNRECOGNIZED:
+      default:
+        // Whether or not it is proto that cannot recognize it (due to the 
version of the
+        // generated code we link to) or the switch hasn't been updated to 
handle it,
+        // the situation is the same: we don't know what this OutputTime means
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                RunnerApi.AccumulationMode.class.getCanonicalName(),
+                AccumulationMode.class.getCanonicalName(),
+                proto));
+    }
+  }
+
+  public static RunnerApi.AccumulationMode toProto(AccumulationMode 
accumulationMode) {
+    switch (accumulationMode) {
+      case DISCARDING_FIRED_PANES:
+        return RunnerApi.AccumulationMode.DISCARDING;
+      case ACCUMULATING_FIRED_PANES:
+        return RunnerApi.AccumulationMode.ACCUMULATING;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                AccumulationMode.class.getCanonicalName(),
+                RunnerApi.AccumulationMode.class.getCanonicalName(),
+                accumulationMode));
+    }
+  }
+
+  public static RunnerApi.ClosingBehavior toProto(Window.ClosingBehavior 
closingBehavior) {
+    switch (closingBehavior) {
+      case FIRE_ALWAYS:
+        return RunnerApi.ClosingBehavior.EMIT_ALWAYS;
+      case FIRE_IF_NON_EMPTY:
+        return RunnerApi.ClosingBehavior.EMIT_IF_NONEMPTY;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                ClosingBehavior.class.getCanonicalName(),
+                RunnerApi.ClosingBehavior.class.getCanonicalName(),
+                closingBehavior));
+    }
+  }
+
+  public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) {
+    switch (proto) {
+      case EMIT_ALWAYS:
+        return ClosingBehavior.FIRE_ALWAYS;
+      case EMIT_IF_NONEMPTY:
+        return ClosingBehavior.FIRE_IF_NON_EMPTY;
+      case UNRECOGNIZED:
+      default:
+        // Whether or not it is proto that cannot recognize it (due to the 
version of the
+        // generated code we link to) or the switch hasn't been updated to 
handle it,
+        // the situation is the same: we don't know what this OutputTime means
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                RunnerApi.ClosingBehavior.class.getCanonicalName(),
+                ClosingBehavior.class.getCanonicalName(),
+                proto));
+
+    }
+  }
+
+  public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
+    if (outputTimeFn instanceof WindowingStrategy.CombineWindowFnOutputTimes) {
+      return OutputTimeFns.toProto(
+          ((CombineWindowFnOutputTimes<?>) outputTimeFn).getOutputTimeFn());
+    } else {
+      return OutputTimeFns.toProto(outputTimeFn);
+    }
+  }
+
+  // This URN says that the coder is just a UDF blob the indicated SDK 
understands
+  private static final String CUSTOM_CODER_URN = "urn:beam:coders:custom:1.0";
+
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> 
windowFn)
+      throws IOException {
+    Coder<?> windowCoder = windowFn.windowCoder();
+
+    // TODO: re-use components
+    String windowCoderId = UUID.randomUUID().toString();
+    String customCoderId = UUID.randomUUID().toString();
+
+    return RunnerApi.MessageWithComponents.newBuilder()
+        .setFunctionSpec(
+            RunnerApi.FunctionSpec.newBuilder()
+                .setSdkFnSpec(
+                    RunnerApi.SdkFunctionSpec.newBuilder()
+                        .setData(
+                            
ByteString.copyFrom(SerializableUtils.serializeToByteArray(windowFn)))))
+        .setComponents(
+            Components.newBuilder()
+                .putCoders(
+                    windowCoderId,
+                    RunnerApi.Coder.newBuilder()
+                        .setUrn(CUSTOM_CODER_URN)
+                        .setCustomCoderFnId(customCoderId)
+                        .build())
+                .putFunctionSpecs(
+                    customCoderId,
+                    RunnerApi.FunctionSpec.newBuilder()
+                        .setSdkFnSpec(
+                            RunnerApi.SdkFunctionSpec.newBuilder()
+                                .setData(
+                                    ByteString.copyFrom(
+                                        OBJECT_MAPPER.writeValueAsBytes(
+                                            windowCoder.asCloudObject()))))
+                        .build()))
+        .build();
+  }
+
+  public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, 
?> windowingStrategy)
+      throws IOException {
+
+    // TODO: have an inverted components to find the id for a thing already
+    // in the components
+    String windowFnId = UUID.randomUUID().toString();
+
+    RunnerApi.MessageWithComponents windowFnWithComponents =
+        toProto(windowingStrategy.getWindowFn());
+
+    RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
+        RunnerApi.WindowingStrategy.newBuilder()
+            .setOutputTime(toProto(windowingStrategy.getOutputTimeFn()))
+            .setAccumulationMode(toProto(windowingStrategy.getMode()))
+            
.setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
+            
.setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
+            .setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
+            .setFnId(windowFnId);
+
+    return RunnerApi.MessageWithComponents.newBuilder()
+        .setWindowingStrategy(windowingStrategyProto)
+        .setComponents(
+            windowFnWithComponents
+                .getComponents()
+                .toBuilder()
+                .putFunctionSpecs(windowFnId, 
windowFnWithComponents.getFunctionSpec()))
+        .build();
+  }
+
+  /**
+   * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link 
RunnerApi.Components}
+   * to the SDK's {@link WindowingStrategy}.
+   */
+  public static WindowingStrategy<?, ?> 
fromProto(RunnerApi.MessageWithComponents proto) {
+    switch (proto.getRootCase()) {
+      case WINDOWING_STRATEGY:
+        return fromProto(proto.getWindowingStrategy(), proto.getComponents());
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Expected a %s with components but received %s",
+                RunnerApi.WindowingStrategy.class.getCanonicalName(), proto));
+    }
+  }
+
+  /**
+   * Converts from {@link RunnerApi.WindowingStrategy} to the SDK's {@link 
WindowingStrategy} using
+   * the provided components to dereferences identifiers found in the proto.
+   */
+  public static WindowingStrategy<?, ?> fromProto(
+      RunnerApi.WindowingStrategy proto, RunnerApi.Components components) {
+    Object deserializedWindowFn =
+        SerializableUtils.deserializeFromByteArray(
+            components
+                .getFunctionSpecsMap()
+                .get(proto.getFnId())
+                .getSdkFnSpec()
+                .getData()
+                .toByteArray(),
+            "WindowFn");
+
+    WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn;
+    OutputTimeFn<?> outputTimeFn = 
OutputTimeFns.fromProto(proto.getOutputTime());
+    AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
+    Trigger trigger = Triggers.fromProto(proto.getTrigger());
+    ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
+    Duration allowedLateness = Duration.millis(proto.getAllowedLateness());
+
+    return WindowingStrategy.of(windowFn)
+        .withAllowedLateness(allowedLateness)
+        .withMode(accumulationMode)
+        .withTrigger(trigger)
+        .withOutputTimeFn(outputTimeFn)
+        .withClosingBehavior(closingBehavior);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7dd57105/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
index 137f108..c14523c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.util;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import java.io.Serializable;
 import java.util.Collections;
@@ -50,7 +51,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> 
implements Serializab
    */
   public enum AccumulationMode {
     DISCARDING_FIRED_PANES,
-    ACCUMULATING_FIRED_PANES
+    ACCUMULATING_FIRED_PANES;
   }
 
   private static final Duration DEFAULT_ALLOWED_LATENESS = Duration.ZERO;
@@ -99,7 +100,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> 
implements Serializab
         DefaultTrigger.of(), false,
         AccumulationMode.DISCARDING_FIRED_PANES, false,
         DEFAULT_ALLOWED_LATENESS, false,
-        OutputTimeFns.outputAtEndOfWindow(), false,
+        new CombineWindowFnOutputTimes(OutputTimeFns.outputAtEndOfWindow(), 
windowFn), false,
         ClosingBehavior.FIRE_IF_NON_EMPTY);
   }
 
@@ -235,6 +236,21 @@ public class WindowingStrategy<T, W extends BoundedWindow> 
implements Serializab
         closingBehavior);
   }
 
+  /**
+   * Fixes all the defaults so that equals can be used to check that two 
strategies are the same,
+   * regardless of the state of "defaulted-ness".
+   */
+  @VisibleForTesting
+  public WindowingStrategy<T, W> fixDefaults() {
+    return new WindowingStrategy<>(
+        windowFn,
+        trigger, true,
+        mode, true,
+        allowedLateness, true,
+        outputTimeFn, true,
+        closingBehavior);
+  }
+
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(this)
@@ -283,7 +299,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> 
implements Serializab
    *       is calculated using {@link OutputTimeFn#merge}.</li>
    * </ul>
    */
-  private static class CombineWindowFnOutputTimes<W extends BoundedWindow>
+  public static class CombineWindowFnOutputTimes<W extends BoundedWindow>
       extends OutputTimeFn<W> {
 
     private final OutputTimeFn<? super W> outputTimeFn;
@@ -295,6 +311,10 @@ public class WindowingStrategy<T, W extends BoundedWindow> 
implements Serializab
       this.windowFn = windowFn;
     }
 
+    public OutputTimeFn<? super W> getOutputTimeFn() {
+      return outputTimeFn;
+    }
+
     @Override
     public Instant assignOutputTime(Instant inputTimestamp, W window) {
       return outputTimeFn.merge(
@@ -320,5 +340,32 @@ public class WindowingStrategy<T, W extends BoundedWindow> 
implements Serializab
     public boolean dependsOnlyOnEarliestInputTimestamp() {
       return outputTimeFn.dependsOnlyOnEarliestInputTimestamp();
     }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (!(obj instanceof CombineWindowFnOutputTimes)) {
+        return false;
+      }
+
+      CombineWindowFnOutputTimes<?> that = (CombineWindowFnOutputTimes<?>) obj;
+      return outputTimeFn.equals(that.outputTimeFn) && 
windowFn.equals(that.windowFn);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(outputTimeFn, windowFn);
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("outputTimeFn", outputTimeFn)
+          .add("windowFn", windowFn)
+          .toString();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7dd57105/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
new file mode 100644
index 0000000..78d7a2f
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for {@link OutputTimeFns}. */
+@RunWith(Parameterized.class)
+public class OutputTimeFnsTest {
+
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<OutputTimeFn<BoundedWindow>> data() {
+    return ImmutableList.of(
+        OutputTimeFns.outputAtEarliestInputTimestamp(),
+        OutputTimeFns.outputAtLatestInputTimestamp(),
+        OutputTimeFns.outputAtEndOfWindow());
+  }
+
+  @Parameter(0)
+  public OutputTimeFn<?> outputTimeFn;
+
+  @Test
+  public void testToProtoAndBack() throws Exception {
+    OutputTimeFn<?> result = 
OutputTimeFns.fromProto(OutputTimeFns.toProto(outputTimeFn));
+
+    assertThat(result, equalTo((OutputTimeFn) outputTimeFn));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7dd57105/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java
new file mode 100644
index 0000000..5d3de51
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Unit tests for {@link WindowingStrategy}. */
+@RunWith(Parameterized.class)
+public class WindowingStrategiesTest {
+
+  // Each spec activates tests of all subsets of its fields
+  @AutoValue
+  abstract static class ToProtoAndBackSpec {
+    abstract WindowingStrategy getWindowingStrategy();
+  }
+
+  private static ToProtoAndBackSpec toProtoAndBackSpec(WindowingStrategy 
windowingStrategy) {
+    return new 
AutoValue_WindowingStrategiesTest_ToProtoAndBackSpec(windowingStrategy);
+  }
+
+  private static final WindowFn<?, ?> REPRESENTATIVE_WINDOW_FN =
+      FixedWindows.of(Duration.millis(12));
+
+  private static final Trigger REPRESENTATIVE_TRIGGER = 
AfterWatermark.pastEndOfWindow();
+
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<ToProtoAndBackSpec> data() {
+    return ImmutableList.of(
+        toProtoAndBackSpec(WindowingStrategy.globalDefault()),
+        toProtoAndBackSpec(
+            WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
+                .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
+                .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+                .withTrigger(REPRESENTATIVE_TRIGGER)
+                .withAllowedLateness(Duration.millis(71))
+                
.withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())),
+        toProtoAndBackSpec(
+            WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
+                .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
+                .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+                .withTrigger(REPRESENTATIVE_TRIGGER)
+                .withAllowedLateness(Duration.millis(93))
+                
.withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())));
+  }
+
+  @Parameter(0)
+  public ToProtoAndBackSpec toProtoAndBackSpec;
+
+  @Test
+  public void testToProtoAndBack() throws Exception {
+    WindowingStrategy<?, ?> windowingStrategy = 
toProtoAndBackSpec.getWindowingStrategy();
+    WindowingStrategy<?, ?> toProtoAndBackWindowingStrategy =
+        
WindowingStrategies.fromProto(WindowingStrategies.toProto(windowingStrategy));
+
+    assertThat(
+        toProtoAndBackWindowingStrategy,
+        equalTo((WindowingStrategy) windowingStrategy.fixDefaults()));
+  }
+}

Reply via email to