[ https://issues.apache.org/jira/browse/SPARK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14481639#comment-14481639 ]
Apache Spark commented on SPARK-6606: ------------------------------------- User 'kayousterhout' has created a pull request for this issue: https://github.com/apache/spark/pull/4145 > Accumulator deserialized twice because the NarrowCoGroupSplitDep contains rdd > object. > ------------------------------------------------------------------------------------- > > Key: SPARK-6606 > URL: https://issues.apache.org/jira/browse/SPARK-6606 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.2.0, 1.3.0 > Reporter: SuYan > > 1. Use code like belows, will found accumulator deserialized twice. > first: > {code} > task = ser.deserialize[Task[Any]](taskBytes, > Thread.currentThread.getContextClassLoader) > {code} > second: > {code} > val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( > ByteBuffer.wrap(taskBinary.value), > Thread.currentThread.getContextClassLoader) > {code} > which the first deserialized is not what expected. > because ResultTask or ShuffleMapTask will have a partition object. > in class > {code} > CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: > Partitioner) > {code}, the CogroupPartition may contains a CoGroupDep: > {code} > NarrowCoGroupSplitDep( > rdd: RDD[_], > splitIndex: Int, > var split: Partition > ) extends CoGroupSplitDep { > {code} > in that *NarrowCoGroupSplitDep*, it will bring into rdd object, which result > into the first deserialized. > example: > {code} > val acc1 = sc.accumulator(0, "test1") > val acc2 = sc.accumulator(0, "test2") > val rdd1 = sc.parallelize((1 to 10).toSeq, 3) > val rdd2 = sc.parallelize((1 to 10).toSeq, 3) > val combine1 = rdd1.map { case a => (a, 1)}.combineByKey(a => { > acc1 += 1 > a > }, (a: Int, b: Int) => { > a + b > }, > (a: Int, b: Int) => { > a + b > }, new HashPartitioner(3), mapSideCombine = false) > val combine2 = rdd2.map { case a => (a, 1)}.combineByKey( > a => { > acc2 += 1 > a > }, > (a: Int, b: Int) => { > a + b > }, > (a: Int, b: Int) => { > a + b > }, new HashPartitioner(3), mapSideCombine = false) > combine1.cogroup(combine2, new HashPartitioner(3)).count() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org