Repository: incubator-beam
Updated Branches:
  refs/heads/master 0a2ed832c -> b41a46e86


[FLINK-1102] Fix Aggregator Registration in Flink Batch Runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/869b2710
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/869b2710
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/869b2710

Branch: refs/heads/master
Commit: 869b2710efdb90bc3ce5b6e9d4f3b49a3a804a63
Parents: 0a2ed83
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Dec 7 13:28:13 2016 +0800
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed Dec 7 15:25:44 2016 +0800

----------------------------------------------------------------------
 .../functions/FlinkProcessContextBase.java      | 21 +++++++++-----------
 1 file changed, 9 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/869b2710/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index 42607dd..6afca38 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.flink.translation.functions;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.Iterables;
-import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -39,7 +38,6 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.joda.time.Instant;
 
@@ -256,15 +254,14 @@ abstract class FlinkProcessContextBase<InputT, OutputT>
   @Override
   protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
   createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, 
AggOutputT> combiner) {
-    SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper =
-        new SerializableFnAggregatorWrapper<>(combiner);
-    Accumulator<?, ?> existingAccum =
-        (Accumulator<AggInputT, Serializable>) 
runtimeContext.getAccumulator(name);
-    if (existingAccum != null) {
-      return wrapper;
-    } else {
-      runtimeContext.addAccumulator(name, wrapper);
+    @SuppressWarnings("unchecked")
+    SerializableFnAggregatorWrapper<AggInputT, AggOutputT> result =
+        (SerializableFnAggregatorWrapper<AggInputT, AggOutputT>)
+            runtimeContext.getAccumulator(name);
+
+    if (result == null) {
+      result = new SerializableFnAggregatorWrapper<>(combiner);
+      runtimeContext.addAccumulator(name, result);
     }
-    return wrapper;
-  }
+    return result;  }
 }

Reply via email to