[ https://issues.apache.org/jira/browse/BEAM-1102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15727742#comment-15727742 ]
Aljoscha Krettek commented on BEAM-1102: ---------------------------------------- The problem is this part in {{FlinkProcessContextBase}}: {code} @Override protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new SerializableFnAggregatorWrapper<>(combiner); Accumulator<?, ?> existingAccum = (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name); if (existingAccum != null) { return wrapper; } else { runtimeContext.addAccumulator(name, wrapper); } return wrapper; } {code} Notice how the newly created wrapper is returned if the accumulator already exists. > Flink Batch Runner does not populate aggregator values > ------------------------------------------------------ > > Key: BEAM-1102 > URL: https://issues.apache.org/jira/browse/BEAM-1102 > Project: Beam > Issue Type: Bug > Components: runner-flink > Affects Versions: 0.3.0-incubating > Reporter: Daniel Halperin > Assignee: Aljoscha Krettek > Priority: Minor > > Running the quickstart gives 0 for emptyLines. > Running with {{--streaming=true}} gives the correct value (for my input file, > the default examples archetype {{pom.xml}}, the true value is 27 at the time > of writing). > Streaming output: > {code} > Dec 07, 2016 1:00:53 PM org.apache.beam.runners.flink.FlinkRunner run > INFO: Final aggregator values: > Dec 07, 2016 1:00:53 PM org.apache.beam.runners.flink.FlinkRunner run > INFO: DroppedDueToLateness : 0 > Dec 07, 2016 1:00:53 PM org.apache.beam.runners.flink.FlinkRunner run > INFO: emptyLines : 27 > Dec 07, 2016 1:00:53 PM org.apache.beam.runners.flink.FlinkRunner run > INFO: DroppedDueToClosedWindow : 0 > {code} > Non-streaming output: > {code} > Dec 07, 2016 12:58:07 PM org.apache.beam.runners.flink.FlinkRunner run > INFO: Final aggregator values: > Dec 07, 2016 12:58:07 PM org.apache.beam.runners.flink.FlinkRunner run > INFO: emptyLines : 0 > {code} > (Note also that the lateness etc. aggregators are missing entirely, may be > expected). -- This message was sent by Atlassian JIRA (v6.3.4#6332)