[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Greg Hogan closed FLINK-4586. ----------------------------- Resolution: Fixed Fixed in 1.2.0: 428419d599d138f1647f84807d6d0224652f3d1b 1.1.4: 9c87f92cbb9989281024c5c300cd18b962ac4357 > NumberSequenceIterator and Accumulator threading issue > ------------------------------------------------------ > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API > Affects Versions: 1.1.2 > Reporter: Johannes > Assignee: Greg Hogan > Priority: Minor > Fix For: 1.2.0, 1.1.4 > > Attachments: FLINK4586Test.scala > > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solutions. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)