Negative Accumulators
Hello, I am seeing negative values for accumulators. Here's my implementation in a standalone app in Spark 1.1.1rc: implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] { def addInPlace(t1: Int, t2: BigInt) = BigInt(t1) + t2 def addInPlace(t1: BigInt, t2: BigInt) = t1 + t2 def zero(initialValue: BigInt) = BigInt(0) } val capped_numpings_accu = sc.accumulator(BigInt(0))(BigIntAccumulatorParam) myRDD.foreach(x={ capped_numpings_accu+=BigInt(x._1).min(threshold_var)}) When I remove the min() condition, I no longer see negative values. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp21441.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Negative Accumulators
Sanity-check: would it be possible that `threshold_var` be negative ? — FG On Fri, Jan 30, 2015 at 5:06 PM, Peter Thai thai.pe...@gmail.com wrote: Hello, I am seeing negative values for accumulators. Here's my implementation in a standalone app in Spark 1.1.1rc: implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] { def addInPlace(t1: Int, t2: BigInt) = BigInt(t1) + t2 def addInPlace(t1: BigInt, t2: BigInt) = t1 + t2 def zero(initialValue: BigInt) = BigInt(0) } val capped_numpings_accu = sc.accumulator(BigInt(0))(BigIntAccumulatorParam) myRDD.foreach(x={ capped_numpings_accu+=BigInt(x._1).min(threshold_var)}) When I remove the min() condition, I no longer see negative values. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp21441.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Negative Accumulators
Similarly, I'm having an issue with the above solution when I use the math.min() function to add to an accumulator. I'm seeing negative overflow numbers again. This code works fine without the math.min() and even if I add an arbitrarily large number like 100 // doesn't work someRDD.foreach(x={ myAccumulator+=math.min(x._1, 100) }) //works someRDD.foreach(x={ myAccumulator+=x._1+100 }) Any ideas? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706p20183.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Negative Accumulators
To answer my own question, I was declaring the accumulator incorrectly. The code should look like this: scala import org.apache.spark.AccumulatorParam import org.apache.spark.AccumulatorParam scala :paste // Entering paste mode (ctrl-D to finish) implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] { def addInPlace(t1: BigInt, t2: BigInt) = t1 + t2 def zero(initialValue: BigInt) = BigInt(0) } // Exiting paste mode, now interpreting. defined module BigIntAccumulatorParam scala val accu = sc.accumulator(BigInt(0))(BigIntAccumulatorParam) accu: org.apache.spark.Accumulator[scala.math.BigInt] = 0 scala accu += 100 scala accu.value res1: scala.math.BigInt = 100 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706p20199.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Negative Accumulators
Hello! Does anyone know why I may be receiving negative final accumulator values? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Negative Accumulators
int overflow? If so, you can use BigInt like this: scala import org.apache.spark.AccumulatorParamimport org.apache.spark.AccumulatorParam scala :paste// Entering paste mode (ctrl-D to finish) implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] { def addInPlace(t1: BigInt, t2: BigInt) = t1 + t2 def zero(initialValue: BigInt) = BigInt(0) } // Exiting paste mode, now interpreting. defined module BigIntAccumulatorParam scala val accu = sc.accumulator(BigInt(0)) accu: org.apache.spark.Accumulator[scala.math.BigInt] = 0 scala accu += 100 scala accu.value res1: scala.math.BigInt = 100 Best Regards, Shixiong Zhu 2014-11-25 10:31 GMT+08:00 Peter Thai thai.pe...@gmail.com: Hello! Does anyone know why I may be receiving negative final accumulator values? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Negative Accumulators
Great! Worked like a charm :) On Mon, Nov 24, 2014 at 9:56 PM, Shixiong Zhu zsxw...@gmail.com wrote: int overflow? If so, you can use BigInt like this: scala import org.apache.spark.AccumulatorParamimport org.apache.spark.AccumulatorParam scala :paste// Entering paste mode (ctrl-D to finish) implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] { def addInPlace(t1: BigInt, t2: BigInt) = t1 + t2 def zero(initialValue: BigInt) = BigInt(0) } // Exiting paste mode, now interpreting. defined module BigIntAccumulatorParam scala val accu = sc.accumulator(BigInt(0)) accu: org.apache.spark.Accumulator[scala.math.BigInt] = 0 scala accu += 100 scala accu.value res1: scala.math.BigInt = 100 Best Regards, Shixiong Zhu 2014-11-25 10:31 GMT+08:00 Peter Thai thai.pe...@gmail.com: Hello! Does anyone know why I may be receiving negative final accumulator values? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org