[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Johannes updated FLINK-4586: ---------------------------- Attachment: FLINK4586Test.scala Scala unit test > NumberSequenceIterator and Accumulator threading issue > ------------------------------------------------------ > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API > Affects Versions: 1.1.2 > Reporter: Johannes > Priority: Minor > Attachments: FLINK4586Test.scala > > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solutions. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)