Repository: spark
Updated Branches:
  refs/heads/master d65e531b4 -> 249007e37


[SPARK-19724][SQL] create a managed table with an existed default table should 
throw an exception

## What changes were proposed in this pull request?
This PR is to finish https://github.com/apache/spark/pull/17272

This JIRA is a follow up work after SPARK-19583

As we discussed in that PR

The following DDL for a managed table with an existed default location should 
throw an exception:

CREATE TABLE ... (PARTITIONED BY ...) AS SELECT ...
CREATE TABLE ... (PARTITIONED BY ...)
Currently there are some situations which are not consist with above logic:

CREATE TABLE ... (PARTITIONED BY ...) succeed with an existed default location
situation: for both hive/datasource(with HiveExternalCatalog/InMemoryCatalog)

CREATE TABLE ... (PARTITIONED BY ...) AS SELECT ...
situation: hive table succeed with an existed default location

This PR is going to make above two situations consist with the logic that it 
should throw an exception
with an existed default location.
## How was this patch tested?

unit test added

Author: Gengliang Wang <gengliang.w...@databricks.com>

Closes #20886 from gengliangwang/pr-17272.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/249007e3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/249007e3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/249007e3

Branch: refs/heads/master
Commit: 249007e37f51f00d14e596692aeac87fbc10b520
Parents: d65e531
Author: Gengliang Wang <gengliang.w...@databricks.com>
Authored: Thu Apr 5 20:19:25 2018 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Apr 5 20:19:25 2018 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  1 +
 .../sql/catalyst/catalog/SessionCatalog.scala   | 23 ++++++-
 .../org/apache/spark/sql/internal/SQLConf.scala | 11 ++++
 .../command/createDataSourceTables.scala        |  5 +-
 .../spark/sql/StatisticsCollectionSuite.scala   |  7 ++
 .../spark/sql/execution/command/DDLSuite.scala  | 67 ++++++++++++++++++++
 6 files changed, 110 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/249007e3/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 2b393f3..9822d66 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1809,6 +1809,7 @@ working with timestamps in `pandas_udf`s to get the best 
performance, see
  - Since Spark 2.4, expression IDs in UDF arguments do not appear in column 
