[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491304#comment-16491304
 ] 

Xiao Li commented on SPARK-24373:
---------------------------------

In the above example, each time when we re-analyze the plan that is recreated 
through the Dataset APIs count(), groupBy(), rollup(), cube(), rollup, pivot() 
and groupByKey(), the Analyzer rule HandleNullInputsForUDF will add the extra 
IF expression above the UDF in the previously resolved sub-plan. Note, this is 
not the only rule that could change the analyzed plans if we re-run the 
analyzer.

This is a regression introduced by 
[https://github.com/apache/spark/pull/17770]. We replaced the original solution 
(based on the analyzed flag) by the AnalysisBarrier. However, we did not add 
the AnalysisBarrier on the APIs of RelationalGroupedDataset and 
KeyValueGroupedDataset.

To fix it, we will changes the plan again. We might face some unknown issues. 
How about adding a temporary flag in Spark 2.3.1? If anything unexpected 
happens, our users still can change it back to the Spark 2.3.0 behavior?

> "df.cache() df.count()" no longer eagerly caches data when the analyzed plans 
> are different after re-analyzing the plans
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24373
>                 URL: https://issues.apache.org/jira/browse/SPARK-24373
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Wenbo Zhao
>            Priority: Blocker
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println("xxxx"); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(<function1>,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "xxxx". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to