Johannes created FLINK-4586:
-------------------------------

             Summary: NumberSequenceIterator and Accumulator threading issue
                 Key: FLINK-4586
                 URL: https://issues.apache.org/jira/browse/FLINK-4586
             Project: Flink
          Issue Type: Bug
          Components: DataSet API
    Affects Versions: 1.1.2
            Reporter: Johannes
            Priority: Minor


There is a strange problem when using the NumberSequenceIterator in combination 
with an AverageAccumulator.

It seems like the individual accumulators are reinitialized and overwrite parts 
of intermediate solution.

The following scala snippit exemplifies the problem.
Instead of printing the correct average, the result should be {{50.5}} but is 
something completely different, like {{8.08}}, dependent on the number of cores 
used.
If the parallelism is set to {{1}} the result is correct, which seems like 
there is a problem with threading. The problem occurs using the java and scala 
API.

{code}
env
  .fromParallelCollection(new NumberSequenceIterator(1, 100))
  .map(new RichMapFunction[Long, Long] {
        var a : AverageAccumulator = _

        override def map(value: Long): Long = {
          a.add(value)
          value
        }

        override def open(parameters: Configuration): Unit = {
          a = new AverageAccumulator
          getRuntimeContext.addAccumulator("test", a)
        }
  })
  .reduce((a, b) => a + b)
  .print()


val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult

println(lastJobExecutionResult.getAccumulatorResult("test"))
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to