This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8d05d46f4b2b116dd3bed21566ede230720ccc0f Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Tue Jun 11 14:40:54 2019 +0200 Fix comment about schemas --- .../batch/AggregatorCombinerGlobally.java | 2 +- .../translation/helpers/ReduceFnRunnerHelpers.java | 77 ++++++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java index a03c17e..2f8293b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java @@ -43,7 +43,7 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT> @Override public AccumT reduce(AccumT accumulator, InputT input) { // because of generic type InputT, spark cannot infer an input type. - // it would pass Integer as input if we had a Aggregator<Input, ..., ...> + // it would pass Integer as input if we had a Aggregator<Integer, ..., ...> // without the type inference it stores input in a GenericRowWithSchema Row row = (Row) input; InputT t = RowHelpers.extractObjectFromRow(row); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java new file mode 100644 index 0000000..97a225e --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/ReduceFnRunnerHelpers.java @@ -0,0 +1,77 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.beam.runners.core.InMemoryTimerInternals; +import org.apache.beam.runners.core.OutputWindowedValue; +import org.apache.beam.runners.core.ReduceFnRunner; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +/** + * Helpers to use {@link ReduceFnRunner}. + */ +public class ReduceFnRunnerHelpers<K, InputT, W extends BoundedWindow> { + public static <K, InputT, W extends BoundedWindow> void fireEligibleTimers( + InMemoryTimerInternals timerInternals, + ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner) + throws Exception { + List<TimerInternals.TimerData> timers = new ArrayList<>(); + while (true) { + TimerInternals.TimerData timer; + while ((timer = timerInternals.removeNextEventTimer()) != null) { + timers.add(timer); + } + while ((timer = timerInternals.removeNextProcessingTimer()) != null) { + timers.add(timer); + } + while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) { + timers.add(timer); + } + if (timers.isEmpty()) { + break; + } + reduceFnRunner.onTimers(timers); + timers.clear(); + } + } + + /** + * {@link OutputWindowedValue} for ReduceFnRunner. + * + */ + public static class GABWOutputWindowedValue<K, V> + implements OutputWindowedValue<KV<K, Iterable<V>>> { + private final List<WindowedValue<KV<K, Iterable<V>>>> outputs = new ArrayList<>(); + + @Override + public void outputWindowedValue( + KV<K, Iterable<V>> output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + outputs.add(WindowedValue.of(output, timestamp, windows, pane)); + } + + @Override + public <AdditionalOutputT> void outputWindowedValue( + TupleTag<AdditionalOutputT> tag, + AdditionalOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs."); + } + + public Iterable<WindowedValue<KV<K, Iterable<V>>>> getOutputs() { + return outputs; + } + } + +}