Spark GABWVOB to use UnsupportedSideInputReader.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3c94ff20 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3c94ff20 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3c94ff20 Branch: refs/heads/master Commit: 3c94ff2069bada1ae7194879d96b5e8f12207101 Parents: 62ddca6 Author: Sela <ans...@paypal.com> Authored: Tue Feb 28 14:53:23 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Wed Mar 1 00:18:10 2017 +0200 ---------------------------------------------------------------------- ...SparkGroupAlsoByWindowViaOutputBufferFn.java | 23 ++------------------ 1 file changed, 2 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3c94ff20/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index 449e3b6..2b16c60 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.UnsupportedSideInputReader; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.runners.spark.aggregators.NamedAggregators; @@ -38,11 +39,9 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Triggers; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.FlatMapFunction; @@ -107,25 +106,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde stateInternals, timerInternals, outputter, - new SideInputReader() { - @Override - public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow) { - throw new UnsupportedOperationException( - "GroupAlsoByWindow must not have side inputs"); - } - - @Override - public <T> boolean contains(PCollectionView<T> view) { - throw new UnsupportedOperationException( - "GroupAlsoByWindow must not have side inputs"); - } - - @Override - public boolean isEmpty() { - throw new UnsupportedOperationException( - "GroupAlsoByWindow must not have side inputs"); - } - }, + new UnsupportedSideInputReader("GroupAlsoByWindow"), droppedDueToClosedWindow, reduceFn, runtimeContext.getPipelineOptions());