[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbo Zhao updated SPARK-24373:
-------------------------------
    Description: 
Here is the code to reproduce in local mode
{code:java}
scala> val df = sc.range(1, 2).toDF
df: org.apache.spark.sql.DataFrame = [value: bigint]

scala> val myudf = udf({x: Long => println("xxxx"); x + 1})
myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
UserDefinedFunction(<function1>,LongType,Some(List(LongType)))

scala> val df1 = df.withColumn("value1", myudf(col("value")))
df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]

scala> df1.cache
res0: df1.type = [value: bigint, value1: bigint]

scala> df1.count
res1: Long = 1 

scala> df1.count
res2: Long = 1

scala> df1.count
res3: Long = 1
{code}
 

in Spark 2.2, you could see it prints "xxxx". 

In the above example, when you do explain. You could see
{code:java}
scala> df1.explain(true)
== Parsed Logical Plan ==
'Project [value#2L, UDF('value) AS value1#5]
+- AnalysisBarrier
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
value: bigint, value1: bigint
Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Optimized Logical Plan ==
InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, memory, 
deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

== Physical Plan ==
*(1) InMemoryTableScan [value#2L, value1#5L]
+- InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, 
memory, deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

{code}
but the ImMemoryTableScan is mising in the following explain()
{code:java}
scala> df1.groupBy().count().explain(true)
== Parsed Logical Plan ==
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Optimized Logical Plan ==
Aggregate [count(1) AS count#170L]
+- Project
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
output=[count#175L])
+- *(1) Project
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]
{code}
 

 

  was:
Here is the code to reproduce in local mode
{code:java}
scala> val df = sc.range(1, 2).toDF
df: org.apache.spark.sql.DataFrame = [value: bigint]
scala> val myudf = udf({x: Long => println("xxxx"); x + 1})
scala> val df1 = df.withColumn("value1", myudf(col("value")))
scala> df1.cache()
res0: df1.type = [value: bigint, value1: bigint] 
scala> df1.count
res1: Long = 1
scala> df1.count
res2: Long = 1 
scala> df1.count 
res3: Long = 1
{code}
 

in Spark 2.2, you could see it prints "xxxx". 

In the above example, when you do explain. You could see
{code:java}
scala> df1.explain(true)
== Parsed Logical Plan ==
'Project [value#2L, UDF('value) AS value1#5]
+- AnalysisBarrier
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
value: bigint, value1: bigint
Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Optimized Logical Plan ==
InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, memory, 
deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

== Physical Plan ==
*(1) InMemoryTableScan [value#2L, value1#5L]
+- InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, 
memory, deserialized, 1 replicas)
+- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]

{code}
but the ImMemoryTableScan is mising in the following explain()
{code:java}
scala> df1.groupBy().count().explain(true)
== Parsed Logical Plan ==
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#170L]
+- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
null else UDF(value#2L) AS value1#5L]
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Optimized Logical Plan ==
Aggregate [count(1) AS count#170L]
+- Project
+- SerializeFromObject [input[0, bigint, false] AS value#2L]
+- ExternalRDD [obj#1L]

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
output=[count#175L])
+- *(1) Project
+- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
+- Scan ExternalRDDScan[obj#1L]
{code}
 

 


> Spark Dataset groupby.agg/count doesn't respect cache with UDF
> --------------------------------------------------------------
>
>                 Key: SPARK-24373
>                 URL: https://issues.apache.org/jira/browse/SPARK-24373
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output
>    Affects Versions: 2.3.0
>            Reporter: Wenbo Zhao
>            Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println("xxxx"); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(<function1>,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "xxxx". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to