Repository: spark
Updated Branches:
  refs/heads/master 97ba49183 -> 13785daa8


[SPARK-21599][SQL] Collecting column statistics for datasource tables may fail 
with java.util.NoSuchElementException

## What changes were proposed in this pull request?
In case of datasource tables (when they are stored in non-hive compatible way) 
, the schema information is recorded as table properties in hive meta-store. 
The alterTableStats method needs to get the schema information from table 
properties for data source tables before recording the column level statistics. 
Currently, we don't get the correct schema information and fail with 
java.util.NoSuchElement exception.

## How was this patch tested?
A new test case is added in StatisticsSuite.

Author: Dilip Biswal <dbis...@us.ibm.com>

Closes #18804 from dilipbiswal/datasource_stats.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13785daa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13785daa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13785daa

Branch: refs/heads/master
Commit: 13785daa8df22cbe3541024d9118600a1e023afe
Parents: 97ba491
Author: Dilip Biswal <dbis...@us.ibm.com>
Authored: Thu Aug 3 09:25:48 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Aug 3 09:25:48 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/HiveExternalCatalog.scala    |  7 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala | 76 +++++++++++++++++++-
 2 files changed, 81 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/13785daa/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 70d7dd2..172317c 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -642,8 +642,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       if (stats.get.rowCount.isDefined) {
         statsProperties += STATISTICS_NUM_ROWS -> 
stats.get.rowCount.get.toString()
       }
+
+      // For datasource tables and hive serde tables created by spark 2.1 or 
higher,
+      // the data schema is stored in the table properties.
+      val schema = restoreTableMetadata(rawTable).schema
+
       val colNameTypeMap: Map[String, DataType] =
-        rawTable.schema.fields.map(f => (f.name, f.dataType)).toMap
+        schema.fields.map(f => (f.name, f.dataType)).toMap
       stats.get.colStats.foreach { case (colName, colStat) =>
         colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) 
=>
           statsProperties += (columnStatKeyPropName(colName, k) -> v)

http://git-wip-us.apache.org/repos/asf/spark/blob/13785daa/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 84bcea3..36566bf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -19,10 +19,11 @@ package org.apache.spark.sql.hive
 
 import java.io.{File, PrintWriter}
 
-import org.apache.hadoop.hive.common.StatsSetupConst
 import scala.reflect.ClassTag
 import scala.util.matching.Regex
 
+import org.apache.hadoop.hive.common.StatsSetupConst
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogStatistics}
@@ -34,9 +35,16 @@ import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 
 
 class StatisticsSuite extends StatisticsCollectionTestBase with 
TestHiveSingleton {
+  private def dropMetadata(schema: StructType): StructType = {
+    val newFields = schema.fields.map { f =>
+      StructField(f.name, f.dataType, f.nullable, Metadata.empty)
+    }
+    StructType(newFields)
+  }
 
   test("Hive serde tables should fallback to HDFS for size estimation") {
     withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
@@ -117,6 +125,72 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
     }
   }
 
+  test("analyze non hive compatible datasource tables") {
+    val table = "parquet_tab"
+    withTable(table) {
+      sql(
+        s"""
+          |CREATE TABLE $table (a int, b int)
+          |USING parquet
+          |OPTIONS (skipHiveMetadata true)
+        """.stripMargin)
+
+      // Verify that the schema stored in catalog is a dummy one used for
+      // data source tables. The actual schema is stored in table properties.
+      val rawSchema = dropMetadata(hiveClient.getTable("default", 
table).schema)
+      val expectedRawSchema = new StructType()
+        .add("col", "array<string>")
+      assert(rawSchema == expectedRawSchema)
+
+      val actualSchema = spark.sharedState.externalCatalog.getTable("default", 
table).schema
+      val expectedActualSchema = new StructType()
+        .add("a", "int")
+        .add("b", "int")
+      assert(actualSchema == expectedActualSchema)
+
+      sql(s"INSERT INTO $table VALUES (1, 1)")
+      sql(s"INSERT INTO $table VALUES (2, 1)")
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS a, b")
+      val fetchedStats0 =
+        checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = 
Some(2))
+      assert(fetchedStats0.get.colStats == Map(
+        "a" -> ColumnStat(2, Some(1), Some(2), 0, 4, 4),
+        "b" -> ColumnStat(1, Some(1), Some(1), 0, 4, 4)))
+    }
+  }
+
+  test("Analyze hive serde tables when schema is not same as schema in table 
properties") {
+
+    val table = "hive_serde"
+    withTable(table) {
+      sql(s"CREATE TABLE $table (C1 INT, C2 STRING, C3 DOUBLE)")
+
+      // Verify that the table schema stored in hive catalog is
+      // different than the schema stored in table properties.
+      val rawSchema = dropMetadata(hiveClient.getTable("default", 
table).schema)
+      val expectedRawSchema = new StructType()
+        .add("c1", "int")
+        .add("c2", "string")
+        .add("c3", "double")
+      assert(rawSchema == expectedRawSchema)
+
+      val actualSchema = spark.sharedState.externalCatalog.getTable("default", 
table).schema
+      val expectedActualSchema = new StructType()
+        .add("C1", "int")
+        .add("C2", "string")
+        .add("C3", "double")
+      assert(actualSchema == expectedActualSchema)
+
+      sql(s"INSERT INTO TABLE $table SELECT 1, 'a', 10.0")
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS C1")
+      val fetchedStats1 =
+        checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = 
Some(1)).get
+      assert(fetchedStats1.colStats == Map(
+        "C1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), 
nullCount = 0,
+          avgLen = 4, maxLen = 4)))
+    }
+  }
+
   test("SPARK-21079 - analyze table with location different than that of 
individual partitions") {
     val tableName = "analyzeTable_part"
     withTable(tableName) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to