This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 16c57c3 First attempt for ParDo primitive implementation 16c57c3 is described below commit 16c57c30f52f0d1b76423a68b4321ee602c1e7c0 Author: Alexey Romanenko <aromanenko....@gmail.com> AuthorDate: Mon Jan 7 10:47:04 2019 +0100 First attempt for ParDo primitive implementation --- .../translation/TranslationContext.java | 12 ++ .../translation/batch/DoFnFunction.java | 137 ++++++++++++++++ .../translation/batch/ParDoTranslatorBatch.java | 174 ++++++++++++++++++++- .../translation/batch/SparkProcessContext.java | 149 ++++++++++++++++++ .../SparkNoOpStepContext.java} | 24 +-- .../batch/functions/SparkSideInputReader.java | 62 ++++++++ 6 files changed, 545 insertions(+), 13 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 9a3330a..33706bd 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -108,6 +108,13 @@ public class TranslationContext { return (Dataset<WindowedValue<T>>) dataset; } + public void putDatasetWildcard(PValue value, Dataset<WindowedValue<?>> dataset) { + if (!datasets.containsKey(value)) { + datasets.put(value, dataset); + leaves.add(dataset); + } + } + public <T> void putDataset(PValue value, Dataset<WindowedValue<T>> dataset) { if (!datasets.containsKey(value)) { datasets.put(value, dataset); @@ -131,6 +138,11 @@ public class TranslationContext { } @SuppressWarnings("unchecked") + public <T extends PValue> T getInput(PTransform<T, ?> transform) { + return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform)); + } + + @SuppressWarnings("unchecked") public Map<TupleTag<?>, PValue> getInputs() { return currentTransform.getInputs(); } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java new file mode 100644 index 0000000..35204bc --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkNoOpStepContext; +import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkSideInputReader; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.util.WindowedValue; + +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.spark.api.java.function.MapPartitionsFunction; +import scala.Tuple2; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class DoFnFunction<InputT, OutputT> + implements MapPartitionsFunction<WindowedValue<InputT>, Tuple2<TupleTag<?>, WindowedValue<?>>> { + + private final SerializablePipelineOptions serializedOptions; + + private final DoFn<InputT, OutputT> doFn; + private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; + + private final WindowingStrategy<?, ?> windowingStrategy; + + private final Map<TupleTag<?>, Integer> outputMap; + private final TupleTag<OutputT> mainOutputTag; + private final Coder<InputT> inputCoder; + private final Map<TupleTag<?>, Coder<?>> outputCoderMap; + + private transient DoFnInvoker<InputT, OutputT> doFnInvoker; + + public DoFnFunction( + DoFn<InputT, OutputT> doFn, + WindowingStrategy<?, ?> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions options, + Map<TupleTag<?>, Integer> outputMap, + TupleTag<OutputT> mainOutputTag, + Coder<InputT> inputCoder, + Map<TupleTag<?>, Coder<?>> outputCoderMap) { + + this.doFn = doFn; + this.sideInputs = sideInputs; + this.serializedOptions = new SerializablePipelineOptions(options); + this.windowingStrategy = windowingStrategy; + this.outputMap = outputMap; + this.mainOutputTag = mainOutputTag; + this.inputCoder = inputCoder; + this.outputCoderMap = outputCoderMap; + } + + @Override + public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> call(Iterator<WindowedValue<InputT>> iter) + throws Exception { + + DoFnOutputManager outputManager = new DoFnOutputManager(); + + List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet()); + + DoFnRunner<InputT, OutputT> doFnRunner = + DoFnRunners.simpleRunner( + serializedOptions.get(), + doFn, + new SparkSideInputReader(sideInputs), + outputManager, + mainOutputTag, + additionalOutputTags, + new SparkNoOpStepContext(), + inputCoder, + outputCoderMap, + windowingStrategy); + + return new SparkProcessContext<>(doFn, doFnRunner, outputManager, Collections.emptyIterator()) + .processPartition(iter) + .iterator(); + } + + private class DoFnOutputManager + implements SparkProcessContext.SparkOutputManager<Tuple2<TupleTag<?>, WindowedValue<?>>> { + + private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create(); + + @Override + public void clear() { + outputs.clear(); + } + + @Override + public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> iterator() { + Iterator<Map.Entry<TupleTag<?>, WindowedValue<?>>> entryIter = outputs.entries().iterator(); + return Iterators.transform(entryIter, this.entryToTupleFn()); + } + + private <K, V> Function<Map.Entry<K, V>, Tuple2<K, V>> entryToTupleFn() { + return en -> new Tuple2<>(en.getKey(), en.getValue()); + } + + @Override + @SuppressWarnings("unchecked") + public synchronized <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + outputs.put(tag, output); + } + } +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 1e57098..1ad1e3b 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -17,16 +17,184 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.beam.runners.core.construction.ParDoTranslation; +import org.apache.beam.runners.spark.structuredstreaming.translation.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.join.UnionCoder; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.*; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import scala.Tuple2; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkState; + +/** + * TODO: Add support of state and timers TODO: Add support of side inputs + * + * @param <InputT> + * @param <OutputT> + */ class ParDoTranslatorBatch<InputT, OutputT> implements TransformTranslator<PTransform<PCollection<InputT>, PCollectionTuple>> { @Override public void translateTransform( - PTransform<PCollection<InputT>, PCollectionTuple> transform, TranslationContext context) {} + PTransform<PCollection<InputT>, PCollectionTuple> transform, TranslationContext context) { + + DoFn<InputT, OutputT> doFn = getDoFn(context); + checkState( + !DoFnSignatures.signatureForDoFn(doFn).processElement().isSplittable(), + "Not expected to directly translate splittable DoFn, should have been overridden: %s", + doFn); + + Dataset<WindowedValue<InputT>> inputDataSet = context.getDataset(context.getInput()); + Map<TupleTag<?>, PValue> outputs = context.getOutputs(); + TupleTag<?> mainOutputTag = getTupleTag(context); + + Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap(); + + outputMap.put(mainOutputTag, 0); + int count = 1; + for (TupleTag<?> tag : outputs.keySet()) { + if (!outputMap.containsKey(tag)) { + outputMap.put(tag, count++); + } + } + + // Union coder elements must match the order of the output tags. + Map<Integer, TupleTag<?>> indexMap = Maps.newTreeMap(); + for (Map.Entry<TupleTag<?>, Integer> entry : outputMap.entrySet()) { + indexMap.put(entry.getValue(), entry.getKey()); + } + + // assume that the windowing strategy is the same for all outputs + WindowingStrategy<?, ?> windowingStrategy = null; + + // collect all output Coders and create a UnionCoder for our tagged outputs + List<Coder<?>> outputCoders = Lists.newArrayList(); + for (TupleTag<?> tag : indexMap.values()) { + PValue taggedValue = outputs.get(tag); + checkState( + taggedValue instanceof PCollection, + "Within ParDo, got a non-PCollection output %s of type %s", + taggedValue, + taggedValue.getClass().getSimpleName()); + PCollection<?> coll = (PCollection<?>) taggedValue; + outputCoders.add(coll.getCoder()); + windowingStrategy = coll.getWindowingStrategy(); + } + + if (windowingStrategy == null) { + throw new IllegalStateException("No outputs defined."); + } + + UnionCoder unionCoder = UnionCoder.of(outputCoders); + + List<PCollectionView<?>> sideInputs = getSideInputs(context); + + // construct a map from side input to WindowingStrategy so that + // the DoFn runner can map main-input windows to side input windows + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); + for (PCollectionView<?> sideInput : sideInputs) { + sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); + } + + Map<TupleTag<?>, Coder<?>> outputCoderMap = context.getOutputCoders(); + + @SuppressWarnings("unchecked") + DoFnFunction<InputT, OutputT> doFnWrapper = + new DoFnFunction( + doFn, + windowingStrategy, + sideInputStrategies, + context.getOptions(), + outputMap, + mainOutputTag, + context.getInput(transform).getCoder(), + outputCoderMap); + + Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputsDataset = + inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.encoder()); + + for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) { + pruneOutput(context, allOutputsDataset, output); + } + } + + private List<PCollectionView<?>> getSideInputs(TranslationContext context) { + List<PCollectionView<?>> sideInputs; + try { + sideInputs = ParDoTranslation.getSideInputs(context.getCurrentTransform()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return sideInputs; + } + + private TupleTag<?> getTupleTag(TranslationContext context) { + TupleTag<?> mainOutputTag; + try { + mainOutputTag = ParDoTranslation.getMainOutputTag(context.getCurrentTransform()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return mainOutputTag; + } + + @SuppressWarnings("unchecked") + private DoFn<InputT, OutputT> getDoFn(TranslationContext context) { + DoFn<InputT, OutputT> doFn; + try { + doFn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(context.getCurrentTransform()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return doFn; + } + + private <T> void pruneOutput( + TranslationContext context, + Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> tmpDataset, + Map.Entry<TupleTag<?>, PValue> output) { + Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> filteredDataset = + tmpDataset.filter(new SparkDoFnFilterFunction(output.getKey())); + Dataset<WindowedValue<?>> outputDataset = + filteredDataset.map( + (MapFunction<Tuple2<TupleTag<?>, WindowedValue<?>>, WindowedValue<?>>) + value -> value._2, + EncoderHelpers.encoder()); + context.putDatasetWildcard(output.getValue(), outputDataset); + } + + class SparkDoFnFilterFunction implements FilterFunction<Tuple2<TupleTag<?>, WindowedValue<?>>> { + + private final TupleTag<?> key; + + public SparkDoFnFilterFunction(TupleTag<?> key) { + this.key = key; + } + + @Override + public boolean call(Tuple2<TupleTag<?>, WindowedValue<?>> value) { + return value._1.equals(key); + } + } } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java new file mode 100644 index 0000000..720b7ab --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import com.google.common.collect.AbstractIterator; +import org.apache.beam.runners.core.*; +import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; + +import java.util.ArrayList; +import java.util.Iterator; + +import static com.google.common.base.Preconditions.checkArgument; + +/** Spark runner process context processes Spark partitions using Beam's {@link DoFnRunner}. */ +class SparkProcessContext<FnInputT, FnOutputT, OutputT> { + + private final DoFn<FnInputT, FnOutputT> doFn; + private final DoFnRunner<FnInputT, FnOutputT> doFnRunner; + private final SparkOutputManager<OutputT> outputManager; + private Iterator<TimerInternals.TimerData> timerDataIterator; + + SparkProcessContext( + DoFn<FnInputT, FnOutputT> doFn, + DoFnRunner<FnInputT, FnOutputT> doFnRunner, + SparkOutputManager<OutputT> outputManager, + Iterator<TimerInternals.TimerData> timerDataIterator) { + + this.doFn = doFn; + this.doFnRunner = doFnRunner; + this.outputManager = outputManager; + this.timerDataIterator = timerDataIterator; + } + + Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>> partition) throws Exception { + + // skip if partition is empty. + if (!partition.hasNext()) { + return new ArrayList<>(); + } + + // process the partition; finishBundle() is called from within the output iterator. + return this.getOutputIterable(partition, doFnRunner); + } + + private void clearOutput() { + outputManager.clear(); + } + + private Iterator<OutputT> getOutputIterator() { + return outputManager.iterator(); + } + + private Iterable<OutputT> getOutputIterable( + final Iterator<WindowedValue<FnInputT>> iter, + final DoFnRunner<FnInputT, FnOutputT> doFnRunner) { + return () -> new ProcCtxtIterator(iter, doFnRunner); + } + + interface SparkOutputManager<T> extends OutputManager, Iterable<T> { + void clear(); + } + + private class ProcCtxtIterator extends AbstractIterator<OutputT> { + + private final Iterator<WindowedValue<FnInputT>> inputIterator; + private final DoFnRunner<FnInputT, FnOutputT> doFnRunner; + private Iterator<OutputT> outputIterator; + private boolean isBundleStarted; + private boolean isBundleFinished; + + ProcCtxtIterator( + Iterator<WindowedValue<FnInputT>> iterator, DoFnRunner<FnInputT, FnOutputT> doFnRunner) { + this.inputIterator = iterator; + this.doFnRunner = doFnRunner; + this.outputIterator = getOutputIterator(); + } + + @Override + protected OutputT computeNext() { + // Process each element from the (input) iterator, which produces, zero, one or more + // output elements (of type V) in the output iterator. Note that the output + // collection (and iterator) is reset between each call to processElement, so the + // collection only holds the output values for each call to processElement, rather + // than for the whole partition (which would use too much memory). + if (!isBundleStarted) { + isBundleStarted = true; + // call startBundle() before beginning to process the partition. + doFnRunner.startBundle(); + } + + try { + while (true) { + if (outputIterator.hasNext()) { + return outputIterator.next(); + } + + clearOutput(); + if (inputIterator.hasNext()) { + // grab the next element and process it. + doFnRunner.processElement(inputIterator.next()); + outputIterator = getOutputIterator(); + } else if (timerDataIterator.hasNext()) { + fireTimer(timerDataIterator.next()); + outputIterator = getOutputIterator(); + } else { + // no more input to consume, but finishBundle can produce more output + if (!isBundleFinished) { + isBundleFinished = true; + doFnRunner.finishBundle(); + outputIterator = getOutputIterator(); + continue; // try to consume outputIterator from start of loop + } + DoFnInvokers.invokerFor(doFn).invokeTeardown(); + return endOfData(); + } + } + } catch (final RuntimeException re) { + DoFnInvokers.invokerFor(doFn).invokeTeardown(); + throw re; + } + } + + private void fireTimer(TimerInternals.TimerData timer) { + StateNamespace namespace = timer.getNamespace(); + checkArgument(namespace instanceof StateNamespaces.WindowNamespace); + BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow(); + doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain()); + } + } +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java similarity index 59% copy from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java copy to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java index 1e57098..889cdf5 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java @@ -15,18 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.spark.structuredstreaming.translation.batch; +package org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions; -import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; -import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StepContext; +import org.apache.beam.runners.core.TimerInternals; -class ParDoTranslatorBatch<InputT, OutputT> - implements TransformTranslator<PTransform<PCollection<InputT>, PCollectionTuple>> { +/** A {@link StepContext} for Spark Batch Runner execution. */ +public class SparkNoOpStepContext implements StepContext { @Override - public void translateTransform( - PTransform<PCollection<InputT>, PCollectionTuple> transform, TranslationContext context) {} + public StateInternals stateInternals() { + throw new UnsupportedOperationException("stateInternals is not supported"); + } + + @Override + public TimerInternals timerInternals() { + throw new UnsupportedOperationException("timerInternals is not supported"); + } } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java new file mode 100644 index 0000000..da75101 --- /dev/null +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkSideInputReader.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions; + +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * TODO: Need to be implemented + * + * A {@link SideInputReader} for the Spark Batch Runner. + */ +public class SparkSideInputReader implements SideInputReader { + private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs; + + + public SparkSideInputReader( + Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView) { + sideInputs = new HashMap<>(); + } + + @Nullable + @Override + public <T> T get(PCollectionView<T> view, BoundedWindow window) { + return null; + } + + @Override + public <T> boolean contains(PCollectionView<T> view) { + return sideInputs.containsKey(view.getTagInternal()); + } + + @Override + public boolean isEmpty() { + return sideInputs.isEmpty(); + } +}