Move StateMerging to runners/core-java
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c0047f7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c0047f7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c0047f7 Branch: refs/heads/master Commit: 9c0047f74635319e28813dd992d5aa3923dac713 Parents: 459a8f8 Author: Kenneth Knowles <k...@google.com> Authored: Fri Feb 3 19:40:46 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Feb 7 11:42:54 2017 -0800 ---------------------------------------------------------------------- .../utils/ApexStateInternalsTest.java | 2 +- .../apache/beam/runners/core/NonEmptyPanes.java | 1 - .../apache/beam/runners/core/StateMerging.java | 267 +++++++++++++++++++ .../beam/runners/core/SystemReduceFn.java | 1 - .../apache/beam/runners/core/WatermarkHold.java | 1 - .../AfterDelayFromFirstElementStateMachine.java | 2 +- .../core/triggers/AfterPaneStateMachine.java | 2 +- .../core/InMemoryStateInternalsTest.java | 1 - .../streaming/FlinkStateInternalsTest.java | 2 +- .../src/main/resources/beam/findbugs-filter.xml | 2 +- .../beam/sdk/util/state/StateMerging.java | 259 ------------------ 11 files changed, 272 insertions(+), 268 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java index 75f648b..d6a4515 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertThat; import com.datatorrent.lib.util.KryoCloneUtils; import java.util.Arrays; +import org.apache.beam.runners.core.StateMerging; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaceForTest; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -36,7 +37,6 @@ import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateMerging; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.ValueState; http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java index 0a6fd93..4d67c66 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.MergingStateAccessor; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateMerging; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java new file mode 100644 index 0000000..c85458c --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkState; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.joda.time.Instant; + +/** + * Helpers for merging state. + */ +public class StateMerging { + /** + * Clear all state in {@code address} in all windows under merge (even result windows) + * in {@code context}. + */ + public static <K, StateT extends State, W extends BoundedWindow> void clear( + MergingStateAccessor<K, W> context, StateTag<? super K, StateT> address) { + for (StateT state : context.accessInEachMergingWindow(address).values()) { + state.clear(); + } + } + + /** + * Prefetch all bag state in {@code address} across all windows under merge in + * {@code context}, except for the bag state in the final state address window which we can + * blindly append to. + */ + public static <K, T, W extends BoundedWindow> void prefetchBags( + MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) { + Map<W, BagState<T>> map = context.accessInEachMergingWindow(address); + if (map.isEmpty()) { + // Nothing to prefetch. + return; + } + BagState<T> result = context.access(address); + // Prefetch everything except what's already in result. + for (BagState<T> source : map.values()) { + if (!source.equals(result)) { + prefetchRead(source); + } + } + } + + /** + * Merge all bag state in {@code address} across all windows under merge. + */ + public static <K, T, W extends BoundedWindow> void mergeBags( + MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) { + mergeBags(context.accessInEachMergingWindow(address).values(), context.access(address)); + } + + /** + * Merge all bag state in {@code sources} (which may include {@code result}) into {@code result}. + */ + public static <T, W extends BoundedWindow> void mergeBags( + Collection<BagState<T>> sources, BagState<T> result) { + if (sources.isEmpty()) { + // Nothing to merge. + return; + } + // Prefetch everything except what's already in result. + List<ReadableState<Iterable<T>>> futures = new ArrayList<>(sources.size()); + for (BagState<T> source : sources) { + if (!source.equals(result)) { + prefetchRead(source); + futures.add(source); + } + } + if (futures.isEmpty()) { + // Result already holds all the values. + return; + } + // Transfer from sources to result. + for (ReadableState<Iterable<T>> future : futures) { + for (T element : future.read()) { + result.add(element); + } + } + // Clear sources except for result. + for (BagState<T> source : sources) { + if (!source.equals(result)) { + source.clear(); + } + } + } + + /** + * Prefetch all combining value state for {@code address} across all merging windows in {@code + * context}. + */ + public static <K, StateT extends CombiningState<?, ?>, W extends BoundedWindow> void + prefetchCombiningValues(MergingStateAccessor<K, W> context, + StateTag<? super K, StateT> address) { + for (StateT state : context.accessInEachMergingWindow(address).values()) { + prefetchRead(state); + } + } + + /** + * Merge all value state in {@code address} across all merging windows in {@code context}. + */ + public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues( + MergingStateAccessor<K, W> context, + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address) { + mergeCombiningValues( + context.accessInEachMergingWindow(address).values(), context.access(address)); + } + + /** + * Merge all value state from {@code sources} (which may include {@code result}) into + * {@code result}. + */ + public static <InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues( + Collection<AccumulatorCombiningState<InputT, AccumT, OutputT>> sources, + AccumulatorCombiningState<InputT, AccumT, OutputT> result) { + if (sources.isEmpty()) { + // Nothing to merge. + return; + } + if (sources.size() == 1 && sources.contains(result)) { + // Result already holds combined value. + return; + } + // Prefetch. + List<ReadableState<AccumT>> futures = new ArrayList<>(sources.size()); + for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) { + prefetchRead(source); + } + // Read. + List<AccumT> accumulators = new ArrayList<>(futures.size()); + for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) { + accumulators.add(source.getAccum()); + } + // Merge (possibly update and return one of the existing accumulators). + AccumT merged = result.mergeAccumulators(accumulators); + // Clear sources. + for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) { + source.clear(); + } + // Update result. + result.addAccum(merged); + } + + /** + * Prefetch all watermark state for {@code address} across all merging windows in + * {@code context}. + */ + public static <K, W extends BoundedWindow> void prefetchWatermarks( + MergingStateAccessor<K, W> context, + StateTag<? super K, WatermarkHoldState<W>> address) { + Map<W, WatermarkHoldState<W>> map = context.accessInEachMergingWindow(address); + WatermarkHoldState<W> result = context.access(address); + if (map.isEmpty()) { + // Nothing to prefetch. + return; + } + if (map.size() == 1 && map.values().contains(result) + && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) { + // Nothing to change. + return; + } + if (result.getOutputTimeFn().dependsOnlyOnWindow()) { + // No need to read existing holds. + return; + } + // Prefetch. + for (WatermarkHoldState<W> source : map.values()) { + prefetchRead(source); + } + } + + private static void prefetchRead(ReadableState<?> source) { + source.readLater(); + } + + /** + * Merge all watermark state in {@code address} across all merging windows in {@code context}, + * where the final merge result window is {@code mergeResult}. + */ + public static <K, W extends BoundedWindow> void mergeWatermarks( + MergingStateAccessor<K, W> context, + StateTag<? super K, WatermarkHoldState<W>> address, + W mergeResult) { + mergeWatermarks( + context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult); + } + + /** + * Merge all watermark state in {@code sources} (which must include {@code result} if non-empty) + * into {@code result}, where the final merge result window is {@code mergeResult}. + */ + public static <W extends BoundedWindow> void mergeWatermarks( + Collection<WatermarkHoldState<W>> sources, WatermarkHoldState<W> result, + W resultWindow) { + if (sources.isEmpty()) { + // Nothing to merge. + return; + } + if (sources.size() == 1 && sources.contains(result) + && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) { + // Nothing to merge. + return; + } + if (result.getOutputTimeFn().dependsOnlyOnWindow()) { + // Clear sources. + for (WatermarkHoldState<W> source : sources) { + source.clear(); + } + // Update directly from window-derived hold. + Instant hold = result.getOutputTimeFn().assignOutputTime( + BoundedWindow.TIMESTAMP_MIN_VALUE, resultWindow); + checkState(hold.isAfter(BoundedWindow.TIMESTAMP_MIN_VALUE)); + result.add(hold); + } else { + // Prefetch. + List<ReadableState<Instant>> futures = new ArrayList<>(sources.size()); + for (WatermarkHoldState<W> source : sources) { + futures.add(source); + } + // Read. + List<Instant> outputTimesToMerge = new ArrayList<>(sources.size()); + for (ReadableState<Instant> future : futures) { + Instant sourceOutputTime = future.read(); + if (sourceOutputTime != null) { + outputTimesToMerge.add(sourceOutputTime); + } + } + // Clear sources. + for (WatermarkHoldState<W> source : sources) { + source.clear(); + } + if (!outputTimesToMerge.isEmpty()) { + // Merge and update. + result.add(result.getOutputTimeFn().merge(resultWindow, outputTimesToMerge)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java index 6c12bad..4c05949 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MergingStateAccessor; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateMerging; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index a7968db..727d275 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.MergingStateAccessor; import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateMerging; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.WatermarkHoldState; http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java index b60c690..cc4a8ae 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Locale; import java.util.Objects; import javax.annotation.Nullable; +import org.apache.beam.runners.core.StateMerging; import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.InstantCoder; @@ -33,7 +34,6 @@ import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MergingStateAccessor; import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateMerging; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java index d8ad370..b2798aa 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.core.triggers; import java.util.Objects; +import org.apache.beam.runners.core.StateMerging; import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.VarLongCoder; @@ -25,7 +26,6 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.MergingStateAccessor; import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateMerging; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java index 8bcd177..ca0a8e5 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateMerging; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.ValueState; http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java index 5784b68..6a086a7 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertThat; import java.nio.ByteBuffer; import java.util.Arrays; +import org.apache.beam.runners.core.StateMerging; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaceForTest; import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkStateInternals; @@ -39,7 +40,6 @@ import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateMerging; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.ValueState; http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 91ab9be..24eaec2 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -189,7 +189,7 @@ </Match> <Match> - <Class name="org.apache.beam.sdk.util.state.StateMerging"/> + <Class name="org.apache.beam.runners.core.StateMerging"/> <Method name="prefetchRead" /> <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/> <!-- prefetch call readLater --> http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateMerging.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateMerging.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateMerging.java deleted file mode 100644 index 457b213..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateMerging.java +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import static com.google.common.base.Preconditions.checkState; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.joda.time.Instant; - -/** - * Helpers for merging state. - */ -public class StateMerging { - /** - * Clear all state in {@code address} in all windows under merge (even result windows) - * in {@code context}. - */ - public static <K, StateT extends State, W extends BoundedWindow> void clear( - MergingStateAccessor<K, W> context, StateTag<? super K, StateT> address) { - for (StateT state : context.accessInEachMergingWindow(address).values()) { - state.clear(); - } - } - - /** - * Prefetch all bag state in {@code address} across all windows under merge in - * {@code context}, except for the bag state in the final state address window which we can - * blindly append to. - */ - public static <K, T, W extends BoundedWindow> void prefetchBags( - MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) { - Map<W, BagState<T>> map = context.accessInEachMergingWindow(address); - if (map.isEmpty()) { - // Nothing to prefetch. - return; - } - BagState<T> result = context.access(address); - // Prefetch everything except what's already in result. - for (BagState<T> source : map.values()) { - if (!source.equals(result)) { - prefetchRead(source); - } - } - } - - /** - * Merge all bag state in {@code address} across all windows under merge. - */ - public static <K, T, W extends BoundedWindow> void mergeBags( - MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) { - mergeBags(context.accessInEachMergingWindow(address).values(), context.access(address)); - } - - /** - * Merge all bag state in {@code sources} (which may include {@code result}) into {@code result}. - */ - public static <T, W extends BoundedWindow> void mergeBags( - Collection<BagState<T>> sources, BagState<T> result) { - if (sources.isEmpty()) { - // Nothing to merge. - return; - } - // Prefetch everything except what's already in result. - List<ReadableState<Iterable<T>>> futures = new ArrayList<>(sources.size()); - for (BagState<T> source : sources) { - if (!source.equals(result)) { - prefetchRead(source); - futures.add(source); - } - } - if (futures.isEmpty()) { - // Result already holds all the values. - return; - } - // Transfer from sources to result. - for (ReadableState<Iterable<T>> future : futures) { - for (T element : future.read()) { - result.add(element); - } - } - // Clear sources except for result. - for (BagState<T> source : sources) { - if (!source.equals(result)) { - source.clear(); - } - } - } - - /** - * Prefetch all combining value state for {@code address} across all merging windows in {@code - * context}. - */ - public static <K, StateT extends CombiningState<?, ?>, W extends BoundedWindow> void - prefetchCombiningValues(MergingStateAccessor<K, W> context, - StateTag<? super K, StateT> address) { - for (StateT state : context.accessInEachMergingWindow(address).values()) { - prefetchRead(state); - } - } - - /** - * Merge all value state in {@code address} across all merging windows in {@code context}. - */ - public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues( - MergingStateAccessor<K, W> context, - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address) { - mergeCombiningValues( - context.accessInEachMergingWindow(address).values(), context.access(address)); - } - - /** - * Merge all value state from {@code sources} (which may include {@code result}) into - * {@code result}. - */ - public static <InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues( - Collection<AccumulatorCombiningState<InputT, AccumT, OutputT>> sources, - AccumulatorCombiningState<InputT, AccumT, OutputT> result) { - if (sources.isEmpty()) { - // Nothing to merge. - return; - } - if (sources.size() == 1 && sources.contains(result)) { - // Result already holds combined value. - return; - } - // Prefetch. - List<ReadableState<AccumT>> futures = new ArrayList<>(sources.size()); - for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) { - prefetchRead(source); - } - // Read. - List<AccumT> accumulators = new ArrayList<>(futures.size()); - for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) { - accumulators.add(source.getAccum()); - } - // Merge (possibly update and return one of the existing accumulators). - AccumT merged = result.mergeAccumulators(accumulators); - // Clear sources. - for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) { - source.clear(); - } - // Update result. - result.addAccum(merged); - } - - /** - * Prefetch all watermark state for {@code address} across all merging windows in - * {@code context}. - */ - public static <K, W extends BoundedWindow> void prefetchWatermarks( - MergingStateAccessor<K, W> context, - StateTag<? super K, WatermarkHoldState<W>> address) { - Map<W, WatermarkHoldState<W>> map = context.accessInEachMergingWindow(address); - WatermarkHoldState<W> result = context.access(address); - if (map.isEmpty()) { - // Nothing to prefetch. - return; - } - if (map.size() == 1 && map.values().contains(result) - && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) { - // Nothing to change. - return; - } - if (result.getOutputTimeFn().dependsOnlyOnWindow()) { - // No need to read existing holds. - return; - } - // Prefetch. - for (WatermarkHoldState<W> source : map.values()) { - prefetchRead(source); - } - } - - private static void prefetchRead(ReadableState<?> source) { - source.readLater(); - } - - /** - * Merge all watermark state in {@code address} across all merging windows in {@code context}, - * where the final merge result window is {@code mergeResult}. - */ - public static <K, W extends BoundedWindow> void mergeWatermarks( - MergingStateAccessor<K, W> context, - StateTag<? super K, WatermarkHoldState<W>> address, - W mergeResult) { - mergeWatermarks( - context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult); - } - - /** - * Merge all watermark state in {@code sources} (which must include {@code result} if non-empty) - * into {@code result}, where the final merge result window is {@code mergeResult}. - */ - public static <W extends BoundedWindow> void mergeWatermarks( - Collection<WatermarkHoldState<W>> sources, WatermarkHoldState<W> result, - W resultWindow) { - if (sources.isEmpty()) { - // Nothing to merge. - return; - } - if (sources.size() == 1 && sources.contains(result) - && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) { - // Nothing to merge. - return; - } - if (result.getOutputTimeFn().dependsOnlyOnWindow()) { - // Clear sources. - for (WatermarkHoldState<W> source : sources) { - source.clear(); - } - // Update directly from window-derived hold. - Instant hold = result.getOutputTimeFn().assignOutputTime( - BoundedWindow.TIMESTAMP_MIN_VALUE, resultWindow); - checkState(hold.isAfter(BoundedWindow.TIMESTAMP_MIN_VALUE)); - result.add(hold); - } else { - // Prefetch. - List<ReadableState<Instant>> futures = new ArrayList<>(sources.size()); - for (WatermarkHoldState<W> source : sources) { - futures.add(source); - } - // Read. - List<Instant> outputTimesToMerge = new ArrayList<>(sources.size()); - for (ReadableState<Instant> future : futures) { - Instant sourceOutputTime = future.read(); - if (sourceOutputTime != null) { - outputTimesToMerge.add(sourceOutputTime); - } - } - // Clear sources. - for (WatermarkHoldState<W> source : sources) { - source.clear(); - } - if (!outputTimesToMerge.isEmpty()) { - // Merge and update. - result.add(result.getOutputTimeFn().merge(resultWindow, outputTimesToMerge)); - } - } - } -}