Makes DoFnTester use new DoFn internally. There were 2 remaining users of DoFnTester.of(OldDoFn): - SplittableParDo.ProcessElements: this is fixed in https://github.com/apache/incubator-beam/pull/1261 - GroupAlsoByWindowsProperties: this one is harder. Various GABWDoFn's use OldDoFn.windowingInternals, and we can't pass that through a new DoFn. So instead I removed usage of DoFnTester from GroupAlsoByWindowsProperties in favor of a tiny hand-coded solution.
So after #1261 DoFnTester.of(OldDoFn) can be deleted. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96455768 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96455768 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96455768 Branch: refs/heads/master Commit: 96455768568616141a95833380f37c478a989397 Parents: e04cd47 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Nov 18 13:10:22 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Dec 2 15:42:33 2016 -0800 ---------------------------------------------------------------------- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 6 +- .../core/GroupByKeyViaGroupByKeyOnly.java | 22 +- .../core/GroupAlsoByWindowsProperties.java | 590 +++++++++++-------- .../beam/sdk/transforms/DoFnAdapters.java | 2 + .../apache/beam/sdk/transforms/DoFnTester.java | 130 ++-- .../sdk/transforms/reflect/DoFnInvokers.java | 11 - .../beam/sdk/transforms/DoFnTesterTest.java | 4 +- 7 files changed, 394 insertions(+), 371 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index f8f6207..b4b366c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -21,7 +21,6 @@ import com.google.common.collect.Iterables; import java.util.List; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowedValue; @@ -30,7 +29,6 @@ import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.util.state.TimerCallback; -import org.apache.beam.sdk.values.KV; import org.joda.time.Instant; /** @@ -55,9 +53,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends } @Override - public void processElement( - OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext c) - throws Exception { + public void processElement(ProcessContext c) throws Exception { K key = c.element().getKey(); // Used with Batch, we know that all the data is available for this key. We can't use the // timer manager from the context because it doesn't exist. So we create one and emulate the http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java index 79d2252..43047ca 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java @@ -26,15 +26,13 @@ import java.util.List; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -135,10 +133,9 @@ public class GroupByKeyViaGroupByKeyOnly<K, V> return input .apply( ParDo.of( - new OldDoFn< - KV<K, Iterable<WindowedValue<V>>>, - KV<K, Iterable<WindowedValue<V>>>>() { - @Override + new DoFn<KV<K, Iterable<WindowedValue<V>>>, + KV<K, Iterable<WindowedValue<V>>>>() { + @ProcessElement public void processElement(ProcessContext c) { KV<K, Iterable<WindowedValue<V>>> kvs = c.element(); K key = kvs.getKey(); @@ -251,16 +248,5 @@ public class GroupByKeyViaGroupByKeyOnly<K, V> input.getPipeline(), windowingStrategy, input.isBounded()) .setCoder(outputKvCoder); } - - private <W extends BoundedWindow> - GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn( - WindowingStrategy<?, W> strategy, - StateInternalsFactory<K> stateInternalsFactory, - Coder<V> inputIterableElementValueCoder) { - return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>( - strategy, - stateInternalsFactory, - SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder)); - } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index d1e0c68..97b67c6 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -23,50 +23,60 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertThat; +import com.google.common.base.Predicate; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.InMemoryStateInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Duration; import org.joda.time.Instant; /** * Properties of {@link GroupAlsoByWindowsDoFn}. * - * <p>Some properties may not hold of some implementations, due to restrictions on the context - * in which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not + * <p>Some properties may not hold of some implementations, due to restrictions on the context in + * which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not * support merging windows. */ public class GroupAlsoByWindowsProperties { /** - * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide - * the appropriate windowing strategy under test. + * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide the + * appropriate windowing strategy under test. */ public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> { - <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> - forStrategy(WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory); + <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> forStrategy( + WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory); } /** @@ -76,8 +86,7 @@ public class GroupAlsoByWindowsProperties { * <p>The input type is deliberately left as a wildcard, since it is not relevant. */ public static <K, InputT, OutputT> void emptyInputEmptyOutput( - GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory) - throws Exception { + GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory) throws Exception { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); @@ -87,13 +96,14 @@ public class GroupAlsoByWindowsProperties { @SuppressWarnings("unchecked") K fakeKey = (K) "this key should never be used"; - DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> result = runGABW( - gabwFactory, - windowingStrategy, - fakeKey, - Collections.<WindowedValue<InputT>>emptyList()); + List<WindowedValue<KV<K, OutputT>>> result = + runGABW( + gabwFactory, + windowingStrategy, + fakeKey, + Collections.<WindowedValue<InputT>>emptyList()); - assertThat(result.peekOutputElements(), hasSize(0)); + assertThat(result, hasSize(0)); } /** @@ -102,38 +112,32 @@ public class GroupAlsoByWindowsProperties { */ public static void groupsElementsIntoFixedWindows( GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) - throws Exception { + throws Exception { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); - DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = - runGABW(gabwFactory, windowingStrategy, "key", + List<WindowedValue<KV<String, Iterable<String>>>> result = + runGABW( + gabwFactory, + windowingStrategy, + "key", WindowedValue.of( - "v1", - new Instant(1), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), + "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), WindowedValue.of( - "v2", - new Instant(2), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), + "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), WindowedValue.of( - "v3", - new Instant(13), - Arrays.asList(window(10, 20)), - PaneInfo.NO_FIRING)); + "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING)); - assertThat(result.peekOutputElements(), hasSize(2)); + assertThat(result, hasSize(2)); TimestampedValue<KV<String, Iterable<String>>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10))); + getOnlyElementInWindow(result, window(0, 10)); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp())); TimestampedValue<KV<String, Iterable<String>>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20))); + getOnlyElementInWindow(result, window(10, 20)); assertThat(item1.getValue().getValue(), contains("v3")); assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp())); } @@ -146,14 +150,17 @@ public class GroupAlsoByWindowsProperties { */ public static void groupsElementsIntoSlidingWindowsWithMinTimestamp( GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) - throws Exception { + throws Exception { - WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of( - SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); + WindowingStrategy<?, IntervalWindow> windowingStrategy = + WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); - DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = - runGABW(gabwFactory, windowingStrategy, "key", + List<WindowedValue<KV<String, Iterable<String>>>> result = + runGABW( + gabwFactory, + windowingStrategy, + "key", WindowedValue.of( "v1", new Instant(5), @@ -165,21 +172,21 @@ public class GroupAlsoByWindowsProperties { Arrays.asList(window(0, 20), window(10, 30)), PaneInfo.NO_FIRING)); - assertThat(result.peekOutputElements(), hasSize(3)); + assertThat(result, hasSize(3)); TimestampedValue<KV<String, Iterable<String>>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10))); + getOnlyElementInWindow(result, window(-10, 10)); assertThat(item0.getValue().getValue(), contains("v1")); assertThat(item0.getTimestamp(), equalTo(new Instant(5))); TimestampedValue<KV<String, Iterable<String>>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20))); + getOnlyElementInWindow(result, window(0, 20)); assertThat(item1.getValue().getValue(), containsInAnyOrder("v1", "v2")); // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window assertThat(item1.getTimestamp(), equalTo(new Instant(10))); TimestampedValue<KV<String, Iterable<String>>> item2 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30))); + getOnlyElementInWindow(result, window(10, 30)); assertThat(item2.getValue().getValue(), contains("v2")); // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window assertThat(item2.getTimestamp(), equalTo(new Instant(20))); @@ -194,14 +201,17 @@ public class GroupAlsoByWindowsProperties { public static void combinesElementsInSlidingWindows( GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory, CombineFn<Long, ?, Long> combineFn) - throws Exception { + throws Exception { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); - DoFnTester<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>> result = - runGABW(gabwFactory, windowingStrategy, "k", + List<WindowedValue<KV<String, Long>>> result = + runGABW( + gabwFactory, + windowingStrategy, + "k", WindowedValue.of( 1L, new Instant(5), @@ -218,23 +228,20 @@ public class GroupAlsoByWindowsProperties { Arrays.asList(window(0, 20), window(10, 30)), PaneInfo.NO_FIRING)); - assertThat(result.peekOutputElements(), hasSize(3)); + assertThat(result, hasSize(3)); - TimestampedValue<KV<String, Long>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10))); + TimestampedValue<KV<String, Long>> item0 = getOnlyElementInWindow(result, window(-10, 10)); assertThat(item0.getValue().getKey(), equalTo("k")); assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L)))); assertThat(item0.getTimestamp(), equalTo(new Instant(5L))); - TimestampedValue<KV<String, Long>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20))); + TimestampedValue<KV<String, Long>> item1 = getOnlyElementInWindow(result, window(0, 20)); assertThat(item1.getValue().getKey(), equalTo("k")); assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L, 4L)))); // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window assertThat(item1.getTimestamp(), equalTo(new Instant(10L))); - TimestampedValue<KV<String, Long>> item2 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30))); + TimestampedValue<KV<String, Long>> item2 = getOnlyElementInWindow(result, window(10, 30)); assertThat(item2.getValue().getKey(), equalTo("k")); assertThat(item2.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(2L, 4L)))); // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window @@ -247,79 +254,63 @@ public class GroupAlsoByWindowsProperties { */ public static void groupsIntoOverlappingNonmergingWindows( GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) - throws Exception { + throws Exception { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); - DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = - runGABW(gabwFactory, windowingStrategy, "key", - WindowedValue.of( - "v1", - new Instant(1), - Arrays.asList(window(0, 5)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", - new Instant(4), - Arrays.asList(window(1, 5)), - PaneInfo.NO_FIRING), + List<WindowedValue<KV<String, Iterable<String>>>> result = + runGABW( + gabwFactory, + windowingStrategy, + "key", + WindowedValue.of("v1", new Instant(1), Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING), + WindowedValue.of("v2", new Instant(4), Arrays.asList(window(1, 5)), PaneInfo.NO_FIRING), WindowedValue.of( - "v3", - new Instant(4), - Arrays.asList(window(0, 5)), - PaneInfo.NO_FIRING)); + "v3", new Instant(4), Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING)); - assertThat(result.peekOutputElements(), hasSize(2)); + assertThat(result, hasSize(2)); TimestampedValue<KV<String, Iterable<String>>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 5))); + getOnlyElementInWindow(result, window(0, 5)); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3")); assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp())); TimestampedValue<KV<String, Iterable<String>>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(1, 5))); + getOnlyElementInWindow(result, window(1, 5)); assertThat(item1.getValue().getValue(), contains("v2")); assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp())); } - /** - * Tests that the given GABW implementation correctly groups elements into merged sessions. - */ + /** Tests that the given GABW implementation correctly groups elements into merged sessions. */ public static void groupsElementsInMergedSessions( GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) - throws Exception { + throws Exception { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))); - DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = - runGABW(gabwFactory, windowingStrategy, "key", + List<WindowedValue<KV<String, Iterable<String>>>> result = + runGABW( + gabwFactory, + windowingStrategy, + "key", WindowedValue.of( - "v1", - new Instant(0), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), + "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), WindowedValue.of( - "v2", - new Instant(5), - Arrays.asList(window(5, 15)), - PaneInfo.NO_FIRING), + "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING), WindowedValue.of( - "v3", - new Instant(15), - Arrays.asList(window(15, 25)), - PaneInfo.NO_FIRING)); + "v3", new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING)); - assertThat(result.peekOutputElements(), hasSize(2)); + assertThat(result, hasSize(2)); TimestampedValue<KV<String, Iterable<String>>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15))); + getOnlyElementInWindow(result, window(0, 15)); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); TimestampedValue<KV<String, Iterable<String>>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25))); + getOnlyElementInWindow(result, window(15, 25)); assertThat(item1.getValue().getValue(), contains("v3")); assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); } @@ -331,39 +322,29 @@ public class GroupAlsoByWindowsProperties { public static void combinesElementsPerSession( GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory, CombineFn<Long, ?, Long> combineFn) - throws Exception { + throws Exception { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))); - DoFnTester<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>> result = - runGABW(gabwFactory, windowingStrategy, "k", - WindowedValue.of( - 1L, - new Instant(0), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), + List<WindowedValue<KV<String, Long>>> result = + runGABW( + gabwFactory, + windowingStrategy, + "k", + WindowedValue.of(1L, new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), + WindowedValue.of(2L, new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING), WindowedValue.of( - 2L, - new Instant(5), - Arrays.asList(window(5, 15)), - PaneInfo.NO_FIRING), - WindowedValue.of( - 4L, - new Instant(15), - Arrays.asList(window(15, 25)), - PaneInfo.NO_FIRING)); + 4L, new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING)); - assertThat(result.peekOutputElements(), hasSize(2)); + assertThat(result, hasSize(2)); - TimestampedValue<KV<String, Long>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15))); + TimestampedValue<KV<String, Long>> item0 = getOnlyElementInWindow(result, window(0, 15)); assertThat(item0.getValue().getKey(), equalTo("k")); assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))); assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); - TimestampedValue<KV<String, Long>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25))); + TimestampedValue<KV<String, Long>> item1 = getOnlyElementInWindow(result, window(15, 25)); assertThat(item1.getValue().getKey(), equalTo("k")); assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L)))); assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); @@ -371,176 +352,152 @@ public class GroupAlsoByWindowsProperties { /** * Tests that for a simple sequence of elements on the same key, the given GABW implementation - * correctly groups them according to fixed windows and also sets the output timestamp - * according to the policy {@link OutputTimeFns#outputAtEndOfWindow()}. + * correctly groups them according to fixed windows and also sets the output timestamp according + * to the policy {@link OutputTimeFns#outputAtEndOfWindow()}. */ public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp( GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) - throws Exception { + throws Exception { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); - DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = - runGABW(gabwFactory, windowingStrategy, "key", + List<WindowedValue<KV<String, Iterable<String>>>> result = + runGABW( + gabwFactory, + windowingStrategy, + "key", WindowedValue.of( - "v1", - new Instant(1), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), + "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), WindowedValue.of( - "v2", - new Instant(2), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), + "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), WindowedValue.of( - "v3", - new Instant(13), - Arrays.asList(window(10, 20)), - PaneInfo.NO_FIRING)); + "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING)); - assertThat(result.peekOutputElements(), hasSize(2)); + assertThat(result, hasSize(2)); TimestampedValue<KV<String, Iterable<String>>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10))); + getOnlyElementInWindow(result, window(0, 10)); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp())); TimestampedValue<KV<String, Iterable<String>>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20))); + getOnlyElementInWindow(result, window(10, 20)); assertThat(item1.getValue().getValue(), contains("v3")); assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp())); } /** * Tests that for a simple sequence of elements on the same key, the given GABW implementation - * correctly groups them according to fixed windows and also sets the output timestamp - * according to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}. + * correctly groups them according to fixed windows and also sets the output timestamp according + * to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}. */ public static void groupsElementsIntoFixedWindowsWithLatestTimestamp( GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) - throws Exception { + throws Exception { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()); + .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()); - DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = - runGABW(gabwFactory, windowingStrategy, "k", + List<WindowedValue<KV<String, Iterable<String>>>> result = + runGABW( + gabwFactory, + windowingStrategy, + "k", WindowedValue.of( - "v1", - new Instant(1), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), + "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), WindowedValue.of( - "v2", - new Instant(2), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), + "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), WindowedValue.of( - "v3", - new Instant(13), - Arrays.asList(window(10, 20)), - PaneInfo.NO_FIRING)); + "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING)); - assertThat(result.peekOutputElements(), hasSize(2)); + assertThat(result, hasSize(2)); TimestampedValue<KV<String, Iterable<String>>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10))); + getOnlyElementInWindow(result, window(0, 10)); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); assertThat(item0.getTimestamp(), equalTo(new Instant(2))); TimestampedValue<KV<String, Iterable<String>>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20))); + getOnlyElementInWindow(result, window(10, 20)); assertThat(item1.getValue().getValue(), contains("v3")); assertThat(item1.getTimestamp(), equalTo(new Instant(13))); } /** - * Tests that the given GABW implementation correctly groups elements into merged sessions - * with output timestamps at the end of the merged window. + * Tests that the given GABW implementation correctly groups elements into merged sessions with + * output timestamps at the end of the merged window. */ public static void groupsElementsInMergedSessionsWithEndOfWindowTimestamp( GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) - throws Exception { + throws Exception { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); - DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = - runGABW(gabwFactory, windowingStrategy, "k", + List<WindowedValue<KV<String, Iterable<String>>>> result = + runGABW( + gabwFactory, + windowingStrategy, + "k", WindowedValue.of( - "v1", - new Instant(0), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), + "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), WindowedValue.of( - "v2", - new Instant(5), - Arrays.asList(window(5, 15)), - PaneInfo.NO_FIRING), + "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING), WindowedValue.of( - "v3", - new Instant(15), - Arrays.asList(window(15, 25)), - PaneInfo.NO_FIRING)); + "v3", new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING)); - assertThat(result.peekOutputElements(), hasSize(2)); + assertThat(result, hasSize(2)); TimestampedValue<KV<String, Iterable<String>>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15))); + getOnlyElementInWindow(result, window(0, 15)); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); TimestampedValue<KV<String, Iterable<String>>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25))); + getOnlyElementInWindow(result, window(15, 25)); assertThat(item1.getValue().getValue(), contains("v3")); assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); } /** - * Tests that the given GABW implementation correctly groups elements into merged sessions - * with output timestamps at the end of the merged window. + * Tests that the given GABW implementation correctly groups elements into merged sessions with + * output timestamps at the end of the merged window. */ public static void groupsElementsInMergedSessionsWithLatestTimestamp( GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) - throws Exception { + throws Exception { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()); BoundedWindow unmergedWindow = window(15, 25); - DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = - runGABW(gabwFactory, windowingStrategy, "k", + List<WindowedValue<KV<String, Iterable<String>>>> result = + runGABW( + gabwFactory, + windowingStrategy, + "k", WindowedValue.of( - "v1", - new Instant(0), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), + "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), WindowedValue.of( - "v2", - new Instant(5), - Arrays.asList(window(5, 15)), - PaneInfo.NO_FIRING), + "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING), WindowedValue.of( - "v3", - new Instant(15), - Arrays.asList(unmergedWindow), - PaneInfo.NO_FIRING)); + "v3", new Instant(15), Arrays.asList(unmergedWindow), PaneInfo.NO_FIRING)); - assertThat(result.peekOutputElements(), hasSize(2)); + assertThat(result, hasSize(2)); BoundedWindow mergedWindow = window(0, 15); TimestampedValue<KV<String, Iterable<String>>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(mergedWindow)); + getOnlyElementInWindow(result, mergedWindow); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); assertThat(item0.getTimestamp(), equalTo(new Instant(5))); TimestampedValue<KV<String, Iterable<String>>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(unmergedWindow)); + getOnlyElementInWindow(result, unmergedWindow); assertThat(item1.getValue().getValue(), contains("v3")); assertThat(item1.getTimestamp(), equalTo(new Instant(15))); } @@ -552,81 +509,66 @@ public class GroupAlsoByWindowsProperties { public static void combinesElementsPerSessionWithEndOfWindowTimestamp( GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory, CombineFn<Long, ?, Long> combineFn) - throws Exception { + throws Exception { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); BoundedWindow secondWindow = window(15, 25); - DoFnTester<?, KV<String, Long>> result = - runGABW(gabwFactory, windowingStrategy, "k", - WindowedValue.of( - 1L, - new Instant(0), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), - WindowedValue.of( - 2L, - new Instant(5), - Arrays.asList(window(5, 15)), - PaneInfo.NO_FIRING), - WindowedValue.of( - 4L, - new Instant(15), - Arrays.asList(secondWindow), - PaneInfo.NO_FIRING)); + List<WindowedValue<KV<String, Long>>> result = + runGABW( + gabwFactory, + windowingStrategy, + "k", + WindowedValue.of(1L, new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), + WindowedValue.of(2L, new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING), + WindowedValue.of(4L, new Instant(15), Arrays.asList(secondWindow), PaneInfo.NO_FIRING)); - assertThat(result.peekOutputElements(), hasSize(2)); + assertThat(result, hasSize(2)); BoundedWindow firstResultWindow = window(0, 15); - TimestampedValue<KV<String, Long>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(firstResultWindow)); + TimestampedValue<KV<String, Long>> item0 = getOnlyElementInWindow(result, firstResultWindow); assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))); assertThat(item0.getTimestamp(), equalTo(firstResultWindow.maxTimestamp())); - TimestampedValue<KV<String, Long>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(secondWindow)); + TimestampedValue<KV<String, Long>> item1 = getOnlyElementInWindow(result, secondWindow); assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L)))); - assertThat(item1.getTimestamp(), - equalTo(secondWindow.maxTimestamp())); + assertThat(item1.getTimestamp(), equalTo(secondWindow.maxTimestamp())); } @SafeVarargs private static <K, InputT, OutputT, W extends BoundedWindow> - DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> runGABW( - GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory, - WindowingStrategy<?, W> windowingStrategy, - K key, - WindowedValue<InputT>... values) throws Exception { + List<WindowedValue<KV<K, OutputT>>> runGABW( + GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory, + WindowingStrategy<?, W> windowingStrategy, + K key, + WindowedValue<InputT>... values) + throws Exception { return runGABW(gabwFactory, windowingStrategy, key, Arrays.asList(values)); } private static <K, InputT, OutputT, W extends BoundedWindow> - DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> runGABW( - GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory, - WindowingStrategy<?, W> windowingStrategy, - K key, - Collection<WindowedValue<InputT>> values) throws Exception { + List<WindowedValue<KV<K, OutputT>>> runGABW( + GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory, + WindowingStrategy<?, W> windowingStrategy, + K key, + Collection<WindowedValue<InputT>> values) + throws Exception { final StateInternalsFactory<K> stateInternalsCache = new CachingStateInternalsFactory<K>(); - DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> tester = - DoFnTester.of(gabwFactory.forStrategy(windowingStrategy, stateInternalsCache)); - - // Though we use a DoFnTester, the function itself is instantiated directly by the - // runner and should not be serialized; it may not even be serializable. - tester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); - tester.startBundle(); - tester.processElement(KV.<K, Iterable<WindowedValue<InputT>>>of(key, values)); - tester.finishBundle(); + List<WindowedValue<KV<K, OutputT>>> output = + processElement( + gabwFactory.forStrategy(windowingStrategy, stateInternalsCache), + KV.<K, Iterable<WindowedValue<InputT>>>of(key, values)); // Sanity check for corruption - for (KV<K, OutputT> elem : tester.peekOutputElements()) { - assertThat(elem.getKey(), equalTo(key)); + for (WindowedValue<KV<K, OutputT>> value : output) { + assertThat(value.getValue().getKey(), equalTo(key)); } - return tester; + return output; } private static BoundedWindow window(long start, long end) { @@ -657,4 +599,158 @@ public class GroupAlsoByWindowsProperties { return InMemoryStateInternals.forKey(key); } } + + private static <K, InputT, OutputT, W extends BoundedWindow> + List<WindowedValue<KV<K, OutputT>>> processElement( + GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn, + KV<K, Iterable<WindowedValue<InputT>>> element) + throws Exception { + TestProcessContext<K, InputT, OutputT, W> c = new TestProcessContext<>(fn, element); + fn.processElement(c); + return c.getOutput(); + } + + private static <K, OutputT> TimestampedValue<KV<K, OutputT>> getOnlyElementInWindow( + List<WindowedValue<KV<K, OutputT>>> output, final BoundedWindow window) { + WindowedValue<KV<K, OutputT>> res = + Iterables.getOnlyElement( + Iterables.filter( + output, + new Predicate<WindowedValue<KV<K, OutputT>>>() { + @Override + public boolean apply(@Nullable WindowedValue<KV<K, OutputT>> input) { + return input.getWindows().contains(window); + } + })); + return TimestampedValue.of(res.getValue(), res.getTimestamp()); + } + + /** + * A {@link GroupAlsoByWindowsDoFn.ProcessContext} providing just enough context for a {@link + * GroupAlsoByWindowsDoFn} - namely, information about the element and output via {@link + * WindowingInternals}, but no side inputs/outputs and no normal output. + */ + private static class TestProcessContext<K, InputT, OutputT, W extends BoundedWindow> + extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>.ProcessContext { + private final PipelineOptions options = PipelineOptionsFactory.create(); + private final KV<K, Iterable<WindowedValue<InputT>>> element; + private final List<WindowedValue<KV<K, OutputT>>> output = new ArrayList<>(); + + private TestProcessContext( + GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn, + KV<K, Iterable<WindowedValue<InputT>>> element) { + fn.super(); + this.element = element; + } + + @Override + public KV<K, Iterable<WindowedValue<InputT>>> element() { + return element; + } + + @Override + public Instant timestamp() { + return BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + @Override + public BoundedWindow window() { + return GlobalWindow.INSTANCE; + } + + @Override + public PaneInfo pane() { + return PaneInfo.NO_FIRING; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + throw new UnsupportedOperationException(); + } + + @Override + public WindowingInternals<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> + windowingInternals() { + return new WindowingInternals<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>() { + @Override + public void outputWindowedValue( + KV<K, OutputT> output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + TestProcessContext.this.output.add(WindowedValue.of(output, timestamp, windows, pane)); + } + + @Override + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + throw new UnsupportedOperationException(); + } + + @Override + public StateInternals<?> stateInternals() { + throw new UnsupportedOperationException(); + } + + @Override + public TimerInternals timerInternals() { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<? extends BoundedWindow> windows() { + return ImmutableList.of(GlobalWindow.INSTANCE); + } + + @Override + public PaneInfo pane() { + return PaneInfo.NO_FIRING; + } + + @Override + public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public PipelineOptions getPipelineOptions() { + return options; + } + + @Override + public void output(KV<K, OutputT> output) { + throw new UnsupportedOperationException(); + } + + @Override + public void outputWithTimestamp(KV<K, OutputT> output, Instant timestamp) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + throw new UnsupportedOperationException(); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( + String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { + throw new UnsupportedOperationException(); + } + + public List<WindowedValue<KV<K, OutputT>>> getOutput() { + return output; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index 1a74ae7..6ee42e7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms; import java.io.IOException; import java.util.Collection; +import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn.Context; @@ -185,6 +186,7 @@ public class DoFnAdapters { * If the fn was created using {@link #toOldDoFn}, returns the original {@link DoFn}. Otherwise, * returns {@code null}. */ + @Nullable public static <InputT, OutputT> DoFn<InputT, OutputT> getDoFn(OldDoFn<InputT, OutputT> fn) { if (fn instanceof SimpleDoFnAdapter) { return ((SimpleDoFnAdapter<InputT, OutputT>) fn).fn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 17fa612..a9f93dd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -23,10 +23,10 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -36,6 +36,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ValueInSingleWindow; import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -43,7 +45,6 @@ import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.InMemoryStateInternals; import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; @@ -86,7 +87,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { */ @SuppressWarnings("unchecked") public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { - return new DoFnTester<>(DoFnAdapters.toOldDoFn(fn)); + checkNotNull(fn, "fn can't be null"); + return new DoFnTester<>(fn); } /** @@ -96,9 +98,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { * @see #of(DoFn) */ @SuppressWarnings("unchecked") - public static <InputT, OutputT> DoFnTester<InputT, OutputT> + @Deprecated + public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) { - return new DoFnTester<>(fn); + checkNotNull(fn, "fn can't be null"); + return new DoFnTester<>(fn.toDoFn()); } /** @@ -238,7 +242,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { stateInternals = InMemoryStateInternals.forKey(new Object()); timerInternals = new InMemoryTimerInternals(); try { - fn.startBundle(context); + fnInvoker.invokeStartBundle(context); } catch (UserCodeException e) { unwrapUserCodeException(e); } @@ -271,8 +275,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } /** - * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a - * context where {@link OldDoFn.ProcessContext#element} returns the + * Calls {@link DoFn.ProcessElement} on the {@code DoFn} under test, in a + * context where {@link DoFn.ProcessContext#element} returns the * given element and timestamp. * * <p>Will call {@link #startBundle} automatically, if it hasn't @@ -286,7 +290,13 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { startBundle(); } try { - fn.processElement(createProcessContext(element)); + final TestProcessContext processContext = createProcessContext(element); + fnInvoker.invokeProcessElement(new DoFnInvoker.FakeArgumentProvider<InputT, OutputT>() { + @Override + public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + return processContext; + } + }); } catch (UserCodeException e) { unwrapUserCodeException(e); } @@ -308,13 +318,14 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { "Must be inside bundle to call finishBundle, but was: %s", state); try { - fn.finishBundle(createContext(fn)); + fnInvoker.invokeFinishBundle(createContext(fn)); } catch (UserCodeException e) { unwrapUserCodeException(e); } if (cloningBehavior == CloningBehavior.CLONE_PER_BUNDLE) { - fn.teardown(); + fnInvoker.invokeTeardown(); fn = null; + fnInvoker = null; state = State.UNINITIALIZED; } else { state = State.BUNDLE_FINISHED; @@ -532,11 +543,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { return mainOutputTag; } - private TestContext createContext(OldDoFn<InputT, OutputT> fn) { + private TestContext createContext(DoFn<InputT, OutputT> fn) { return new TestContext(); } - private class TestContext extends OldDoFn<InputT, OutputT>.Context { + private class TestContext extends DoFn<InputT, OutputT>.Context { TestContext() { fn.super(); } @@ -557,7 +568,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } @Override - protected <AggInT, AggOutT> Aggregator<AggInT, AggOutT> createAggregatorInternal( + protected <AggInT, AggOutT> Aggregator<AggInT, AggOutT> createAggregator( final String name, final CombineFn<AggInT, ?, AggOutT> combiner) { return aggregator(name, combiner); } @@ -624,7 +635,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { elem.getValue(), elem.getTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); } - private class TestProcessContext extends OldDoFn<InputT, OutputT>.ProcessContext { + private class TestProcessContext extends DoFn<InputT, OutputT>.ProcessContext { private final TestContext context; private final ValueInSingleWindow<InputT> element; @@ -644,7 +655,9 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { Map<BoundedWindow, ?> viewValues = sideInputs.get(view); if (viewValues != null) { BoundedWindow sideInputWindow = - view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window()); + view.getWindowingStrategyInternal() + .getWindowFn() + .getSideInputWindow(element.getWindow()); @SuppressWarnings("unchecked") T windowValue = (T) viewValues.get(sideInputWindow); if (windowValue != null) { @@ -660,73 +673,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } @Override - public BoundedWindow window() { - return element.getWindow(); - } - - @Override public PaneInfo pane() { return element.getPane(); } @Override - public WindowingInternals<InputT, OutputT> windowingInternals() { - return new WindowingInternals<InputT, OutputT>() { - @Override - public StateInternals<?> stateInternals() { - return stateInternals; - } - - @Override - public void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - for (BoundedWindow window : windows) { - context.noteOutput( - mainOutputTag, ValueInSingleWindow.of(output, timestamp, window, pane)); - } - } - - @Override - public <SideOutputT> void sideOutputWindowedValue( - TupleTag<SideOutputT> tag, - SideOutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - for (BoundedWindow window : windows) { - context.noteOutput( - tag, ValueInSingleWindow.of(output, timestamp, window, pane)); - } - } - - @Override - public TimerInternals timerInternals() { - return timerInternals; - } - - @Override - public Collection<? extends BoundedWindow> windows() { - return Collections.singleton(element.getWindow()); - } - - @Override - public PaneInfo pane() { - return element.getPane(); - } - - @Override - public <T> T sideInput( - PCollectionView<T> view, BoundedWindow sideInputWindow) { - throw new UnsupportedOperationException( - "SideInput from WindowingInternals is not supported in in the context of DoFnTester"); - } - }; - } - - @Override public PipelineOptions getPipelineOptions() { return context.getPipelineOptions(); } @@ -753,10 +704,10 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { throw new IllegalStateException("Aggregators should not be created within ProcessContext. " - + "Instead, create an aggregator at OldDoFn construction time with" + + "Instead, create an aggregator at DoFn construction time with" + " createAggregator, and ensure they are set up by the time startBundle is" + " called with setupDelegateAggregators."); } @@ -768,8 +719,9 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { finishBundle(); } if (state == State.BUNDLE_FINISHED) { - fn.teardown(); + fnInvoker.invokeTeardown(); fn = null; + fnInvoker = null; } state = State.TORN_DOWN; } @@ -786,8 +738,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { private final PipelineOptions options = PipelineOptionsFactory.create(); - /** The original {@link OldDoFn} under test. */ - private final OldDoFn<InputT, OutputT> origFn; + /** The original {@link DoFn} under test. */ + private final DoFn<InputT, OutputT> origFn; /** * Whether to clone the original {@link DoFn} or just use it as-is. @@ -805,8 +757,9 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { /** The output tags used by the {@link DoFn} under test. */ private TupleTag<OutputT> mainOutputTag = new TupleTag<>(); - /** The original OldDoFn under test, if started. */ - OldDoFn<InputT, OutputT> fn; + /** The original DoFn under test, if started. */ + private DoFn<InputT, OutputT> fn; + private DoFnInvoker<InputT, OutputT> fnInvoker; /** The outputs from the {@link DoFn} under test. */ private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs; @@ -817,7 +770,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { /** The state of processing of the {@link DoFn} under test. */ private State state = State.UNINITIALIZED; - private DoFnTester(OldDoFn<InputT, OutputT> origFn) { + private DoFnTester(DoFn<InputT, OutputT> origFn) { this.origFn = origFn; } @@ -828,12 +781,13 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { if (cloningBehavior.equals(CloningBehavior.DO_NOT_CLONE)) { fn = origFn; } else { - fn = (OldDoFn<InputT, OutputT>) + fn = (DoFn<InputT, OutputT>) SerializableUtils.deserializeFromByteArray( SerializableUtils.serializeToByteArray(origFn), origFn.toString()); } - fn.setup(); + fnInvoker = DoFnInvokers.invokerFor(fn); + fnInvoker.invokeSetup(); outputs = new HashMap<>(); accumulators = new HashMap<>(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index 4ad7dad..50a7082 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -18,9 +18,6 @@ package org.apache.beam.sdk.transforms.reflect; import java.io.Serializable; -import java.lang.reflect.Constructor; -import java.util.LinkedHashMap; -import java.util.Map; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.DoFn; @@ -45,14 +42,6 @@ public class DoFnInvokers { return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn); } - /** - * A cache of constructors of generated {@link DoFnInvoker} classes, keyed by {@link DoFn} class. - * Needed because generating an invoker class is expensive, and to avoid generating an excessive - * number of classes consuming PermGen memory. - */ - private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache = - new LinkedHashMap<>(); - private DoFnInvokers() {} /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index ac76b2e..ff8a9bc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -350,14 +350,14 @@ public class DoFnTesterTest { } } - private static class SideInputDoFn extends OldDoFn<Integer, Integer> { + private static class SideInputDoFn extends DoFn<Integer, Integer> { private final PCollectionView<Integer> value; private SideInputDoFn(PCollectionView<Integer> value) { this.value = value; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(c.sideInput(value)); }