Negative Accumulators

2015-01-30 Thread Peter Thai
Hello,

I am seeing negative values for accumulators. Here's my implementation in a
standalone app in Spark 1.1.1rc:

  implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] {
def addInPlace(t1: Int, t2: BigInt) = BigInt(t1) + t2
def addInPlace(t1: BigInt, t2: BigInt) = t1 + t2
def zero(initialValue: BigInt) = BigInt(0)
  }

val capped_numpings_accu = sc.accumulator(BigInt(0))(BigIntAccumulatorParam)
myRDD.foreach(x={ capped_numpings_accu+=BigInt(x._1).min(threshold_var)})

When I remove the min() condition, I no longer see negative values.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp21441.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Negative Accumulators

2015-01-30 Thread francois . garillot
Sanity-check: would it be possible that `threshold_var` be negative ?



—
FG

On Fri, Jan 30, 2015 at 5:06 PM, Peter Thai thai.pe...@gmail.com wrote:

 Hello,
 I am seeing negative values for accumulators. Here's my implementation in a
 standalone app in Spark 1.1.1rc:
   implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] {
 def addInPlace(t1: Int, t2: BigInt) = BigInt(t1) + t2
 def addInPlace(t1: BigInt, t2: BigInt) = t1 + t2
 def zero(initialValue: BigInt) = BigInt(0)
   }
 val capped_numpings_accu = sc.accumulator(BigInt(0))(BigIntAccumulatorParam)
 myRDD.foreach(x={ capped_numpings_accu+=BigInt(x._1).min(threshold_var)})
 When I remove the min() condition, I no longer see negative values.
 Thanks!
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp21441.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

Re: Negative Accumulators

2014-12-02 Thread Peter Thai
Similarly, I'm having an issue with the above solution when I use the
math.min() function to add to an accumulator. I'm seeing negative overflow
numbers again.

This code works fine without the math.min() and even if I add an arbitrarily
large number like 100

// doesn't work
someRDD.foreach(x={
  myAccumulator+=math.min(x._1, 100)
})

//works
someRDD.foreach(x={
  myAccumulator+=x._1+100
})

Any ideas? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706p20183.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Negative Accumulators

2014-12-02 Thread Peter Thai
To answer my own question, 

I was declaring the accumulator incorrectly. The code should look like this:

scala import org.apache.spark.AccumulatorParam
import org.apache.spark.AccumulatorParam

scala :paste
// Entering paste mode (ctrl-D to finish)

implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] {
  def addInPlace(t1: BigInt, t2: BigInt) = t1 + t2
  def zero(initialValue: BigInt) = BigInt(0)
}

// Exiting paste mode, now interpreting.

defined module BigIntAccumulatorParam   

scala val accu = sc.accumulator(BigInt(0))(BigIntAccumulatorParam)
accu: org.apache.spark.Accumulator[scala.math.BigInt] = 0

scala accu += 100

scala accu.value
res1: scala.math.BigInt = 100



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706p20199.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Negative Accumulators

2014-11-24 Thread Peter Thai
Hello!

Does anyone know why I may be receiving negative final accumulator values? 

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Negative Accumulators

2014-11-24 Thread Shixiong Zhu
int overflow? If so, you can use BigInt like this:

scala import org.apache.spark.AccumulatorParamimport
org.apache.spark.AccumulatorParam

scala :paste// Entering paste mode (ctrl-D to finish)
implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] {
  def addInPlace(t1: BigInt, t2: BigInt) = t1 + t2
  def zero(initialValue: BigInt) = BigInt(0)
}
// Exiting paste mode, now interpreting.

defined module BigIntAccumulatorParam

scala val accu = sc.accumulator(BigInt(0))
accu: org.apache.spark.Accumulator[scala.math.BigInt] = 0

scala accu += 100

scala accu.value
res1: scala.math.BigInt = 100

​

Best Regards,
Shixiong Zhu

2014-11-25 10:31 GMT+08:00 Peter Thai thai.pe...@gmail.com:

 Hello!

 Does anyone know why I may be receiving negative final accumulator values?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Negative Accumulators

2014-11-24 Thread Peter Thai
Great! Worked like a charm :)

On Mon, Nov 24, 2014 at 9:56 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 int overflow? If so, you can use BigInt like this:

 scala import org.apache.spark.AccumulatorParamimport 
 org.apache.spark.AccumulatorParam

 scala :paste// Entering paste mode (ctrl-D to finish)
 implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] {
   def addInPlace(t1: BigInt, t2: BigInt) = t1 + t2
   def zero(initialValue: BigInt) = BigInt(0)
 }
 // Exiting paste mode, now interpreting.

 defined module BigIntAccumulatorParam

 scala val accu = sc.accumulator(BigInt(0))
 accu: org.apache.spark.Accumulator[scala.math.BigInt] = 0

 scala accu += 100

 scala accu.value
 res1: scala.math.BigInt = 100

 ​

 Best Regards,
 Shixiong Zhu

 2014-11-25 10:31 GMT+08:00 Peter Thai thai.pe...@gmail.com:

 Hello!

 Does anyone know why I may be receiving negative final accumulator values?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org