Utils for SparkGroupAlsoByWindowViaWindowSet.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8c379704 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8c379704 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8c379704 Branch: refs/heads/master Commit: 8c3797047a6e971fdeb7882d1765f66a63109255 Parents: 32a9d61 Author: Sela <ans...@paypal.com> Authored: Mon Feb 13 16:33:14 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Wed Mar 1 00:17:59 2017 +0200 ---------------------------------------------------------------------- .../spark/stateful/SparkStateInternals.java | 402 +++++++++++++++++++ .../spark/stateful/SparkTimerInternals.java | 173 ++++++++ .../beam/runners/spark/util/LateDataUtils.java | 92 +++++ .../spark/util/UnsupportedSideInputReader.java | 52 +++ 4 files changed, 719 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8c379704/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java new file mode 100644 index 0000000..e628d31 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.stateful; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.runners.spark.coders.CoderHelpers; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTag.StateBinder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.util.CombineFnUtil; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateContext; +import org.apache.beam.sdk.util.state.StateContexts; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.joda.time.Instant; + + +/** + * An implementation of {@link StateInternals} for the SparkRunner. + */ +class SparkStateInternals<K> implements StateInternals<K> { + + private final K key; + //Serializable state for internals (namespace to state tag to coded value). + private final Table<String, String, byte[]> stateTable; + + private SparkStateInternals(K key) { + this.key = key; + this.stateTable = HashBasedTable.create(); + } + + private SparkStateInternals(K key, Table<String, String, byte[]> stateTable) { + this.key = key; + this.stateTable = stateTable; + } + + static <K> SparkStateInternals<K> forKey(K key) { + return new SparkStateInternals<>(key); + } + + static <K> SparkStateInternals<K> + forKeyAndState(K key, Table<String, String, byte[]> stateTable) { + return new SparkStateInternals<>(key, stateTable); + } + + public Table<String, String, byte[]> getState() { + return stateTable; + } + + @Override + public K getKey() { + return key; + } + + @Override + public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) { + return state(namespace, address, StateContexts.nullContext()); + } + + @Override + public <T extends State> T state( + StateNamespace namespace, + StateTag<? super K, T> address, + StateContext<?> c) { + return address.bind(new SparkStateBinder(key, namespace, c)); + } + + private class SparkStateBinder implements StateBinder<K> { + private final K key; + private final StateNamespace namespace; + private final StateContext<?> c; + + private SparkStateBinder(K key, + StateNamespace namespace, + StateContext<?> c) { + this.key = key; + this.namespace = namespace; + this.c = c; + } + + @Override + public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) { + return new SparkValueState<>(namespace, address, coder); + } + + @Override + public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { + return new SparkBagState<>(namespace, address, elemCoder); + } + + @Override + public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + bindCombiningValue( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + Coder<AccumT> accumCoder, + CombineFn<InputT, AccumT, OutputT> combineFn) { + return new SparkAccumulatorCombiningState<>(namespace, address, accumCoder, key, + combineFn.<K>asKeyedFn()); + } + + @Override + public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + bindKeyedCombiningValue( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + Coder<AccumT> accumCoder, + KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { + return new SparkAccumulatorCombiningState<>(namespace, address, accumCoder, key, combineFn); + } + + @Override + public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + bindKeyedCombiningValueWithContext( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + Coder<AccumT> accumCoder, + KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { + return new SparkAccumulatorCombiningState<>(namespace, address, accumCoder, key, + CombineFnUtil.bindContext(combineFn, c)); + } + + @Override + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + StateTag<? super K, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { + return new SparkWatermarkHoldState<>(namespace, address, outputTimeFn); + } + } + + private class AbstractState<T> { + final StateNamespace namespace; + final StateTag<?, ? extends State> address; + final Coder<T> coder; + + private AbstractState( + StateNamespace namespace, + StateTag<?, ? extends State> address, + Coder<T> coder) { + this.namespace = namespace; + this.address = address; + this.coder = coder; + } + + T readValue() { + byte[] buf = stateTable.get(namespace.stringKey(), address.getId()); + if (buf != null) { + return CoderHelpers.fromByteArray(buf, coder); + } + return null; + } + + void writeValue(T input) { + stateTable.put(namespace.stringKey(), address.getId(), + CoderHelpers.toByteArray(input, coder)); + } + + public void clear() { + stateTable.remove(namespace.stringKey(), address.getId()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + @SuppressWarnings("unchecked") + AbstractState<?> that = (AbstractState<?>) o; + return namespace.equals(that.namespace) && address.equals(that.address); + } + + @Override + public int hashCode() { + int result = namespace.hashCode(); + result = 31 * result + address.hashCode(); + return result; + } + } + + private class SparkValueState<T> extends AbstractState<T> implements ValueState<T> { + + private SparkValueState( + StateNamespace namespace, + StateTag<?, ValueState<T>> address, + Coder<T> coder) { + super(namespace, address, coder); + } + + @Override + public SparkValueState<T> readLater() { + return this; + } + + @Override + public T read() { + return readValue(); + } + + @Override + public void write(T input) { + writeValue(input); + } + } + + private class SparkWatermarkHoldState<W extends BoundedWindow> + extends AbstractState<Instant> implements WatermarkHoldState<W> { + + private final OutputTimeFn<? super W> outputTimeFn; + + public SparkWatermarkHoldState( + StateNamespace namespace, + StateTag<?, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { + super(namespace, address, InstantCoder.of()); + this.outputTimeFn = outputTimeFn; + } + + @Override + public SparkWatermarkHoldState<W> readLater() { + return this; + } + + @Override + public Instant read() { + return readValue(); + } + + @Override + public void add(Instant outputTime) { + Instant combined = read(); + combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime); + writeValue(combined); + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public ReadableState<Boolean> readLater() { + return this; + } + @Override + public Boolean read() { + return stateTable.get(namespace.stringKey(), address.getId()) == null; + } + }; + } + + @Override + public OutputTimeFn<? super W> getOutputTimeFn() { + return outputTimeFn; + } + } + + private class SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT> + extends AbstractState<AccumT> + implements AccumulatorCombiningState<InputT, AccumT, OutputT> { + + private final K key; + private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn; + + private SparkAccumulatorCombiningState( + StateNamespace namespace, + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + Coder<AccumT> coder, + K key, + KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { + super(namespace, address, coder); + this.key = key; + this.combineFn = combineFn; + } + + @Override + public SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() { + return this; + } + + @Override + public OutputT read() { + return combineFn.extractOutput(key, getAccum()); + } + + @Override + public void add(InputT input) { + AccumT accum = getAccum(); + combineFn.addInput(key, accum, input); + writeValue(accum); + } + + @Override + public AccumT getAccum() { + AccumT accum = readValue(); + if (accum == null) { + accum = combineFn.createAccumulator(key); + } + return accum; + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public ReadableState<Boolean> readLater() { + return this; + } + @Override + public Boolean read() { + return stateTable.get(namespace.stringKey(), address.getId()) == null; + } + }; + } + + @Override + public void addAccum(AccumT accum) { + accum = combineFn.mergeAccumulators(key, Arrays.asList(getAccum(), accum)); + writeValue(accum); + } + + @Override + public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { + return combineFn.mergeAccumulators(key, accumulators); + } + + } + + private final class SparkBagState<T> extends AbstractState<List<T>> implements BagState<T> { + private SparkBagState( + StateNamespace namespace, + StateTag<?, BagState<T>> address, + Coder<T> coder) { + super(namespace, address, ListCoder.of(coder)); + } + + @Override + public SparkBagState<T> readLater() { + return this; + } + + @Override + public List<T> read() { + List<T> value = super.readValue(); + if (value == null) { + value = new ArrayList<>(); + } + return value; + } + + @Override + public void add(T input) { + List<T> value = read(); + value.add(input); + writeValue(value); + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public ReadableState<Boolean> readLater() { + return this; + } + + @Override + public Boolean read() { + return stateTable.get(namespace.stringKey(), address.getId()) == null; + } + }; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/8c379704/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java new file mode 100644 index 0000000..65225c5 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.stateful; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.spark.broadcast.Broadcast; +import org.joda.time.Instant; + + +/** + * An implementation of {@link TimerInternals} for the SparkRunner. + */ +class SparkTimerInternals implements TimerInternals { + private final Instant lowWatermark; + private final Instant highWatermark; + private final Instant synchronizedProcessingTime; + private final Set<TimerData> timers = Sets.newHashSet(); + + private Instant inputWatermark; + + private SparkTimerInternals( + Instant lowWatermark, Instant highWatermark, Instant synchronizedProcessingTime) { + this.lowWatermark = lowWatermark; + this.highWatermark = highWatermark; + this.synchronizedProcessingTime = synchronizedProcessingTime; + } + + /** Build the {@link TimerInternals} according to the feeding streams. */ + public static SparkTimerInternals forStreamFromSources( + List<Integer> sourceIds, + @Nullable Broadcast<Map<Integer, SparkWatermarks>> broadcast) { + // if broadcast is invalid for the specific ids, use defaults. + if (broadcast == null || broadcast.getValue().isEmpty() + || Collections.disjoint(sourceIds, broadcast.getValue().keySet())) { + return new SparkTimerInternals( + BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(0)); + } + // there might be more than one stream feeding this stream, slowest WM is the right one. + Instant slowestLowWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE; + Instant slowestHighWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE; + // synchronized processing time should clearly be synchronized. + Instant synchronizedProcessingTime = null; + for (Integer sourceId: sourceIds) { + SparkWatermarks sparkWatermarks = broadcast.getValue().get(sourceId); + if (sparkWatermarks != null) { + // keep slowest WMs. + slowestLowWatermark = slowestLowWatermark.isBefore(sparkWatermarks.getLowWatermark()) + ? slowestLowWatermark : sparkWatermarks.getLowWatermark(); + slowestHighWatermark = slowestHighWatermark.isBefore(sparkWatermarks.getHighWatermark()) + ? slowestHighWatermark : sparkWatermarks.getHighWatermark(); + if (synchronizedProcessingTime == null) { + // firstime set. + synchronizedProcessingTime = sparkWatermarks.getSynchronizedProcessingTime(); + } else { + // assert on following. + checkArgument( + sparkWatermarks.getSynchronizedProcessingTime().equals(synchronizedProcessingTime), + "Synchronized time is expected to keep synchronized across sources."); + } + } + } + return new SparkTimerInternals( + slowestLowWatermark, slowestHighWatermark, synchronizedProcessingTime); + } + + Collection<TimerData> getTimers() { + return timers; + } + + /** This should only be called after processing the element. */ + Collection<TimerData> getTimersReadyToProcess() { + Set<TimerData> toFire = Sets.newHashSet(); + Iterator<TimerData> iterator = timers.iterator(); + while (iterator.hasNext()) { + TimerData timer = iterator.next(); + if (timer.getTimestamp().isBefore(inputWatermark)) { + toFire.add(timer); + iterator.remove(); + } + } + return toFire; + } + + void addTimers(Collection<TimerData> timers) { + this.timers.addAll(timers); + } + + @Override + public void setTimer(TimerData timer) { + this.timers.add(timer); + } + + @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException("Deleting a timer by ID is not yet supported."); + } + + @Override + public void deleteTimer(TimerData timer) { + this.timers.remove(timer); + } + + @Override + public Instant currentProcessingTime() { + return Instant.now(); + } + + @Nullable + @Override + public Instant currentSynchronizedProcessingTime() { + return synchronizedProcessingTime; + } + + @Override + public Instant currentInputWatermarkTime() { + return inputWatermark; + } + + /** Advances the watermark - since */ + public void advanceWatermark() { + inputWatermark = highWatermark; + } + + @Nullable + @Override + public Instant currentOutputWatermarkTime() { + return null; + } + + @Override + public void setTimer( + StateNamespace namespace, + String timerId, + Instant target, + TimeDomain timeDomain) { + throw new UnsupportedOperationException("Setting a timer by ID not yet supported."); + } + + @Override + public void deleteTimer(StateNamespace namespace, String timerId) { + throw new UnsupportedOperationException("Deleting a timer by ID is not yet supported."); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/8c379704/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java new file mode 100644 index 0000000..96e6ee5 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.util; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterables; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; + + +/** + * Utils to handle late data. + */ +public class LateDataUtils { + + /** + * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains non-late input + * elements. + * Taken from Thomas Groh's implementation in the DirectRunner's + * GroupAlsoByWindowEvaluatorFactory. + */ + public static <K, V> Iterable<WindowedValue<V>> dropExpiredWindows( + final K key, + Iterable<WindowedValue<V>> elements, + final TimerInternals timerInternals, + final WindowingStrategy<?, ?> windowingStrategy, + final Aggregator<Long, Long> droppedDueToLateness) { + return FluentIterable.from(elements) + .transformAndConcat( + // Explode windows to filter out expired ones + new Function<WindowedValue<V>, Iterable<WindowedValue<V>>>() { + @Override + public Iterable<WindowedValue<V>> apply(@Nullable WindowedValue<V> input) { + if (input == null) { + return null; + } + return input.explodeWindows(); + } + }) + .filter( + new Predicate<WindowedValue<V>>() { + @Override + public boolean apply(@Nullable WindowedValue<V> input) { + if (input == null) { + // drop null elements. + return false; + } + BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); + boolean expired = + window + .maxTimestamp() + .plus(windowingStrategy.getAllowedLateness()) + .isBefore(timerInternals.currentInputWatermarkTime()); + if (expired) { + // The element is too late for this window. + droppedDueToLateness.addValue(1L); + WindowTracing.debug( + "GroupAlsoByWindow: Dropping element at {} for key: {}; " + + "window: {} since it is too far behind inputWatermark: {}", + input.getTimestamp(), + key, + window, + timerInternals.currentInputWatermarkTime()); + } + // Keep the element if the window is not expired. + return !expired; + } + }); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/8c379704/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java new file mode 100644 index 0000000..6de7e86 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.util; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.values.PCollectionView; + + +/** + * An implementation of a {@link SideInputReader} that actually does not support side-inputs. + */ +public class UnsupportedSideInputReader implements SideInputReader { + private final String transformName; + + public UnsupportedSideInputReader(String transformName) { + this.transformName = transformName; + } + + @Override + public <T> T get(PCollectionView<T> view, BoundedWindow window) { + throw new UnsupportedOperationException( + String.format("%s does not support side inputs.", transformName)); + } + + @Override + public <T> boolean contains(PCollectionView<T> view) { + throw new UnsupportedOperationException( + String.format("%s does not support side inputs.", transformName)); + } + + @Override + public boolean isEmpty() { + throw new UnsupportedOperationException( + String.format("%s does not support side inputs.", transformName)); + } +} \ No newline at end of file