Repository: incubator-beam Updated Branches: refs/heads/master 0a2ed832c -> b41a46e86
[FLINK-1102] Fix Aggregator Registration in Flink Batch Runner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/869b2710 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/869b2710 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/869b2710 Branch: refs/heads/master Commit: 869b2710efdb90bc3ce5b6e9d4f3b49a3a804a63 Parents: 0a2ed83 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Dec 7 13:28:13 2016 +0800 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed Dec 7 15:25:44 2016 +0800 ---------------------------------------------------------------------- .../functions/FlinkProcessContextBase.java | 21 +++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/869b2710/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java index 42607dd..6afca38 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.flink.translation.functions; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.Iterables; -import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -39,7 +38,6 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.RuntimeContext; import org.joda.time.Instant; @@ -256,15 +254,14 @@ abstract class FlinkProcessContextBase<InputT, OutputT> @Override protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { - SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = - new SerializableFnAggregatorWrapper<>(combiner); - Accumulator<?, ?> existingAccum = - (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name); - if (existingAccum != null) { - return wrapper; - } else { - runtimeContext.addAccumulator(name, wrapper); + @SuppressWarnings("unchecked") + SerializableFnAggregatorWrapper<AggInputT, AggOutputT> result = + (SerializableFnAggregatorWrapper<AggInputT, AggOutputT>) + runtimeContext.getAccumulator(name); + + if (result == null) { + result = new SerializableFnAggregatorWrapper<>(combiner); + runtimeContext.addAccumulator(name, result); } - return wrapper; - } + return result; } }