Repository: spark Updated Branches: refs/heads/master 97ba49183 -> 13785daa8
[SPARK-21599][SQL] Collecting column statistics for datasource tables may fail with java.util.NoSuchElementException ## What changes were proposed in this pull request? In case of datasource tables (when they are stored in non-hive compatible way) , the schema information is recorded as table properties in hive meta-store. The alterTableStats method needs to get the schema information from table properties for data source tables before recording the column level statistics. Currently, we don't get the correct schema information and fail with java.util.NoSuchElement exception. ## How was this patch tested? A new test case is added in StatisticsSuite. Author: Dilip Biswal <dbis...@us.ibm.com> Closes #18804 from dilipbiswal/datasource_stats. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13785daa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13785daa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13785daa Branch: refs/heads/master Commit: 13785daa8df22cbe3541024d9118600a1e023afe Parents: 97ba491 Author: Dilip Biswal <dbis...@us.ibm.com> Authored: Thu Aug 3 09:25:48 2017 -0700 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Thu Aug 3 09:25:48 2017 -0700 ---------------------------------------------------------------------- .../spark/sql/hive/HiveExternalCatalog.scala | 7 +- .../apache/spark/sql/hive/StatisticsSuite.scala | 76 +++++++++++++++++++- 2 files changed, 81 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/13785daa/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 70d7dd2..172317c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -642,8 +642,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat if (stats.get.rowCount.isDefined) { statsProperties += STATISTICS_NUM_ROWS -> stats.get.rowCount.get.toString() } + + // For datasource tables and hive serde tables created by spark 2.1 or higher, + // the data schema is stored in the table properties. + val schema = restoreTableMetadata(rawTable).schema + val colNameTypeMap: Map[String, DataType] = - rawTable.schema.fields.map(f => (f.name, f.dataType)).toMap + schema.fields.map(f => (f.name, f.dataType)).toMap stats.get.colStats.foreach { case (colName, colStat) => colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) => statsProperties += (columnStatKeyPropName(colName, k) -> v) http://git-wip-us.apache.org/repos/asf/spark/blob/13785daa/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 84bcea3..36566bf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -19,10 +19,11 @@ package org.apache.spark.sql.hive import java.io.{File, PrintWriter} -import org.apache.hadoop.hive.common.StatsSetupConst import scala.reflect.ClassTag import scala.util.matching.Regex +import org.apache.hadoop.hive.common.StatsSetupConst + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics} @@ -34,9 +35,16 @@ import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton { + private def dropMetadata(schema: StructType): StructType = { + val newFields = schema.fields.map { f => + StructField(f.name, f.dataType, f.nullable, Metadata.empty) + } + StructType(newFields) + } test("Hive serde tables should fallback to HDFS for size estimation") { withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { @@ -117,6 +125,72 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("analyze non hive compatible datasource tables") { + val table = "parquet_tab" + withTable(table) { + sql( + s""" + |CREATE TABLE $table (a int, b int) + |USING parquet + |OPTIONS (skipHiveMetadata true) + """.stripMargin) + + // Verify that the schema stored in catalog is a dummy one used for + // data source tables. The actual schema is stored in table properties. + val rawSchema = dropMetadata(hiveClient.getTable("default", table).schema) + val expectedRawSchema = new StructType() + .add("col", "array<string>") + assert(rawSchema == expectedRawSchema) + + val actualSchema = spark.sharedState.externalCatalog.getTable("default", table).schema + val expectedActualSchema = new StructType() + .add("a", "int") + .add("b", "int") + assert(actualSchema == expectedActualSchema) + + sql(s"INSERT INTO $table VALUES (1, 1)") + sql(s"INSERT INTO $table VALUES (2, 1)") + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS a, b") + val fetchedStats0 = + checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2)) + assert(fetchedStats0.get.colStats == Map( + "a" -> ColumnStat(2, Some(1), Some(2), 0, 4, 4), + "b" -> ColumnStat(1, Some(1), Some(1), 0, 4, 4))) + } + } + + test("Analyze hive serde tables when schema is not same as schema in table properties") { + + val table = "hive_serde" + withTable(table) { + sql(s"CREATE TABLE $table (C1 INT, C2 STRING, C3 DOUBLE)") + + // Verify that the table schema stored in hive catalog is + // different than the schema stored in table properties. + val rawSchema = dropMetadata(hiveClient.getTable("default", table).schema) + val expectedRawSchema = new StructType() + .add("c1", "int") + .add("c2", "string") + .add("c3", "double") + assert(rawSchema == expectedRawSchema) + + val actualSchema = spark.sharedState.externalCatalog.getTable("default", table).schema + val expectedActualSchema = new StructType() + .add("C1", "int") + .add("C2", "string") + .add("C3", "double") + assert(actualSchema == expectedActualSchema) + + sql(s"INSERT INTO TABLE $table SELECT 1, 'a', 10.0") + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS C1") + val fetchedStats1 = + checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get + assert(fetchedStats1.colStats == Map( + "C1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount = 0, + avgLen = 4, maxLen = 4))) + } + } + test("SPARK-21079 - analyze table with location different than that of individual partitions") { val tableName = "analyzeTable_part" withTable(tableName) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org