Utils for SparkGroupAlsoByWindowViaWindowSet.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8c379704
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8c379704
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8c379704

Branch: refs/heads/master
Commit: 8c3797047a6e971fdeb7882d1765f66a63109255
Parents: 32a9d61
Author: Sela <ans...@paypal.com>
Authored: Mon Feb 13 16:33:14 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Wed Mar 1 00:17:59 2017 +0200

----------------------------------------------------------------------
 .../spark/stateful/SparkStateInternals.java     | 402 +++++++++++++++++++
 .../spark/stateful/SparkTimerInternals.java     | 173 ++++++++
 .../beam/runners/spark/util/LateDataUtils.java  |  92 +++++
 .../spark/util/UnsupportedSideInputReader.java  |  52 +++
 4 files changed, 719 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8c379704/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
new file mode 100644
index 0000000..e628d31
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.stateful;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTag.StateBinder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CombineFnUtil;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.joda.time.Instant;
+
+
+/**
+ * An implementation of {@link StateInternals} for the SparkRunner.
+ */
+class SparkStateInternals<K> implements StateInternals<K> {
+
+  private final K key;
+  //Serializable state for internals (namespace to state tag to coded value).
+  private final Table<String, String, byte[]> stateTable;
+
+  private SparkStateInternals(K key) {
+    this.key = key;
+    this.stateTable = HashBasedTable.create();
+  }
+
+  private SparkStateInternals(K key, Table<String, String, byte[]> stateTable) 
{
+    this.key = key;
+    this.stateTable = stateTable;
+  }
+
+  static <K> SparkStateInternals<K> forKey(K key) {
+    return new SparkStateInternals<>(key);
+  }
+
+  static <K> SparkStateInternals<K>
+      forKeyAndState(K key, Table<String, String, byte[]> stateTable) {
+    return new SparkStateInternals<>(key, stateTable);
+  }
+
+  public Table<String, String, byte[]> getState() {
+    return stateTable;
+  }
+
+  @Override
+  public K getKey() {
+    return key;
+  }
+
+  @Override
+  public <T extends State> T state(StateNamespace namespace, StateTag<? super 
K, T> address) {
+    return state(namespace, address, StateContexts.nullContext());
+  }
+
+  @Override
+  public <T extends State> T state(
+      StateNamespace namespace,
+      StateTag<? super K, T> address,
+      StateContext<?> c) {
+    return address.bind(new SparkStateBinder(key, namespace, c));
+  }
+
+  private class SparkStateBinder implements StateBinder<K> {
+    private final K key;
+    private final StateNamespace namespace;
+    private final StateContext<?> c;
+
+    private SparkStateBinder(K key,
+                             StateNamespace namespace,
+                             StateContext<?> c) {
+      this.key = key;
+      this.namespace = namespace;
+      this.c = c;
+    }
+
+    @Override
+    public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> 
address, Coder<T> coder) {
+      return new SparkValueState<>(namespace, address, coder);
+    }
+
+    @Override
+    public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, 
Coder<T> elemCoder) {
+      return new SparkBagState<>(namespace, address, elemCoder);
+    }
+
+    @Override
+    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+        bindCombiningValue(
+            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            Coder<AccumT> accumCoder,
+            CombineFn<InputT, AccumT, OutputT> combineFn) {
+      return new SparkAccumulatorCombiningState<>(namespace, address, 
accumCoder, key,
+          combineFn.<K>asKeyedFn());
+    }
+
+    @Override
+    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+        bindKeyedCombiningValue(
+            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            Coder<AccumT> accumCoder,
+            KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+      return new SparkAccumulatorCombiningState<>(namespace, address, 
accumCoder, key, combineFn);
+    }
+
+    @Override
+    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT>
+        bindKeyedCombiningValueWithContext(
+            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+            Coder<AccumT> accumCoder,
+            KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> 
combineFn) {
+      return new SparkAccumulatorCombiningState<>(namespace, address, 
accumCoder, key,
+          CombineFnUtil.bindContext(combineFn, c));
+    }
+
+    @Override
+    public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+        StateTag<? super K, WatermarkHoldState<W>> address,
+        OutputTimeFn<? super W> outputTimeFn) {
+      return new SparkWatermarkHoldState<>(namespace, address, outputTimeFn);
+    }
+  }
+
+  private class AbstractState<T> {
+    final StateNamespace namespace;
+    final StateTag<?, ? extends State> address;
+    final Coder<T> coder;
+
+    private AbstractState(
+        StateNamespace namespace,
+        StateTag<?, ? extends State> address,
+        Coder<T> coder) {
+      this.namespace = namespace;
+      this.address = address;
+      this.coder = coder;
+    }
+
+    T readValue() {
+      byte[] buf = stateTable.get(namespace.stringKey(), address.getId());
+      if (buf != null) {
+        return CoderHelpers.fromByteArray(buf, coder);
+      }
+      return null;
+    }
+
+    void writeValue(T input) {
+      stateTable.put(namespace.stringKey(), address.getId(),
+          CoderHelpers.toByteArray(input, coder));
+    }
+
+    public void clear() {
+      stateTable.remove(namespace.stringKey(), address.getId());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      @SuppressWarnings("unchecked")
+      AbstractState<?> that = (AbstractState<?>) o;
+      return namespace.equals(that.namespace) && address.equals(that.address);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
+  private class SparkValueState<T> extends AbstractState<T> implements 
ValueState<T> {
+
+    private SparkValueState(
+            StateNamespace namespace,
+            StateTag<?, ValueState<T>> address,
+            Coder<T> coder) {
+      super(namespace, address, coder);
+    }
+
+    @Override
+    public SparkValueState<T> readLater() {
+      return this;
+    }
+
+    @Override
+    public T read() {
+      return readValue();
+    }
+
+    @Override
+    public void write(T input) {
+      writeValue(input);
+    }
+  }
+
+  private class SparkWatermarkHoldState<W extends BoundedWindow>
+      extends AbstractState<Instant> implements WatermarkHoldState<W> {
+
+    private final OutputTimeFn<? super W> outputTimeFn;
+
+    public SparkWatermarkHoldState(
+        StateNamespace namespace,
+        StateTag<?, WatermarkHoldState<W>> address,
+        OutputTimeFn<? super W> outputTimeFn) {
+      super(namespace, address, InstantCoder.of());
+      this.outputTimeFn = outputTimeFn;
+    }
+
+    @Override
+    public SparkWatermarkHoldState<W> readLater() {
+      return this;
+    }
+
+    @Override
+    public Instant read() {
+      return readValue();
+    }
+
+    @Override
+    public void add(Instant outputTime) {
+      Instant combined = read();
+      combined = (combined == null) ? outputTime : 
outputTimeFn.combine(combined, outputTime);
+      writeValue(combined);
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+        @Override
+        public Boolean read() {
+          return stateTable.get(namespace.stringKey(), address.getId()) == 
null;
+        }
+      };
+    }
+
+    @Override
+    public OutputTimeFn<? super W> getOutputTimeFn() {
+      return outputTimeFn;
+    }
+  }
+
+  private class SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+      extends AbstractState<AccumT>
+          implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+
+    private final K key;
+    private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
+
+    private SparkAccumulatorCombiningState(
+        StateNamespace namespace,
+        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+        Coder<AccumT> coder,
+        K key,
+        KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+      super(namespace, address, coder);
+      this.key = key;
+      this.combineFn = combineFn;
+    }
+
+    @Override
+    public SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT> 
readLater() {
+      return this;
+    }
+
+    @Override
+    public OutputT read() {
+      return combineFn.extractOutput(key, getAccum());
+    }
+
+    @Override
+    public void add(InputT input) {
+      AccumT accum = getAccum();
+      combineFn.addInput(key, accum, input);
+      writeValue(accum);
+    }
+
+    @Override
+    public AccumT getAccum() {
+      AccumT accum = readValue();
+      if (accum == null) {
+        accum = combineFn.createAccumulator(key);
+      }
+      return accum;
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+        @Override
+        public Boolean read() {
+          return stateTable.get(namespace.stringKey(), address.getId()) == 
null;
+        }
+      };
+    }
+
+    @Override
+    public void addAccum(AccumT accum) {
+      accum = combineFn.mergeAccumulators(key, Arrays.asList(getAccum(), 
accum));
+      writeValue(accum);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return combineFn.mergeAccumulators(key, accumulators);
+    }
+
+  }
+
+  private final class SparkBagState<T> extends AbstractState<List<T>> 
implements BagState<T> {
+    private SparkBagState(
+        StateNamespace namespace,
+        StateTag<?, BagState<T>> address,
+        Coder<T> coder) {
+      super(namespace, address, ListCoder.of(coder));
+    }
+
+    @Override
+    public SparkBagState<T> readLater() {
+      return this;
+    }
+
+    @Override
+    public List<T> read() {
+      List<T> value = super.readValue();
+      if (value == null) {
+        value = new ArrayList<>();
+      }
+      return value;
+    }
+
+    @Override
+    public void add(T input) {
+      List<T> value = read();
+      value.add(input);
+      writeValue(value);
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+
+        @Override
+        public Boolean read() {
+          return stateTable.get(namespace.stringKey(), address.getId()) == 
null;
+        }
+      };
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/8c379704/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
new file mode 100644
index 0000000..65225c5
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.stateful;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.spark.broadcast.Broadcast;
+import org.joda.time.Instant;
+
+
+/**
+ * An implementation of {@link TimerInternals} for the SparkRunner.
+ */
+class SparkTimerInternals implements TimerInternals {
+  private final Instant lowWatermark;
+  private final Instant highWatermark;
+  private final Instant synchronizedProcessingTime;
+  private final Set<TimerData> timers = Sets.newHashSet();
+
+  private Instant inputWatermark;
+
+  private SparkTimerInternals(
+      Instant lowWatermark, Instant highWatermark, Instant 
synchronizedProcessingTime) {
+    this.lowWatermark = lowWatermark;
+    this.highWatermark = highWatermark;
+    this.synchronizedProcessingTime = synchronizedProcessingTime;
+  }
+
+  /** Build the {@link TimerInternals} according to the feeding streams. */
+  public static SparkTimerInternals forStreamFromSources(
+      List<Integer> sourceIds,
+      @Nullable Broadcast<Map<Integer, SparkWatermarks>> broadcast) {
+    // if broadcast is invalid for the specific ids, use defaults.
+    if (broadcast == null || broadcast.getValue().isEmpty()
+        || Collections.disjoint(sourceIds, broadcast.getValue().keySet())) {
+      return new SparkTimerInternals(
+          BoundedWindow.TIMESTAMP_MIN_VALUE, 
BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(0));
+    }
+    // there might be more than one stream feeding this stream, slowest WM is 
the right one.
+    Instant slowestLowWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    Instant slowestHighWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    // synchronized processing time should clearly be synchronized.
+    Instant synchronizedProcessingTime = null;
+    for (Integer sourceId: sourceIds) {
+      SparkWatermarks sparkWatermarks = broadcast.getValue().get(sourceId);
+      if (sparkWatermarks != null) {
+        // keep slowest WMs.
+        slowestLowWatermark = 
slowestLowWatermark.isBefore(sparkWatermarks.getLowWatermark())
+            ? slowestLowWatermark : sparkWatermarks.getLowWatermark();
+        slowestHighWatermark = 
slowestHighWatermark.isBefore(sparkWatermarks.getHighWatermark())
+            ? slowestHighWatermark : sparkWatermarks.getHighWatermark();
+        if (synchronizedProcessingTime == null) {
+          // firstime set.
+          synchronizedProcessingTime = 
sparkWatermarks.getSynchronizedProcessingTime();
+        } else {
+          // assert on following.
+          checkArgument(
+              
sparkWatermarks.getSynchronizedProcessingTime().equals(synchronizedProcessingTime),
+              "Synchronized time is expected to keep synchronized across 
sources.");
+        }
+      }
+    }
+    return new SparkTimerInternals(
+        slowestLowWatermark, slowestHighWatermark, synchronizedProcessingTime);
+  }
+
+  Collection<TimerData> getTimers() {
+    return timers;
+  }
+
+  /** This should only be called after processing the element. */
+  Collection<TimerData> getTimersReadyToProcess() {
+    Set<TimerData> toFire = Sets.newHashSet();
+    Iterator<TimerData> iterator = timers.iterator();
+    while (iterator.hasNext()) {
+      TimerData timer = iterator.next();
+      if (timer.getTimestamp().isBefore(inputWatermark)) {
+        toFire.add(timer);
+        iterator.remove();
+      }
+    }
+    return toFire;
+  }
+
+  void addTimers(Collection<TimerData> timers) {
+    this.timers.addAll(timers);
+  }
+
+  @Override
+  public void setTimer(TimerData timer) {
+    this.timers.add(timer);
+  }
+
+  @Override
+  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain 
timeDomain) {
+    throw new UnsupportedOperationException("Deleting a timer by ID is not yet 
supported.");
+  }
+
+  @Override
+  public void deleteTimer(TimerData timer) {
+    this.timers.remove(timer);
+  }
+
+  @Override
+  public Instant currentProcessingTime() {
+    return Instant.now();
+  }
+
+  @Nullable
+  @Override
+  public Instant currentSynchronizedProcessingTime() {
+    return synchronizedProcessingTime;
+  }
+
+  @Override
+  public Instant currentInputWatermarkTime() {
+    return inputWatermark;
+  }
+
+  /** Advances the watermark - since */
+  public void advanceWatermark() {
+    inputWatermark = highWatermark;
+  }
+
+  @Nullable
+  @Override
+  public Instant currentOutputWatermarkTime() {
+    return null;
+  }
+
+  @Override
+  public void setTimer(
+      StateNamespace namespace,
+      String timerId,
+      Instant target,
+      TimeDomain timeDomain) {
+    throw new UnsupportedOperationException("Setting a timer by ID not yet 
supported.");
+  }
+
+  @Override
+  public void deleteTimer(StateNamespace namespace, String timerId) {
+    throw new UnsupportedOperationException("Deleting a timer by ID is not yet 
supported.");
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/8c379704/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java
new file mode 100644
index 0000000..96e6ee5
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.util;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+
+
+/**
+ * Utils to handle late data.
+ */
+public class LateDataUtils {
+
+  /**
+   * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains 
non-late input
+   * elements.
+   * Taken from Thomas Groh's implementation in the DirectRunner's
+   * GroupAlsoByWindowEvaluatorFactory.
+   */
+  public static <K, V> Iterable<WindowedValue<V>> dropExpiredWindows(
+      final K key,
+      Iterable<WindowedValue<V>> elements,
+      final TimerInternals timerInternals,
+      final WindowingStrategy<?, ?> windowingStrategy,
+      final Aggregator<Long, Long> droppedDueToLateness) {
+    return FluentIterable.from(elements)
+        .transformAndConcat(
+            // Explode windows to filter out expired ones
+            new Function<WindowedValue<V>, Iterable<WindowedValue<V>>>() {
+              @Override
+              public Iterable<WindowedValue<V>> apply(@Nullable 
WindowedValue<V> input) {
+                if (input == null) {
+                  return null;
+                }
+                return input.explodeWindows();
+              }
+            })
+        .filter(
+            new Predicate<WindowedValue<V>>() {
+              @Override
+              public boolean apply(@Nullable WindowedValue<V> input) {
+                if (input == null) {
+                  // drop null elements.
+                  return false;
+                }
+                BoundedWindow window = 
Iterables.getOnlyElement(input.getWindows());
+                boolean expired =
+                    window
+                        .maxTimestamp()
+                        .plus(windowingStrategy.getAllowedLateness())
+                        .isBefore(timerInternals.currentInputWatermarkTime());
+                if (expired) {
+                  // The element is too late for this window.
+                  droppedDueToLateness.addValue(1L);
+                  WindowTracing.debug(
+                      "GroupAlsoByWindow: Dropping element at {} for key: {}; "
+                          + "window: {} since it is too far behind 
inputWatermark: {}",
+                      input.getTimestamp(),
+                      key,
+                      window,
+                      timerInternals.currentInputWatermarkTime());
+                }
+                // Keep the element if the window is not expired.
+                return !expired;
+              }
+            });
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/8c379704/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java
new file mode 100644
index 0000000..6de7e86
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.values.PCollectionView;
+
+
+/**
+ * An implementation of a {@link SideInputReader} that actually does not 
support side-inputs.
+ */
+public class UnsupportedSideInputReader implements SideInputReader {
+  private final String transformName;
+
+  public UnsupportedSideInputReader(String transformName) {
+    this.transformName = transformName;
+  }
+
+  @Override
+  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
+    throw new UnsupportedOperationException(
+        String.format("%s does not support side inputs.", transformName));
+  }
+
+  @Override
+  public <T> boolean contains(PCollectionView<T> view) {
+    throw new UnsupportedOperationException(
+        String.format("%s does not support side inputs.", transformName));
+  }
+
+  @Override
+  public boolean isEmpty() {
+    throw new UnsupportedOperationException(
+        String.format("%s does not support side inputs.", transformName));
+  }
+}
\ No newline at end of file

Reply via email to