[BEAM-3035] Introduces Reify transform Initially contains stuff from ReifyTimestamps, for reifying/extracting timestamps and windows.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7966b759 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7966b759 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7966b759 Branch: refs/heads/tez-runner Commit: 7966b759a15a2ea266e6db0d3f884c835d0bf628 Parents: 40100db Author: wtanaka.com <wtan...@users.noreply.github.com> Authored: Sat Oct 7 08:45:45 2017 -1000 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Thu Nov 16 14:57:47 2017 -0800 ---------------------------------------------------------------------- .../examples/complete/AutoCompleteTest.java | 20 +- .../apache/beam/sdk/testing/GatherAllPanes.java | 15 +- .../org/apache/beam/sdk/transforms/Reify.java | 192 +++++++++++++++++ .../beam/sdk/transforms/ReifyTimestamps.java | 55 +++-- .../apache/beam/sdk/transforms/Reshuffle.java | 2 +- .../apache/beam/sdk/transforms/ReifyTest.java | 212 +++++++++++++++++++ .../beam/sdk/transforms/ReshuffleTest.java | 4 +- .../beam/sdk/transforms/SplittableDoFnTest.java | 9 +- .../apache/beam/sdk/transforms/WatchTest.java | 2 +- 9 files changed, 447 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java index ef57da4..900d966 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java @@ -27,10 +27,7 @@ import org.apache.beam.examples.complete.AutoComplete.ComputeTopCompletions; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -128,9 +125,7 @@ public class AutoCompleteTest implements Serializable { TimestampedValue.of("xB", new Instant(2)), TimestampedValue.of("xB", new Instant(2))); - PCollection<String> input = p - .apply(Create.of(words)) - .apply(new ReifyTimestamps<String>()); + PCollection<String> input = p.apply(Create.timestamped(words)); PCollection<KV<String, List<CompletionCandidate>>> output = input.apply(Window.<String>into(SlidingWindows.of(new Duration(2)))) @@ -161,17 +156,4 @@ public class AutoCompleteTest implements Serializable { } return all; } - - private static class ReifyTimestamps<T> - extends PTransform<PCollection<TimestampedValue<T>>, PCollection<T>> { - @Override - public PCollection<T> expand(PCollection<TimestampedValue<T>> input) { - return input.apply(ParDo.of(new DoFn<TimestampedValue<T>, T>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp()); - } - })); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java index 6b24d95..979e979 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java @@ -17,13 +17,11 @@ */ package org.apache.beam.sdk.testing; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reify; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Never; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -60,10 +58,7 @@ class GatherAllPanes<T> WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn(); return input - .apply(ParDo.of(new ReifyTimestampsAndWindowsFn<T>())) - .setCoder( - ValueInSingleWindow.Coder.of( - input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())) + .apply(Reify.<T>windows()) .apply( WithKeys.<Integer, ValueInSingleWindow<T>>of(0) .withKeyType(new TypeDescriptor<Integer>() {})) @@ -80,10 +75,4 @@ class GatherAllPanes<T> .setWindowingStrategyInternal(input.getWindowingStrategy()); } - private static class ReifyTimestampsAndWindowsFn<T> extends DoFn<T, ValueInSingleWindow<T>> { - @DoFn.ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { - c.output(ValueInSingleWindow.of(c.element(), c.timestamp(), window, c.pane())); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java new file mode 100644 index 0000000..caa89e6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.transforms; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.joda.time.Duration; + +/** {@link PTransform PTransforms} for reifying the timestamp, window and pane of values. */ +public class Reify { + /** Private implementation of {@link #windows()}. */ + private static class Window<T> + extends PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> { + @Override + public PCollection<ValueInSingleWindow<T>> expand(PCollection<T> input) { + return input + .apply( + ParDo.of( + new DoFn<T, ValueInSingleWindow<T>>() { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + c.outputWithTimestamp( + ValueInSingleWindow.of(c.element(), c.timestamp(), window, c.pane()), + c.timestamp()); + } + })) + .setCoder( + ValueInSingleWindow.Coder.of( + input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())); + } + } + + private static class Timestamp<T> + extends PTransform<PCollection<T>, PCollection<TimestampedValue<T>>> { + @Override + public PCollection<TimestampedValue<T>> expand(PCollection<T> input) { + return input + .apply( + ParDo.of( + new DoFn<T, TimestampedValue<T>>() { + @ProcessElement + public void processElement(ProcessContext context) { + context.output(TimestampedValue.of(context.element(), context.timestamp())); + } + })) + .setCoder(TimestampedValueCoder.of(input.getCoder())); + } + } + + private static class WindowInValue<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, ValueInSingleWindow<V>>>> { + @Override + public PCollection<KV<K, ValueInSingleWindow<V>>> expand(PCollection<KV<K, V>> input) { + KvCoder<K, V> coder = (KvCoder<K, V>) input.getCoder(); + return input + .apply( + ParDo.of( + new DoFn<KV<K, V>, KV<K, ValueInSingleWindow<V>>>() { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + c.output( + KV.of( + c.element().getKey(), + ValueInSingleWindow.of( + c.element().getValue(), c.timestamp(), window, c.pane()))); + } + })) + .setCoder( + KvCoder.of( + coder.getKeyCoder(), + ValueInSingleWindow.Coder.of( + coder.getValueCoder(), + input.getWindowingStrategy().getWindowFn().windowCoder()))); + } + } + + private static class TimestampInValue<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>> { + @Override + public PCollection<KV<K, TimestampedValue<V>>> expand(PCollection<KV<K, V>> input) { + KvCoder<K, V> coder = (KvCoder<K, V>) input.getCoder(); + return input + .apply( + ParDo.of( + new DoFn<KV<K, V>, KV<K, TimestampedValue<V>>>() { + @ProcessElement + public void processElement(ProcessContext context) { + context.output( + KV.of( + context.element().getKey(), + TimestampedValue.of( + context.element().getValue(), context.timestamp()))); + } + })) + .setCoder( + KvCoder.of(coder.getKeyCoder(), TimestampedValueCoder.of(coder.getValueCoder()))); + } + } + + private static class ExtractTimestampsFromValues<K, V> + extends PTransform<PCollection<KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>> { + @Override + public PCollection<KV<K, V>> expand(PCollection<KV<K, TimestampedValue<V>>> input) { + KvCoder<K, TimestampedValue<V>> kvCoder = (KvCoder<K, TimestampedValue<V>>) input.getCoder(); + TimestampedValueCoder<V> tvCoder = (TimestampedValueCoder<V>) kvCoder.getValueCoder(); + return input + .apply( + ParDo.of( + new DoFn<KV<K, TimestampedValue<V>>, KV<K, V>>() { + @Override + public Duration getAllowedTimestampSkew() { + return Duration.millis(Long.MAX_VALUE); + } + + @ProcessElement + public void processElement(ProcessContext context) { + KV<K, TimestampedValue<V>> kv = context.element(); + context.outputWithTimestamp( + KV.of(kv.getKey(), kv.getValue().getValue()), + kv.getValue().getTimestamp()); + } + })) + .setCoder(KvCoder.of(kvCoder.getKeyCoder(), tvCoder.getValueCoder())); + } + } + + private Reify() {} + + /** + * Create a {@link PTransform} that will output all inputs wrapped in a {@link TimestampedValue}. + */ + public static <T> PTransform<PCollection<T>, PCollection<TimestampedValue<T>>> timestamps() { + return new Timestamp<>(); + } + + /** + * Create a {@link PTransform} that will output all input {@link KV KVs} with the timestamp inside + * the value. + */ + public static <K, V> + PTransform<PCollection<KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>> + timestampsInValue() { + return new TimestampInValue<>(); + } + + /** + * Create a {@link PTransform} that will reify information from the processing context into + * instances of {@link ValueInSingleWindow}. + * + * @param <T> element type + */ + public static <T> PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> windows() { + return new Window<>(); + } + + /** + * Create a {@link PTransform} that will output all input {@link KV KVs} with the window pane info + * inside the value. + */ + public static <K, V> + PTransform<PCollection<KV<K, V>>, PCollection<KV<K, ValueInSingleWindow<V>>>> + windowsInValue() { + return new WindowInValue<>(); + } + + public static <K, V> + PTransform<PCollection<KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>> + extractTimestampsFromValues() { + return new ExtractTimestampsFromValues<>(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java index 990f235..583dc38 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java @@ -21,59 +21,74 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; /** * {@link PTransform PTransforms} for reifying the timestamp of values and reemitting the original * value with the original timestamp. + * + * @deprecated Use {@link Reify} */ +@Deprecated class ReifyTimestamps { private ReifyTimestamps() {} /** * Create a {@link PTransform} that will output all input {@link KV KVs} with the timestamp inside * the value. + * + * @deprecated Use {@link Reify#timestampsInValue()} */ + @Deprecated public static <K, V> PTransform<PCollection<? extends KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>> inValues() { - return ParDo.of(new ReifyValueTimestampDoFn<K, V>()); + return new InValues<>(); } /** * Create a {@link PTransform} that consumes {@link KV KVs} with a {@link TimestampedValue} as the * value, and outputs the {@link KV} of the input key and value at the timestamp specified by the * {@link TimestampedValue}. + * + * @deprecated Use {@link Reify#extractTimestampsFromValues()}. */ + @Deprecated public static <K, V> PTransform<PCollection<? extends KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>> extractFromValues() { - return ParDo.of(new ExtractTimestampedValueDoFn<K, V>()); + return new ExtractTimestampsFromValues<>(); } - private static class ReifyValueTimestampDoFn<K, V> - extends DoFn<KV<K, V>, KV<K, TimestampedValue<V>>> { - @ProcessElement - public void processElement(ProcessContext context) { - context.output( - KV.of( - context.element().getKey(), - TimestampedValue.of(context.element().getValue(), context.timestamp()))); + private static class RemoveWildcard<T> + extends PTransform<PCollection<? extends T>, PCollection<T>> { + @Override + public PCollection<T> expand(PCollection<? extends T> input) { + return input.apply( + ParDo.of( + new DoFn<T, T>() { + @ProcessElement + public void process(ProcessContext c) { + c.output(c.element()); + } + })); } } - private static class ExtractTimestampedValueDoFn<K, V> - extends DoFn<KV<K, TimestampedValue<V>>, KV<K, V>> { + private static class InValues<K, V> + extends PTransform<PCollection<? extends KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>> { @Override - public Duration getAllowedTimestampSkew() { - return Duration.millis(Long.MAX_VALUE); + public PCollection<KV<K, TimestampedValue<V>>> expand(PCollection<? extends KV<K, V>> input) { + return input.apply(new RemoveWildcard<KV<K, V>>()).apply(Reify.<K, V>timestampsInValue()); } + } - @ProcessElement - public void processElement(ProcessContext context) { - KV<K, TimestampedValue<V>> kv = context.element(); - context.outputWithTimestamp( - KV.of(kv.getKey(), kv.getValue().getValue()), kv.getValue().getTimestamp()); + private static class ExtractTimestampsFromValues<K, V> + extends PTransform<PCollection<? extends KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>> { + @Override + public PCollection<KV<K, V>> expand(PCollection<? extends KV<K, TimestampedValue<V>>> input) { + return input + .apply(new RemoveWildcard<KV<K, TimestampedValue<V>>>()) + .apply(Reify.<K, V>extractTimestampsFromValues()); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java index 68e4560..8920559 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java @@ -84,7 +84,7 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti return input .apply(rewindow) - .apply("ReifyOriginalTimestamps", ReifyTimestamps.<K, V>inValues()) + .apply("ReifyOriginalTimestamps", Reify.<K, V>timestampsInValue()) .apply(GroupByKey.<K, TimestampedValue<V>>create()) // Set the windowing strategy directly, so that it doesn't get counted as the user having // set allowed lateness. http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTest.java new file mode 100644 index 0000000..9e5ce9f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.io.Serializable; + +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Utility transforms for reifying implicit context into explicit fields. */ +@RunWith(JUnit4.class) +public class ReifyTest implements Serializable { + public static final WithTimestamps<KV<String, Integer>> TIMESTAMP_FROM_V = + WithTimestamps.of( + new SerializableFunction<KV<String, Integer>, Instant>() { + @Override + public Instant apply(KV<String, Integer> input) { + return new Instant(input.getValue().longValue()); + } + }); + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + @Test + @Category(NeedsRunner.class) + public void extractFromValuesSucceeds() { + PCollection<KV<String, TimestampedValue<Integer>>> preified = + pipeline.apply( + Create.of( + KV.of("foo", TimestampedValue.of(0, new Instant((0)))), + KV.of("foo", TimestampedValue.of(1, new Instant(1))), + KV.of("bar", TimestampedValue.of(2, new Instant(2))), + KV.of("baz", TimestampedValue.of(3, new Instant(3))))); + + PCollection<KV<String, Integer>> timestamped = + preified.apply(Reify.<String, Integer>extractTimestampsFromValues()); + + PAssert.that(timestamped) + .containsInAnyOrder(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3)); + + timestamped.apply( + "AssertElementTimestamps", + ParDo.of( + new DoFn<KV<String, Integer>, Void>() { + @ProcessElement + public void verifyTimestampsEqualValue(ProcessContext context) { + assertThat( + new Instant(context.element().getValue().longValue()), + equalTo(context.timestamp())); + } + })); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void extractFromValuesWhenValueTimestampedLaterSucceeds() { + PCollection<KV<String, TimestampedValue<Integer>>> preified = + pipeline.apply( + Create.timestamped( + TimestampedValue.of( + KV.of("foo", TimestampedValue.of(0, new Instant((0)))), new Instant(100)), + TimestampedValue.of( + KV.of("foo", TimestampedValue.of(1, new Instant(1))), new Instant(101L)), + TimestampedValue.of( + KV.of("bar", TimestampedValue.of(2, new Instant(2))), new Instant(102L)), + TimestampedValue.of( + KV.of("baz", TimestampedValue.of(3, new Instant(3))), new Instant(103L)))); + + PCollection<KV<String, Integer>> timestamped = + preified.apply(ReifyTimestamps.<String, Integer>extractFromValues()); + + PAssert.that(timestamped) + .containsInAnyOrder(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3)); + + timestamped.apply( + "AssertElementTimestamps", + ParDo.of( + new DoFn<KV<String, Integer>, Void>() { + @ProcessElement + public void verifyTimestampsEqualValue(ProcessContext context) { + assertThat( + new Instant(context.element().getValue().longValue()), + equalTo(context.timestamp())); + } + })); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void globalWindowNoKeys() { + PCollection<ValueInSingleWindow<String>> result = + pipeline + .apply( + TestStream.create(StringUtf8Coder.of()) + .addElements(TimestampedValue.of("dei", new Instant(123L))) + .advanceWatermarkToInfinity()) + .apply(Reify.<String>windows()); + PAssert.that(result) + .containsInAnyOrder( + ValueInSingleWindow.of( + "dei", new Instant(123L), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void timestampedValuesSucceeds() { + PCollection<KV<String, Integer>> timestamped = + pipeline + .apply(Create.of(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3))) + .apply(TIMESTAMP_FROM_V); + + PCollection<KV<String, TimestampedValue<Integer>>> reified = + timestamped.apply(Reify.<String, Integer>timestampsInValue()); + + PAssert.that(reified) + .containsInAnyOrder( + KV.of("foo", TimestampedValue.of(0, new Instant(0))), + KV.of("foo", TimestampedValue.of(1, new Instant(1))), + KV.of("bar", TimestampedValue.of(2, new Instant(2))), + KV.of("baz", TimestampedValue.of(3, new Instant(3)))); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void timestampsSucceeds() { + PCollection<String> timestamped = + pipeline.apply( + Create.timestamped( + TimestampedValue.of("foo", new Instant(0L)), + TimestampedValue.of("bar", new Instant(1L)))); + + PCollection<TimestampedValue<String>> reified = timestamped.apply(Reify.<String>timestamps()); + + PAssert.that(reified) + .containsInAnyOrder( + TimestampedValue.of("foo", new Instant(0)), TimestampedValue.of("bar", new Instant(1))); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void windowsInValueSucceeds() { + PCollection<KV<String, Integer>> timestamped = + pipeline + .apply(Create.of(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3))) + .apply(TIMESTAMP_FROM_V); + + PCollection<KV<String, ValueInSingleWindow<Integer>>> reified = + timestamped.apply(Reify.<String, Integer>windowsInValue()); + + PAssert.that(reified) + .containsInAnyOrder( + KV.of( + "foo", + ValueInSingleWindow.of( + 0, new Instant(0), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)), + KV.of( + "foo", + ValueInSingleWindow.of( + 1, new Instant(1), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)), + KV.of( + "bar", + ValueInSingleWindow.of( + 2, new Instant(2), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)), + KV.of( + "baz", + ValueInSingleWindow.of( + 3, new Instant(3), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING))); + + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java index 0eb8e2d..12eddf2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java @@ -131,7 +131,7 @@ public class ReshuffleTest implements Serializable { return input; } })) - .apply("ReifyOriginalTimestamps", ReifyTimestamps.<String, String>inValues()); + .apply("ReifyOriginalTimestamps", Reify.<String, String>timestampsInValue()); // The outer TimestampedValue is the reified timestamp post-reshuffle. The inner // TimestampedValue is the pre-reshuffle timestamp. @@ -140,7 +140,7 @@ public class ReshuffleTest implements Serializable { .apply(Reshuffle.<String, TimestampedValue<String>>of()) .apply( "ReifyReshuffledTimestamps", - ReifyTimestamps.<String, TimestampedValue<String>>inValues()) + Reify.<String, TimestampedValue<String>>timestampsInValue()) .apply(Values.<TimestampedValue<TimestampedValue<String>>>create()); PAssert.that(output) http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index d2d2529..f70dfbb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -98,13 +98,6 @@ public class SplittableDoFnTest implements Serializable { } } - private static class ReifyTimestampsFn<T> extends DoFn<T, TimestampedValue<T>> { - @ProcessElement - public void process(ProcessContext c) { - c.output(TimestampedValue.of(c.element(), c.timestamp())); - } - } - private static PipelineOptions streamingTestPipelineOptions() { // Using testing options with streaming=true makes it possible to enable UsesSplittableParDo // tests in Dataflow runner, because as of writing, it can run Splittable DoFn only in @@ -176,7 +169,7 @@ public class SplittableDoFnTest implements Serializable { assertEquals(windowFn, res.getWindowingStrategy().getWindowFn()); PCollection<TimestampedValue<KV<String, Integer>>> timestamped = - res.apply("Reify timestamps", ParDo.of(new ReifyTimestampsFn<KV<String, Integer>>())); + res.apply(Reify.<KV<String, Integer>>timestamps()); for (int i = 0; i < 4; ++i) { Instant base = now.minus(Duration.standardSeconds(i)); http://git-wip-us.apache.org/repos/asf/beam/blob/7966b759/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java index 113e8fe..8904376 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java @@ -272,7 +272,7 @@ public class WatchTest implements Serializable { standardSeconds(30) /* timeToFail */)) .withPollInterval(Duration.millis(500)) .withOutputCoder(VarIntCoder.of())) - .apply(ReifyTimestamps.<String, Integer>inValues()) + .apply(Reify.<String, Integer>timestampsInValue()) .apply("Drop timestamped input", Values.<TimestampedValue<Integer>>create()); PAssert.that(res)