[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenbo Zhao updated SPARK-24373: ------------------------------- Description: Here is the code to reproduce in local mode {code:java} scala> val df = sc.range(1, 2).toDF df: org.apache.spark.sql.DataFrame = [value: bigint] scala> val myudf = udf({x: Long => println("xxxx"); x + 1}) myudf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(LongType))) scala> val df1 = df.withColumn("value1", myudf(col("value"))) df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] scala> df1.cache res0: df1.type = [value: bigint, value1: bigint] scala> df1.count res1: Long = 1 scala> df1.count res2: Long = 1 scala> df1.count res3: Long = 1 {code} in Spark 2.2, you could see it prints "xxxx". In the above example, when you do explain. You could see {code:java} scala> df1.explain(true) == Parsed Logical Plan == 'Project [value#2L, UDF('value) AS value1#5] +- AnalysisBarrier +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == value: bigint, value1: bigint Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] == Physical Plan == *(1) InMemoryTableScan [value#2L, value1#5L] +- InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] {code} but the ImMemoryTableScan is mising in the following explain() {code:java} scala> df1.groupBy().count().explain(true) == Parsed Logical Plan == Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == count: bigint Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == Aggregate [count(1) AS count#170L] +- Project +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Physical Plan == *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) +- Exchange SinglePartition +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#175L]) +- *(1) Project +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] {code} was: Here is the code to reproduce in local mode {code:java} scala> val df = sc.range(1, 2).toDF df: org.apache.spark.sql.DataFrame = [value: bigint] scala> val myudf = udf({x: Long => println("xxxx"); x + 1}) scala> val df1 = df.withColumn("value1", myudf(col("value"))) scala> df1.cache() res0: df1.type = [value: bigint, value1: bigint] scala> df1.count res1: Long = 1 scala> df1.count res2: Long = 1 scala> df1.count res3: Long = 1 {code} in Spark 2.2, you could see it prints "xxxx". In the above example, when you do explain. You could see {code:java} scala> df1.explain(true) == Parsed Logical Plan == 'Project [value#2L, UDF('value) AS value1#5] +- AnalysisBarrier +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == value: bigint, value1: bigint Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] == Physical Plan == *(1) InMemoryTableScan [value#2L, value1#5L] +- InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] {code} but the ImMemoryTableScan is mising in the following explain() {code:java} scala> df1.groupBy().count().explain(true) == Parsed Logical Plan == Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == count: bigint Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == Aggregate [count(1) AS count#170L] +- Project +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Physical Plan == *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) +- Exchange SinglePartition +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#175L]) +- *(1) Project +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] {code} > Spark Dataset groupby.agg/count doesn't respect cache with UDF > -------------------------------------------------------------- > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: Input/Output > Affects Versions: 2.3.0 > Reporter: Wenbo Zhao > Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println("xxxx"); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(<function1>,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "xxxx". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org