Hi all,
I wrote an AccumulatorParam but in my job it does not seem to be adding the
values. When I tried with an int accumulator in my job the value was added
to.
object MapAccumulatorParam extends AccumulatorParam[Map[Long, Int]]{
> def zero(initialValue: Map[Long, Int] = Map.empty): Map[Long, Int] = {
> Map.empty
> }
> override def addInPlace(m1: Map[Long, Int], m2: Map[Long, Int]):
> Map[Long, Int] = {
> val res = m1.map {
> case (k, v) => {
> k -> (v + m2.getOrElse(k, 0))
> }
> } ++ m2
> println(res)
> res
> }
> override def addAccumulator(t1: Map[Long, Int], t2: Map[Long, Int]):
> Map[Long, Int] = {
> val res = t1.map {
> case (k, v) => {
> k -> (v + t2.getOrElse(k, 0))
> }
> } ++ t2
> println(res)
> res
> }
> }
To use it I am doing something like this:
val acc1 = sparkEnv.sc.accumulator(Map.empty[Long,
> Int])(MapAccumulatorParam)
> val acc2 = sparkEnv.sc.accumulator(Map.empty[Long,
> Int])(MapAccumulatorParam)
> new SparkJob(sparkEnv, parsedArgs, unpricedCallsAcc, acc1, acc2).runJob()
> match { ...'
Where sparkEnv is a wrapper for both SQLContext and SparkContext. Is there
any reason my values might not be getting added together? Did I initialize
the accumulators in the wrong place (it is where my spark contexts get
created) When I step through the function I see that I get back a Map(1 ->
1) when I do
acc1.add(Map(1 -> 1)
but it never starts the add with a nonempty Map. Any ideas? Thanks in
advance.