Repository: spark Updated Branches: refs/heads/master 6ab973ec5 -> 3fca635b4
[SPARK-15583][SQL] Disallow altering datasource properties ## What changes were proposed in this pull request? Certain table properties (and SerDe properties) are in the protected namespace `spark.sql.sources.`, which we use internally for datasource tables. The user should not be allowed to (1) Create a Hive table setting these properties (2) Alter these properties in an existing table Previously, we threw an exception if the user tried to alter the properties of an existing datasource table. However, this is overly restrictive for datasource tables and does not do anything for Hive tables. ## How was this patch tested? DDLSuite Author: Andrew Or <and...@databricks.com> Closes #13341 from andrewor14/alter-table-props. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fca635b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fca635b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fca635b Branch: refs/heads/master Commit: 3fca635b4ed322208debcd89a539e42cdde6bbd4 Parents: 6ab973e Author: Andrew Or <and...@databricks.com> Authored: Thu May 26 20:11:09 2016 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Thu May 26 20:11:09 2016 -0700 ---------------------------------------------------------------------- .../command/createDataSourceTables.scala | 17 +++ .../spark/sql/execution/command/ddl.scala | 37 +++-- .../spark/sql/execution/command/tables.scala | 2 + .../spark/sql/execution/command/DDLSuite.scala | 148 ++++++++++++------- .../spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- 5 files changed, 139 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3fca635b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 6ca66a2..deedb68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -255,6 +255,23 @@ case class CreateDataSourceTableAsSelectCommand( object CreateDataSourceTableUtils extends Logging { + + // TODO: Actually replace usages with these variables (SPARK-15584) + + val DATASOURCE_PREFIX = "spark.sql.sources." + val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider" + val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID" + val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path" + val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_PREFIX + "schema." + val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" + val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" + val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets" + val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols" + val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part." + val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol." + val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." + val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." + /** * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"), * i.e. if this name only contains characters, numbers, and _. http://git-wip-us.apache.org/repos/asf/spark/blob/3fca635b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 7ce7bb9..15eba3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils.DATASOURCE_PREFIX import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.types._ @@ -228,15 +229,13 @@ case class AlterTableSetPropertiesCommand( extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { + val ident = if (isView) "VIEW" else "TABLE" val catalog = sparkSession.sessionState.catalog DDLUtils.verifyAlterTableType(catalog, tableName, isView) + DDLUtils.verifyTableProperties(properties.keys.toSeq, s"ALTER $ident") val table = catalog.getTableMetadata(tableName) - val newProperties = table.properties ++ properties - if (DDLUtils.isDatasourceTable(newProperties)) { - throw new AnalysisException("ALTER TABLE SET TBLPROPERTIES is not supported for " + - "tables defined using the datasource API") - } - val newTable = table.copy(properties = newProperties) + // This overrides old properties + val newTable = table.copy(properties = table.properties ++ properties) catalog.alterTable(newTable) Seq.empty[Row] } @@ -260,18 +259,16 @@ case class AlterTableUnsetPropertiesCommand( extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { + val ident = if (isView) "VIEW" else "TABLE" val catalog = sparkSession.sessionState.catalog DDLUtils.verifyAlterTableType(catalog, tableName, isView) + DDLUtils.verifyTableProperties(propKeys, s"ALTER $ident") val table = catalog.getTableMetadata(tableName) - if (DDLUtils.isDatasourceTable(table)) { - throw new AnalysisException( - "alter table properties is not supported for datasource tables") - } if (!ifExists) { propKeys.foreach { k => if (!table.properties.contains(k)) { throw new AnalysisException( - s"attempted to unset non-existent property '$k' in table '$tableName'") + s"Attempted to unset non-existent property '$k' in table '$tableName'") } } } @@ -304,11 +301,15 @@ case class AlterTableSerDePropertiesCommand( "ALTER TABLE attempted to set neither serde class name nor serde properties") override def run(sparkSession: SparkSession): Seq[Row] = { + DDLUtils.verifyTableProperties( + serdeProperties.toSeq.flatMap(_.keys.toSeq), + "ALTER TABLE SERDEPROPERTIES") val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) // Do not support setting serde for datasource tables if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) { - throw new AnalysisException("ALTER TABLE SET SERDE is not supported for datasource tables") + throw new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " + + "not supported for tables created with the datasource API") } val newTable = table.withNewStorage( serde = serdeClassName.orElse(table.storage.serde), @@ -489,6 +490,18 @@ object DDLUtils { }) } + /** + * If the given table properties (or SerDe properties) contains datasource properties, + * throw an exception. + */ + def verifyTableProperties(propKeys: Seq[String], operation: String): Unit = { + val datasourceKeys = propKeys.filter(_.startsWith(DATASOURCE_PREFIX)) + if (datasourceKeys.nonEmpty) { + throw new AnalysisException(s"Operation not allowed: $operation property keys may not " + + s"start with '$DATASOURCE_PREFIX': ${datasourceKeys.mkString("[", ", ", "]")}") + } + } + def isTablePartitioned(table: CatalogTable): Boolean = { table.partitionColumns.nonEmpty || table.properties.contains("spark.sql.sources.schema.numPartCols") http://git-wip-us.apache.org/repos/asf/spark/blob/3fca635b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index e34beec..d102409 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -118,6 +118,8 @@ case class CreateTableLikeCommand( case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { + DDLUtils.verifyTableProperties(table.properties.keys.toSeq, "CREATE TABLE") + DDLUtils.verifyTableProperties(table.storage.serdeProperties.keys.toSeq, "CREATE TABLE") sparkSession.sessionState.catalog.createTable(table, ifNotExists) Seq.empty[Row] } http://git-wip-us.apache.org/repos/asf/spark/blob/3fca635b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 6c038c7..ff56749 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils.DATASOURCE_PREFIX import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -489,63 +490,19 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } test("alter table: set properties") { - val catalog = spark.sessionState.catalog - val tableIdent = TableIdentifier("tab1", Some("dbx")) - createDatabase(catalog, "dbx") - createTable(catalog, tableIdent) - assert(catalog.getTableMetadata(tableIdent).properties.isEmpty) - // set table properties - sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')") - assert(catalog.getTableMetadata(tableIdent).properties == - Map("andrew" -> "or14", "kor" -> "bel")) - // set table properties without explicitly specifying database - catalog.setCurrentDatabase("dbx") - sql("ALTER TABLE tab1 SET TBLPROPERTIES ('kor' = 'belle', 'kar' = 'bol')") - assert(catalog.getTableMetadata(tableIdent).properties == - Map("andrew" -> "or14", "kor" -> "belle", "kar" -> "bol")) - // table to alter does not exist - intercept[AnalysisException] { - sql("ALTER TABLE does_not_exist SET TBLPROPERTIES ('winner' = 'loser')") - } - // throw exception for datasource tables - convertToDatasourceTable(catalog, tableIdent) - val e = intercept[AnalysisException] { - sql("ALTER TABLE tab1 SET TBLPROPERTIES ('sora' = 'bol')") - } - assert(e.getMessage.contains("datasource")) + testSetProperties(isDatasourceTable = false) + } + + test("alter table: set properties (datasource table)") { + testSetProperties(isDatasourceTable = true) } test("alter table: unset properties") { - val catalog = spark.sessionState.catalog - val tableIdent = TableIdentifier("tab1", Some("dbx")) - createDatabase(catalog, "dbx") - createTable(catalog, tableIdent) - // unset table properties - sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan')") - sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')") - assert(catalog.getTableMetadata(tableIdent).properties == Map("p" -> "an", "c" -> "lan")) - // unset table properties without explicitly specifying database - catalog.setCurrentDatabase("dbx") - sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('p')") - assert(catalog.getTableMetadata(tableIdent).properties == Map("c" -> "lan")) - // table to alter does not exist - intercept[AnalysisException] { - sql("ALTER TABLE does_not_exist UNSET TBLPROPERTIES ('c' = 'lan')") - } - // property to unset does not exist - val e = intercept[AnalysisException] { - sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('c', 'xyz')") - } - assert(e.getMessage.contains("xyz")) - // property to unset does not exist, but "IF EXISTS" is specified - sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')") - assert(catalog.getTableMetadata(tableIdent).properties.isEmpty) - // throw exception for datasource tables - convertToDatasourceTable(catalog, tableIdent) - val e1 = intercept[AnalysisException] { - sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('sora')") - } - assert(e1.getMessage.contains("datasource")) + testUnsetProperties(isDatasourceTable = false) + } + + test("alter table: unset properties (datasource table)") { + testUnsetProperties(isDatasourceTable = true) } test("alter table: set serde") { @@ -768,6 +725,78 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { properties = Map("spark.sql.sources.provider" -> "csv"))) } + private def testSetProperties(isDatasourceTable: Boolean): Unit = { + val catalog = spark.sessionState.catalog + val tableIdent = TableIdentifier("tab1", Some("dbx")) + createDatabase(catalog, "dbx") + createTable(catalog, tableIdent) + if (isDatasourceTable) { + convertToDatasourceTable(catalog, tableIdent) + } + def getProps: Map[String, String] = { + catalog.getTableMetadata(tableIdent).properties.filterKeys { k => + !isDatasourceTable || !k.startsWith(DATASOURCE_PREFIX) + } + } + assert(getProps.isEmpty) + // set table properties + sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')") + assert(getProps == Map("andrew" -> "or14", "kor" -> "bel")) + // set table properties without explicitly specifying database + catalog.setCurrentDatabase("dbx") + sql("ALTER TABLE tab1 SET TBLPROPERTIES ('kor' = 'belle', 'kar' = 'bol')") + assert(getProps == Map("andrew" -> "or14", "kor" -> "belle", "kar" -> "bol")) + // table to alter does not exist + intercept[AnalysisException] { + sql("ALTER TABLE does_not_exist SET TBLPROPERTIES ('winner' = 'loser')") + } + // datasource table property keys are not allowed + val e = intercept[AnalysisException] { + sql(s"ALTER TABLE tab1 SET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo' = 'loser')") + } + assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo")) + } + + private def testUnsetProperties(isDatasourceTable: Boolean): Unit = { + val catalog = spark.sessionState.catalog + val tableIdent = TableIdentifier("tab1", Some("dbx")) + createDatabase(catalog, "dbx") + createTable(catalog, tableIdent) + if (isDatasourceTable) { + convertToDatasourceTable(catalog, tableIdent) + } + def getProps: Map[String, String] = { + catalog.getTableMetadata(tableIdent).properties.filterKeys { k => + !isDatasourceTable || !k.startsWith(DATASOURCE_PREFIX) + } + } + // unset table properties + sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan', 'x' = 'y')") + sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')") + assert(getProps == Map("p" -> "an", "c" -> "lan", "x" -> "y")) + // unset table properties without explicitly specifying database + catalog.setCurrentDatabase("dbx") + sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('p')") + assert(getProps == Map("c" -> "lan", "x" -> "y")) + // table to alter does not exist + intercept[AnalysisException] { + sql("ALTER TABLE does_not_exist UNSET TBLPROPERTIES ('c' = 'lan')") + } + // property to unset does not exist + val e = intercept[AnalysisException] { + sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('c', 'xyz')") + } + assert(e.getMessage.contains("xyz")) + // property to unset does not exist, but "IF EXISTS" is specified + sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')") + assert(getProps == Map("x" -> "y")) + // datasource table property keys are not allowed + val e2 = intercept[AnalysisException] { + sql(s"ALTER TABLE tab1 UNSET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo')") + } + assert(e2.getMessage.contains(DATASOURCE_PREFIX + "foo")) + } + private def testSetLocation(isDatasourceTable: Boolean): Unit = { val catalog = spark.sessionState.catalog val tableIdent = TableIdentifier("tab1", Some("dbx")) @@ -870,6 +899,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { intercept[AnalysisException] { sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')") } + // serde properties must not be a datasource property + val e = intercept[AnalysisException] { + sql(s"ALTER TABLE tab1 SET SERDEPROPERTIES ('${DATASOURCE_PREFIX}foo'='wah')") + } + assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo")) } private def testAddPartitions(isDatasourceTable: Boolean): Unit = { @@ -1091,6 +1125,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("create table with datasource properties (not allowed)") { + assertUnsupported("CREATE TABLE my_tab TBLPROPERTIES ('spark.sql.sources.me'='anything')") + assertUnsupported("CREATE TABLE my_tab ROW FORMAT SERDE 'serde' " + + "WITH SERDEPROPERTIES ('spark.sql.sources.me'='anything')") + } + test("drop default database") { Seq("true", "false").foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) { http://git-wip-us.apache.org/repos/asf/spark/blob/3fca635b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index e2cef38..80e6f4e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -287,7 +287,7 @@ class HiveDDLSuite sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')") }.getMessage assert(message.contains( - "attempted to unset non-existent property 'p' in table '`view1`'")) + "Attempted to unset non-existent property 'p' in table '`view1`'")) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org