[ https://issues.apache.org/jira/browse/SPARK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14950378#comment-14950378 ]
Shixiong Zhu edited comment on SPARK-11013 at 10/9/15 1:29 PM: --------------------------------------------------------------- I may misunderstand this issue. Do you mean we should not report accumulators that are not updated? I think reporting `zero` is fine. An AccumulableParam should guarantee {{AccumulableParam.addInPlace(AccumulableParam.zero, X) = X}}. Here is an example of min AccumulableParam {code} class LongMinAccumulableParam extends AccumulableParam[Option[Long], Long] { def addAccumulator(r: Option[Long], t: Long): Option[Long] = r match { case Some(l) => Some(l min t) case None => Some(t) } def addInPlace(r1: Option[Long], r2: Option[Long]): Option[Long] = r1 match { case Some(t1) => r2 match { case Some(t2) => Some(t1 min t2) case None => r1 } case None => r2 } def zero(initialValue: Option[Long]): Option[Long] = None } {code} was (Author: zsxwing): I may misunderstand this issue. Do you mean we should not report accumulators that are not updated? I think reporting `zero` is fine. An AccumulableParam should guarantee {{AccumulableParam.addInPlace(AccumulableParam.zero, X) = X}}. Here is an example of min AccumulableParam {code} class LongMinAccumulableParam extends AccumulableParam[Option[Long], Long] { def addAccumulator(r: Option[Long], t: Long): Option[Long] = r.map(_ min t) def addInPlace(r1: Option[Long], r2: Option[Long]): Option[Long] = r1.flatMap(t1 => r2.map(_ min t1)) def zero(initialValue: Option[Long]): Option[Long] = None } {code} > SparkPlan may mistakenly register child plan's accumulators for SQL metrics > --------------------------------------------------------------------------- > > Key: SPARK-11013 > URL: https://issues.apache.org/jira/browse/SPARK-11013 > Project: Spark > Issue Type: Bug > Components: SQL > Reporter: Wenchen Fan > > The reason is that: when we call RDD API inside SparkPlan, we are very likely > to reference the SparkPlan in the closure and thus serialize and transfer a > SparkPlan tree to executor side. When we deserialize it, the accumulators in > child SparkPlan are also deserialized and registered, and always report zero > value. > This is not a problem currently because we only have one operation to > aggregate the accumulators: add. However, if we wanna support more complex > metric like min, the extra zero values will lead to wrong result. > Take TungstenAggregate as an example, I logged "stageId, partitionId, > accumName, accumId" when an accumulator is deserialized and registered, and > logged the "accumId -> accumValue" map when a task ends. The output is: > {code} > scala> val df = Seq(1 -> "a", 2 -> "b").toDF("a", "b").groupBy().count() > df: org.apache.spark.sql.DataFrame = [count: bigint] > scala> df.collect > register: 0 0 Some(number of input rows) 4 > register: 0 0 Some(number of output rows) 5 > register: 1 0 Some(number of input rows) 4 > register: 1 0 Some(number of output rows) 5 > register: 1 0 Some(number of input rows) 2 > register: 1 0 Some(number of output rows) 3 > Map(5 -> 1, 4 -> 2, 6 -> 4458496) > Map(5 -> 0, 2 -> 1, 7 -> 4458496, 3 -> 1, 4 -> 0) > res0: Array[org.apache.spark.sql.Row] = Array([2]) > {code} > The best choice is to avoid serialize and deserialize a SparkPlan tree, which > can be achieved by LocalNode. > Or we can do some workaround to fix this serialization problem for the > problematic SparkPlans like TungstenAggregate, TungstenSort. > Or we can improve the SQL metrics framework to make it more robust to this > case. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org