Wenbo Zhao created SPARK-24373: ---------------------------------- Summary: Spark Dataset groupby.agg/count doesn't respect cache Key: SPARK-24373 URL: https://issues.apache.org/jira/browse/SPARK-24373 Project: Spark Issue Type: Bug Components: Input/Output Affects Versions: 2.3.0 Reporter: Wenbo Zhao
Here is the code to reproduce in local mode {code:java} val df = sc.range(1, 2).toDF val myudf = udf({x: Long => println("xxxx"); x + 1}) scala> df1.cache() res0: df1.type = [value: bigint, value1: bigint] scala> df1.count res1: Long = 1 scala> df1.count res2: Long = 1 scala> df1.count res3: Long = 1 {code} in Spark 2.2, you could see it prints "xxxx". In the above example, when you do explain. You could see {code:java} scala> df1.explain(true) == Parsed Logical Plan == 'Project [value#2L, UDF('value) AS value1#5] +- AnalysisBarrier +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == value: bigint, value1: bigint Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] == Physical Plan == *(1) InMemoryTableScan [value#2L, value1#5L] +- InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] {code} but the ImMemoryTableScan is mising in the following explain() {code:java} scala> df1.groupBy().count().explain(true) == Parsed Logical Plan == Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == count: bigint Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == Aggregate [count(1) AS count#170L] +- Project +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Physical Plan == *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) +- Exchange SinglePartition +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#175L]) +- *(1) Project +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org