Repository: spark
Updated Branches:
  refs/heads/master 8e9863531 -> 9b262f6a0


[SPARK-22356][SQL] data source table should support overlapped columns between 
data and partition schema

## What changes were proposed in this pull request?

This is a regression introduced by #14207. After Spark 2.1, we store the 
inferred schema when creating the table, to avoid inferring schema again at 
read path. However, there is one special case: overlapped columns between data 
and partition. For this case, it breaks the assumption of table schema that 
there is on ovelap between data and partition schema, and partition columns 
should be at the end. The result is, for Spark 2.1, the table scan has 
incorrect schema that puts partition columns at the end. For Spark 2.2, we add 
a check in CatalogTable to validate table schema, which fails at this case.

To fix this issue, a simple and safe approach is to fallback to old behavior 
when overlapeed columns detected, i.e. store empty schema in metastore.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenc...@databricks.com>

Closes #19579 from cloud-fan/bug2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b262f6a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b262f6a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b262f6a

Branch: refs/heads/master
Commit: 9b262f6a08c0c1b474d920d49b9fdd574c401d39
Parents: 8e98635
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Oct 26 17:39:53 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Oct 26 17:39:53 2017 -0700

----------------------------------------------------------------------
 .../command/createDataSourceTables.scala        | 35 +++++++++++++-----
 .../datasources/HadoopFsRelation.scala          | 25 ++++++++-----
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 16 +++++++++
 .../hive/HiveExternalCatalogVersionsSuite.scala | 38 +++++++++++++++-----
 4 files changed, 89 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9b262f6a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 9e39079..306f43d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
 
 /**
  * A command used to create a data source table.
@@ -85,14 +86,32 @@ case class CreateDataSourceTableCommand(table: 
CatalogTable, ignoreIfExists: Boo
       }
     }
 
-    val newTable = table.copy(
-      schema = dataSource.schema,
-      partitionColumnNames = partitionColumnNames,
-      // If metastore partition management for file source tables is enabled, 
we start off with
-      // partition provider hive, but no partitions in the metastore. The user 
has to call
-      // `msck repair table` to populate the table partitions.
-      tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
-        sessionState.conf.manageFilesourcePartitions)
+    val newTable = dataSource match {
+      // Since Spark 2.1, we store the inferred schema of data source in 
metastore, to avoid
+      // inferring the schema again at read path. However if the data source 
has overlapped columns
+      // between data and partition schema, we can't store it in metastore as 
it breaks the
+      // assumption of table schema. Here we fallback to the behavior of Spark 
prior to 2.1, store
+      // empty schema in metastore and infer it at runtime. Note that this 
also means the new
+      // scalable partitioning handling feature(introduced at Spark 2.1) is 
disabled in this case.
+      case r: HadoopFsRelation if r.overlappedPartCols.nonEmpty =>
+        logWarning("It is not recommended to create a table with overlapped 
data and partition " +
+          "columns, as Spark cannot store a valid table schema and has to 
infer it at runtime, " +
+          "which hurts performance. Please check your data files and remove 
the partition " +
+          "columns in it.")
+        table.copy(schema = new StructType(), partitionColumnNames = Nil)
+
+      case _ =>
+        table.copy(
+          schema = dataSource.schema,
+          partitionColumnNames = partitionColumnNames,
+          // If metastore partition management for file source tables is 
enabled, we start off with
+          // partition provider hive, but no partitions in the metastore. The 
user has to call
+          // `msck repair table` to populate the table partitions.
+          tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
+            sessionState.conf.manageFilesourcePartitions)
+
+    }
+
     // We will return Nil or throw exception at the beginning if the table 
already exists, so when
     // we reach here, the table should not exist and we should set 
`ignoreIfExists` to false.
     sessionState.catalog.createTable(newTable, ignoreIfExists = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/9b262f6a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index 9a08524..89d8a85 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.util.Locale
+
 import scala.collection.mutable
 
 import org.apache.spark.sql.{SparkSession, SQLContext}
@@ -50,15 +52,22 @@ case class HadoopFsRelation(
 
   override def sqlContext: SQLContext = sparkSession.sqlContext
 
-  val schema: StructType = {
-    val getColName: (StructField => String) =
-      if (sparkSession.sessionState.conf.caseSensitiveAnalysis) _.name else 
_.name.toLowerCase
-    val overlappedPartCols = mutable.Map.empty[String, StructField]
-    partitionSchema.foreach { partitionField =>
-      if (dataSchema.exists(getColName(_) == getColName(partitionField))) {
-        overlappedPartCols += getColName(partitionField) -> partitionField
-      }
+  private def getColName(f: StructField): String = {
+    if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
+      f.name
+    } else {
+      f.name.toLowerCase(Locale.ROOT)
+    }
+  }
+
+  val overlappedPartCols = mutable.Map.empty[String, StructField]
+  partitionSchema.foreach { partitionField =>
+    if (dataSchema.exists(getColName(_) == getColName(partitionField))) {
+      overlappedPartCols += getColName(partitionField) -> partitionField
     }
+  }
+
+  val schema: StructType = {
     StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), 
f)) ++
       partitionSchema.filterNot(f => 
overlappedPartCols.contains(getColName(f))))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9b262f6a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index caf332d..5d0bba6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2741,4 +2741,20 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
     assert (aggregateExpressions.isDefined)
     assert (aggregateExpressions.get.size == 2)
   }
+
+  test("SPARK-22356: overlapped columns between data and partition schema in 
data source tables") {
+    withTempPath { path =>
+      Seq((1, 1, 1), (1, 2, 1)).toDF("i", "p", "j")
+        .write.mode("overwrite").parquet(new File(path, 
"p=1").getCanonicalPath)
+      withTable("t") {
+        sql(s"create table t using parquet 
options(path='${path.getCanonicalPath}')")
+        // We should respect the column order in data schema.
+        assert(spark.table("t").columns === Array("i", "p", "j"))
+        checkAnswer(spark.table("t"), Row(1, 1, 1) :: Row(1, 1, 1) :: Nil)
+        // The DESC TABLE should report same schema as table scan.
+        assert(sql("desc t").select("col_name")
+          .as[String].collect().mkString(",").contains("i,p,j"))
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9b262f6a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index 5f8c9d5..6859432 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -40,7 +40,7 @@ class HiveExternalCatalogVersionsSuite extends 
SparkSubmitTestUtils {
   private val tmpDataDir = Utils.createTempDir(namePrefix = "test-data")
   // For local test, you can set `sparkTestingDir` to a static value like 
`/tmp/test-spark`, to
   // avoid downloading Spark of different versions in each run.
-  private val sparkTestingDir = Utils.createTempDir(namePrefix = "test-spark")
+  private val sparkTestingDir = new File("/tmp/test-spark")
   private val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
 
   override def afterAll(): Unit = {
@@ -77,35 +77,38 @@ class HiveExternalCatalogVersionsSuite extends 
SparkSubmitTestUtils {
     super.beforeAll()
 
     val tempPyFile = File.createTempFile("test", ".py")
+    // scalastyle:off line.size.limit
     Files.write(tempPyFile.toPath,
       s"""
         |from pyspark.sql import SparkSession
+        |import os
         |
         |spark = SparkSession.builder.enableHiveSupport().getOrCreate()
         |version_index = spark.conf.get("spark.sql.test.version.index", None)
         |
         |spark.sql("create table data_source_tbl_{} using json as select 1 
i".format(version_index))
         |
-        |spark.sql("create table hive_compatible_data_source_tbl_" + 
version_index + \\
-        |          " using parquet as select 1 i")
+        |spark.sql("create table hive_compatible_data_source_tbl_{} using 
parquet as select 1 i".format(version_index))
         |
         |json_file = "${genDataDir("json_")}" + str(version_index)
         |spark.range(1, 2).selectExpr("cast(id as int) as 
i").write.json(json_file)
-        |spark.sql("create table external_data_source_tbl_" + version_index + 
\\
-        |          "(i int) using json options (path '{}')".format(json_file))
+        |spark.sql("create table external_data_source_tbl_{}(i int) using json 
options (path '{}')".format(version_index, json_file))
         |
         |parquet_file = "${genDataDir("parquet_")}" + str(version_index)
         |spark.range(1, 2).selectExpr("cast(id as int) as 
i").write.parquet(parquet_file)
-        |spark.sql("create table hive_compatible_external_data_source_tbl_" + 
version_index + \\
-        |          "(i int) using parquet options (path 
'{}')".format(parquet_file))
+        |spark.sql("create table hive_compatible_external_data_source_tbl_{}(i 
int) using parquet options (path '{}')".format(version_index, parquet_file))
         |
         |json_file2 = "${genDataDir("json2_")}" + str(version_index)
         |spark.range(1, 2).selectExpr("cast(id as int) as 
i").write.json(json_file2)
-        |spark.sql("create table external_table_without_schema_" + 
version_index + \\
-        |          " using json options (path '{}')".format(json_file2))
+        |spark.sql("create table external_table_without_schema_{} using json 
options (path '{}')".format(version_index, json_file2))
+        |
+        |parquet_file2 = "${genDataDir("parquet2_")}" + str(version_index)
+        |spark.range(1, 3).selectExpr("1 as i", "cast(id as int) as p", "1 as 
j").write.parquet(os.path.join(parquet_file2, "p=1"))
+        |spark.sql("create table tbl_with_col_overlap_{} using parquet 
options(path '{}')".format(version_index, parquet_file2))
         |
         |spark.sql("create view v_{} as select 1 i".format(version_index))
       """.stripMargin.getBytes("utf8"))
+    // scalastyle:on line.size.limit
 
     PROCESS_TABLES.testingVersions.zipWithIndex.foreach { case (version, 
index) =>
       val sparkHome = new File(sparkTestingDir, s"spark-$version")
@@ -153,6 +156,7 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils {
       .enableHiveSupport()
       .getOrCreate()
     spark = session
+    import session.implicits._
 
     testingVersions.indices.foreach { index =>
       Seq(
@@ -194,6 +198,22 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils {
 
       // test permanent view
       checkAnswer(sql(s"select i from v_$index"), Row(1))
+
+      // SPARK-22356: overlapped columns between data and partition schema in 
data source tables
+      val tbl_with_col_overlap = s"tbl_with_col_overlap_$index"
+      // For Spark 2.2.0 and 2.1.x, the behavior is different from Spark 2.0.
+      if (testingVersions(index).startsWith("2.1") || testingVersions(index) 
== "2.2.0") {
+        spark.sql("msck repair table " + tbl_with_col_overlap)
+        assert(spark.table(tbl_with_col_overlap).columns === Array("i", "j", 
"p"))
+        checkAnswer(spark.table(tbl_with_col_overlap), Row(1, 1, 1) :: Row(1, 
1, 1) :: Nil)
+        assert(sql("desc " + tbl_with_col_overlap).select("col_name")
+          .as[String].collect().mkString(",").contains("i,j,p"))
+      } else {
+        assert(spark.table(tbl_with_col_overlap).columns === Array("i", "p", 
"j"))
+        checkAnswer(spark.table(tbl_with_col_overlap), Row(1, 1, 1) :: Row(1, 
1, 1) :: Nil)
+        assert(sql("desc " + tbl_with_col_overlap).select("col_name")
+          .as[String].collect().mkString(",").contains("i,p,j"))
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to