Spark GABWVOB to use UnsupportedSideInputReader.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3c94ff20
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3c94ff20
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3c94ff20

Branch: refs/heads/master
Commit: 3c94ff2069bada1ae7194879d96b5e8f12207101
Parents: 62ddca6
Author: Sela <ans...@paypal.com>
Authored: Tue Feb 28 14:53:23 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Wed Mar 1 00:18:10 2017 +0200

----------------------------------------------------------------------
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java | 23 ++------------------
 1 file changed, 2 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3c94ff20/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index 449e3b6..2b16c60 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.UnsupportedSideInputReader;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
@@ -38,11 +39,9 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Triggers;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.FlatMapFunction;
@@ -107,25 +106,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, 
InputT, W extends Bounde
             stateInternals,
             timerInternals,
             outputter,
-            new SideInputReader() {
-              @Override
-              public <T> T get(PCollectionView<T> view, BoundedWindow 
sideInputWindow) {
-                throw new UnsupportedOperationException(
-                    "GroupAlsoByWindow must not have side inputs");
-              }
-
-              @Override
-              public <T> boolean contains(PCollectionView<T> view) {
-                throw new UnsupportedOperationException(
-                    "GroupAlsoByWindow must not have side inputs");
-              }
-
-              @Override
-              public boolean isEmpty() {
-                throw new UnsupportedOperationException(
-                    "GroupAlsoByWindow must not have side inputs");
-              }
-            },
+            new UnsupportedSideInputReader("GroupAlsoByWindow"),
             droppedDueToClosedWindow,
             reduceFn,
             runtimeContext.getPipelineOptions());

Reply via email to