[BEAM-3035] Introduces Reify transform

Initially contains stuff from ReifyTimestamps,
for reifying/extracting timestamps and windows.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7966b759
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7966b759
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7966b759

Branch: refs/heads/tez-runner
Commit: 7966b759a15a2ea266e6db0d3f884c835d0bf628
Parents: 40100db
Author: wtanaka.com <wtan...@users.noreply.github.com>
Authored: Sat Oct 7 08:45:45 2017 -1000
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Thu Nov 16 14:57:47 2017 -0800

----------------------------------------------------------------------
 .../examples/complete/AutoCompleteTest.java     |  20 +-
 .../apache/beam/sdk/testing/GatherAllPanes.java |  15 +-
 .../org/apache/beam/sdk/transforms/Reify.java   | 192 +++++++++++++++++
 .../beam/sdk/transforms/ReifyTimestamps.java    |  55 +++--
 .../apache/beam/sdk/transforms/Reshuffle.java   |   2 +-
 .../apache/beam/sdk/transforms/ReifyTest.java   | 212 +++++++++++++++++++
 .../beam/sdk/transforms/ReshuffleTest.java      |   4 +-
 .../beam/sdk/transforms/SplittableDoFnTest.java |   9 +-
 .../apache/beam/sdk/transforms/WatchTest.java   |   2 +-
 9 files changed, 447 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
 
b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
index ef57da4..900d966 100644
--- 
a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
+++ 
b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
@@ -27,10 +27,7 @@ import 
org.apache.beam.examples.complete.AutoComplete.ComputeTopCompletions;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -128,9 +125,7 @@ public class AutoCompleteTest implements Serializable {
         TimestampedValue.of("xB", new Instant(2)),
         TimestampedValue.of("xB", new Instant(2)));
 
-    PCollection<String> input = p
-      .apply(Create.of(words))
-      .apply(new ReifyTimestamps<String>());
+    PCollection<String> input = p.apply(Create.timestamped(words));
 
     PCollection<KV<String, List<CompletionCandidate>>> output =
       input.apply(Window.<String>into(SlidingWindows.of(new Duration(2))))
@@ -161,17 +156,4 @@ public class AutoCompleteTest implements Serializable {
     }
     return all;
   }
-
-  private static class ReifyTimestamps<T>
-      extends PTransform<PCollection<TimestampedValue<T>>, PCollection<T>> {
-    @Override
-    public PCollection<T> expand(PCollection<TimestampedValue<T>> input) {
-      return input.apply(ParDo.of(new DoFn<TimestampedValue<T>, T>() {
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-          c.outputWithTimestamp(c.element().getValue(), 
c.element().getTimestamp());
-        }
-      }));
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
index 6b24d95..979e979 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
@@ -17,13 +17,11 @@
  */
 package org.apache.beam.sdk.testing;
 
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reify;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Never;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -60,10 +58,7 @@ class GatherAllPanes<T>
     WindowFn<?, ?> originalWindowFn = 
input.getWindowingStrategy().getWindowFn();
 
     return input
-        .apply(ParDo.of(new ReifyTimestampsAndWindowsFn<T>()))
-        .setCoder(
-            ValueInSingleWindow.Coder.of(
-                input.getCoder(), 
input.getWindowingStrategy().getWindowFn().windowCoder()))
+        .apply(Reify.<T>windows())
         .apply(
             WithKeys.<Integer, ValueInSingleWindow<T>>of(0)
                 .withKeyType(new TypeDescriptor<Integer>() {}))
@@ -80,10 +75,4 @@ class GatherAllPanes<T>
         .setWindowingStrategyInternal(input.getWindowingStrategy());
   }
 
