Make aggregator registration idempotent in FlinkRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2089c5cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2089c5cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2089c5cd Branch: refs/heads/gearpump-runner Commit: 2089c5cd2662a2eeea39ac7ebd1bfd8bcdc1aa16 Parents: 1919d8b Author: Kenneth Knowles <k...@google.com> Authored: Sun Oct 23 21:26:48 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Sun Oct 23 21:26:48 2016 -0700 ---------------------------------------------------------------------- .../flink/translation/functions/FlinkProcessContext.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2089c5cd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java index fa5eb1a..baf97cb 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.Iterables; import java.io.IOException; +import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -40,6 +41,7 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.util.Collector; import org.joda.time.Instant; @@ -316,7 +318,13 @@ class FlinkProcessContext<InputT, OutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new SerializableFnAggregatorWrapper<>(combiner); - runtimeContext.addAccumulator(name, wrapper); + Accumulator<?, ?> existingAccum = + (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name); + if (existingAccum != null) { + return wrapper; + } else { + runtimeContext.addAccumulator(name, wrapper); + } return wrapper; } }