[ 
https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489060#comment-16489060
 ] 

Li Jin edited comment on SPARK-24373 at 5/24/18 9:00 PM:
---------------------------------------------------------

This is a reproduce:
{code:java}
val myUDF = udf((x: Long) => { println("xxxx"); x + 1 })

val df1 = spark.range(0, 1).toDF("s").select(myUDF($"s"))
df1.cache()
df1.count()
// No xxxx printed
{code}
It appears the issue is related to UDF:
{code:java}
val df1 = spark.range(0, 1).toDF("s").select(myUDF($"s"))
df1.cache()
df1.groupBy().count().explain()

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
  +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
    +- *(1) Project
      +- *(1) Range (0, 1, step=1, splits=2)
{code}
Without UDF it uses "count" materialize cache:
{code:java}
val df1 = spark.range(0, 1).toDF("s").select($"s" + 1)
df1.cache()
df1.groupBy().count().explain()

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
  +- Exchange SinglePartition
    +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(1) InMemoryTableScan
        +- InMemoryRelation [(s + 1)#179L], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) Project [(id#175L + 1) AS (s + 1)#179L]
          +- *(1) Range (0, 1, step=1, splits=2) ,None)
            +- *(1) Project [(id#175L + 1) AS (s + 1)#179L]
               +- *(1) Range (0, 1, step=1, splits=2)

{code}
 


was (Author: icexelloss):
This is a reproduce:
{code:java}
val myUDF = udf((x: Long) => { println("xxxx"); x + 1 })

val df1 = spark.range(0, 1).toDF("s").select(myUDF($"s"))
df1.cache()
df1.count()
// No xxxx printed
{code}
It appears the issue is related to UDF:

 
{code:java}
val df1 = spark.range(0, 1).toDF("s").select(myUDF($"s"))
df1.cache()
df1.groupBy().count().explain()

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
  +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
    +- *(1) Project
      +- *(1) Range (0, 1, step=1, splits=2)
{code}
Without UDF it uses "count" materialize cache:

 
{code:java}
val df1 = spark.range(0, 1).toDF("s").select($"s" + 1)
df1.cache()
df1.groupBy().count().explain()

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
  +- Exchange SinglePartition
    +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(1) InMemoryTableScan
        +- InMemoryRelation [(s + 1)#179L], 
CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) Project [(id#175L + 1) AS (s + 1)#179L]
          +- *(1) Range (0, 1, step=1, splits=2) ,None)
            +- *(1) Project [(id#175L + 1) AS (s + 1)#179L]
               +- *(1) Range (0, 1, step=1, splits=2)

{code}
 

 

 

> "df.cache() df.count()" no longer eagerly caches data
> -----------------------------------------------------
>
>                 Key: SPARK-24373
>                 URL: https://issues.apache.org/jira/browse/SPARK-24373
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Wenbo Zhao
>            Priority: Major
>
> Here is the code to reproduce in local mode
> {code:java}
> scala> val df = sc.range(1, 2).toDF
> df: org.apache.spark.sql.DataFrame = [value: bigint]
> scala> val myudf = udf({x: Long => println("xxxx"); x + 1})
> myudf: org.apache.spark.sql.expressions.UserDefinedFunction = 
> UserDefinedFunction(<function1>,LongType,Some(List(LongType)))
> scala> val df1 = df.withColumn("value1", myudf(col("value")))
> df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
> scala> df1.cache
> res0: df1.type = [value: bigint, value1: bigint]
> scala> df1.count
> res1: Long = 1 
> scala> df1.count
> res2: Long = 1
> scala> df1.count
> res3: Long = 1
> {code}
>  
> in Spark 2.2, you could see it prints "xxxx". 
> In the above example, when you do explain. You could see
> {code:java}
> scala> df1.explain(true)
> == Parsed Logical Plan ==
> 'Project [value#2L, UDF('value) AS value1#5]
> +- AnalysisBarrier
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> value: bigint, value1: bigint
> Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> == Physical Plan ==
> *(1) InMemoryTableScan [value#2L, value1#5L]
> +- InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
> but the ImMemoryTableScan is mising in the following explain()
> {code:java}
> scala> df1.groupBy().count().explain(true)
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS 
> value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#170L]
> +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) 
> null else UDF(value#2L) AS value1#5L]
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#170L]
> +- Project
> +- SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- ExternalRDD [obj#1L]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
> +- Exchange SinglePartition
> +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#175L])
> +- *(1) Project
> +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
> +- Scan ExternalRDDScan[obj#1L]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to