[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489060#comment-16489060 ]
Li Jin edited comment on SPARK-24373 at 5/24/18 9:00 PM: --------------------------------------------------------- This is a reproduce: {code:java} val myUDF = udf((x: Long) => { println("xxxx"); x + 1 }) val df1 = spark.range(0, 1).toDF("s").select(myUDF($"s")) df1.cache() df1.count() // No xxxx printed {code} It appears the issue is related to UDF: {code:java} val df1 = spark.range(0, 1).toDF("s").select(myUDF($"s")) df1.cache() df1.groupBy().count().explain() == Physical Plan == *(2) HashAggregate(keys=[], functions=[count(1)]) +- Exchange SinglePartition +- *(1) HashAggregate(keys=[], functions=[partial_count(1)]) +- *(1) Project +- *(1) Range (0, 1, step=1, splits=2) {code} Without UDF it uses "count" materialize cache: {code:java} val df1 = spark.range(0, 1).toDF("s").select($"s" + 1) df1.cache() df1.groupBy().count().explain() == Physical Plan == *(2) HashAggregate(keys=[], functions=[count(1)]) +- Exchange SinglePartition +- *(1) HashAggregate(keys=[], functions=[partial_count(1)]) +- *(1) InMemoryTableScan +- InMemoryRelation [(s + 1)#179L], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) Project [(id#175L + 1) AS (s + 1)#179L] +- *(1) Range (0, 1, step=1, splits=2) ,None) +- *(1) Project [(id#175L + 1) AS (s + 1)#179L] +- *(1) Range (0, 1, step=1, splits=2) {code} was (Author: icexelloss): This is a reproduce: {code:java} val myUDF = udf((x: Long) => { println("xxxx"); x + 1 }) val df1 = spark.range(0, 1).toDF("s").select(myUDF($"s")) df1.cache() df1.count() // No xxxx printed {code} It appears the issue is related to UDF: {code:java} val df1 = spark.range(0, 1).toDF("s").select(myUDF($"s")) df1.cache() df1.groupBy().count().explain() == Physical Plan == *(2) HashAggregate(keys=[], functions=[count(1)]) +- Exchange SinglePartition +- *(1) HashAggregate(keys=[], functions=[partial_count(1)]) +- *(1) Project +- *(1) Range (0, 1, step=1, splits=2) {code} Without UDF it uses "count" materialize cache: {code:java} val df1 = spark.range(0, 1).toDF("s").select($"s" + 1) df1.cache() df1.groupBy().count().explain() == Physical Plan == *(2) HashAggregate(keys=[], functions=[count(1)]) +- Exchange SinglePartition +- *(1) HashAggregate(keys=[], functions=[partial_count(1)]) +- *(1) InMemoryTableScan +- InMemoryRelation [(s + 1)#179L], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) Project [(id#175L + 1) AS (s + 1)#179L] +- *(1) Range (0, 1, step=1, splits=2) ,None) +- *(1) Project [(id#175L + 1) AS (s + 1)#179L] +- *(1) Range (0, 1, step=1, splits=2) {code} > "df.cache() df.count()" no longer eagerly caches data > ----------------------------------------------------- > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.0 > Reporter: Wenbo Zhao > Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println("xxxx"); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(<function1>,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "xxxx". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org