Non-deterministic Accumulator Values

2015-02-26 Thread Peter Thai
Hi all, I'm incrementing several accumulators inside a foreach. Most of the time, the accumulators will return the same value for the same dataset. However, they sometimes differ. I'm not sure how accumulators are implemented. Could this behavior be caused by data not arriving before I print

Negative Accumulators

2015-01-30 Thread Peter Thai
Hello, I am seeing negative values for accumulators. Here's my implementation in a standalone app in Spark 1.1.1rc: implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] { def addInPlace(t1: Int, t2: BigInt) = BigInt(t1) + t2 def addInPlace(t1: BigInt, t2: BigInt) =

Re: Negative Accumulators

2014-12-02 Thread Peter Thai
Similarly, I'm having an issue with the above solution when I use the math.min() function to add to an accumulator. I'm seeing negative overflow numbers again. This code works fine without the math.min() and even if I add an arbitrarily large number like 100 // doesn't work someRDD.foreach(x={

Re: Negative Accumulators

2014-12-02 Thread Peter Thai
To answer my own question, I was declaring the accumulator incorrectly. The code should look like this: scala import org.apache.spark.AccumulatorParam import org.apache.spark.AccumulatorParam scala :paste // Entering paste mode (ctrl-D to finish) implicit object BigIntAccumulatorParam extends

Negative Accumulators

2014-11-24 Thread Peter Thai
Hello! Does anyone know why I may be receiving negative final accumulator values? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706.html Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Negative Accumulators

2014-11-24 Thread Peter Thai
(BigInt(0)) accu: org.apache.spark.Accumulator[scala.math.BigInt] = 0 scala accu += 100 scala accu.value res1: scala.math.BigInt = 100 ​ Best Regards, Shixiong Zhu 2014-11-25 10:31 GMT+08:00 Peter Thai thai.pe...@gmail.com: Hello! Does anyone know why I may be receiving negative final

Re: JVM Memory Woes

2014-11-21 Thread Peter Thai
Quick update: It is a filter job that creates the error above, not the reduceByKey Why would a filter cause an out of memory? Here is my code val inputgsup =hdfs://+sparkmasterip+/user/sense/datasets/gsup/binary/30/2014/11/0[1-9]/part*; val gsupfile =