[ 
https://issues.apache.org/jira/browse/SPARK-11013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14950378#comment-14950378
 ] 

Shixiong Zhu edited comment on SPARK-11013 at 10/9/15 1:29 PM:
---------------------------------------------------------------

I may misunderstand this issue. 

Do you mean we should not report accumulators that are not updated? I think 
reporting `zero` is fine.

An AccumulableParam should guarantee 
{{AccumulableParam.addInPlace(AccumulableParam.zero, X) = X}}. Here is an 
example of min AccumulableParam

{code}
class LongMinAccumulableParam extends AccumulableParam[Option[Long], Long] {

  def addAccumulator(r: Option[Long], t: Long): Option[Long] = r match {
    case Some(l) => Some(l min t)
    case None => Some(t)
  }

  def addInPlace(r1: Option[Long], r2: Option[Long]): Option[Long] = r1 match {
    case Some(t1) =>
      r2 match {
        case Some(t2) => Some(t1 min t2)
        case None => r1
      }
    case None => r2
  }

  def zero(initialValue: Option[Long]): Option[Long] = None
}

{code}




was (Author: zsxwing):
I may misunderstand this issue. 

Do you mean we should not report accumulators that are not updated? I think 
reporting `zero` is fine.

An AccumulableParam should guarantee 
{{AccumulableParam.addInPlace(AccumulableParam.zero, X) = X}}. Here is an 
example of min AccumulableParam

{code}
class LongMinAccumulableParam extends AccumulableParam[Option[Long], Long] {

  def addAccumulator(r: Option[Long], t: Long): Option[Long] = r.map(_ min t)

  def addInPlace(r1: Option[Long], r2: Option[Long]): Option[Long] = 
r1.flatMap(t1 => r2.map(_ min t1))

  def zero(initialValue: Option[Long]): Option[Long] = None
}
{code}



> SparkPlan may mistakenly register child plan's accumulators for SQL metrics
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-11013
>                 URL: https://issues.apache.org/jira/browse/SPARK-11013
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>            Reporter: Wenchen Fan
>
> The reason is that: when we call RDD API inside SparkPlan, we are very likely 
> to reference the SparkPlan in the closure and thus serialize and transfer a 
> SparkPlan tree to executor side. When we deserialize it, the accumulators in 
> child SparkPlan are also deserialized and registered, and always report zero 
> value.
> This is not a problem currently because we only have one operation to 
> aggregate the accumulators: add. However, if we wanna support more complex 
> metric like min, the extra zero values will lead to wrong result.
> Take TungstenAggregate as an example, I logged "stageId, partitionId, 
> accumName, accumId" when an accumulator is deserialized and registered, and 
> logged the "accumId -> accumValue" map when a task ends. The output is:
> {code}
> scala> val df = Seq(1 -> "a", 2 -> "b").toDF("a", "b").groupBy().count()
> df: org.apache.spark.sql.DataFrame = [count: bigint]
> scala> df.collect
> register: 0 0 Some(number of input rows) 4
> register: 0 0 Some(number of output rows) 5
> register: 1 0 Some(number of input rows) 4
> register: 1 0 Some(number of output rows) 5
> register: 1 0 Some(number of input rows) 2
> register: 1 0 Some(number of output rows) 3
> Map(5 -> 1, 4 -> 2, 6 -> 4458496)
> Map(5 -> 0, 2 -> 1, 7 -> 4458496, 3 -> 1, 4 -> 0)
> res0: Array[org.apache.spark.sql.Row] = Array([2])
> {code}
> The best choice is to avoid serialize and deserialize a SparkPlan tree, which 
> can be achieved by LocalNode.
> Or we can do some workaround to fix this serialization problem for the 
> problematic SparkPlans like TungstenAggregate, TungstenSort.
> Or we can improve the SQL metrics framework to make it more robust to this 
> case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to