names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` 
but ``UDF:f(col0 AS `colA`)``.
  - Since Spark 2.4, writing a dataframe with an empty or nested empty schema 
using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An 
exception is thrown when attempting to write dataframes with empty schema. 
  - Since Spark 2.4, Spark compares a DATE type with a TIMESTAMP type after 
promotes both sides to TIMESTAMP. To set `false` to 
`spark.sql.hive.compareDateTimestampInTimestamp` restores the previous 
behavior. This option will be removed in Spark 3.0.
+ - Since Spark 2.4, creating a managed table with nonempty location is not 
allowed. An exception is thrown when attempting to create a managed table with 
nonempty location. To set `true` to 
`spark.sql.allowCreatingManagedTableUsingNonemptyLocation` restores the 
previous behavior. This option will be removed in Spark 3.0.
 
 ## Upgrading From Spark SQL 2.2 to 2.3
 

http://git-wip-us.apache.org/repos/asf/spark/blob/249007e3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 64e7ca1..52ed89e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -289,6 +289,7 @@ class SessionCatalog(
   def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): 
Unit = {
     val db = 
formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(tableDefinition.identifier.table)
+    val tableIdentifier = TableIdentifier(table, Some(db))
     validateName(table)
 
     val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
@@ -298,15 +299,33 @@ class SessionCatalog(
         makeQualifiedPath(tableDefinition.storage.locationUri.get)
       tableDefinition.copy(
         storage = tableDefinition.storage.copy(locationUri = 
Some(qualifiedTableLocation)),
-        identifier = TableIdentifier(table, Some(db)))
+        identifier = tableIdentifier)
     } else {
-      tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
+      tableDefinition.copy(identifier = tableIdentifier)
     }
 
     requireDbExists(db)
+    if (!ignoreIfExists) {
+      validateTableLocation(newTableDefinition)
+    }
     externalCatalog.createTable(newTableDefinition, ignoreIfExists)
   }
 
+  def validateTableLocation(table: CatalogTable): Unit = {
+    // SPARK-19724: the default location of a managed table should be 
non-existent or empty.
+    if (table.tableType == CatalogTableType.MANAGED &&
+      !conf.allowCreatingManagedTableUsingNonemptyLocation) {
+      val tableLocation =
+        new 
Path(table.storage.locationUri.getOrElse(defaultTablePath(table.identifier)))
+      val fs = tableLocation.getFileSystem(hadoopConf)
+
+      if (fs.exists(tableLocation) && fs.listStatus(tableLocation).nonEmpty) {
+        throw new AnalysisException(s"Can not create the managed 
table('${table.identifier}')" +
+          s". The associated location('${tableLocation.toString}') already 
exists.")
+      }
+    }
+  }
+
   /**
    * Alter the metadata of an existing metastore table identified by 
`tableDefinition`.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/249007e3/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 13f31a6..1c8ab9c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1159,6 +1159,14 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION =
+    buildConf("spark.sql.allowCreatingManagedTableUsingNonemptyLocation")
+    .internal()
+    .doc("When this option is set to true, creating managed tables with 
nonempty location " +
+      "is allowed. Otherwise, an analysis exception is thrown. ")
+    .booleanConf
+    .createWithDefault(false)
+
   val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
     buildConf("spark.sql.streaming.continuous.executorQueueSize")
     .internal()
@@ -1581,6 +1589,9 @@ class SQLConf extends Serializable with Logging {
 
   def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING)
 
+  def allowCreatingManagedTableUsingNonemptyLocation: Boolean =
+    getConf(ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION)
+
   def partitionOverwriteMode: PartitionOverwriteMode.Value =
     PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/249007e3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index e974776..f7c3e9b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -167,7 +167,7 @@ case class CreateDataSourceTableAsSelectCommand(
         sparkSession, table, table.storage.locationUri, child, 
SaveMode.Append, tableExists = true)
     } else {
       assert(table.schema.isEmpty)
-
+      sparkSession.sessionState.catalog.validateTableLocation(table)
       val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
         Some(sessionState.catalog.defaultTablePath(table.identifier))
       } else {
@@ -181,7 +181,8 @@ case class CreateDataSourceTableAsSelectCommand(
         // the schema of df). It is important since the nullability may be 
changed by the relation
         // provider (for example, see 
org.apache.spark.sql.parquet.DefaultSource).
         schema = result.schema)
-      sessionState.catalog.createTable(newTable, ignoreIfExists = false)
+      // Table location is already validated. No need to check it again during 
table creation.
+      sessionState.catalog.createTable(newTable, ignoreIfExists = true)
 
       result match {
         case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&

http://git-wip-us.apache.org/repos/asf/spark/blob/249007e3/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index ed4ea02..14a5658 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import java.io.File
+
 import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -26,6 +28,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.test.SQLTestData.ArrayData
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 
 /**
@@ -242,6 +245,7 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
 
   test("change stats after set location command") {
     val table = "change_stats_set_location_table"
+    val tableLoc = new 
File(spark.sessionState.catalog.defaultTablePath(TableIdentifier(table)))
     Seq(false, true).foreach { autoUpdate =>
       withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) 
{
         withTable(table) {
@@ -269,6 +273,9 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
               assert(fetched3.get.sizeInBytes == fetched1.get.sizeInBytes)
             } else {
               checkTableStats(table, hasSizeInBytes = false, expectedRowCounts 
= None)
+              // SPARK-19724: clean up the previous table location.
+              waitForTasksToFinish()
+              Utils.deleteRecursively(tableLoc)
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/249007e3/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 4df8fbf..4304d0b6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -180,6 +180,13 @@ abstract class DDLSuite extends QueryTest with 
SQLTestUtils {
 
   private val escapedIdentifier = "`(.+)`".r
 
+  private def dataSource: String = {
+    if (isUsingHiveMetastore) {
+      "HIVE"
+    } else {
+      "PARQUET"
+    }
+  }
   protected def normalizeCatalogTable(table: CatalogTable): CatalogTable = 
table
 
   private def normalizeSerdeProp(props: Map[String, String]): Map[String, 
String] = {
@@ -365,6 +372,66 @@ abstract class DDLSuite extends QueryTest with 
SQLTestUtils {
     }
   }
 
+  test("CTAS a managed table with the existing empty directory") {
+    val tableLoc = new 
File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
+    try {
+      tableLoc.mkdir()
+      withTable("tab1") {
+        sql(s"CREATE TABLE tab1 USING ${dataSource} AS SELECT 1, 'a'")
+        checkAnswer(spark.table("tab1"), Row(1, "a"))
+      }
+    } finally {
+      waitForTasksToFinish()
+      Utils.deleteRecursively(tableLoc)
+    }
+  }
+
+  test("create a managed table with the existing empty directory") {
+    val tableLoc = new 
File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
+    try {
+      tableLoc.mkdir()
+      withTable("tab1") {
+        sql(s"CREATE TABLE tab1 (col1 int, col2 string) USING ${dataSource}")
+        sql("INSERT INTO tab1 VALUES (1, 'a')")
+        checkAnswer(spark.table("tab1"), Row(1, "a"))
+      }
+    } finally {
+      waitForTasksToFinish()
+      Utils.deleteRecursively(tableLoc)
+    }
+  }
+
+  test("create a managed table with the existing non-empty directory") {
+    withTable("tab1") {
+      val tableLoc = new 
File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
+      try {
+        // create an empty hidden file
+        tableLoc.mkdir()
+        val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage")
+        hiddenGarbageFile.createNewFile()
+        val exMsg = "Can not create the managed table('`tab1`'). The 
associated location"
+        val exMsgWithDefaultDB =
+          "Can not create the managed table('`default`.`tab1`'). The 
associated location"
+        var ex = intercept[AnalysisException] {
+          sql(s"CREATE TABLE tab1 USING ${dataSource} AS SELECT 1, 'a'")
+        }.getMessage
+        if (isUsingHiveMetastore) {
+          assert(ex.contains(exMsgWithDefaultDB))
+        } else {
+          assert(ex.contains(exMsg))
+        }
+
+        ex = intercept[AnalysisException] {
+          sql(s"CREATE TABLE tab1 (col1 int, col2 string) USING ${dataSource}")
+        }.getMessage
+        assert(ex.contains(exMsgWithDefaultDB))
+      } finally {
+        waitForTasksToFinish()
+        Utils.deleteRecursively(tableLoc)
+      }
+    }
+  }
+
   private def checkSchemaInCreatedDataSourceTable(
       path: File,
       userSpecifiedSchema: Option[String],


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to