Remove KeyedCombineFn
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e04924e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e04924e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e04924e Branch: refs/heads/master Commit: 7e04924ee7b31e28326f761618173749a55789d0 Parents: a198f8d Author: Kenneth Knowles <k...@google.com> Authored: Fri Apr 21 14:04:02 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Sun Apr 30 18:17:42 2017 -0700 ---------------------------------------------------------------------- .../translation/utils/ApexStateInternals.java | 38 +- .../runners/core/GlobalCombineFnRunner.java | 78 +++ .../runners/core/GlobalCombineFnRunners.java | 193 ++++++ .../runners/core/InMemoryStateInternals.java | 50 +- .../runners/core/PerKeyCombineFnRunner.java | 79 --- .../runners/core/PerKeyCombineFnRunners.java | 161 ----- .../org/apache/beam/runners/core/StateTag.java | 18 +- .../org/apache/beam/runners/core/StateTags.java | 43 +- .../beam/runners/core/SystemReduceFn.java | 15 +- .../beam/runners/core/ReduceFnRunnerTest.java | 36 +- .../beam/runners/core/ReduceFnTester.java | 15 +- .../apache/beam/runners/core/StateTagTest.java | 22 +- .../CopyOnAccessInMemoryStateInternals.java | 66 +- .../CopyOnAccessInMemoryStateInternalsTest.java | 34 - .../flink/FlinkBatchTransformTranslators.java | 9 +- .../functions/AbstractFlinkCombineRunner.java | 44 +- .../FlinkMergingNonShuffleReduceFunction.java | 10 +- .../functions/FlinkPartialReduceFunction.java | 6 +- .../functions/FlinkReduceFunction.java | 10 +- .../functions/SortingFlinkCombineRunner.java | 1 - .../state/FlinkBroadcastStateInternals.java | 173 ++--- .../state/FlinkKeyGroupStateInternals.java | 119 ++-- .../state/FlinkSplitStateInternals.java | 119 ++-- .../streaming/state/FlinkStateInternals.java | 173 ++--- .../spark/stateful/SparkStateInternals.java | 40 +- .../spark/translation/SparkKeyedCombineFn.java | 26 +- .../spark/translation/TransformTranslator.java | 44 +- .../streaming/StreamingTransformTranslator.java | 4 +- .../runners/spark/SparkRunnerDebuggerTest.java | 7 +- .../src/main/resources/beam/findbugs-filter.xml | 2 +- .../sdk/transforms/ApproximateQuantiles.java | 8 +- .../beam/sdk/transforms/ApproximateUnique.java | 3 +- .../org/apache/beam/sdk/transforms/Combine.java | 672 +++++-------------- .../beam/sdk/transforms/CombineFnBase.java | 136 ---- .../apache/beam/sdk/transforms/CombineFns.java | 448 +------------ .../beam/sdk/transforms/CombineWithContext.java | 174 +---- .../org/apache/beam/sdk/transforms/Top.java | 6 +- .../org/apache/beam/sdk/transforms/View.java | 2 +- .../apache/beam/sdk/util/AppliedCombineFn.java | 35 +- .../org/apache/beam/sdk/util/CombineFnUtil.java | 123 ++-- .../apache/beam/sdk/util/state/StateBinder.java | 19 +- .../apache/beam/sdk/util/state/StateSpecs.java | 177 ++--- .../beam/sdk/transforms/CombineFnsTest.java | 114 ++-- .../apache/beam/sdk/transforms/CombineTest.java | 213 +++--- .../apache/beam/sdk/transforms/ParDoTest.java | 2 +- .../apache/beam/sdk/transforms/ViewTest.java | 2 +- .../apache/beam/sdk/util/CombineFnUtilTest.java | 18 +- 47 files changed, 1178 insertions(+), 2609 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java index ec8f666..e682894 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java @@ -42,8 +42,7 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CoderUtils; @@ -145,7 +144,7 @@ public class ApexStateInternals<K> implements StateInternals<K> { address, accumCoder, key, - combineFn.<K>asKeyedFn() + combineFn ); } @@ -158,24 +157,11 @@ public class ApexStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> - bindKeyedCombiningValue( + bindCombiningValueWithContext( StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, - KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return new ApexCombiningState<>( - namespace, - address, - accumCoder, - key, combineFn); - } - - @Override - public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> - bindKeyedCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { - return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); + CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { + return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); } } @@ -323,12 +309,12 @@ public class ApexStateInternals<K> implements StateInternals<K> { extends AbstractState<AccumT> implements CombiningState<InputT, AccumT, OutputT> { private final K key; - private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn; + private final CombineFn<InputT, AccumT, OutputT> combineFn; private ApexCombiningState(StateNamespace namespace, StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> coder, - K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { + K key, CombineFn<InputT, AccumT, OutputT> combineFn) { super(namespace, address, coder); this.key = key; this.combineFn = combineFn; @@ -341,13 +327,13 @@ public class ApexStateInternals<K> implements StateInternals<K> { @Override public OutputT read() { - return combineFn.extractOutput(key, getAccum()); + return combineFn.extractOutput(getAccum()); } @Override public void add(InputT input) { AccumT accum = getAccum(); - combineFn.addInput(key, accum, input); + combineFn.addInput(accum, input); writeValue(accum); } @@ -355,7 +341,7 @@ public class ApexStateInternals<K> implements StateInternals<K> { public AccumT getAccum() { AccumT accum = readValue(); if (accum == null) { - accum = combineFn.createAccumulator(key); + accum = combineFn.createAccumulator(); } return accum; } @@ -376,13 +362,13 @@ public class ApexStateInternals<K> implements StateInternals<K> { @Override public void addAccum(AccumT accum) { - accum = combineFn.mergeAccumulators(key, Arrays.asList(getAccum(), accum)); + accum = combineFn.mergeAccumulators(Arrays.asList(getAccum(), accum)); writeValue(accum); } @Override public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { - return combineFn.mergeAccumulators(key, accumulators); + return combineFn.mergeAccumulators(accumulators); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java new file mode 100644 index 0000000..5325ba6 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.io.Serializable; +import java.util.Collection; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.SideInputReader; + +/** + * An interface that runs a {@link GlobalCombineFn} with unified APIs. + * + * <p>Different combine functions have their own implementations. For example, the implementation + * can skip allocating {@code Combine.Context}, if the combine function doesn't use it. + */ +public interface GlobalCombineFnRunner<InputT, AccumT, OutputT> extends Serializable { + /** + * Forwards the call to a {@link GlobalCombineFn} to create the accumulator. + * + * <p>It constructs a {@code CombineWithContext.Context} from + * {@link PipelineOptions} and {@link SideInputReader} if it is required. + */ + AccumT createAccumulator(PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows); + + /** + * Forwards the call to a {@link GlobalCombineFn} to add the input. + * + * <p>It constructs a {@code CombineWithContext.Context} from + * {@link PipelineOptions} and {@link SideInputReader} if it is required. + */ + AccumT addInput(AccumT accumulator, InputT value, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows); + + /** + * Forwards the call to a {@link GlobalCombineFn} to merge accumulators. + * + * <p>It constructs a {@code CombineWithContext.Context} from + * {@link PipelineOptions} and {@link SideInputReader} if it is required. + */ + AccumT mergeAccumulators(Iterable<AccumT> accumulators, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows); + + /** + * Forwards the call to a {@link GlobalCombineFn} to extract the output. + * + * <p>It constructs a {@code CombineWithContext.Context} from + * {@link PipelineOptions} and {@link SideInputReader} if it is required. + */ + OutputT extractOutput(AccumT accumulator, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows); + + /** + * Forwards the call to a {@link GlobalCombineFn} to compact the accumulator. + * + * <p>It constructs a {@code CombineWithContext.Context} from + * {@link PipelineOptions} and {@link SideInputReader} if it is required. + */ + AccumT compact(AccumT accumulator, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows); +} http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java new file mode 100644 index 0000000..d45b503 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.collect.Iterables; +import java.util.Collection; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.CombineContextFactory; +import org.apache.beam.sdk.util.SideInputReader; + +/** + * Static utility methods that provide {@link GlobalCombineFnRunner} implementations for different + * combine functions. + */ +public class GlobalCombineFnRunners { + /** Returns a {@link GlobalCombineFnRunner} from a {@link GlobalCombineFn}. */ + public static <InputT, AccumT, OutputT> GlobalCombineFnRunner<InputT, AccumT, OutputT> create( + GlobalCombineFn<InputT, AccumT, OutputT> globalCombineFn) { + if (globalCombineFn instanceof CombineFnWithContext) { + return new CombineFnWithContextRunner<>( + (CombineFnWithContext<InputT, AccumT, OutputT>) globalCombineFn); + } else if (globalCombineFn instanceof CombineFn) { + return new CombineFnRunner<>((CombineFn<InputT, AccumT, OutputT>) globalCombineFn); + } else { + throw new IllegalStateException( + String.format("Unknown type of CombineFn: %s", globalCombineFn.getClass())); + } + } + + /** + * An implementation of {@link GlobalCombineFnRunner} with {@link CombineFn}. + * + * <p>It forwards functions calls to the {@link CombineFn}. + */ + private static class CombineFnRunner<InputT, AccumT, OutputT> + implements org.apache.beam.runners.core.GlobalCombineFnRunner<InputT, AccumT, OutputT> { + private final CombineFn<InputT, AccumT, OutputT> combineFn; + + private CombineFnRunner(CombineFn<InputT, AccumT, OutputT> combineFn) { + this.combineFn = combineFn; + } + + @Override + public String toString() { + return combineFn.toString(); + } + + @Override + public AccumT createAccumulator( + PipelineOptions options, + SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return combineFn.createAccumulator(); + } + + @Override + public AccumT addInput( + AccumT accumulator, + InputT input, + PipelineOptions options, + SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return combineFn.addInput(accumulator, input); + } + + @Override + public AccumT mergeAccumulators( + Iterable<AccumT> accumulators, + PipelineOptions options, + SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return combineFn.mergeAccumulators(accumulators); + } + + @Override + public OutputT extractOutput( + AccumT accumulator, + PipelineOptions options, + SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return combineFn.extractOutput(accumulator); + } + + @Override + public AccumT compact( + AccumT accumulator, + PipelineOptions options, + SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return combineFn.compact(accumulator); + } + } + + /** + * An implementation of {@link org.apache.beam.runners.core.GlobalCombineFnRunner} with {@link + * CombineFnWithContext}. + * + * <p>It forwards functions calls to the {@link CombineFnWithContext}. + */ + private static class CombineFnWithContextRunner<InputT, AccumT, OutputT> + implements org.apache.beam.runners.core.GlobalCombineFnRunner<InputT, AccumT, OutputT> { + private final CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext; + + private CombineFnWithContextRunner( + CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) { + this.combineFnWithContext = combineFnWithContext; + } + + @Override + public String toString() { + return combineFnWithContext.toString(); + } + + @Override + public AccumT createAccumulator( + PipelineOptions options, + SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return combineFnWithContext.createAccumulator( + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + + @Override + public AccumT addInput( + AccumT accumulator, + InputT input, + PipelineOptions options, + SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return combineFnWithContext.addInput( + accumulator, + input, + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + + @Override + public AccumT mergeAccumulators( + Iterable<AccumT> accumulators, + PipelineOptions options, + SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return combineFnWithContext.mergeAccumulators( + accumulators, + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + + @Override + public OutputT extractOutput( + AccumT accumulator, + PipelineOptions options, + SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return combineFnWithContext.extractOutput( + accumulator, + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + + @Override + public AccumT compact( + AccumT accumulator, + PipelineOptions options, + SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return combineFnWithContext.compact( + accumulator, + CombineContextFactory.createFromComponents( + options, sideInputReader, Iterables.getOnlyElement(windows))); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java index 9fb8e3f..2c02282 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java @@ -31,8 +31,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CombineFnUtil; @@ -152,7 +151,7 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, final CombineFn<InputT, AccumT, OutputT> combineFn) { - return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, combineFn.<K>asKeyedFn()); + return new InMemoryCombiningState<>(combineFn); } @Override @@ -164,20 +163,11 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> - bindKeyedCombiningValue( + bindCombiningValueWithContext( StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, - KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return new InMemoryCombiningState<K, InputT, AccumT, OutputT>(key, combineFn); - } - - @Override - public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> - bindKeyedCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { - return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); + CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { + return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); } } @@ -310,23 +300,21 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { /** * An {@link InMemoryState} implementation of {@link CombiningState}. */ - public static final class InMemoryCombiningState<K, InputT, AccumT, OutputT> + public static final class InMemoryCombiningState<InputT, AccumT, OutputT> implements CombiningState<InputT, AccumT, OutputT>, - InMemoryState<InMemoryCombiningState<K, InputT, AccumT, OutputT>> { - private final K key; + InMemoryState<InMemoryCombiningState<InputT, AccumT, OutputT>> { private boolean isCleared = true; - private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn; + private final CombineFn<InputT, AccumT, OutputT> combineFn; private AccumT accum; public InMemoryCombiningState( - K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - this.key = key; + CombineFn<InputT, AccumT, OutputT> combineFn) { this.combineFn = combineFn; - accum = combineFn.createAccumulator(key); + accum = combineFn.createAccumulator(); } @Override - public InMemoryCombiningState<K, InputT, AccumT, OutputT> readLater() { + public InMemoryCombiningState<InputT, AccumT, OutputT> readLater() { return this; } @@ -334,19 +322,19 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { public void clear() { // Even though we're clearing we can't remove this from the in-memory state map, since // other users may already have a handle on this CombiningValue. - accum = combineFn.createAccumulator(key); + accum = combineFn.createAccumulator(); isCleared = true; } @Override public OutputT read() { - return combineFn.extractOutput(key, accum); + return combineFn.extractOutput(accum); } @Override public void add(InputT input) { isCleared = false; - accum = combineFn.addInput(key, accum, input); + accum = combineFn.addInput(accum, input); } @Override @@ -371,12 +359,12 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { @Override public void addAccum(AccumT accum) { isCleared = false; - this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum)); + this.accum = combineFn.mergeAccumulators(Arrays.asList(this.accum, accum)); } @Override public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { - return combineFn.mergeAccumulators(key, accumulators); + return combineFn.mergeAccumulators(accumulators); } @Override @@ -385,9 +373,9 @@ public class InMemoryStateInternals<K> implements StateInternals<K> { } @Override - public InMemoryCombiningState<K, InputT, AccumT, OutputT> copy() { - InMemoryCombiningState<K, InputT, AccumT, OutputT> that = - new InMemoryCombiningState<>(key, combineFn); + public InMemoryCombiningState<InputT, AccumT, OutputT> copy() { + InMemoryCombiningState<InputT, AccumT, OutputT> that = + new InMemoryCombiningState<>(combineFn); if (!this.isCleared) { that.isCleared = this.isCleared; that.addAccum(accum); http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java deleted file mode 100644 index a6608a7..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunner.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import java.io.Serializable; -import java.util.Collection; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.SideInputReader; - -/** - * An interface that runs a {@link PerKeyCombineFn} with unified APIs. - * - * <p>Different keyed combine functions have their own implementations. - * For example, the implementation can skip allocating {@code Combine.Context}, - * if the keyed combine function doesn't use it. - */ -public interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Serializable { - /** - * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator. - * - * <p>It constructs a {@code CombineWithContext.Context} from - * {@link PipelineOptions} and {@link SideInputReader} if it is required. - */ - AccumT createAccumulator(K key, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows); - - /** - * Forwards the call to a {@link PerKeyCombineFn} to add the input. - * - * <p>It constructs a {@code CombineWithContext.Context} from - * {@link PipelineOptions} and {@link SideInputReader} if it is required. - */ - AccumT addInput(K key, AccumT accumulator, InputT value, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows); - - /** - * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators. - * - * <p>It constructs a {@code CombineWithContext.Context} from - * {@link PipelineOptions} and {@link SideInputReader} if it is required. - */ - AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows); - - /** - * Forwards the call to a {@link PerKeyCombineFn} to extract the output. - * - * <p>It constructs a {@code CombineWithContext.Context} from - * {@link PipelineOptions} and {@link SideInputReader} if it is required. - */ - OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows); - - /** - * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator. - * - * <p>It constructs a {@code CombineWithContext.Context} from - * {@link PipelineOptions} and {@link SideInputReader} if it is required. - */ - AccumT compact(K key, AccumT accumulator, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows); -} http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java deleted file mode 100644 index 7736758..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PerKeyCombineFnRunners.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import com.google.common.collect.Iterables; -import java.util.Collection; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.CombineContextFactory; -import org.apache.beam.sdk.util.SideInputReader; - -/** - * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations - * for different keyed combine functions. - */ -public class PerKeyCombineFnRunners { - /** - * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}. - */ - public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> - create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) { - if (perKeyCombineFn instanceof KeyedCombineFnWithContext) { - return new KeyedCombineFnWithContextRunner<>( - (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn); - } else if (perKeyCombineFn instanceof KeyedCombineFn) { - return new KeyedCombineFnRunner<>( - (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn); - } else { - throw new IllegalStateException( - String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass())); - } - } - - /** - * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}. - * - * <p>It forwards functions calls to the {@link KeyedCombineFn}. - */ - private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT> - implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> { - private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn; - - private KeyedCombineFnRunner( - KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) { - this.keyedCombineFn = keyedCombineFn; - } - - @Override - public String toString() { - return keyedCombineFn.toString(); - } - - @Override - public AccumT createAccumulator(K key, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.createAccumulator(key); - } - - @Override - public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.addInput(key, accumulator, input); - } - - @Override - public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.mergeAccumulators(key, accumulators); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.extractOutput(key, accumulator); - } - - @Override - public AccumT compact(K key, AccumT accumulator, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFn.compact(key, accumulator); - } - } - - /** - * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}. - * - * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}. - */ - private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT> - implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> { - private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext; - - private KeyedCombineFnWithContextRunner( - KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) { - this.keyedCombineFnWithContext = keyedCombineFnWithContext; - } - - @Override - public String toString() { - return keyedCombineFnWithContext.toString(); - } - - @Override - public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader, - Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.createAccumulator(key, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - - @Override - public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.addInput(key, accumulator, input, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - - @Override - public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.mergeAccumulators(key, accumulators, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.extractOutput(key, accumulator, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - - @Override - public AccumT compact(K key, AccumT accumulator, PipelineOptions options, - SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return keyedCombineFnWithContext.compact(key, accumulator, - CombineContextFactory.createFromComponents( - options, sideInputReader, Iterables.getOnlyElement(windows))); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java index a5d262a..aaeecf0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java @@ -23,8 +23,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; @@ -100,17 +99,10 @@ public interface StateTag<K, StateT extends State> extends Serializable { CombineFn<InputT, AccumT, OutputT> combineFn); <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn); - - <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> - combineFn); + CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext( + StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + CombineFnWithContext<InputT, AccumT, OutputT> combineFn); /** * Bind to a watermark {@link StateSpec}. http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java index 2b3f4b8..fe99f27 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java @@ -26,8 +26,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.state.BagState; @@ -90,22 +89,12 @@ public class StateTags { @Override public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombining( + CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext( String id, StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, - KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return binder.bindKeyedCombiningValue(tagForSpec(id, spec), accumCoder, combineFn); - } - - @Override - public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext( - String id, - StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { - return binder.bindKeyedCombiningValueWithContext( + CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { + return binder.bindCombiningValueWithContext( tagForSpec(id, spec), accumCoder, combineFn); } @@ -162,29 +151,17 @@ public class StateTags { } /** - * Create a state tag for values that use a {@link KeyedCombineFn} to automatically merge - * multiple {@code InputT}s into a single {@code OutputT}. - */ - public static <K, InputT, AccumT, - OutputT> StateTag<K, CombiningState<InputT, AccumT, OutputT>> - keyedCombiningValue(String id, Coder<AccumT> accumCoder, - KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { - return new SimpleStateTag<>( - new StructuredId(id), StateSpecs.keyedCombining(accumCoder, combineFn)); - } - - /** - * Create a state tag for values that use a {@link KeyedCombineFnWithContext} to automatically + * Create a state tag for values that use a {@link CombineFnWithContext} to automatically * merge multiple {@code InputT}s into a single {@code OutputT}. */ - public static <K, InputT, AccumT, OutputT> - StateTag<K, CombiningState<InputT, AccumT, OutputT>> - keyedCombiningValueWithContext( + public static <InputT, AccumT, OutputT> + StateTag<Object, CombiningState<InputT, AccumT, OutputT>> + combiningValueWithContext( String id, Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { + CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { return new SimpleStateTag<>( - new StructuredId(id), StateSpecs.keyedCombiningWithContext(accumCoder, combineFn)); + new StructuredId(id), StateSpecs.combining(accumCoder, combineFn)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java index f618d88..86a7fd7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java @@ -20,8 +20,7 @@ package org.apache.beam.runners.core; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.AppliedCombineFn; @@ -71,18 +70,18 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound AccumT, OutputT, W> combining( final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) { - final StateTag<K, CombiningState<InputT, AccumT, OutputT>> bufferTag; - if (combineFn.getFn() instanceof KeyedCombineFnWithContext) { + final StateTag<Object, CombiningState<InputT, AccumT, OutputT>> bufferTag; + if (combineFn.getFn() instanceof CombineFnWithContext) { bufferTag = StateTags.makeSystemTagInternal( - StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext( + StateTags.<InputT, AccumT, OutputT>combiningValueWithContext( BUFFER_NAME, combineFn.getAccumulatorCoder(), - (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) combineFn.getFn())); + (CombineFnWithContext<InputT, AccumT, OutputT>) combineFn.getFn())); } else { bufferTag = StateTags.makeSystemTagInternal( - StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValue( + StateTags.<InputT, AccumT, OutputT>combiningValue( BUFFER_NAME, combineFn.getAccumulatorCoder(), - (KeyedCombineFn<K, InputT, AccumT, OutputT>) combineFn.getFn())); + (CombineFn<InputT, AccumT, OutputT>) combineFn.getFn())); } return new SystemReduceFn<K, InputT, AccumT, OutputT, W>(bufferTag) { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 44bc538..ec2e7a3 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -218,7 +218,7 @@ public class ReduceFnRunnerTest { ReduceFnTester.combining( strategy, mockTriggerStateMachine, - Sum.ofIntegers().<String>asKeyedFn(), + Sum.ofIntegers(), VarIntCoder.of()); injectElement(tester, 2); @@ -291,7 +291,7 @@ public class ReduceFnRunnerTest { ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester - .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of()); + .combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); tester.injectElements(TimestampedValue.of(13, elementTimestamp)); @@ -323,7 +323,7 @@ public class ReduceFnRunnerTest { ReduceFnTester.combining( strategy, mockTriggerStateMachine, - Sum.ofIntegers().<String>asKeyedFn(), + Sum.ofIntegers(), VarIntCoder.of()); injectElement(tester, 1); @@ -387,9 +387,14 @@ public class ReduceFnRunnerTest { }); SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, expectedValue); - ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining( - mainInputWindowingStrategy, mockTriggerStateMachine, combineFn.<String>asKeyedFn(), - VarIntCoder.of(), options, mockSideInputReader); + ReduceFnTester<Integer, Integer, IntervalWindow> tester = + ReduceFnTester.combining( + mainInputWindowingStrategy, + mockTriggerStateMachine, + combineFn, + VarIntCoder.of(), + options, + mockSideInputReader); when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); for (int i = 0; i < 8; ++i) { @@ -1062,12 +1067,13 @@ public class ReduceFnRunnerTest { */ @Test public void testDropDataMultipleWindowsFinishedTrigger() throws Exception { - ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining( - WindowingStrategy.of( - SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30))) - .withTrigger(AfterWatermark.pastEndOfWindow()) - .withAllowedLateness(Duration.millis(1000)), - Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of()); + ReduceFnTester<Integer, Integer, IntervalWindow> tester = + ReduceFnTester.combining( + WindowingStrategy.of(SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30))) + .withTrigger(AfterWatermark.pastEndOfWindow()) + .withAllowedLateness(Duration.millis(1000)), + Sum.ofIntegers(), + VarIntCoder.of()); tester.injectElements( // assigned to [-60, 40), [-30, 70), [0, 100) @@ -1209,8 +1215,7 @@ public class ReduceFnRunnerTest { .withAllowedLateness(Duration.millis(100)); ReduceFnTester<Integer, Integer, IntervalWindow> tester = - ReduceFnTester - .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of()); + ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); tester.advanceInputWatermark(new Instant(0)); tester.advanceProcessingTime(new Instant(0)); @@ -1265,8 +1270,7 @@ public class ReduceFnRunnerTest { .withAllowedLateness(Duration.millis(100)); ReduceFnTester<Integer, Integer, IntervalWindow> tester = - ReduceFnTester - .combining(strategy, Sum.ofIntegers().<String>asKeyedFn(), VarIntCoder.of()); + ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); tester.advanceInputWatermark(new Instant(0)); tester.advanceProcessingTime(new Instant(0)); http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index b5b5492..dfb769f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -53,8 +53,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -170,13 +169,13 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { /** * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} and - * {@link KeyedCombineFn}, creating a {@link TriggerStateMachine} from the + * {@link CombineFn}, creating a {@link TriggerStateMachine} from the * {@link Trigger} in the {@link WindowingStrategy}. */ public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W> combining( WindowingStrategy<?, W> strategy, - KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn, + CombineFn<Integer, AccumT, OutputT> combineFn, Coder<OutputT> outputCoder) throws Exception { @@ -194,7 +193,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { /** * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy}, - * {@link KeyedCombineFn}, and {@link TriggerStateMachine}, for mocking the interaction + * {@link CombineFn}, and {@link TriggerStateMachine}, for mocking the interaction * between {@link ReduceFnRunner} and the {@link TriggerStateMachine}. * Ignores the {@link Trigger} in the {@link WindowingStrategy}. */ @@ -202,7 +201,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { ReduceFnTester<Integer, OutputT, W> combining( WindowingStrategy<?, W> strategy, TriggerStateMachine triggerStateMachine, - KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn, + CombineFn<Integer, AccumT, OutputT> combineFn, Coder<OutputT> outputCoder) throws Exception { @@ -223,7 +222,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W> combining( WindowingStrategy<?, W> strategy, - KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn, + CombineFnWithContext<Integer, AccumT, OutputT> combineFn, Coder<OutputT> outputCoder, PipelineOptions options, SideInputReader sideInputReader) @@ -246,7 +245,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { ReduceFnTester<Integer, OutputT, W> combining( WindowingStrategy<?, W> strategy, TriggerStateMachine triggerStateMachine, - KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn, + CombineFnWithContext<Integer, AccumT, OutputT> combineFn, Coder<OutputT> outputCoder, PipelineOptions options, SideInputReader sideInputReader) http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java index 10dcb62..9a8b75c 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java @@ -162,17 +162,17 @@ public class StateTagTest { Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of()); Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of()); - StateTag<?, ?> fooCoder1Max1 = StateTags.keyedCombiningValueWithContext( - "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).<String>asKeyedFn()); - StateTag<?, ?> fooCoder1Max2 = StateTags.keyedCombiningValueWithContext( - "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn()); - StateTag<?, ?> fooCoder1Min = StateTags.keyedCombiningValueWithContext( - "foo", accum1, CombineFnUtil.toFnWithContext(minFn).asKeyedFn()); - - StateTag<?, ?> fooCoder2Max = StateTags.keyedCombiningValueWithContext( - "foo", accum2, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn()); - StateTag<?, ?> barCoder1Max = StateTags.keyedCombiningValueWithContext( - "bar", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn()); + StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueWithContext( + "foo", accum1, CombineFnUtil.toFnWithContext(maxFn)); + StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueWithContext( + "foo", accum1, CombineFnUtil.toFnWithContext(maxFn)); + StateTag<?, ?> fooCoder1Min = StateTags.combiningValueWithContext( + "foo", accum1, CombineFnUtil.toFnWithContext(minFn)); + + StateTag<?, ?> fooCoder2Max = StateTags.combiningValueWithContext( + "foo", accum2, CombineFnUtil.toFnWithContext(maxFn)); + StateTag<?, ?> barCoder1Max = StateTags.combiningValueWithContext( + "bar", accum1, CombineFnUtil.toFnWithContext(maxFn)); // Same name, coder and combineFn assertEquals(fooCoder1Max1, fooCoder1Max2); http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java index 068b37f..92d87b5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java @@ -40,8 +40,7 @@ import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTag.StateBinder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CombineFnUtil; @@ -283,11 +282,10 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> @SuppressWarnings("unchecked") InMemoryState<? extends WatermarkHoldState> existingState = (InMemoryState<? extends WatermarkHoldState>) - underlying.get().get(namespace, address, c); + underlying.get().get(namespace, address, c); return existingState.copy(); } else { - return new InMemoryWatermarkHold<>( - timestampCombiner); + return new InMemoryWatermarkHold<>(timestampCombiner); } } @@ -298,7 +296,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> @SuppressWarnings("unchecked") InMemoryState<? extends ValueState<T>> existingState = (InMemoryState<? extends ValueState<T>>) - underlying.get().get(namespace, address, c); + underlying.get().get(namespace, address, c); return existingState.copy(); } else { return new InMemoryValue<>(); @@ -306,10 +304,11 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> } @Override - public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> - bindCombiningValue( + public <InputT, AccumT, OutputT> + CombiningState<InputT, AccumT, OutputT> bindCombiningValue( StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { + Coder<AccumT> accumCoder, + CombineFn<InputT, AccumT, OutputT> combineFn) { if (containedInUnderlying(namespace, address)) { @SuppressWarnings("unchecked") InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> existingState = @@ -317,8 +316,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> underlying.get().get(namespace, address, c); return existingState.copy(); } else { - return new InMemoryCombiningState<>( - key, combineFn.asKeyedFn()); + return new InMemoryCombiningState<>(combineFn); } } @@ -329,7 +327,7 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> @SuppressWarnings("unchecked") InMemoryState<? extends BagState<T>> existingState = (InMemoryState<? extends BagState<T>>) - underlying.get().get(namespace, address, c); + underlying.get().get(namespace, address, c); return existingState.copy(); } else { return new InMemoryBag<>(); @@ -353,7 +351,8 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> @Override public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( StateTag<? super K, MapState<KeyT, ValueT>> address, - Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) { + Coder<KeyT> mapKeyCoder, + Coder<ValueT> mapValueCoder) { if (containedInUnderlying(namespace, address)) { @SuppressWarnings("unchecked") InMemoryState<? extends MapState<KeyT, ValueT>> existingState = @@ -366,30 +365,12 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> } @Override - public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> - bindKeyedCombiningValue( + public <InputT, AccumT, OutputT> + CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext( StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, - KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - if (containedInUnderlying(namespace, address)) { - @SuppressWarnings("unchecked") - InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>> existingState = - (InMemoryState<? extends CombiningState<InputT, AccumT, OutputT>>) - underlying.get().get(namespace, address, c); - return existingState.copy(); - } else { - return new InMemoryCombiningState<>(key, combineFn); - } - } - - @Override - public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> - bindKeyedCombiningValueWithContext( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { - return bindKeyedCombiningValue( - address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); + CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { + return bindCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); } }; } @@ -475,20 +456,11 @@ public class CopyOnAccessInMemoryStateInternals<K> implements StateInternals<K> @Override public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> - bindKeyedCombiningValue( - StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return underlying.get(namespace, address, c); - } - - @Override - public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> - bindKeyedCombiningValueWithContext( + bindCombiningValueWithContext( StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { - return bindKeyedCombiningValue( + CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { + return bindCombiningValue( address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); } }; http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java index f0aeece..4d04745 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java @@ -40,7 +40,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; @@ -251,39 +250,6 @@ public class CopyOnAccessInMemoryStateInternalsTest { } @Test - public void testKeyedAccumulatorCombiningStateWithUnderlying() throws Exception { - CopyOnAccessInMemoryStateInternals<String> underlying = - CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); - KeyedCombineFn<String, Long, long[], Long> sumLongFn = Sum.ofLongs().asKeyedFn(); - - StateNamespace namespace = new StateNamespaceForTest("foo"); - CoderRegistry reg = pipeline.getCoderRegistry(); - StateTag<String, CombiningState<Long, long[], Long>> stateTag = - StateTags.keyedCombiningValue( - "summer", - sumLongFn.getAccumulatorCoder( - reg, StringUtf8Coder.of(), reg.getDefaultCoder(Long.class)), - sumLongFn); - GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag); - assertThat(underlyingValue.read(), equalTo(0L)); - - underlyingValue.add(1L); - assertThat(underlyingValue.read(), equalTo(1L)); - - CopyOnAccessInMemoryStateInternals<String> internals = - CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); - GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag); - assertThat(copyOnAccessState.read(), equalTo(1L)); - - copyOnAccessState.add(4L); - assertThat(copyOnAccessState.read(), equalTo(5L)); - assertThat(underlyingValue.read(), equalTo(1L)); - - GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag); - assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read())); - } - - @Test public void testWatermarkHoldStateWithUnderlying() { CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index 99de5be..6a7689a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -188,8 +188,7 @@ class FlinkBatchTransformTranslators { DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn = - new Concatenate<InputT>().asKeyedFn(); + Combine.CombineFn<InputT, List<InputT>, List<InputT>> combineFn = new Concatenate<>(); KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) context.getInput(transform).getCoder(); @@ -200,7 +199,6 @@ class FlinkBatchTransformTranslators { accumulatorCoder = combineFn.getAccumulatorCoder( context.getInput(transform).getPipeline().getCoderRegistry(), - inputCoder.getKeyCoder(), inputCoder.getValueCoder()); } catch (CannotProvideCoderException e) { throw new RuntimeException(e); @@ -337,8 +335,8 @@ class FlinkBatchTransformTranslators { DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn = - (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn(); + CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn = + (CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT>) transform.getFn(); KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) context.getInput(transform).getCoder(); @@ -349,7 +347,6 @@ class FlinkBatchTransformTranslators { accumulatorCoder = combineFn.getAccumulatorCoder( context.getInput(transform).getPipeline().getCoderRegistry(), - inputCoder.getKeyCoder(), inputCoder.getValueCoder()); } catch (CannotProvideCoderException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java index 83ff70d..6e27057 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java @@ -19,8 +19,8 @@ package org.apache.beam.runners.flink.translation.functions; import com.google.common.collect.ImmutableList; import java.util.Collection; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.runners.core.GlobalCombineFnRunner; +import org.apache.beam.runners.core.GlobalCombineFnRunners; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -53,7 +53,7 @@ public abstract class AbstractFlinkCombineRunner< Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception; /** - * Adapter interface that allows using a {@link CombineFnBase.PerKeyCombineFn} to either produce + * Adapter interface that allows using a {@link CombineFnBase.GlobalCombineFn} to either produce * the {@code AccumT} as output or to combine several accumulators into an {@code OutputT}. * The former would be used for a partial combine while the latter is used for the final merging * of accumulators. @@ -72,17 +72,17 @@ public abstract class AbstractFlinkCombineRunner< } /** - * A straight wrapper of {@link CombineFnBase.PerKeyCombineFn} that takes in {@code InputT} + * A straight wrapper of {@link CombineFnBase.GlobalCombineFn} that takes in {@code InputT} * and emits {@code OutputT}. */ public static class CompleteFlinkCombiner<K, InputT, AccumT, OutputT> implements FlinkCombiner<K, InputT, AccumT, OutputT> { - private final PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner; + private final GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFnRunner; public CompleteFlinkCombiner( - CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn) { - combineFnRunner = PerKeyCombineFnRunners.create(combineFn); + CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn) { + combineFnRunner = GlobalCombineFnRunners.create(combineFn); } @Override @@ -90,22 +90,22 @@ public abstract class AbstractFlinkCombineRunner< K key, InputT value, PipelineOptions options, SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { AccumT accumulator = - combineFnRunner.createAccumulator(key, options, sideInputReader, windows); - return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows); + combineFnRunner.createAccumulator(options, sideInputReader, windows); + return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows); } @Override public AccumT addInput( K key, AccumT accumulator, InputT value, PipelineOptions options, SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows); + return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows); } @Override public OutputT extractOutput( K key, AccumT accumulator, PipelineOptions options, SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return combineFnRunner.extractOutput(key, accumulator, options, sideInputReader, windows); + return combineFnRunner.extractOutput(accumulator, options, sideInputReader, windows); } } @@ -115,10 +115,10 @@ public abstract class AbstractFlinkCombineRunner< public static class PartialFlinkCombiner<K, InputT, AccumT> implements FlinkCombiner<K, InputT, AccumT, AccumT> { - private final PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner; + private final GlobalCombineFnRunner<InputT, AccumT, ?> combineFnRunner; - public PartialFlinkCombiner(CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn) { - combineFnRunner = PerKeyCombineFnRunners.create(combineFn); + public PartialFlinkCombiner(CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn) { + combineFnRunner = GlobalCombineFnRunners.create(combineFn); } @Override @@ -126,15 +126,15 @@ public abstract class AbstractFlinkCombineRunner< K key, InputT value, PipelineOptions options, SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { AccumT accumulator = - combineFnRunner.createAccumulator(key, options, sideInputReader, windows); - return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows); + combineFnRunner.createAccumulator(options, sideInputReader, windows); + return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows); } @Override public AccumT addInput( K key, AccumT accumulator, InputT value, PipelineOptions options, SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows); + return combineFnRunner.addInput(accumulator, value, options, sideInputReader, windows); } @Override @@ -151,10 +151,10 @@ public abstract class AbstractFlinkCombineRunner< public static class FinalFlinkCombiner<K, AccumT, OutputT> implements FlinkCombiner<K, AccumT, AccumT, OutputT> { - private final PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner; + private final GlobalCombineFnRunner<?, AccumT, OutputT> combineFnRunner; - public FinalFlinkCombiner(CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn) { - combineFnRunner = PerKeyCombineFnRunners.create(combineFn); + public FinalFlinkCombiner(CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn) { + combineFnRunner = GlobalCombineFnRunners.create(combineFn); } @Override @@ -169,14 +169,14 @@ public abstract class AbstractFlinkCombineRunner< K key, AccumT accumulator, AccumT value, PipelineOptions options, SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { return combineFnRunner.mergeAccumulators( - key, ImmutableList.of(accumulator, value), options, sideInputReader, windows); + ImmutableList.of(accumulator, value), options, sideInputReader, windows); } @Override public OutputT extractOutput( K key, AccumT accumulator, PipelineOptions options, SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { - return combineFnRunner.extractOutput(key, accumulator, options, sideInputReader, windows); + return combineFnRunner.extractOutput(accumulator, options, sideInputReader, windows); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java index 3712598..9ccf079 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java @@ -41,7 +41,7 @@ public class FlinkMergingNonShuffleReduceFunction< K, InputT, AccumT, OutputT, W extends BoundedWindow> extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> { - private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn; + private final CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn; private final WindowingStrategy<Object, W> windowingStrategy; @@ -50,12 +50,12 @@ public class FlinkMergingNonShuffleReduceFunction< private final SerializedPipelineOptions serializedOptions; public FlinkMergingNonShuffleReduceFunction( - CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn, + CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn, WindowingStrategy<Object, W> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions pipelineOptions) { - this.combineFn = keyedCombineFn; + this.combineFn = combineFn; this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; @@ -75,7 +75,6 @@ public class FlinkMergingNonShuffleReduceFunction< new FlinkSideInputReader(sideInputs, getRuntimeContext()); AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> reduceRunner; - if (windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) { reduceRunner = new SortingFlinkCombineRunner<>(); } else { @@ -83,13 +82,12 @@ public class FlinkMergingNonShuffleReduceFunction< } reduceRunner.combine( - new AbstractFlinkCombineRunner.CompleteFlinkCombiner<>(combineFn), + new AbstractFlinkCombineRunner.CompleteFlinkCombiner<K, InputT, AccumT, OutputT>(combineFn), windowingStrategy, sideInputReader, options, elements, out); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java index 9a44840..4099f52 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java @@ -42,7 +42,7 @@ import org.apache.flink.util.Collector; public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWindow> extends RichGroupCombineFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, AccumT>>> { - protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn; + protected final CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn; protected final WindowingStrategy<Object, W> windowingStrategy; @@ -51,7 +51,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; public FlinkPartialReduceFunction( - CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn, + CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn, WindowingStrategy<Object, W> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions pipelineOptions) { @@ -83,7 +83,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind } reduceRunner.combine( - new AbstractFlinkCombineRunner.PartialFlinkCombiner<>(combineFn), + new AbstractFlinkCombineRunner.PartialFlinkCombiner<K, InputT, AccumT>(combineFn), windowingStrategy, sideInputReader, options, http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java index 6c1a2e4..90dcbff 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java @@ -42,7 +42,7 @@ import org.apache.flink.util.Collector; public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> extends RichGroupReduceFunction<WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> { - protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn; + protected final CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn; protected final WindowingStrategy<Object, W> windowingStrategy; @@ -51,12 +51,12 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> protected final SerializedPipelineOptions serializedOptions; public FlinkReduceFunction( - CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn, + CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn, WindowingStrategy<Object, W> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions pipelineOptions) { - this.combineFn = keyedCombineFn; + this.combineFn = combineFn; this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; @@ -83,15 +83,13 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> } else { reduceRunner = new SortingFlinkCombineRunner<>(); } - reduceRunner.combine( - new AbstractFlinkCombineRunner.FinalFlinkCombiner<>(combineFn), + new AbstractFlinkCombineRunner.FinalFlinkCombiner<K, AccumT, OutputT>(combineFn), windowingStrategy, sideInputReader, options, elements, out); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java index eac465c..4aacb4a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java @@ -43,7 +43,6 @@ import org.joda.time.Instant; public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends BoundedWindow> extends AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> { - @Override public void combine( FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner,