[BEAM-854] Removes ReifyTimestampsAndWindows It was used effectively in 2 places: - GatherAllPanes (used in PAssert): replaced with a similar custom DoFn. - A couple of GBK-related transforms: folded into GroupByKeyOnly.
Also makes GBKO produce globally-windowed KWIs. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e77d881a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e77d881a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e77d881a Branch: refs/heads/apex-runner Commit: e77d881aecc3f5fdbc407a252f9924c1226bcb4a Parents: 529f266 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Oct 28 10:40:17 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Wed Nov 2 10:18:59 2016 -0700 ---------------------------------------------------------------------- .../core/GroupByKeyViaGroupByKeyOnly.java | 13 ++-- ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 2 - .../beam/runners/direct/DirectGroupByKey.java | 42 ++++++------- .../GroupAlsoByWindowEvaluatorFactory.java | 4 +- .../direct/GroupByKeyOnlyEvaluatorFactory.java | 31 +++++----- .../beam/runners/direct/ParDoEvaluator.java | 5 +- .../direct/ParDoMultiEvaluatorHooks.java | 1 + .../direct/ParDoSingleEvaluatorHooks.java | 1 + .../direct/GroupByKeyEvaluatorFactoryTest.java | 29 ++++----- .../GroupByKeyOnlyEvaluatorFactoryTest.java | 31 +++++----- .../beam/runners/direct/ParDoEvaluatorTest.java | 5 +- .../apache/beam/sdk/util/GatherAllPanes.java | 18 +++++- .../sdk/util/ReifyTimestampAndWindowsDoFn.java | 41 ------------- .../sdk/util/ReifyTimestampsAndWindows.java | 63 -------------------- 14 files changed, 94 insertions(+), 192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java index b521425..79d2252 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.ReifyTimestampsAndWindows; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; import org.apache.beam.sdk.util.WindowingStrategy; @@ -84,15 +83,11 @@ public class GroupByKeyViaGroupByKeyOnly<K, V> WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); return input - // Make each input element's timestamp and assigned windows - // explicit, in the value part. - .apply(new ReifyTimestampsAndWindows<K, V>()) - // Group by just the key. // Combiner lifting will not happen regardless of the disallowCombinerLifting value. // There will be no combiners right after the GroupByKeyOnly because of the two ParDos // introduced in here. - .apply(new GroupByKeyOnly<K, WindowedValue<V>>()) + .apply(new GroupByKeyOnly<K, V>()) // Sort each key's values by timestamp. GroupAlsoByWindow requires // its input to be sorted by timestamp. @@ -112,12 +107,12 @@ public class GroupByKeyViaGroupByKeyOnly<K, V> * or evaluate this class. */ public static class GroupByKeyOnly<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { + extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<WindowedValue<V>>>>> { @SuppressWarnings({"rawtypes", "unchecked"}) @Override - public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { - return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal( + public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(PCollection<KV<K, V>> input) { + return PCollection.createPrimitiveOutputInternal( input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java index b63e23b..680a971 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.KeyedWorkItemCoder; -import org.apache.beam.sdk.util.ReifyTimestampsAndWindows; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -52,7 +51,6 @@ class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT> checkArgument(input.getCoder() instanceof KvCoder); KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder(); return input - .apply(new ReifyTimestampsAndWindows<KeyT, InputT>()) // TODO: Perhaps windowing strategy should instead be set by ReifyTAW, or by DGBKO .setWindowingStrategyInternal(WindowingStrategy.globalDefault()) .apply(new DirectGroupByKey.DirectGroupByKeyOnly<KeyT, InputT>()) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java index 14103a6..219314a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java @@ -27,8 +27,6 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.KeyedWorkItemCoder; -import org.apache.beam.sdk.util.ReifyTimestampsAndWindows; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -55,13 +53,13 @@ class DirectGroupByKey<K, V> // merging windows as needed, using the windows assigned to the // key/value input elements and the window merge operation of the // window function associated with the input PCollection. - WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); + WindowingStrategy<?, ?> inputWindowingStrategy = input.getWindowingStrategy(); + // Update the windowing strategy as appropriate. + WindowingStrategy<?, ?> outputWindowingStrategy = + original.updateWindowingStrategy(inputWindowingStrategy); // By default, implement GroupByKey via a series of lower-level operations. return input - // Make each input element's timestamp and assigned windows - // explicit, in the value part. - .apply(new ReifyTimestampsAndWindows<K, V>()) .apply(new DirectGroupByKeyOnly<K, V>()) .setCoder( KeyedWorkItemCoder.of( @@ -70,20 +68,20 @@ class DirectGroupByKey<K, V> input.getWindowingStrategy().getWindowFn().windowCoder())) // Group each key's values by window, merging windows as needed. - .apply("GroupAlsoByWindow", new DirectGroupAlsoByWindow<K, V>(windowingStrategy)) + .apply( + "GroupAlsoByWindow", + new DirectGroupAlsoByWindow<K, V>(inputWindowingStrategy, outputWindowingStrategy)) - // And update the windowing strategy as appropriate. - .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy)) .setCoder( KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()))); } static final class DirectGroupByKeyOnly<K, V> - extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> { + extends PTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> { @Override - public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) { - return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, V>> input) { + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded()); } DirectGroupByKeyOnly() {} @@ -92,14 +90,18 @@ class DirectGroupByKey<K, V> static final class DirectGroupAlsoByWindow<K, V> extends PTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> { - private final WindowingStrategy<?, ?> windowingStrategy; + private final WindowingStrategy<?, ?> inputWindowingStrategy; + private final WindowingStrategy<?, ?> outputWindowingStrategy; - public DirectGroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) { - this.windowingStrategy = windowingStrategy; + public DirectGroupAlsoByWindow( + WindowingStrategy<?, ?> inputWindowingStrategy, + WindowingStrategy<?, ?> outputWindowingStrategy) { + this.inputWindowingStrategy = inputWindowingStrategy; + this.outputWindowingStrategy = outputWindowingStrategy; } - public WindowingStrategy<?, ?> getWindowingStrategy() { - return windowingStrategy; + public WindowingStrategy<?, ?> getInputWindowingStrategy() { + return inputWindowingStrategy; } private KeyedWorkItemCoder<K, V> getKeyedWorkItemCoder(Coder<KeyedWorkItem<K, V>> inputCoder) { @@ -125,8 +127,8 @@ class DirectGroupByKey<K, V> @Override public PCollection<KV<K, Iterable<V>>> apply(PCollection<KeyedWorkItem<K, V>> input) { - return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), outputWindowingStrategy, input.isBounded()); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 4115bb7..37cc319 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -103,7 +103,8 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { @SuppressWarnings("unchecked") WindowingStrategy<?, BoundedWindow> windowingStrategy = - (WindowingStrategy<?, BoundedWindow>) application.getTransform().getWindowingStrategy(); + (WindowingStrategy<?, BoundedWindow>) + application.getTransform().getInputWindowingStrategy(); DirectStepContext stepContext = evaluationContext @@ -125,6 +126,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { evaluationContext, stepContext, application, + windowingStrategy, gabwDoFn, Collections.<PCollectionView<?>>emptyList(), MAIN_OUTPUT_TAG, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java index 2ead782..0fa7ebd 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java @@ -67,14 +67,14 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory { @Override public void cleanup() {} - private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator( + private <K, V> TransformEvaluator<KV<K, V>> createEvaluator( final AppliedPTransform< - PCollection<KV<K, WindowedValue<V>>>, + PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, DirectGroupByKeyOnly<K, V>> application, - final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle) { - return new GroupByKeyOnlyEvaluator<>(evaluationContext, inputBundle, application); + final CommittedBundle<KV<K, V>> inputBundle) { + return new GroupByKeyOnlyEvaluator<>(evaluationContext, application); } /** @@ -84,12 +84,11 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory { * @see GroupByKeyViaGroupByKeyOnly */ private static class GroupByKeyOnlyEvaluator<K, V> - implements TransformEvaluator<KV<K, WindowedValue<V>>> { + implements TransformEvaluator<KV<K, V>> { private final EvaluationContext evaluationContext; - private final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle; private final AppliedPTransform< - PCollection<KV<K, WindowedValue<V>>>, + PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, DirectGroupByKeyOnly<K, V>> application; private final Coder<K> keyCoder; @@ -97,19 +96,17 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory { public GroupByKeyOnlyEvaluator( EvaluationContext evaluationContext, - CommittedBundle<KV<K, WindowedValue<V>>> inputBundle, AppliedPTransform< - PCollection<KV<K, WindowedValue<V>>>, - PCollection<KeyedWorkItem<K, V>>, + PCollection<KV<K, V>>, + PCollection<KeyedWorkItem<K, V>>, DirectGroupByKeyOnly<K, V>> application) { this.evaluationContext = evaluationContext; - this.inputBundle = inputBundle; this.application = application; this.keyCoder = getKeyCoder(application.getInput().getCoder()); this.groupingMap = new HashMap<>(); } - private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) { + private Coder<K> getKeyCoder(Coder<KV<K, V>> coder) { checkState( coder instanceof KvCoder, "%s requires a coder of class %s." @@ -118,13 +115,13 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory { getClass().getSimpleName(), KvCoder.class.getSimpleName()); @SuppressWarnings("unchecked") - Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder(); + Coder<K> keyCoder = ((KvCoder<K, V>) coder).getKeyCoder(); return keyCoder; } @Override - public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) { - KV<K, WindowedValue<V>> kv = element.getValue(); + public void processElement(WindowedValue<KV<K, V>> element) { + KV<K, V> kv = element.getValue(); K key = kv.getKey(); byte[] encodedKey; try { @@ -139,10 +136,10 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory { GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey); List<WindowedValue<V>> values = groupingMap.get(groupingKey); if (values == null) { - values = new ArrayList<WindowedValue<V>>(); + values = new ArrayList<>(); groupingMap.put(groupingKey, values); } - values.add(kv.getValue()); + values.add(element.withValue(kv.getValue())); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 5913379..6f91319 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -31,9 +31,11 @@ import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -44,6 +46,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { EvaluationContext evaluationContext, DirectStepContext stepContext, AppliedPTransform<PCollection<InputT>, ?, ?> application, + WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy, Serializable fn, // may be OldDoFn or DoFn List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, @@ -70,7 +73,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { sideOutputTags, stepContext, aggregatorChanges, - application.getInput().getWindowingStrategy()); + windowingStrategy); PushbackSideInputDoFnRunner<InputT, OutputT> runner = PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java index a566154..f30f209 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java @@ -45,6 +45,7 @@ class ParDoMultiEvaluatorHooks<InputT, OutputT> evaluationContext, stepContext, application, + application.getInput().getWindowingStrategy(), fnLocal, transform.getSideInputs(), transform.getMainOutputTag(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java index b554f41..6d284c2 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java @@ -48,6 +48,7 @@ class ParDoSingleEvaluatorHooks<InputT, OutputT> evaluationContext, stepContext, application, + application.getInput().getWindowingStrategy(), fnLocal, transform.getSideInputs(), mainOutputTag, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java index 49d7d90..a726817 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.KeyedWorkItems; -import org.apache.beam.sdk.util.ReifyTimestampsAndWindows; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -64,13 +63,11 @@ public class GroupByKeyEvaluatorFactoryTest { KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE); PCollection<KV<String, Integer>> values = p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo)); - PCollection<KV<String, WindowedValue<Integer>>> kvs = - values.apply(new ReifyTimestampsAndWindows<String, Integer>()); PCollection<KeyedWorkItem<String, Integer>> groupedKvs = - kvs.apply(new DirectGroupByKeyOnly<String, Integer>()); + values.apply(new DirectGroupByKeyOnly<String, Integer>()); - CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle = - bundleFactory.createBundle(kvs).commit(Instant.now()); + CommittedBundle<KV<String, Integer>> inputBundle = + bundleFactory.createBundle(values).commit(Instant.now()); EvaluationContext evaluationContext = mock(EvaluationContext.class); StructuralKey<String> fooKey = StructuralKey.of("foo", StringUtf8Coder.of()); UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle = @@ -97,17 +94,17 @@ public class GroupByKeyEvaluatorFactoryTest { // The input to a GroupByKey is assumed to be a KvCoder @SuppressWarnings("unchecked") Coder<String> keyCoder = - ((KvCoder<String, WindowedValue<Integer>>) kvs.getCoder()).getKeyCoder(); - TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator = + ((KvCoder<String, Integer>) values.getCoder()).getKeyCoder(); + TransformEvaluator<KV<String, Integer>> evaluator = new GroupByKeyOnlyEvaluatorFactory(evaluationContext) .forApplication(groupedKvs.getProducingTransformInternal(), inputBundle); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz))); + evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo)); + evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo)); + evaluator.processElement(WindowedValue.valueInGlobalWindow(thirdFoo)); + evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBar)); + evaluator.processElement(WindowedValue.valueInGlobalWindow(secondBar)); + evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBaz)); evaluator.finishBundle(); @@ -142,10 +139,6 @@ public class GroupByKeyEvaluatorFactoryTest { keyCoder))); } - private <K, V> KV<K, WindowedValue<V>> gwValue(KV<K, V> kv) { - return KV.of(kv.getKey(), WindowedValue.valueInGlobalWindow(kv.getValue())); - } - private static class KeyedWorkItemMatcher<K, V> extends BaseMatcher<WindowedValue<KeyedWorkItem<K, V>>> { private final KeyedWorkItem<K, V> myWorkItem; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java index 3b9dc39..3e5af14 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.KeyedWorkItems; -import org.apache.beam.sdk.util.ReifyTimestampsAndWindows; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -64,13 +63,11 @@ public class GroupByKeyOnlyEvaluatorFactoryTest { KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE); PCollection<KV<String, Integer>> values = p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo)); - PCollection<KV<String, WindowedValue<Integer>>> kvs = - values.apply(new ReifyTimestampsAndWindows<String, Integer>()); PCollection<KeyedWorkItem<String, Integer>> groupedKvs = - kvs.apply(new DirectGroupByKeyOnly<String, Integer>()); + values.apply(new DirectGroupByKeyOnly<String, Integer>()); - CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle = - bundleFactory.createBundle(kvs).commit(Instant.now()); + CommittedBundle<KV<String, Integer>> inputBundle = + bundleFactory.createBundle(values).commit(Instant.now()); EvaluationContext evaluationContext = mock(EvaluationContext.class); StructuralKey<String> fooKey = StructuralKey.of("foo", StringUtf8Coder.of()); @@ -90,25 +87,25 @@ public class GroupByKeyOnlyEvaluatorFactoryTest { // The input to a GroupByKey is assumed to be a KvCoder @SuppressWarnings("unchecked") Coder<String> keyCoder = - ((KvCoder<String, WindowedValue<Integer>>) kvs.getCoder()).getKeyCoder(); - TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator = + ((KvCoder<String, Integer>) values.getCoder()).getKeyCoder(); + TransformEvaluator<KV<String, Integer>> evaluator = new GroupByKeyOnlyEvaluatorFactory(evaluationContext) .forApplication( groupedKvs.getProducingTransformInternal(), inputBundle); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz))); + evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo)); + evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo)); + evaluator.processElement(WindowedValue.valueInGlobalWindow(thirdFoo)); + evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBar)); + evaluator.processElement(WindowedValue.valueInGlobalWindow(secondBar)); + evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBaz)); evaluator.finishBundle(); assertThat( fooBundle.commit(Instant.now()).getElements(), contains( - new KeyedWorkItemMatcher<String, Integer>( + new KeyedWorkItemMatcher<>( KeyedWorkItems.elementsWorkItem( "foo", ImmutableSet.of( @@ -119,7 +116,7 @@ public class GroupByKeyOnlyEvaluatorFactoryTest { assertThat( barBundle.commit(Instant.now()).getElements(), contains( - new KeyedWorkItemMatcher<String, Integer>( + new KeyedWorkItemMatcher<>( KeyedWorkItems.elementsWorkItem( "bar", ImmutableSet.of( @@ -129,7 +126,7 @@ public class GroupByKeyOnlyEvaluatorFactoryTest { assertThat( bazBundle.commit(Instant.now()).getElements(), contains( - new KeyedWorkItemMatcher<String, Integer>( + new KeyedWorkItemMatcher<>( KeyedWorkItems.elementsWorkItem( "baz", ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 8254413..eab92f4 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -152,10 +152,13 @@ public class ParDoEvaluatorTest { when(evaluationContext.getAggregatorContainer()).thenReturn(container); when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); + AppliedPTransform<PCollection<Integer>, ?, ?> transform = + (AppliedPTransform<PCollection<Integer>, ?, ?>) output.getProducingTransformInternal(); return ParDoEvaluator.create( evaluationContext, stepContext, - (AppliedPTransform<PCollection<Integer>, ?, ?>) output.getProducingTransformInternal(), + transform, + transform.getInput().getWindowingStrategy(), fn, ImmutableList.<PCollectionView<?>>of(singletonView), mainOutputTag, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java index a2a6e17..52a2ba8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java @@ -17,10 +17,13 @@ */ package org.apache.beam.sdk.util; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Never; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -55,8 +58,12 @@ public class GatherAllPanes<T> WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn(); return input - .apply(WithKeys.<Integer, T>of(0).withKeyType(new TypeDescriptor<Integer>() {})) - .apply(new ReifyTimestampsAndWindows<Integer, T>()) + .apply(ParDo.of(new ReifyTimestampsAndWindowsFn<T>())) + .setCoder( + WindowedValue.FullWindowedValueCoder.of( + input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())) + .apply( + WithKeys.<Integer, WindowedValue<T>>of(0).withKeyType(new TypeDescriptor<Integer>() {})) .apply( Window.into( new IdentityWindowFn<KV<Integer, WindowedValue<T>>>( @@ -69,4 +76,11 @@ public class GatherAllPanes<T> .apply(Values.<Iterable<WindowedValue<T>>>create()) .setWindowingStrategyInternal(input.getWindowingStrategy()); } + + private static class ReifyTimestampsAndWindowsFn<T> extends DoFn<T, WindowedValue<T>> { + @DoFn.ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + c.output(WindowedValue.of(c.element(), c.timestamp(), window, c.pane())); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java deleted file mode 100644 index 6da4da0..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; -import org.apache.beam.sdk.values.KV; - -/** - * {@link OldDoFn} that makes timestamps and window assignments explicit in the value part of each - * key/value pair. - * - * @param <K> the type of the keys of the input and output {@code PCollection}s - * @param <V> the type of the values of the input {@code PCollection} - */ -@SystemDoFnInternal -public class ReifyTimestampAndWindowsDoFn<K, V> extends OldDoFn<KV<K, V>, KV<K, WindowedValue<V>>> - implements RequiresWindowAccess { - @Override - public void processElement(ProcessContext c) throws Exception { - KV<K, V> kv = c.element(); - K key = kv.getKey(); - V value = kv.getValue(); - c.output(KV.of(key, WindowedValue.of(value, c.timestamp(), c.window(), c.pane()))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java deleted file mode 100644 index d129c8e..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -/** - * Helper transform that makes timestamps and window assignments explicit in the value part of - * each key/value pair. - */ -public class ReifyTimestampsAndWindows<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, WindowedValue<V>>>> { - - @Override - public PCollection<KV<K, WindowedValue<V>>> apply(PCollection<KV<K, V>> input) { - - // The requirement to use a KvCoder *is* actually a model-level requirement, not specific - // to this implementation of GBK. All runners need a way to get the key. - checkArgument( - input.getCoder() instanceof KvCoder, - "%s requires its input to use a %s", - GroupByKey.class.getSimpleName(), - KvCoder.class.getSimpleName()); - - @SuppressWarnings("unchecked") - KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder(); - Coder<K> keyCoder = inputKvCoder.getKeyCoder(); - Coder<V> inputValueCoder = inputKvCoder.getValueCoder(); - Coder<WindowedValue<V>> outputValueCoder = - FullWindowedValueCoder.of( - inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder()); - Coder<KV<K, WindowedValue<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder); - return input - .apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>())) - .setCoder(outputKvCoder); - } -}