Repository: spark
Updated Branches:
  refs/heads/master b804ca577 -> c220cc42a


[SPARK-25028][SQL] Avoid NPE when analyzing partition with NULL values

## What changes were proposed in this pull request?

`ANALYZE TABLE ... PARTITION(...) COMPUTE STATISTICS` can fail with a NPE if a 
partition column contains a NULL value.

The PR avoids the NPE, replacing the `NULL` values with the default partition 
placeholder.

## How was this patch tested?

added UT

Closes #22036 from mgaido91/SPARK-25028.

Authored-by: Marco Gaido <marcogaid...@gmail.com>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c220cc42
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c220cc42
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c220cc42

Branch: refs/heads/master
Commit: c220cc42abebbc98a6110b50f787eb6d338c2d97
Parents: b804ca5
Author: Marco Gaido <marcogaid...@gmail.com>
Authored: Tue Aug 14 00:59:18 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Aug 14 00:59:18 2018 +0800

----------------------------------------------------------------------
 .../command/AnalyzePartitionCommand.scala         | 10 ++++++++--
 .../spark/sql/StatisticsCollectionSuite.scala     | 18 ++++++++++++++++++
 2 files changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c220cc42/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
index 5b54b22..18fefa0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
 import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
@@ -140,7 +140,13 @@ case class AnalyzePartitionCommand(
     val df = tableDf.filter(Column(filter)).groupBy(partitionColumns: 
_*).count()
 
     df.collect().map { r =>
-      val partitionColumnValues = 
partitionColumns.indices.map(r.get(_).toString)
+      val partitionColumnValues = partitionColumns.indices.map { i =>
+        if (r.isNullAt(i)) {
+          ExternalCatalogUtils.DEFAULT_PARTITION_NAME
+        } else {
+          r.get(i).toString
+        }
+      }
       val spec = 
tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap
       val count = BigInt(r.getLong(partitionColumns.size))
       (spec, count)

http://git-wip-us.apache.org/repos/asf/spark/blob/c220cc42/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index 60fa951..cb562d6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -204,6 +204,24 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
     }
   }
 
+  test("SPARK-25028: column stats collection for null partitioning columns") {
+    val table = "analyze_partition_with_null"
+    withTempDir { dir =>
+      withTable(table) {
+        sql(s"""
+             |CREATE TABLE $table (value string, name string)
+             |USING PARQUET
+             |PARTITIONED BY (name)
+             |LOCATION '${dir.toURI}'""".stripMargin)
+        val df = Seq(("a", null), ("b", null)).toDF("value", "name")
+        df.write.mode("overwrite").insertInto(table)
+        sql(s"ANALYZE TABLE $table PARTITION (name) COMPUTE STATISTICS")
+        val partitions = 
spark.sessionState.catalog.listPartitions(TableIdentifier(table))
+        assert(partitions.head.stats.get.rowCount.get == 2)
+      }
+    }
+  }
+
   test("number format in statistics") {
     val numbers = Seq(
       BigInt(0) -> (("0.0 B", "0")),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to