http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java new file mode 100644 index 0000000..b1442dd --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning + * them via the {@link #processElementInReadyWindows(WindowedValue)}. + */ +public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { + private final DoFnRunner<InputT, OutputT> underlying; + private final Collection<PCollectionView<?>> views; + private final ReadyCheckingSideInputReader sideInputReader; + + private Set<BoundedWindow> notReadyWindows; + + public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> create( + DoFnRunner<InputT, OutputT> underlying, + Collection<PCollectionView<?>> views, + ReadyCheckingSideInputReader sideInputReader) { + return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader); + } + + private PushbackSideInputDoFnRunner( + DoFnRunner<InputT, OutputT> underlying, + Collection<PCollectionView<?>> views, + ReadyCheckingSideInputReader sideInputReader) { + this.underlying = underlying; + this.views = views; + this.sideInputReader = sideInputReader; + } + + @Override + public void startBundle() { + notReadyWindows = new HashSet<>(); + underlying.startBundle(); + } + + /** + * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element + * for each window the element is in that is ready. + * + * @param elem the element to process in all ready windows + * @return each element that could not be processed because it requires a side input window + * that is not ready. + */ + public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) { + if (views.isEmpty()) { + processElement(elem); + return Collections.emptyList(); + } + ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder(); + for (WindowedValue<InputT> windowElem : elem.explodeWindows()) { + BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows()); + boolean isReady = !notReadyWindows.contains(mainInputWindow); + for (PCollectionView<?> view : views) { + BoundedWindow sideInputWindow = + view.getWindowingStrategyInternal() + .getWindowFn() + .getSideInputWindow(mainInputWindow); + if (!sideInputReader.isReady(view, sideInputWindow)) { + isReady = false; + break; + } + } + if (isReady) { + processElement(windowElem); + } else { + notReadyWindows.add(mainInputWindow); + pushedBack.add(windowElem); + } + } + return pushedBack.build(); + } + + @Override + public void processElement(WindowedValue<InputT> elem) { + underlying.processElement(elem); + } + + /** + * Call the underlying {@link DoFnRunner#finishBundle()}. + */ + @Override + public void finishBundle() { + notReadyWindows = null; + underlying.finishBundle(); + } +} +
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java new file mode 100644 index 0000000..c5ee1e1 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.StateAccessor; + +import org.joda.time.Instant; + +import java.io.Serializable; + +/** + * Specification for processing to happen after elements have been grouped by key. + * + * @param <K> The type of key being processed. + * @param <InputT> The type of input values associated with the key. + * @param <OutputT> The output type that will be produced for each key. + * @param <W> The type of windows this operates on. + */ +public abstract class ReduceFn<K, InputT, OutputT, W extends BoundedWindow> + implements Serializable { + + /** Information accessible to all the processing methods in this {@code ReduceFn}. */ + public abstract class Context { + /** Return the key that is being processed. */ + public abstract K key(); + + /** The window that is being processed. */ + public abstract W window(); + + /** Access the current {@link WindowingStrategy}. */ + public abstract WindowingStrategy<?, W> windowingStrategy(); + + /** Return the interface for accessing state. */ + public abstract StateAccessor<K> state(); + + /** Return the interface for accessing timers. */ + public abstract Timers timers(); + } + + /** Information accessible within {@link #processValue}. */ + public abstract class ProcessValueContext extends Context { + /** Return the actual value being processed. */ + public abstract InputT value(); + + /** Return the timestamp associated with the value. */ + public abstract Instant timestamp(); + } + + /** Information accessible within {@link #onMerge}. */ + public abstract class OnMergeContext extends Context { + /** Return the interface for accessing state. */ + @Override + public abstract MergingStateAccessor<K, W> state(); + } + + /** Information accessible within {@link #onTrigger}. */ + public abstract class OnTriggerContext extends Context { + /** Returns the {@link PaneInfo} for the trigger firing being processed. */ + public abstract PaneInfo paneInfo(); + + /** Output the given value in the current window. */ + public abstract void output(OutputT value); + } + + ////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Called for each value of type {@code InputT} associated with the current key. + */ + public abstract void processValue(ProcessValueContext c) throws Exception; + + /** + * Called when windows are merged. + */ + public abstract void onMerge(OnMergeContext context) throws Exception; + + /** + * Called when triggers fire. + * + * <p>Implementations of {@link ReduceFn} should call {@link OnTriggerContext#output} to emit + * any results that should be included in the pane produced by this trigger firing. + */ + public abstract void onTrigger(OnTriggerContext context) throws Exception; + + /** + * Called before {@link #onMerge} is invoked to provide an opportunity to prefetch any needed + * state. + * + * @param c Context to use prefetch from. + */ + public void prefetchOnMerge(MergingStateAccessor<K, W> c) throws Exception {} + + /** + * Called before {@link #onTrigger} is invoked to provide an opportunity to prefetch any needed + * state. + * + * @param context Context to use prefetch from. + */ + public void prefetchOnTrigger(StateAccessor<K> context) {} + + /** + * Called to clear any persisted state that the {@link ReduceFn} may be holding. This will be + * called when the windowing is closing and will receive no future interactions. + */ + public abstract void clearState(Context context) throws Exception; + + /** + * Returns true if the there is no buffered state. + */ + public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java new file mode 100644 index 0000000..c90940e --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateContext; +import org.apache.beam.sdk.util.state.StateContexts; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; +import org.apache.beam.sdk.util.state.StateTag; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +import org.joda.time.Instant; + +import java.util.Collection; +import java.util.Map; + +import javax.annotation.Nullable; + +/** + * Factory for creating instances of the various {@link ReduceFn} contexts. + */ +class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { + public interface OnTriggerCallbacks<OutputT> { + void output(OutputT toOutput); + } + + private final K key; + private final ReduceFn<K, InputT, OutputT, W> reduceFn; + private final WindowingStrategy<?, W> windowingStrategy; + private final StateInternals<K> stateInternals; + private final ActiveWindowSet<W> activeWindows; + private final TimerInternals timerInternals; + private final WindowingInternals<?, ?> windowingInternals; + private final PipelineOptions options; + + ReduceFnContextFactory(K key, ReduceFn<K, InputT, OutputT, W> reduceFn, + WindowingStrategy<?, W> windowingStrategy, StateInternals<K> stateInternals, + ActiveWindowSet<W> activeWindows, TimerInternals timerInternals, + WindowingInternals<?, ?> windowingInternals, PipelineOptions options) { + this.key = key; + this.reduceFn = reduceFn; + this.windowingStrategy = windowingStrategy; + this.stateInternals = stateInternals; + this.activeWindows = activeWindows; + this.timerInternals = timerInternals; + this.windowingInternals = windowingInternals; + this.options = options; + } + + /** Where should we look for state associated with a given window? */ + public static enum StateStyle { + /** All state is associated with the window itself. */ + DIRECT, + /** State is associated with the 'state address' windows tracked by the active window set. */ + RENAMED + } + + private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) { + return new StateAccessorImpl<K, W>( + activeWindows, windowingStrategy.getWindowFn().windowCoder(), + stateInternals, StateContexts.createFromComponents(options, windowingInternals, window), + style); + } + + public ReduceFn<K, InputT, OutputT, W>.Context base(W window, StateStyle style) { + return new ContextImpl(stateAccessor(window, style)); + } + + public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue( + W window, InputT value, Instant timestamp, StateStyle style) { + return new ProcessValueContextImpl(stateAccessor(window, style), value, timestamp); + } + + public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window, + ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) { + return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks); + } + + public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge( + Collection<W> activeToBeMerged, W mergeResult, StateStyle style) { + return new OnMergeContextImpl( + new MergingStateAccessorImpl<K, W>(activeWindows, + windowingStrategy.getWindowFn().windowCoder(), + stateInternals, style, activeToBeMerged, mergeResult)); + } + + public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forPremerge(W window) { + return new OnPremergeContextImpl(new PremergingStateAccessorImpl<K, W>( + activeWindows, windowingStrategy.getWindowFn().windowCoder(), stateInternals, window)); + } + + private class TimersImpl implements Timers { + private final StateNamespace namespace; + + public TimersImpl(StateNamespace namespace) { + Preconditions.checkArgument(namespace instanceof WindowNamespace); + this.namespace = namespace; + } + + @Override + public void setTimer(Instant timestamp, TimeDomain timeDomain) { + timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain)); + } + + @Override + public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { + timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain)); + } + + @Override + public Instant currentProcessingTime() { + return timerInternals.currentProcessingTime(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return timerInternals.currentSynchronizedProcessingTime(); + } + + @Override + public Instant currentEventTime() { + return timerInternals.currentInputWatermarkTime(); + } + } + + // ====================================================================== + // StateAccessors + // ====================================================================== + static class StateAccessorImpl<K, W extends BoundedWindow> implements StateAccessor<K> { + + + protected final ActiveWindowSet<W> activeWindows; + protected final StateContext<W> context; + protected final StateNamespace windowNamespace; + protected final Coder<W> windowCoder; + protected final StateInternals<K> stateInternals; + protected final StateStyle style; + + public StateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder, + StateInternals<K> stateInternals, StateContext<W> context, StateStyle style) { + + this.activeWindows = activeWindows; + this.windowCoder = windowCoder; + this.stateInternals = stateInternals; + this.context = checkNotNull(context); + this.windowNamespace = namespaceFor(context.window()); + this.style = style; + } + + protected StateNamespace namespaceFor(W window) { + return StateNamespaces.window(windowCoder, window); + } + + protected StateNamespace windowNamespace() { + return windowNamespace; + } + + W window() { + return context.window(); + } + + StateNamespace namespace() { + return windowNamespace(); + } + + @Override + public <StateT extends State> StateT access(StateTag<? super K, StateT> address) { + switch (style) { + case DIRECT: + return stateInternals.state(windowNamespace(), address, context); + case RENAMED: + return stateInternals.state( + namespaceFor(activeWindows.writeStateAddress(context.window())), address, context); + } + throw new RuntimeException(); // cases are exhaustive. + } + } + + static class MergingStateAccessorImpl<K, W extends BoundedWindow> + extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> { + private final Collection<W> activeToBeMerged; + + public MergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder, + StateInternals<K> stateInternals, StateStyle style, Collection<W> activeToBeMerged, + W mergeResult) { + super(activeWindows, windowCoder, stateInternals, + StateContexts.windowOnly(mergeResult), style); + this.activeToBeMerged = activeToBeMerged; + } + + @Override + public <StateT extends State> StateT access(StateTag<? super K, StateT> address) { + switch (style) { + case DIRECT: + return stateInternals.state(windowNamespace(), address, context); + case RENAMED: + return stateInternals.state( + namespaceFor(activeWindows.mergedWriteStateAddress( + activeToBeMerged, context.window())), + address, + context); + } + throw new RuntimeException(); // cases are exhaustive. + } + + @Override + public <StateT extends State> Map<W, StateT> accessInEachMergingWindow( + StateTag<? super K, StateT> address) { + ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder(); + for (W mergingWindow : activeToBeMerged) { + StateNamespace namespace = null; + switch (style) { + case DIRECT: + namespace = namespaceFor(mergingWindow); + break; + case RENAMED: + namespace = namespaceFor(activeWindows.writeStateAddress(mergingWindow)); + break; + } + Preconditions.checkNotNull(namespace); // cases are exhaustive. + builder.put(mergingWindow, stateInternals.state(namespace, address, context)); + } + return builder.build(); + } + } + + static class PremergingStateAccessorImpl<K, W extends BoundedWindow> + extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> { + public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder, + StateInternals<K> stateInternals, W window) { + super(activeWindows, windowCoder, stateInternals, + StateContexts.windowOnly(window), StateStyle.RENAMED); + } + + Collection<W> mergingWindows() { + return activeWindows.readStateAddresses(context.window()); + } + + @Override + public <StateT extends State> Map<W, StateT> accessInEachMergingWindow( + StateTag<? super K, StateT> address) { + ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder(); + for (W stateAddressWindow : activeWindows.readStateAddresses(context.window())) { + StateT stateForWindow = + stateInternals.state(namespaceFor(stateAddressWindow), address, context); + builder.put(stateAddressWindow, stateForWindow); + } + return builder.build(); + } + } + + // ====================================================================== + // Contexts + // ====================================================================== + + private class ContextImpl extends ReduceFn<K, InputT, OutputT, W>.Context { + private final StateAccessorImpl<K, W> state; + private final TimersImpl timers; + + private ContextImpl(StateAccessorImpl<K, W> state) { + reduceFn.super(); + this.state = state; + this.timers = new TimersImpl(state.namespace()); + } + + @Override + public K key() { + return key; + } + + @Override + public W window() { + return state.window(); + } + + @Override + public WindowingStrategy<?, W> windowingStrategy() { + return windowingStrategy; + } + + @Override + public StateAccessor<K> state() { + return state; + } + + @Override + public Timers timers() { + return timers; + } + } + + private class ProcessValueContextImpl + extends ReduceFn<K, InputT, OutputT, W>.ProcessValueContext { + private final InputT value; + private final Instant timestamp; + private final StateAccessorImpl<K, W> state; + private final TimersImpl timers; + + private ProcessValueContextImpl(StateAccessorImpl<K, W> state, + InputT value, Instant timestamp) { + reduceFn.super(); + this.state = state; + this.value = value; + this.timestamp = timestamp; + this.timers = new TimersImpl(state.namespace()); + } + + @Override + public K key() { + return key; + } + + @Override + public W window() { + return state.window(); + } + + @Override + public WindowingStrategy<?, W> windowingStrategy() { + return windowingStrategy; + } + + @Override + public StateAccessor<K> state() { + return state; + } + + @Override + public InputT value() { + return value; + } + + @Override + public Instant timestamp() { + return timestamp; + } + + @Override + public Timers timers() { + return timers; + } + } + + private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext { + private final StateAccessorImpl<K, W> state; + private final ReadableState<PaneInfo> pane; + private final OnTriggerCallbacks<OutputT> callbacks; + private final TimersImpl timers; + + private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane, + OnTriggerCallbacks<OutputT> callbacks) { + reduceFn.super(); + this.state = state; + this.pane = pane; + this.callbacks = callbacks; + this.timers = new TimersImpl(state.namespace()); + } + + @Override + public K key() { + return key; + } + + @Override + public W window() { + return state.window(); + } + + @Override + public WindowingStrategy<?, W> windowingStrategy() { + return windowingStrategy; + } + + @Override + public StateAccessor<K> state() { + return state; + } + + @Override + public PaneInfo paneInfo() { + return pane.read(); + } + + @Override + public void output(OutputT value) { + callbacks.output(value); + } + + @Override + public Timers timers() { + return timers; + } + } + + private class OnMergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext { + private final MergingStateAccessorImpl<K, W> state; + private final TimersImpl timers; + + private OnMergeContextImpl(MergingStateAccessorImpl<K, W> state) { + reduceFn.super(); + this.state = state; + this.timers = new TimersImpl(state.namespace()); + } + + @Override + public K key() { + return key; + } + + @Override + public WindowingStrategy<?, W> windowingStrategy() { + return windowingStrategy; + } + + @Override + public MergingStateAccessor<K, W> state() { + return state; + } + + @Override + public W window() { + return state.window(); + } + + @Override + public Timers timers() { + return timers; + } + } + + private class OnPremergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext { + private final PremergingStateAccessorImpl<K, W> state; + private final TimersImpl timers; + + private OnPremergeContextImpl(PremergingStateAccessorImpl<K, W> state) { + reduceFn.super(); + this.state = state; + this.timers = new TimersImpl(state.namespace()); + } + + @Override + public K key() { + return key; + } + + @Override + public WindowingStrategy<?, W> windowingStrategy() { + return windowingStrategy; + } + + @Override + public MergingStateAccessor<K, W> state() { + return state; + } + + @Override + public W window() { + return state.window(); + } + + @Override + public Timers timers() { + return timers; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java new file mode 100644 index 0000000..864e8e7 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java @@ -0,0 +1,985 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; +import org.apache.beam.sdk.util.ReduceFnContextFactory.OnTriggerCallbacks; +import org.apache.beam.sdk.util.ReduceFnContextFactory.StateStyle; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; + +/** + * Manages the execution of a {@link ReduceFn} after a {@link GroupByKeyOnly} has partitioned the + * {@link PCollection} by key. + * + * <p>The {@link #onTrigger} relies on a {@link TriggerRunner} to manage the execution of + * the triggering logic. The {@code ReduceFnRunner}s responsibilities are: + * + * <ul> + * <li>Tracking the windows that are active (have buffered data) as elements arrive and + * triggers are fired. + * <li>Holding the watermark based on the timestamps of elements in a pane and releasing it + * when the trigger fires. + * <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger execution, timer + * firings, etc, and providing appropriate contexts to the {@link ReduceFn} for actions + * such as output. + * <li>Scheduling garbage collection of state associated with a specific window, and making that + * happen when the appropriate timer fires. + * </ul> + * + * @param <K> The type of key being processed. + * @param <InputT> The type of values associated with the key. + * @param <OutputT> The output type that will be produced for each key. + * @param <W> The type of windows this operates on. + */ +public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { + + /** + * The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}. + * + * <ul> + * <li>It runs the trigger from the {@link WindowingStrategy}.</li> + * <li>It merges windows according to the {@link WindowingStrategy}.</li> + * <li>It chooses how to track active windows and clear out expired windows + * according to the {@link WindowingStrategy}, based on the allowed lateness and + * whether windows can merge.</li> + * <li>It decides whether to emit empty final panes according to whether the + * {@link WindowingStrategy} requires it.<li> + * <li>It uses discarding or accumulation mode according to the {@link WindowingStrategy}.</li> + * </ul> + */ + private final WindowingStrategy<Object, W> windowingStrategy; + + private final OutputWindowedValue<KV<K, OutputT>> outputter; + + private final StateInternals<K> stateInternals; + + private final Aggregator<Long, Long> droppedDueToClosedWindow; + + private final K key; + + /** + * Track which windows are still active and the 'state address' windows which hold their state. + * + * <ul> + * <li>State: Global map for all active windows for this computation and key. + * <li>Lifetime: Cleared when no active windows need to be tracked. A window lives within + * the active window set until its trigger is closed or the window is garbage collected. + * </ul> + */ + private final ActiveWindowSet<W> activeWindows; + + /** + * Always a {@link SystemReduceFn}. + * + * <ul> + * <li>State: A bag of accumulated values, or the intermediate result of a combiner. + * <li>State style: RENAMED + * <li>Merging: Concatenate or otherwise combine the state from each merged window. + * <li>Lifetime: Cleared when a pane fires if DISCARDING_FIRED_PANES. Otherwise cleared + * when trigger is finished or when the window is garbage collected. + * </ul> + */ + private final ReduceFn<K, InputT, OutputT, W> reduceFn; + + /** + * Manage the setting and firing of timer events. + * + * <ul> + * <li>Merging: End-of-window and garbage collection timers are cancelled when windows are + * merged away. Timers created by triggers are never garbage collected and are left to + * fire and be ignored. + * <li>Lifetime: Timers automatically disappear after they fire. + * </ul> + */ + private final TimerInternals timerInternals; + + /** + * Manage the execution and state for triggers. + * + * <ul> + * <li>State: Tracks which sub-triggers have finished, and any additional state needed to + * determine when the trigger should fire. + * <li>State style: DIRECT + * <li>Merging: Finished bits are explicitly managed. Other state is eagerly merged as + * needed. + * <li>Lifetime: Most trigger state is cleared when the final pane is emitted. However + * the finished bits are left behind and must be cleared when the window is + * garbage collected. + * </ul> + */ + private final TriggerRunner<W> triggerRunner; + + /** + * Store the output watermark holds for each window. + * + * <ul> + * <li>State: Bag of hold timestamps. + * <li>State style: RENAMED + * <li>Merging: Depending on {@link OutputTimeFn}, may need to be recalculated on merging. + * When a pane fires it may be necessary to add (back) an end-of-window or garbage collection + * hold. + * <li>Lifetime: Cleared when a pane fires or when the window is garbage collected. + * </ul> + */ + private final WatermarkHold<W> watermarkHold; + + private final ReduceFnContextFactory<K, InputT, OutputT, W> contextFactory; + + /** + * Store the previously emitted pane (if any) for each window. + * + * <ul> + * <li>State: The previous {@link PaneInfo} passed to the user's {@link DoFn#processElement}, + * if any. + * <li>Style style: DIRECT + * <li>Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}. + * Cleared when window is merged away. + * <li>Lifetime: Cleared when trigger is closed or window is garbage collected. + * </ul> + */ + private final PaneInfoTracker paneInfoTracker; + + /** + * Store whether we've seen any elements for a window since the last pane was emitted. + * + * <ul> + * <li>State: Unless DISCARDING_FIRED_PANES, a count of number of elements added so far. + * <li>State style: RENAMED. + * <li>Merging: Counts are summed when windows are merged. + * <li>Lifetime: Cleared when pane fires or window is garbage collected. + * </ul> + */ + private final NonEmptyPanes<K, W> nonEmptyPanes; + + public ReduceFnRunner( + K key, + WindowingStrategy<?, W> windowingStrategy, + StateInternals<K> stateInternals, + TimerInternals timerInternals, + WindowingInternals<?, KV<K, OutputT>> windowingInternals, + Aggregator<Long, Long> droppedDueToClosedWindow, + ReduceFn<K, InputT, OutputT, W> reduceFn, + PipelineOptions options) { + this.key = key; + this.timerInternals = timerInternals; + this.paneInfoTracker = new PaneInfoTracker(timerInternals); + this.stateInternals = stateInternals; + this.outputter = new OutputViaWindowingInternals<>(windowingInternals); + this.droppedDueToClosedWindow = droppedDueToClosedWindow; + this.reduceFn = reduceFn; + + @SuppressWarnings("unchecked") + WindowingStrategy<Object, W> objectWindowingStrategy = + (WindowingStrategy<Object, W>) windowingStrategy; + this.windowingStrategy = objectWindowingStrategy; + + this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn); + + // Note this may incur I/O to load persisted window set data. + this.activeWindows = createActiveWindowSet(); + + this.contextFactory = + new ReduceFnContextFactory<K, InputT, OutputT, W>(key, reduceFn, this.windowingStrategy, + stateInternals, this.activeWindows, timerInternals, windowingInternals, options); + + this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy); + this.triggerRunner = + new TriggerRunner<>( + windowingStrategy.getTrigger(), + new TriggerContextFactory<>(windowingStrategy, stateInternals, activeWindows)); + } + + private ActiveWindowSet<W> createActiveWindowSet() { + return windowingStrategy.getWindowFn().isNonMerging() + ? new NonMergingActiveWindowSet<W>() + : new MergingActiveWindowSet<W>(windowingStrategy.getWindowFn(), stateInternals); + } + + @VisibleForTesting + boolean isFinished(W window) { + return triggerRunner.isClosed(contextFactory.base(window, StateStyle.DIRECT).state()); + } + + @VisibleForTesting + boolean hasNoActiveWindows() { + return activeWindows.getActiveAndNewWindows().isEmpty(); + } + + /** + * Incorporate {@code values} into the underlying reduce function, and manage holds, timers, + * triggers, and window merging. + * + * <p>The general strategy is: + * <ol> + * <li>Use {@link WindowedValue#getWindows} (itself determined using + * {@link WindowFn#assignWindows}) to determine which windows each element belongs to. Some + * of those windows will already have state associated with them. The rest are considered + * NEW. + * <li>Use {@link WindowFn#mergeWindows} to attempt to merge currently ACTIVE and NEW windows. + * Each NEW window will become either ACTIVE or be discardedL. + * (See {@link ActiveWindowSet} for definitions of these terms.) + * <li>If at all possible, eagerly substitute NEW windows with their ACTIVE state address + * windows before any state is associated with the NEW window. In the common case that + * windows for new elements are merged into existing ACTIVE windows then no additional + * storage or merging overhead will be incurred. + * <li>Otherwise, keep track of the state address windows for ACTIVE windows so that their + * states can be merged on-demand when a pane fires. + * <li>Process the element for each of the windows it's windows have been merged into according + * to {@link ActiveWindowSet}. Processing may require running triggers, setting timers, + * setting holds, and invoking {@link ReduceFn#onTrigger}. + * </ol> + */ + public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception { + // If an incoming element introduces a new window, attempt to merge it into an existing + // window eagerly. + Map<W, W> windowToMergeResult = collectAndMergeWindows(values); + + Set<W> windowsToConsider = new HashSet<>(); + + // Process each element, using the updated activeWindows determined by collectAndMergeWindows. + for (WindowedValue<InputT> value : values) { + windowsToConsider.addAll(processElement(windowToMergeResult, value)); + } + + // Trigger output from any window for which the trigger is ready + for (W mergedWindow : windowsToConsider) { + ReduceFn<K, InputT, OutputT, W>.Context directContext = + contextFactory.base(mergedWindow, StateStyle.DIRECT); + ReduceFn<K, InputT, OutputT, W>.Context renamedContext = + contextFactory.base(mergedWindow, StateStyle.RENAMED); + triggerRunner.prefetchShouldFire(mergedWindow, directContext.state()); + emitIfAppropriate(directContext, renamedContext); + } + + // We're all done with merging and emitting elements so can compress the activeWindow state. + // Any windows which are still NEW must have come in on a new element which was then discarded + // due to the window's trigger being closed. We can thus delete them. + activeWindows.cleanupTemporaryWindows(); + } + + public void persist() { + activeWindows.persist(); + } + + /** + * Extract the windows associated with the values, and invoke merge. Return a map + * from windows to the merge result window. If a window is not in the domain of + * the result map then it did not get merged into a different window. + */ + private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> values) + throws Exception { + // No-op if no merging can take place + if (windowingStrategy.getWindowFn().isNonMerging()) { + return ImmutableMap.of(); + } + + // Collect the windows from all elements (except those which are too late) and + // make sure they are already in the active window set or are added as NEW windows. + for (WindowedValue<?> value : values) { + for (BoundedWindow untypedWindow : value.getWindows()) { + @SuppressWarnings("unchecked") + W window = (W) untypedWindow; + + // For backwards compat with pre 1.4 only. + // We may still have ACTIVE windows with multiple state addresses, representing + // a window who's state has not yet been eagerly merged. + // We'll go ahead and merge that state now so that we don't have to worry about + // this legacy case anywhere else. + if (activeWindows.isActive(window)) { + Set<W> stateAddressWindows = activeWindows.readStateAddresses(window); + if (stateAddressWindows.size() > 1) { + // This is a legacy window who's state has not been eagerly merged. + // Do that now. + ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext = + contextFactory.forPremerge(window); + reduceFn.onMerge(premergeContext); + watermarkHold.onMerge(premergeContext); + activeWindows.merged(window); + } + } + + // Add this window as NEW if it is not currently ACTIVE. + // If we had already seen this window and closed its trigger, then the + // window will not be currently ACTIVE. It will then be added as NEW here, + // and fall into the merging logic as usual. + activeWindows.ensureWindowExists(window); + } + } + + // Merge all of the active windows and retain a mapping from source windows to result windows. + Map<W, W> windowToMergeResult = new HashMap<>(); + activeWindows.merge(new OnMergeCallback(windowToMergeResult)); + return windowToMergeResult; + } + + private class OnMergeCallback implements ActiveWindowSet.MergeCallback<W> { + private final Map<W, W> windowToMergeResult; + + OnMergeCallback(Map<W, W> windowToMergeResult) { + this.windowToMergeResult = windowToMergeResult; + } + + /** + * Return the subset of {@code windows} which are currently ACTIVE. We only need to worry + * about merging state from ACTIVE windows. NEW windows by definition have no existing state. + */ + private List<W> activeWindows(Iterable<W> windows) { + List<W> active = new ArrayList<>(); + for (W window : windows) { + if (activeWindows.isActive(window)) { + active.add(window); + } + } + return active; + } + + /** + * Called from the active window set to indicate {@code toBeMerged} (of which only + * {@code activeToBeMerged} are ACTIVE and thus have state associated with them) will later + * be merged into {@code mergeResult}. + */ + @Override + public void prefetchOnMerge( + Collection<W> toBeMerged, W mergeResult) throws Exception { + List<W> activeToBeMerged = activeWindows(toBeMerged); + ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext = + contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT); + ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext = + contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED); + + // Prefetch various state. + triggerRunner.prefetchForMerge(mergeResult, activeToBeMerged, directMergeContext.state()); + reduceFn.prefetchOnMerge(renamedMergeContext.state()); + watermarkHold.prefetchOnMerge(renamedMergeContext.state()); + nonEmptyPanes.prefetchOnMerge(renamedMergeContext.state()); + } + + /** + * Called from the active window set to indicate {@code toBeMerged} (of which only + * {@code activeToBeMerged} are ACTIVE and thus have state associated with them) are about + * to be merged into {@code mergeResult}. + */ + @Override + public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception { + // Remember we have merged these windows. + for (W window : toBeMerged) { + windowToMergeResult.put(window, mergeResult); + } + + // At this point activeWindows has NOT incorporated the results of the merge. + List<W> activeToBeMerged = activeWindows(toBeMerged); + ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext = + contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT); + ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext = + contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED); + + // Run the reduceFn to perform any needed merging. + reduceFn.onMerge(renamedMergeContext); + + // Merge the watermark holds. + watermarkHold.onMerge(renamedMergeContext); + + // Merge non-empty pane state. + nonEmptyPanes.onMerge(renamedMergeContext.state()); + + // Have the trigger merge state as needed. + triggerRunner.onMerge( + directMergeContext.window(), directMergeContext.timers(), directMergeContext.state()); + + for (W active : activeToBeMerged) { + if (active.equals(mergeResult)) { + // Not merged away. + continue; + } + // Cleanup flavor A: Currently ACTIVE window is about to be merged away. + // Clear any state not already cleared by the onMerge calls above. + WindowTracing.debug("ReduceFnRunner.onMerge: Merging {} into {}", active, mergeResult); + ReduceFn<K, InputT, OutputT, W>.Context directClearContext = + contextFactory.base(active, StateStyle.DIRECT); + // No need for the end-of-window or garbage collection timers. + // We will establish a new end-of-window or garbage collection timer for the mergeResult + // window in processElement below. There must be at least one element for the mergeResult + // window since a new element with a new window must have triggered this onMerge. + cancelEndOfWindowAndGarbageCollectionTimers(directClearContext); + // We no longer care about any previous panes of merged away windows. The + // merge result window gets to start fresh if it is new. + paneInfoTracker.clear(directClearContext.state()); + } + } + } + + /** + * Process an element. + * + * @param value the value being processed + * @return the set of windows in which the element was actually processed + */ + private Collection<W> processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value) + throws Exception { + // Redirect element windows to the ACTIVE windows they have been merged into. + // The compressed representation (value, {window1, window2, ...}) actually represents + // distinct elements (value, window1), (value, window2), ... + // so if window1 and window2 merge, the resulting window will contain both copies + // of the value. + Collection<W> windows = new ArrayList<>(); + for (BoundedWindow untypedWindow : value.getWindows()) { + @SuppressWarnings("unchecked") + W window = (W) untypedWindow; + W mergeResult = windowToMergeResult.get(window); + if (mergeResult == null) { + mergeResult = window; + } + windows.add(mergeResult); + } + + // Prefetch in each of the windows if we're going to need to process triggers + for (W window : windows) { + ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue( + window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT); + triggerRunner.prefetchForValue(window, directContext.state()); + } + + // Process the element for each (mergeResultWindow, not closed) window it belongs to. + List<W> triggerableWindows = new ArrayList<>(windows.size()); + for (W window : windows) { + ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue( + window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT); + if (triggerRunner.isClosed(directContext.state())) { + // This window has already been closed. + droppedDueToClosedWindow.addValue(1L); + WindowTracing.debug( + "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " + + "since window is no longer active at inputWatermark:{}; outputWatermark:{}", + value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + continue; + } + + triggerableWindows.add(window); + activeWindows.ensureWindowIsActive(window); + ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue( + window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED); + + nonEmptyPanes.recordContent(renamedContext.state()); + + // Make sure we've scheduled the end-of-window or garbage collection timer for this window. + Instant timer = scheduleEndOfWindowOrGarbageCollectionTimer(directContext); + + // Hold back progress of the output watermark until we have processed the pane this + // element will be included within. If the element is too late for that, place a hold at + // the end-of-window or garbage collection time to allow empty panes to contribute elements + // which won't be dropped due to lateness by a following computation (assuming the following + // computation uses the same allowed lateness value...) + @Nullable Instant hold = watermarkHold.addHolds(renamedContext); + + if (hold != null) { + // Assert that holds have a proximate timer. + boolean holdInWindow = !hold.isAfter(window.maxTimestamp()); + boolean timerInWindow = !timer.isAfter(window.maxTimestamp()); + Preconditions.checkState( + holdInWindow == timerInWindow, + "set a hold at %s, a timer at %s, which disagree as to whether they are in window %s", + hold, + timer, + directContext.window()); + } + + // Execute the reduceFn, which will buffer the value as appropriate + reduceFn.processValue(renamedContext); + + // Run the trigger to update its state + triggerRunner.processValue( + directContext.window(), + directContext.timestamp(), + directContext.timers(), + directContext.state()); + + // At this point, if triggerRunner.shouldFire before the processValue then + // triggerRunner.shouldFire after the processValue. In other words adding values + // cannot take a trigger state from firing to non-firing. + // (We don't actually assert this since it is too slow.) + } + + return triggerableWindows; + } + + /** + * Called when an end-of-window, garbage collection, or trigger-specific timer fires. + */ + public void onTimer(TimerData timer) throws Exception { + // Which window is the timer for? + Preconditions.checkArgument(timer.getNamespace() instanceof WindowNamespace, + "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace()); + @SuppressWarnings("unchecked") + WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace(); + W window = windowNamespace.getWindow(); + ReduceFn<K, InputT, OutputT, W>.Context directContext = + contextFactory.base(window, StateStyle.DIRECT); + ReduceFn<K, InputT, OutputT, W>.Context renamedContext = + contextFactory.base(window, StateStyle.RENAMED); + + // Has this window had its trigger finish? + // - The trigger may implement isClosed as constant false. + // - If the window function does not support windowing then all windows will be considered + // active. + // So we must take conjunction of activeWindows and triggerRunner state. + boolean windowIsActiveAndOpen = + activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state()); + + if (!windowIsActiveAndOpen) { + WindowTracing.debug( + "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window); + } + + // If this is an end-of-window timer then we may need to set a garbage collection timer + // if allowed lateness is non-zero. + boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() + && timer.getTimestamp().equals(window.maxTimestamp()); + + // If this is a garbage collection timer then we should trigger and garbage collect the window. + // We'll consider any timer at or after the end-of-window time to be a signal to garbage + // collect. + Instant cleanupTime = garbageCollectionTime(window); + boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain() + && !timer.getTimestamp().isBefore(cleanupTime); + + if (isGarbageCollection) { + WindowTracing.debug( + "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with " + + "inputWatermark:{}; outputWatermark:{}", + key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + + if (windowIsActiveAndOpen) { + // We need to call onTrigger to emit the final pane if required. + // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, + // and the watermark has passed the end of the window. + @Nullable Instant newHold = + onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow); + Preconditions.checkState(newHold == null, + "Hold placed at %s despite isFinished being true.", newHold); + } + + // Cleanup flavor B: Clear all the remaining state for this window since we'll never + // see elements for it again. + clearAllState(directContext, renamedContext, windowIsActiveAndOpen); + } else { + WindowTracing.debug( + "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with " + + "inputWatermark:{}; outputWatermark:{}", + key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + if (windowIsActiveAndOpen) { + emitIfAppropriate(directContext, renamedContext); + } + + if (isEndOfWindow) { + // If the window strategy trigger includes a watermark trigger then at this point + // there should be no data holds, either because we'd already cleared them on an + // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate. + // We could assert this but it is very expensive. + + // Since we are processing an on-time firing we should schedule the garbage collection + // timer. (If getAllowedLateness is zero then the timer event will be considered a + // cleanup event and handled by the above). + // Note we must do this even if the trigger is finished so that we are sure to cleanup + // any final trigger finished bits. + Preconditions.checkState( + windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO), + "Unexpected zero getAllowedLateness"); + WindowTracing.debug( + "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with " + + "inputWatermark:{}; outputWatermark:{}", + key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + Preconditions.checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), + "Cleanup time %s is beyond end-of-time", cleanupTime); + directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME); + } + } + } + + /** + * Clear all the state associated with {@code context}'s window. + * Should only be invoked if we know all future elements for this window will be considered + * beyond allowed lateness. + * This is a superset of the clearing done by {@link #emitIfAppropriate} below since: + * <ol> + * <li>We can clear the trigger finished bits since we'll never need to ask if the trigger is + * closed again. + * <li>We can clear any remaining garbage collection hold. + * </ol> + */ + private void clearAllState( + ReduceFn<K, InputT, OutputT, W>.Context directContext, + ReduceFn<K, InputT, OutputT, W>.Context renamedContext, + boolean windowIsActiveAndOpen) + throws Exception { + if (windowIsActiveAndOpen) { + // Since both the window is in the active window set AND the trigger was not yet closed, + // it is possible we still have state. + reduceFn.clearState(renamedContext); + watermarkHold.clearHolds(renamedContext); + nonEmptyPanes.clearPane(renamedContext.state()); + // These calls work irrespective of whether the window is active or not, but + // are unnecessary if the window is not active. + triggerRunner.clearState( + directContext.window(), directContext.timers(), directContext.state()); + paneInfoTracker.clear(directContext.state()); + } else { + // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2). + // For (1), if !activeWindows.isActive then the window must be merging and has been + // explicitly removed by emitIfAppropriate. But in that case the trigger must have fired + // and been closed, so this case reduces to (2). + // For (2), if triggerRunner.isClosed then the trigger was fired and entered the + // closed state. In that case emitIfAppropriate will have cleared all state in + // reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows. + // We also know nonEmptyPanes must have been unconditionally cleared by the trigger. + // Since the trigger fired the existing watermark holds must have been cleared, and since + // the trigger closed no new end of window or garbage collection hold will have been + // placed by WatermarkHold.extractAndRelease. + // Thus all the state clearing above is unnecessary. + // + // But(!) for backwards compatibility we must allow a pipeline to be updated from + // an sdk version <= 1.3. In that case it is possible we have an end-of-window or + // garbage collection hold keyed by the current window (reached via directContext) rather + // than the state address window (reached via renamedContext). + // However this can only happen if: + // - We have merging windows. + // - We are DISCARDING_FIRED_PANES. + // - A pane has fired. + // - But the trigger is not (yet) closed. + if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES + && !windowingStrategy.getWindowFn().isNonMerging()) { + watermarkHold.clearHolds(directContext); + } + } + + // Don't need to track address state windows anymore. + activeWindows.remove(directContext.window()); + // We'll never need to test for the trigger being closed again. + triggerRunner.clearFinished(directContext.state()); + } + + /** Should the reduce function state be cleared? */ + private boolean shouldDiscardAfterFiring(boolean isFinished) { + if (isFinished) { + // This is the last firing for trigger. + return true; + } + if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) { + // Nothing should be accumulated between panes. + return true; + } + return false; + } + + /** + * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state. + */ + private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext, + ReduceFn<K, InputT, OutputT, W>.Context renamedContext) + throws Exception { + if (!triggerRunner.shouldFire( + directContext.window(), directContext.timers(), directContext.state())) { + // Ignore unless trigger is ready to fire + return; + } + + // Inform the trigger of the transition to see if it is finished + triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state()); + boolean isFinished = triggerRunner.isClosed(directContext.state()); + + // Will be able to clear all element state after triggering? + boolean shouldDiscard = shouldDiscardAfterFiring(isFinished); + + // Run onTrigger to produce the actual pane contents. + // As a side effect it will clear all element holds, but not necessarily any + // end-of-window or garbage collection holds. + onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/); + + // Now that we've triggered, the pane is empty. + nonEmptyPanes.clearPane(renamedContext.state()); + + // Cleanup buffered data if appropriate + if (shouldDiscard) { + // Cleanup flavor C: The user does not want any buffered data to persist between panes. + reduceFn.clearState(renamedContext); + } + + if (isFinished) { + // Cleanup flavor D: If trigger is closed we will ignore all new incoming elements. + // Clear state not otherwise cleared by onTrigger and clearPane above. + // Remember the trigger is, indeed, closed until the window is garbage collected. + triggerRunner.clearState( + directContext.window(), directContext.timers(), directContext.state()); + paneInfoTracker.clear(directContext.state()); + activeWindows.remove(directContext.window()); + } + } + + /** + * Do we need to emit a pane? + */ + private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) { + if (!isEmpty) { + // The pane has elements. + return true; + } + if (timing == Timing.ON_TIME) { + // This is the unique ON_TIME pane. + return true; + } + if (isFinished && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_ALWAYS) { + // This is known to be the final pane, and the user has requested it even when empty. + return true; + } + return false; + } + + /** + * Run the {@link ReduceFn#onTrigger} method and produce any necessary output. + * + * @return output watermark hold added, or {@literal null} if none. + */ + @Nullable + private Instant onTrigger( + final ReduceFn<K, InputT, OutputT, W>.Context directContext, + ReduceFn<K, InputT, OutputT, W>.Context renamedContext, + boolean isFinished, boolean isEndOfWindow) + throws Exception { + Instant inputWM = timerInternals.currentInputWatermarkTime(); + + // Prefetch necessary states + ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture = + watermarkHold.extractAndRelease(renamedContext, isFinished).readLater(); + ReadableState<PaneInfo> paneFuture = + paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater(); + ReadableState<Boolean> isEmptyFuture = + nonEmptyPanes.isEmpty(renamedContext.state()).readLater(); + + reduceFn.prefetchOnTrigger(directContext.state()); + triggerRunner.prefetchOnFire(directContext.window(), directContext.state()); + + // Calculate the pane info. + final PaneInfo pane = paneFuture.read(); + // Extract the window hold, and as a side effect clear it. + + WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read(); + final Instant outputTimestamp = pair.oldHold; + @Nullable Instant newHold = pair.newHold; + + if (newHold != null) { + // We can't be finished yet. + Preconditions.checkState( + !isFinished, "new hold at %s but finished %s", newHold, directContext.window()); + // The hold cannot be behind the input watermark. + Preconditions.checkState( + !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM); + if (newHold.isAfter(directContext.window().maxTimestamp())) { + // The hold must be for garbage collection, which can't have happened yet. + Preconditions.checkState( + newHold.isEqual(garbageCollectionTime(directContext.window())), + "new hold %s should be at garbage collection for window %s plus %s", + newHold, + directContext.window(), + windowingStrategy.getAllowedLateness()); + } else { + // The hold must be for the end-of-window, which can't have happened yet. + Preconditions.checkState( + newHold.isEqual(directContext.window().maxTimestamp()), + "new hold %s should be at end of window %s", + newHold, + directContext.window()); + Preconditions.checkState( + !isEndOfWindow, + "new hold at %s for %s but this is the watermark trigger", + newHold, + directContext.window()); + } + } + + // Only emit a pane if it has data or empty panes are observable. + if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) { + // Run reduceFn.onTrigger method. + final List<W> windows = Collections.singletonList(directContext.window()); + ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext = + contextFactory.forTrigger(directContext.window(), paneFuture, StateStyle.RENAMED, + new OnTriggerCallbacks<OutputT>() { + @Override + public void output(OutputT toOutput) { + // We're going to output panes, so commit the (now used) PaneInfo. + // TODO: This is unnecessary if the trigger isFinished since the saved + // state will be immediately deleted. + paneInfoTracker.storeCurrentPaneInfo(directContext, pane); + + // Output the actual value. + outputter.outputWindowedValue( + KV.of(key, toOutput), outputTimestamp, windows, pane); + } + }); + + reduceFn.onTrigger(renamedTriggerContext); + } + + return newHold; + } + + /** + * Make sure we'll eventually have a timer fire which will tell us to garbage collect + * the window state. For efficiency we may need to do this in two steps rather + * than one. Return the time at which the timer will fire. + * + * <ul> + * <li>If allowedLateness is zero then we'll garbage collect at the end of the window. + * For simplicity we'll set our own timer for this situation even though an + * {@link AfterWatermark} trigger may have also set an end-of-window timer. + * ({@code setTimer} is idempotent.) + * <li>If allowedLateness is non-zero then we could just always set a timer for the garbage + * collection time. However if the windows are large (eg hourly) and the allowedLateness is small + * (eg seconds) then we'll end up with nearly twice the number of timers in-flight. So we + * instead set an end-of-window timer and then roll that forward to a garbage collection timer + * when it fires. We use the input watermark to distinguish those cases. + * </ul> + */ + private Instant scheduleEndOfWindowOrGarbageCollectionTimer( + ReduceFn<?, ?, ?, W>.Context directContext) { + Instant inputWM = timerInternals.currentInputWatermarkTime(); + Instant endOfWindow = directContext.window().maxTimestamp(); + String which; + Instant timer; + if (endOfWindow.isBefore(inputWM)) { + timer = garbageCollectionTime(directContext.window()); + which = "garbage collection"; + } else { + timer = endOfWindow; + which = "end-of-window"; + } + WindowTracing.trace( + "ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: Scheduling {} timer at {} for " + + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}", + which, + timer, + key, + directContext.window(), + inputWM, + timerInternals.currentOutputWatermarkTime()); + Preconditions.checkState(!timer.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), + "Timer %s is beyond end-of-time", timer); + directContext.timers().setTimer(timer, TimeDomain.EVENT_TIME); + return timer; + } + + private void cancelEndOfWindowAndGarbageCollectionTimers( + ReduceFn<?, ?, ?, W>.Context directContext) { + WindowTracing.debug( + "ReduceFnRunner.cancelEndOfWindowAndGarbageCollectionTimers: Deleting timers for " + + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}", + key, directContext.window(), timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + Instant eow = directContext.window().maxTimestamp(); + directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME); + Instant gc = garbageCollectionTime(directContext.window()); + if (gc.isAfter(eow)) { + directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME); + } + } + + /** + * Return when {@code window} should be garbage collected. If the window's expiration time is on + * or after the end of the global window, it will be truncated to the end of the global window. + */ + private Instant garbageCollectionTime(W window) { + + // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the + // global window, then we truncate it. The conditional is phrased like it is because the + // addition of EOW + allowed lateness might even overflow the maximum allowed Instant + if (GlobalWindow.INSTANCE + .maxTimestamp() + .minus(windowingStrategy.getAllowedLateness()) + .isBefore(window.maxTimestamp())) { + return GlobalWindow.INSTANCE.maxTimestamp(); + } else { + return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + } + } + + /** + * An object that can output a value with all of its windowing information. This is a deliberately + * restricted subinterface of {@link WindowingInternals} to express how it is used here. + */ + private interface OutputWindowedValue<OutputT> { + void outputWindowedValue(OutputT output, Instant timestamp, + Collection<? extends BoundedWindow> windows, PaneInfo pane); + } + + private static class OutputViaWindowingInternals<OutputT> + implements OutputWindowedValue<OutputT> { + + private final WindowingInternals<?, OutputT> windowingInternals; + + public OutputViaWindowingInternals(WindowingInternals<?, OutputT> windowingInternals) { + this.windowingInternals = windowingInternals; + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + windowingInternals.outputWindowedValue(output, timestamp, windows, pane); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java new file mode 100644 index 0000000..78377c8 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.DoFnRunners.OutputManager; +import org.apache.beam.sdk.util.ExecutionContext.StepContext; +import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator; +import org.apache.beam.sdk.values.TupleTag; + +import java.util.List; + +/** + * Runs a {@link DoFn} by constructing the appropriate contexts and passing them in. + * + * @param <InputT> the type of the DoFn's (main) input elements + * @param <OutputT> the type of the DoFn's (main) output elements + */ +public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, OutputT>{ + + protected SimpleDoFnRunner(PipelineOptions options, DoFn<InputT, OutputT> fn, + SideInputReader sideInputReader, + OutputManager outputManager, + TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext, + AddCounterMutator addCounterMutator, WindowingStrategy<?, ?> windowingStrategy) { + super(options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext, + addCounterMutator, windowingStrategy); + } + + @Override + protected void invokeProcessElement(WindowedValue<InputT> elem) { + final DoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem); + // This can contain user code. Wrap it in case it throws an exception. + try { + fn.processElement(processContext); + } catch (Exception ex) { + throw wrapUserCodeException(ex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java new file mode 100644 index 0000000..2eeee54 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateMerging; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; + +/** + * {@link ReduceFn} implementing the default reduction behaviors of {@link GroupByKey}. + * + * @param <K> The type of key being processed. + * @param <InputT> The type of values associated with the key. + * @param <OutputT> The output type that will be produced for each key. + * @param <W> The type of windows this operates on. + */ +public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends BoundedWindow> + extends ReduceFn<K, InputT, OutputT, W> { + private static final String BUFFER_NAME = "buf"; + + /** + * Create a factory that produces {@link SystemReduceFn} instances that that buffer all of the + * input values in persistent state and produces an {@code Iterable<T>}. + */ + public static <K, T, W extends BoundedWindow> SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W> + buffering(final Coder<T> inputCoder) { + final StateTag<Object, BagState<T>> bufferTag = + StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, inputCoder)); + return new SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>(bufferTag) { + @Override + public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception { + StateMerging.prefetchBags(state, bufferTag); + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + StateMerging.mergeBags(c.state(), bufferTag); + } + }; + } + + /** + * Create a factory that produces {@link SystemReduceFn} instances that combine all of the input + * values using a {@link CombineFn}. + */ + public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> SystemReduceFn<K, InputT, + AccumT, OutputT, W> + combining( + final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) { + final StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> bufferTag; + if (combineFn.getFn() instanceof KeyedCombineFnWithContext) { + bufferTag = StateTags.makeSystemTagInternal( + StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext( + BUFFER_NAME, combineFn.getAccumulatorCoder(), + (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) combineFn.getFn())); + + } else { + bufferTag = StateTags.makeSystemTagInternal( + StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValue( + BUFFER_NAME, combineFn.getAccumulatorCoder(), + (KeyedCombineFn<K, InputT, AccumT, OutputT>) combineFn.getFn())); + } + return new SystemReduceFn<K, InputT, AccumT, OutputT, W>(bufferTag) { + @Override + public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception { + StateMerging.prefetchCombiningValues(state, bufferTag); + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + StateMerging.mergeCombiningValues(c.state(), bufferTag); + } + }; + } + + private StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag; + + public SystemReduceFn( + StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag) { + this.bufferTag = bufferTag; + } + + @Override + public void processValue(ProcessValueContext c) throws Exception { + c.state().access(bufferTag).add(c.value()); + } + + @Override + public void prefetchOnTrigger(StateAccessor<K> state) { + state.access(bufferTag).readLater(); + } + + @Override + public void onTrigger(OnTriggerContext c) throws Exception { + c.output(c.state().access(bufferTag).read()); + } + + @Override + public void clearState(Context c) throws Exception { + c.state().access(bufferTag).clear(); + } + + @Override + public ReadableState<Boolean> isEmpty(StateAccessor<K> state) { + return state.access(bufferTag).isEmpty(); + } +}