-  private static class ReifyTimestampsAndWindowsFn<T> extends DoFn<T, 
ValueInSingleWindow<T>> {
-    @DoFn.ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) {
-      c.output(ValueInSingleWindow.of(c.element(), c.timestamp(), window, 
c.pane()));
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
new file mode 100644
index 0000000..caa89e6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.joda.time.Duration;
+
+/** {@link PTransform PTransforms} for reifying the timestamp, window and pane 
of values. */
+public class Reify {
+  /** Private implementation of {@link #windows()}. */
+  private static class Window<T>
+      extends PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> {
+    @Override
+    public PCollection<ValueInSingleWindow<T>> expand(PCollection<T> input) {
+      return input
+          .apply(
+              ParDo.of(
+                  new DoFn<T, ValueInSingleWindow<T>>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext c, BoundedWindow 
window) {
+                      c.outputWithTimestamp(
+                          ValueInSingleWindow.of(c.element(), c.timestamp(), 
window, c.pane()),
+                          c.timestamp());
+                    }
+                  }))
+          .setCoder(
+              ValueInSingleWindow.Coder.of(
+                  input.getCoder(), 
input.getWindowingStrategy().getWindowFn().windowCoder()));
+    }
+  }
+
+  private static class Timestamp<T>
+      extends PTransform<PCollection<T>, PCollection<TimestampedValue<T>>> {
+    @Override
+    public PCollection<TimestampedValue<T>> expand(PCollection<T> input) {
+      return input
+          .apply(
+              ParDo.of(
+                  new DoFn<T, TimestampedValue<T>>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {
+                      context.output(TimestampedValue.of(context.element(), 
context.timestamp()));
+                    }
+                  }))
+          .setCoder(TimestampedValueCoder.of(input.getCoder()));
+    }
+  }
+
+  private static class WindowInValue<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
ValueInSingleWindow<V>>>> {
+    @Override
+    public PCollection<KV<K, ValueInSingleWindow<V>>> expand(PCollection<KV<K, 
V>> input) {
+      KvCoder<K, V> coder = (KvCoder<K, V>) input.getCoder();
+      return input
+          .apply(
+              ParDo.of(
+                  new DoFn<KV<K, V>, KV<K, ValueInSingleWindow<V>>>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext c, BoundedWindow 
window) {
+                      c.output(
+                          KV.of(
+                              c.element().getKey(),
+                              ValueInSingleWindow.of(
+                                  c.element().getValue(), c.timestamp(), 
window, c.pane())));
+                    }
+                  }))
+          .setCoder(
+              KvCoder.of(
+                  coder.getKeyCoder(),
+                  ValueInSingleWindow.Coder.of(
+                      coder.getValueCoder(),
+                      
input.getWindowingStrategy().getWindowFn().windowCoder())));
+    }
+  }
+
+  private static class TimestampInValue<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
TimestampedValue<V>>>> {
+    @Override
+    public PCollection<KV<K, TimestampedValue<V>>> expand(PCollection<KV<K, 
V>> input) {
+      KvCoder<K, V> coder = (KvCoder<K, V>) input.getCoder();
+      return input
+          .apply(
+              ParDo.of(
+                  new DoFn<KV<K, V>, KV<K, TimestampedValue<V>>>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {
+                      context.output(
+                          KV.of(
+                              context.element().getKey(),
+                              TimestampedValue.of(
+                                  context.element().getValue(), 
context.timestamp())));
+                    }
+                  }))
+          .setCoder(
+              KvCoder.of(coder.getKeyCoder(), 
TimestampedValueCoder.of(coder.getValueCoder())));
+    }
+  }
+
+  private static class ExtractTimestampsFromValues<K, V>
+      extends PTransform<PCollection<KV<K, TimestampedValue<V>>>, 
PCollection<KV<K, V>>> {
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, 
TimestampedValue<V>>> input) {
+      KvCoder<K, TimestampedValue<V>> kvCoder = (KvCoder<K, 
TimestampedValue<V>>) input.getCoder();
+      TimestampedValueCoder<V> tvCoder = (TimestampedValueCoder<V>) 
kvCoder.getValueCoder();
+      return input
+          .apply(
+              ParDo.of(
+                  new DoFn<KV<K, TimestampedValue<V>>, KV<K, V>>() {
+                    @Override
+                    public Duration getAllowedTimestampSkew() {
+                      return Duration.millis(Long.MAX_VALUE);
+                    }
+
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {
+                      KV<K, TimestampedValue<V>> kv = context.element();
+                      context.outputWithTimestamp(
+                          KV.of(kv.getKey(), kv.getValue().getValue()),
+                          kv.getValue().getTimestamp());
+                    }
+                  }))
+          .setCoder(KvCoder.of(kvCoder.getKeyCoder(), 
tvCoder.getValueCoder()));
+    }
+  }
+
+  private Reify() {}
+
+  /**
+   * Create a {@link PTransform} that will output all inputs wrapped in a 
{@link TimestampedValue}.
+   */
+  public static <T> PTransform<PCollection<T>, 
PCollection<TimestampedValue<T>>> timestamps() {
+    return new Timestamp<>();
+  }
+
+  /**
+   * Create a {@link PTransform} that will output all input {@link KV KVs} 
with the timestamp inside
+   * the value.
+   */
+  public static <K, V>
+      PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
TimestampedValue<V>>>>
+          timestampsInValue() {
+    return new TimestampInValue<>();
+  }
+
+  /**
+   * Create a {@link PTransform} that will reify information from the 
processing context into
+   * instances of {@link ValueInSingleWindow}.
+   *
+   * @param <T> element type
+   */
+  public static <T> PTransform<PCollection<T>, 
PCollection<ValueInSingleWindow<T>>> windows() {
+    return new Window<>();
+  }
+
+  /**
+   * Create a {@link PTransform} that will output all input {@link KV KVs} 
with the window pane info
+   * inside the value.
+   */
+  public static <K, V>
+      PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
ValueInSingleWindow<V>>>>
+          windowsInValue() {
+    return new WindowInValue<>();
+  }
+
+  public static <K, V>
+      PTransform<PCollection<KV<K, TimestampedValue<V>>>, PCollection<KV<K, 
V>>>
+          extractTimestampsFromValues() {
+    return new ExtractTimestampsFromValues<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
index 990f235..583dc38 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
@@ -21,59 +21,74 @@ package org.apache.beam.sdk.transforms;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
 
 /**
  * {@link PTransform PTransforms} for reifying the timestamp of values and 
reemitting the original
  * value with the original timestamp.
+ *
+ * @deprecated Use {@link Reify}
  */
+@Deprecated
 class ReifyTimestamps {
   private ReifyTimestamps() {}
 
   /**
    * Create a {@link PTransform} that will output all input {@link KV KVs} 
with the timestamp inside
    * the value.
+   *
+   * @deprecated Use {@link Reify#timestampsInValue()}
    */
+  @Deprecated
   public static <K, V>
       PTransform<PCollection<? extends KV<K, V>>, PCollection<KV<K, 
TimestampedValue<V>>>>
           inValues() {
-    return ParDo.of(new ReifyValueTimestampDoFn<K, V>());
+    return new InValues<>();
   }
 
   /**
    * Create a {@link PTransform} that consumes {@link KV KVs} with a {@link 
TimestampedValue} as the
    * value, and outputs the {@link KV} of the input key and value at the 
timestamp specified by the
    * {@link TimestampedValue}.
+   *
+   * @deprecated Use {@link Reify#extractTimestampsFromValues()}.
    */
+  @Deprecated
   public static <K, V>
       PTransform<PCollection<? extends KV<K, TimestampedValue<V>>>, 
PCollection<KV<K, V>>>
           extractFromValues() {
-    return ParDo.of(new ExtractTimestampedValueDoFn<K, V>());
+    return new ExtractTimestampsFromValues<>();
   }
 
-  private static class ReifyValueTimestampDoFn<K, V>
-      extends DoFn<KV<K, V>, KV<K, TimestampedValue<V>>> {
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      context.output(
-          KV.of(
-              context.element().getKey(),
-              TimestampedValue.of(context.element().getValue(), 
context.timestamp())));
+  private static class RemoveWildcard<T>
+      extends PTransform<PCollection<? extends T>, PCollection<T>> {
+    @Override
+    public PCollection<T> expand(PCollection<? extends T> input) {
+      return input.apply(
+          ParDo.of(
+              new DoFn<T, T>() {
+                @ProcessElement
+                public void process(ProcessContext c) {
+                  c.output(c.element());
+                }
+              }));
     }
   }
 
-  private static class ExtractTimestampedValueDoFn<K, V>
-      extends DoFn<KV<K, TimestampedValue<V>>, KV<K, V>> {
+  private static class InValues<K, V>
+      extends PTransform<PCollection<? extends KV<K, V>>, PCollection<KV<K, 
TimestampedValue<V>>>> {
     @Override
-    public Duration getAllowedTimestampSkew() {
-      return Duration.millis(Long.MAX_VALUE);
+    public PCollection<KV<K, TimestampedValue<V>>> expand(PCollection<? 
extends KV<K, V>> input) {
+      return input.apply(new RemoveWildcard<KV<K, V>>()).apply(Reify.<K, 
V>timestampsInValue());
     }
+  }
 
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      KV<K, TimestampedValue<V>> kv = context.element();
-      context.outputWithTimestamp(
-          KV.of(kv.getKey(), kv.getValue().getValue()), 
kv.getValue().getTimestamp());
+  private static class ExtractTimestampsFromValues<K, V>
+      extends PTransform<PCollection<? extends KV<K, TimestampedValue<V>>>, 
PCollection<KV<K, V>>> {
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<? extends KV<K, 
TimestampedValue<V>>> input) {
+      return input
+          .apply(new RemoveWildcard<KV<K, TimestampedValue<V>>>())
+          .apply(Reify.<K, V>extractTimestampsFromValues());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
index 68e4560..8920559 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
@@ -84,7 +84,7 @@ public class Reshuffle<K, V> extends 
PTransform<PCollection<KV<K, V>>, PCollecti
 
     return input
         .apply(rewindow)
-        .apply("ReifyOriginalTimestamps", ReifyTimestamps.<K, V>inValues())
+        .apply("ReifyOriginalTimestamps", Reify.<K, V>timestampsInValue())
         .apply(GroupByKey.<K, TimestampedValue<V>>create())
         // Set the windowing strategy directly, so that it doesn't get counted 
as the user having
         // set allowed lateness.

http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTest.java
new file mode 100644
index 0000000..9e5ce9f
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.io.Serializable;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Utility transforms for reifying implicit context into explicit fields. */
+@RunWith(JUnit4.class)
+public class ReifyTest implements Serializable {
+  public static final WithTimestamps<KV<String, Integer>> TIMESTAMP_FROM_V =
+      WithTimestamps.of(
+          new SerializableFunction<KV<String, Integer>, Instant>() {
+            @Override
+            public Instant apply(KV<String, Integer> input) {
+              return new Instant(input.getValue().longValue());
+            }
+          });
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void extractFromValuesSucceeds() {
+    PCollection<KV<String, TimestampedValue<Integer>>> preified =
+        pipeline.apply(
+            Create.of(
+                KV.of("foo", TimestampedValue.of(0, new Instant((0)))),
+                KV.of("foo", TimestampedValue.of(1, new Instant(1))),
+                KV.of("bar", TimestampedValue.of(2, new Instant(2))),
+                KV.of("baz", TimestampedValue.of(3, new Instant(3)))));
+
+    PCollection<KV<String, Integer>> timestamped =
+        preified.apply(Reify.<String, Integer>extractTimestampsFromValues());
+
+    PAssert.that(timestamped)
+        .containsInAnyOrder(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), 
KV.of("baz", 3));
+
+    timestamped.apply(
+        "AssertElementTimestamps",
+        ParDo.of(
+            new DoFn<KV<String, Integer>, Void>() {
+              @ProcessElement
+              public void verifyTimestampsEqualValue(ProcessContext context) {
+                assertThat(
+                    new Instant(context.element().getValue().longValue()),
+                    equalTo(context.timestamp()));
+              }
+            }));
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void extractFromValuesWhenValueTimestampedLaterSucceeds() {
+    PCollection<KV<String, TimestampedValue<Integer>>> preified =
+        pipeline.apply(
+            Create.timestamped(
+                TimestampedValue.of(
+                    KV.of("foo", TimestampedValue.of(0, new Instant((0)))), 
new Instant(100)),
+                TimestampedValue.of(
+                    KV.of("foo", TimestampedValue.of(1, new Instant(1))), new 
Instant(101L)),
+                TimestampedValue.of(
+                    KV.of("bar", TimestampedValue.of(2, new Instant(2))), new 
Instant(102L)),
+                TimestampedValue.of(
+                    KV.of("baz", TimestampedValue.of(3, new Instant(3))), new 
Instant(103L))));
+
+    PCollection<KV<String, Integer>> timestamped =
+        preified.apply(ReifyTimestamps.<String, Integer>extractFromValues());
+
+    PAssert.that(timestamped)
+        .containsInAnyOrder(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), 
KV.of("baz", 3));
+
+    timestamped.apply(
+        "AssertElementTimestamps",
+        ParDo.of(
+            new DoFn<KV<String, Integer>, Void>() {
+              @ProcessElement
+              public void verifyTimestampsEqualValue(ProcessContext context) {
+                assertThat(
+                    new Instant(context.element().getValue().longValue()),
+                    equalTo(context.timestamp()));
+              }
+            }));
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void globalWindowNoKeys() {
+    PCollection<ValueInSingleWindow<String>> result =
+        pipeline
+            .apply(
+                TestStream.create(StringUtf8Coder.of())
+                    .addElements(TimestampedValue.of("dei", new Instant(123L)))
+                    .advanceWatermarkToInfinity())
+            .apply(Reify.<String>windows());
+    PAssert.that(result)
+        .containsInAnyOrder(
+            ValueInSingleWindow.of(
+                "dei", new Instant(123L), GlobalWindow.INSTANCE, 
PaneInfo.NO_FIRING));
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void timestampedValuesSucceeds() {
+    PCollection<KV<String, Integer>> timestamped =
+        pipeline
+            .apply(Create.of(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 
2), KV.of("baz", 3)))
+            .apply(TIMESTAMP_FROM_V);
+
+    PCollection<KV<String, TimestampedValue<Integer>>> reified =
+        timestamped.apply(Reify.<String, Integer>timestampsInValue());
+
+    PAssert.that(reified)
+        .containsInAnyOrder(
+            KV.of("foo", TimestampedValue.of(0, new Instant(0))),
+            KV.of("foo", TimestampedValue.of(1, new Instant(1))),
+            KV.of("bar", TimestampedValue.of(2, new Instant(2))),
+            KV.of("baz", TimestampedValue.of(3, new Instant(3))));
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void timestampsSucceeds() {
+    PCollection<String> timestamped =
+        pipeline.apply(
+            Create.timestamped(
+                TimestampedValue.of("foo", new Instant(0L)),
+                TimestampedValue.of("bar", new Instant(1L))));
+
+    PCollection<TimestampedValue<String>> reified = 
timestamped.apply(Reify.<String>timestamps());
+
+    PAssert.that(reified)
+        .containsInAnyOrder(
+            TimestampedValue.of("foo", new Instant(0)), 
TimestampedValue.of("bar", new Instant(1)));
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void windowsInValueSucceeds() {
+    PCollection<KV<String, Integer>> timestamped =
+        pipeline
+            .apply(Create.of(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 
2), KV.of("baz", 3)))
+            .apply(TIMESTAMP_FROM_V);
+
+    PCollection<KV<String, ValueInSingleWindow<Integer>>> reified =
+        timestamped.apply(Reify.<String, Integer>windowsInValue());
+
+    PAssert.that(reified)
+        .containsInAnyOrder(
+            KV.of(
+                "foo",
+                ValueInSingleWindow.of(
+                    0, new Instant(0), GlobalWindow.INSTANCE, 
PaneInfo.NO_FIRING)),
+            KV.of(
+                "foo",
+                ValueInSingleWindow.of(
+                    1, new Instant(1), GlobalWindow.INSTANCE, 
PaneInfo.NO_FIRING)),
+            KV.of(
+                "bar",
+                ValueInSingleWindow.of(
+                    2, new Instant(2), GlobalWindow.INSTANCE, 
PaneInfo.NO_FIRING)),
+            KV.of(
+                "baz",
+                ValueInSingleWindow.of(
+                    3, new Instant(3), GlobalWindow.INSTANCE, 
PaneInfo.NO_FIRING)));
+
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
index 0eb8e2d..12eddf2 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
@@ -131,7 +131,7 @@ public class ReshuffleTest implements Serializable {
                         return input;
                       }
                     }))
-            .apply("ReifyOriginalTimestamps", ReifyTimestamps.<String, 
String>inValues());
+            .apply("ReifyOriginalTimestamps", Reify.<String, 
String>timestampsInValue());
 
     // The outer TimestampedValue is the reified timestamp post-reshuffle. The 
inner
     // TimestampedValue is the pre-reshuffle timestamp.
@@ -140,7 +140,7 @@ public class ReshuffleTest implements Serializable {
             .apply(Reshuffle.<String, TimestampedValue<String>>of())
             .apply(
                 "ReifyReshuffledTimestamps",
-                ReifyTimestamps.<String, TimestampedValue<String>>inValues())
+                Reify.<String, TimestampedValue<String>>timestampsInValue())
             
.apply(Values.<TimestampedValue<TimestampedValue<String>>>create());
 
     PAssert.that(output)

http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index d2d2529..f70dfbb 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -98,13 +98,6 @@ public class SplittableDoFnTest implements Serializable {
     }
   }
 
-  private static class ReifyTimestampsFn<T> extends DoFn<T, 
TimestampedValue<T>> {
-    @ProcessElement
-    public void process(ProcessContext c) {
-      c.output(TimestampedValue.of(c.element(), c.timestamp()));
-    }
-  }
-
   private static PipelineOptions streamingTestPipelineOptions() {
     // Using testing options with streaming=true makes it possible to enable 
UsesSplittableParDo
     // tests in Dataflow runner, because as of writing, it can run Splittable 
DoFn only in
@@ -176,7 +169,7 @@ public class SplittableDoFnTest implements Serializable {
     assertEquals(windowFn, res.getWindowingStrategy().getWindowFn());
 
     PCollection<TimestampedValue<KV<String, Integer>>> timestamped =
-        res.apply("Reify timestamps", ParDo.of(new 
ReifyTimestampsFn<KV<String, Integer>>()));
+        res.apply(Reify.<KV<String, Integer>>timestamps());
 
     for (int i = 0; i < 4; ++i) {
       Instant base = now.minus(Duration.standardSeconds(i));

http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
index 113e8fe..8904376 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
@@ -272,7 +272,7 @@ public class WatchTest implements Serializable {
                             standardSeconds(30) /* timeToFail */))
                     .withPollInterval(Duration.millis(500))
                     .withOutputCoder(VarIntCoder.of()))
-            .apply(ReifyTimestamps.<String, Integer>inValues())
+            .apply(Reify.<String, Integer>timestampsInValue())
             .apply("Drop timestamped input", 
Values.<TimestampedValue<Integer>>create());
 
     PAssert.that(res)

Reply via email to