[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-4586: -- Fix Version/s: 1.1.4 1.2.0 > NumberSequenceIterator and Accumulator threading issue > -- > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.1.2 >Reporter: Johannes >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.2.0, 1.1.4 > > Attachments: FLINK4586Test.scala > > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solutions. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johannes updated FLINK-4586: Attachment: FLINK4586Test.scala Scala unit test > NumberSequenceIterator and Accumulator threading issue > -- > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.1.2 >Reporter: Johannes >Priority: Minor > Attachments: FLINK4586Test.scala > > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solutions. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johannes updated FLINK-4586: Attachment: (was: FLINK4586Test.scala) > NumberSequenceIterator and Accumulator threading issue > -- > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.1.2 >Reporter: Johannes >Priority: Minor > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solutions. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johannes updated FLINK-4586: Description: There is a strange problem when using the NumberSequenceIterator in combination with an AverageAccumulator. It seems like the individual accumulators are reinitialized and overwrite parts of intermediate solutions. The following scala snippit exemplifies the problem. Instead of printing the correct average, the result should be {{50.5}} but is something completely different, like {{8.08}}, dependent on the number of cores used. If the parallelism is set to {{1}} the result is correct, which indicates a likely threading problem. The problem occurs using the java and scala API. {code} env .fromParallelCollection(new NumberSequenceIterator(1, 100)) .map(new RichMapFunction[Long, Long] { var a : AverageAccumulator = _ override def map(value: Long): Long = { a.add(value) value } override def open(parameters: Configuration): Unit = { a = new AverageAccumulator getRuntimeContext.addAccumulator("test", a) } }) .reduce((a, b) => a + b) .print() val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult println(lastJobExecutionResult.getAccumulatorResult("test")) {code} was: There is a strange problem when using the NumberSequenceIterator in combination with an AverageAccumulator. It seems like the individual accumulators are reinitialized and overwrite parts of intermediate solution. The following scala snippit exemplifies the problem. Instead of printing the correct average, the result should be {{50.5}} but is something completely different, like {{8.08}}, dependent on the number of cores used. If the parallelism is set to {{1}} the result is correct, which indicates a likely threading problem. The problem occurs using the java and scala API. {code} env .fromParallelCollection(new NumberSequenceIterator(1, 100)) .map(new RichMapFunction[Long, Long] { var a : AverageAccumulator = _ override def map(value: Long): Long = { a.add(value) value } override def open(parameters: Configuration): Unit = { a = new AverageAccumulator getRuntimeContext.addAccumulator("test", a) } }) .reduce((a, b) => a + b) .print() val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult println(lastJobExecutionResult.getAccumulatorResult("test")) {code} > NumberSequenceIterator and Accumulator threading issue > -- > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.1.2 >Reporter: Johannes >Priority: Minor > Attachments: FLINK4586Test.scala > > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solutions. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johannes updated FLINK-4586: Description: There is a strange problem when using the NumberSequenceIterator in combination with an AverageAccumulator. It seems like the individual accumulators are reinitialized and overwrite parts of intermediate solution. The following scala snippit exemplifies the problem. Instead of printing the correct average, the result should be {{50.5}} but is something completely different, like {{8.08}}, dependent on the number of cores used. If the parallelism is set to {{1}} the result is correct, which indicates a likely threading problem. The problem occurs using the java and scala API. {code} env .fromParallelCollection(new NumberSequenceIterator(1, 100)) .map(new RichMapFunction[Long, Long] { var a : AverageAccumulator = _ override def map(value: Long): Long = { a.add(value) value } override def open(parameters: Configuration): Unit = { a = new AverageAccumulator getRuntimeContext.addAccumulator("test", a) } }) .reduce((a, b) => a + b) .print() val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult println(lastJobExecutionResult.getAccumulatorResult("test")) {code} was: There is a strange problem when using the NumberSequenceIterator in combination with an AverageAccumulator. It seems like the individual accumulators are reinitialized and overwrite parts of intermediate solution. The following scala snippit exemplifies the problem. Instead of printing the correct average, the result should be {{50.5}} but is something completely different, like {{8.08}}, dependent on the number of cores used. If the parallelism is set to {{1}} the result is correct, which seems like there is a problem with threading. The problem occurs using the java and scala API. {code} env .fromParallelCollection(new NumberSequenceIterator(1, 100)) .map(new RichMapFunction[Long, Long] { var a : AverageAccumulator = _ override def map(value: Long): Long = { a.add(value) value } override def open(parameters: Configuration): Unit = { a = new AverageAccumulator getRuntimeContext.addAccumulator("test", a) } }) .reduce((a, b) => a + b) .print() val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult println(lastJobExecutionResult.getAccumulatorResult("test")) {code} > NumberSequenceIterator and Accumulator threading issue > -- > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.1.2 >Reporter: Johannes >Priority: Minor > Attachments: FLINK4586Test.scala > > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solution. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johannes updated FLINK-4586: Attachment: FLINK4586Test.scala Complete Scala Testcase > NumberSequenceIterator and Accumulator threading issue > -- > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.1.2 >Reporter: Johannes >Priority: Minor > Attachments: FLINK4586Test.scala > > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solution. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which seems like > there is a problem with threading. The problem occurs using the java and > scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)