Hi all, I wrote an AccumulatorParam but in my job it does not seem to be adding the values. When I tried with an int accumulator in my job the value was added to.
object MapAccumulatorParam extends AccumulatorParam[Map[Long, Int]]{ > def zero(initialValue: Map[Long, Int] = Map.empty): Map[Long, Int] = { > Map.empty > } > override def addInPlace(m1: Map[Long, Int], m2: Map[Long, Int]): > Map[Long, Int] = { > val res = m1.map { > case (k, v) => { > k -> (v + m2.getOrElse(k, 0)) > } > } ++ m2 > println(res) > res > } > override def addAccumulator(t1: Map[Long, Int], t2: Map[Long, Int]): > Map[Long, Int] = { > val res = t1.map { > case (k, v) => { > k -> (v + t2.getOrElse(k, 0)) > } > } ++ t2 > println(res) > res > } > } To use it I am doing something like this: val acc1 = sparkEnv.sc.accumulator(Map.empty[Long, > Int])(MapAccumulatorParam) > val acc2 = sparkEnv.sc.accumulator(Map.empty[Long, > Int])(MapAccumulatorParam) > new SparkJob(sparkEnv, parsedArgs, unpricedCallsAcc, acc1, acc2).runJob() > match { ...' Where sparkEnv is a wrapper for both SQLContext and SparkContext. Is there any reason my values might not be getting added together? Did I initialize the accumulators in the wrong place (it is where my spark contexts get created) When I step through the function I see that I get back a Map(1 -> 1) when I do acc1.add(Map(1 -> 1) but it never starts the add with a nonempty Map. Any ideas? Thanks in advance.