Hi all,
I'm incrementing several accumulators inside a foreach. Most of the time,
the accumulators will return the same value for the same dataset. However,
they sometimes differ.
I'm not sure how accumulators are implemented. Could this behavior be caused
by data not arriving before I print
Hello,
I am seeing negative values for accumulators. Here's my implementation in a
standalone app in Spark 1.1.1rc:
implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] {
def addInPlace(t1: Int, t2: BigInt) = BigInt(t1) + t2
def addInPlace(t1: BigInt, t2: BigInt) =
Similarly, I'm having an issue with the above solution when I use the
math.min() function to add to an accumulator. I'm seeing negative overflow
numbers again.
This code works fine without the math.min() and even if I add an arbitrarily
large number like 100
// doesn't work
someRDD.foreach(x={
To answer my own question,
I was declaring the accumulator incorrectly. The code should look like this:
scala import org.apache.spark.AccumulatorParam
import org.apache.spark.AccumulatorParam
scala :paste
// Entering paste mode (ctrl-D to finish)
implicit object BigIntAccumulatorParam extends
Hello!
Does anyone know why I may be receiving negative final accumulator values?
Thanks!
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
(BigInt(0))
accu: org.apache.spark.Accumulator[scala.math.BigInt] = 0
scala accu += 100
scala accu.value
res1: scala.math.BigInt = 100
Best Regards,
Shixiong Zhu
2014-11-25 10:31 GMT+08:00 Peter Thai thai.pe...@gmail.com:
Hello!
Does anyone know why I may be receiving negative final
Quick update:
It is a filter job that creates the error above, not the reduceByKey
Why would a filter cause an out of memory?
Here is my code
val inputgsup
=hdfs://+sparkmasterip+/user/sense/datasets/gsup/binary/30/2014/11/0[1-9]/part*;
val gsupfile =