SuYan created SPARK-6606: ---------------------------- Summary: Accumulator deserialized twice because the NarrowCoGroupSplitDep contains rdd object. Key: SPARK-6606 URL: https://issues.apache.org/jira/browse/SPARK-6606 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.3.0, 1.2.0 Reporter: SuYan
1. Use code like belows, will found accumulator deserialized twice. first: {code} task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) {code} second: {code} val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) {code} which the first deserialized is not what expected. because ResultTask or ShuffleMapTask will have a partition object. in class {code} CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner) {code}, the CogroupPartition may contains a CoGroupDep: {code} NarrowCoGroupSplitDep( rdd: RDD[_], splitIndex: Int, var split: Partition ) extends CoGroupSplitDep { {code} in that *NarrowCoGroupSplitDep*, it will bring into rdd object, which result into the first deserialized. example: {code} val acc1 = sc.accumulator(0, "test1") val acc2 = sc.accumulator(0, "test2") val rdd1 = sc.parallelize((1 to 10).toSeq, 3) val rdd2 = sc.parallelize((1 to 10).toSeq, 3) val combine1 = rdd1.map { case a => (a, 1)}.combineByKey(a => { acc1 += 1 a }, (a: Int, b: Int) => { a + b }, (a: Int, b: Int) => { a + b }, new HashPartitioner(3), mapSideCombine = false) val combine2 = rdd2.map { case a => (a, 1)}.combineByKey( a => { acc2 += 1 a }, (a: Int, b: Int) => { a + b }, (a: Int, b: Int) => { a + b }, new HashPartitioner(3), mapSideCombine = false) combine1.cogroup(combine2, new HashPartitioner(3)).count() {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org