[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75633327 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +375,77 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this doesn't support altering table schema, partition column names and bucket + * specification. We will ignore them even if users do specify different values for these fields. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) + + client.alterTable(newDef) +} } override def getTable(db: String, table: String): CatalogTable = withClient { -client.getTable(db, table) +restoreTableMetadata(client.getTable(db, table)) } override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient { -client.getTableOption(db, table) +client.getTableOption(db, table).map(restoreTableMetadata) + } + + /** + * Restores table metadata from the table properties if it's a datasouce table. This method is + * kind of a opposite version of [[createTable]]. + * + * It reads table schema, provider, partition column names and bucket specification from table + * properties, and filter out these special entries from table properties. + */ + private def restoreTableMetadata(table: CatalogTable): CatalogTable = { +if (table.tableType == VIEW) { + table +} else { + getProviderFromTableProperties(table).map { provider => +// SPARK-15269: Persisted data source tables always store the location URI as a storage +// property named "path" instead of standard Hive `dataLocation`, because Hive only +// allows directory paths as location URIs while Spark SQL data source tables also +// allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL +// data source tables. +// Spark SQL may also save external data source in Hive compatible format when +// possible, so that these tables can be directly accessed by Hive. For these tables, +// `dataLocation` is still necessary. Here we also check for input format because only +// these Hive compatible tables set this field. +val storage = if (table.tableType == EXTERNAL && table.storage.inputFormat.isEmpty) { + table.storage.copy(locationUri = None) +} else { + table.storage +} +table.copy( + storage = storage, + schema = getSchemaFromTableProperties(table), + provider = Some(provider), + partitionColumnNames = getPartitionColumnsFromTableProperties(table), + bucketSpec = getBucketSpecFromTableProperties(table), + properties = getOriginalTableProperties(table)) --- End diff -- we are talking about store data source table properties into storage properties, and the discuss is finished, we won't do this, see https://github.com/apache/spark/pull/14727 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/14155 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75617761 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +375,77 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this doesn't support altering table schema, partition column names and bucket + * specification. We will ignore them even if users do specify different values for these fields. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) + + client.alterTable(newDef) +} } override def getTable(db: String, table: String): CatalogTable = withClient { -client.getTable(db, table) +restoreTableMetadata(client.getTable(db, table)) } override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient { -client.getTableOption(db, table) +client.getTableOption(db, table).map(restoreTableMetadata) + } + + /** + * Restores table metadata from the table properties if it's a datasouce table. This method is + * kind of a opposite version of [[createTable]]. + * + * It reads table schema, provider, partition column names and bucket specification from table + * properties, and filter out these special entries from table properties. + */ + private def restoreTableMetadata(table: CatalogTable): CatalogTable = { +if (table.tableType == VIEW) { + table +} else { + getProviderFromTableProperties(table).map { provider => +// SPARK-15269: Persisted data source tables always store the location URI as a storage +// property named "path" instead of standard Hive `dataLocation`, because Hive only +// allows directory paths as location URIs while Spark SQL data source tables also +// allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL +// data source tables. +// Spark SQL may also save external data source in Hive compatible format when +// possible, so that these tables can be directly accessed by Hive. For these tables, +// `dataLocation` is still necessary. Here we also check for input format because only +// these Hive compatible tables set this field. +val storage = if (table.tableType == EXTERNAL && table.storage.inputFormat.isEmpty) { + table.storage.copy(locationUri = None) +} else { + table.storage +} +table.copy( + storage = storage, + schema = getSchemaFromTableProperties(table), + provider = Some(provider), + partitionColumnNames = getPartitionColumnsFromTableProperties(table), + bucketSpec = getBucketSpecFromTableProperties(table), + properties = getOriginalTableProperties(table)) --- End diff -- just want to double check. We are not talking about not using serde properties to store options, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75572741 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala --- @@ -175,7 +127,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } else { val qualifiedTable = MetastoreRelation( - qualifiedTableName.database, qualifiedTableName.name)(table, client, sparkSession) + qualifiedTableName.database, qualifiedTableName.name)( + table.copy(provider = Some("hive")), client, sparkSession) --- End diff -- Then we will restore table metadata from table properties twice. As this class will be removed soon, I don't want to change too much. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75572713 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +375,77 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this doesn't support altering table schema, partition column names and bucket + * specification. We will ignore them even if users do specify different values for these fields. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) + + client.alterTable(newDef) +} } override def getTable(db: String, table: String): CatalogTable = withClient { -client.getTable(db, table) +restoreTableMetadata(client.getTable(db, table)) } override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient { -client.getTableOption(db, table) +client.getTableOption(db, table).map(restoreTableMetadata) + } + + /** + * Restores table metadata from the table properties if it's a datasouce table. This method is + * kind of a opposite version of [[createTable]]. + * + * It reads table schema, provider, partition column names and bucket specification from table + * properties, and filter out these special entries from table properties. + */ + private def restoreTableMetadata(table: CatalogTable): CatalogTable = { +if (table.tableType == VIEW) { + table +} else { + getProviderFromTableProperties(table).map { provider => +// SPARK-15269: Persisted data source tables always store the location URI as a storage +// property named "path" instead of standard Hive `dataLocation`, because Hive only +// allows directory paths as location URIs while Spark SQL data source tables also +// allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL +// data source tables. +// Spark SQL may also save external data source in Hive compatible format when +// possible, so that these tables can be directly accessed by Hive. For these tables, +// `dataLocation` is still necessary. Here we also check for input format because only +// these Hive compatible tables set this field. +val storage = if (table.tableType == EXTERNAL && table.storage.inputFormat.isEmpty) { + table.storage.copy(locationUri = None) +} else { + table.storage +} +table.copy( + storage = storage, + schema = getSchemaFromTableProperties(table), + provider = Some(provider), + partitionColumnNames = getPartitionColumnsFromTableProperties(table), + bucketSpec = getBucketSpecFromTableProperties(table), + properties = getOriginalTableProperties(table)) --- End diff -- The previous code also store options to serde properties, I'm not going to fix everything in this PR, and I'm not sure if it's a real problem, but let's continue the discussion in follow-up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75565740 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +164,162 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +// Before saving data source table metadata into Hive metastore, we should: +// 1. Put table schema, partition column names and bucket specification in table properties. +// 2. Check if this table is hive compatible +//2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty +// and save table metadata to Hive. +//2.1 If it's hive compatible, set serde information in table metadata and try to save +// it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 +if (DDLUtils.isDatasourceTable(tableDefinition)) { + // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`. + val provider = tableDefinition.provider.get + val partitionColumns = tableDefinition.partitionColumnNames + val bucketSpec = tableDefinition.bucketSpec + + val tableProperties = new scala.collection.mutable.HashMap[String, String] + tableProperties.put(DATASOURCE_PROVIDER, provider) + + // Serialized JSON schema string may be too long to be stored into a single metastore table + // property. In this case, we split the JSON string and store each part as a separate table + // property. + // TODO: the threshold should be set by `spark.sql.sources.schemaStringLengthThreshold`, + // however the current SQLConf is session isolated, which is not applicable to external + // catalog. We should re-enable this conf instead of hard code the value here, after we have + // global SQLConf. + val threshold = 4000 + val schemaJsonString = tableDefinition.schema.json + // Split the JSON string. + val parts = schemaJsonString.grouped(threshold).toSeq + tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) + parts.zipWithIndex.foreach { case (part, index) => +tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part) + } + + if (partitionColumns.nonEmpty) { +tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString) +partitionColumns.zipWithIndex.foreach { case (partCol, index) => + tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol) +} + } + + if (bucketSpec.isDefined) { +val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get + +tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString) +tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString) +bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) => + tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol) +} + +if (sortColumnNames.nonEmpty) { + tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString) + sortColumnNames.zipWithIndex.foreach { case (sortCol, index) => + tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol) + } +} + } + + // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column + // names and bucket specification to empty. + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + // converts the table metadata to Hive compatible format, i.e. set the serde information. + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75556596 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala --- @@ -175,7 +127,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } else { val qualifiedTable = MetastoreRelation( - qualifiedTableName.database, qualifiedTableName.name)(table, client, sparkSession) + qualifiedTableName.database, qualifiedTableName.name)( + table.copy(provider = Some("hive")), client, sparkSession) --- End diff -- If we use the `ExternalCatalog` API to fetch table metadata, we do not need this change. That means, we just need to update the following line: https://github.com/cloud-fan/spark/blob/6ca8909d355b14abcc0099a53928bba437d98442/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L113 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75528185 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +375,77 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this doesn't support altering table schema, partition column names and bucket + * specification. We will ignore them even if users do specify different values for these fields. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) + + client.alterTable(newDef) +} } override def getTable(db: String, table: String): CatalogTable = withClient { -client.getTable(db, table) +restoreTableMetadata(client.getTable(db, table)) } override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient { -client.getTableOption(db, table) +client.getTableOption(db, table).map(restoreTableMetadata) + } + + /** + * Restores table metadata from the table properties if it's a datasouce table. This method is + * kind of a opposite version of [[createTable]]. + * + * It reads table schema, provider, partition column names and bucket specification from table + * properties, and filter out these special entries from table properties. + */ + private def restoreTableMetadata(table: CatalogTable): CatalogTable = { +if (table.tableType == VIEW) { + table +} else { + getProviderFromTableProperties(table).map { provider => +// SPARK-15269: Persisted data source tables always store the location URI as a storage +// property named "path" instead of standard Hive `dataLocation`, because Hive only +// allows directory paths as location URIs while Spark SQL data source tables also +// allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL +// data source tables. +// Spark SQL may also save external data source in Hive compatible format when +// possible, so that these tables can be directly accessed by Hive. For these tables, +// `dataLocation` is still necessary. Here we also check for input format because only +// these Hive compatible tables set this field. +val storage = if (table.tableType == EXTERNAL && table.storage.inputFormat.isEmpty) { + table.storage.copy(locationUri = None) +} else { + table.storage +} +table.copy( + storage = storage, + schema = getSchemaFromTableProperties(table), + provider = Some(provider), + partitionColumnNames = getPartitionColumnsFromTableProperties(table), + bucketSpec = getBucketSpecFromTableProperties(table), + properties = getOriginalTableProperties(table)) --- End diff -- Yeah, but when users describe the data source tables, they will see these options in serde properties. The issues become more complicated when we support conversion from Hive Serde tables to Data Source Tables. The actual table properties will be lost in some cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75523912 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +375,77 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this doesn't support altering table schema, partition column names and bucket + * specification. We will ignore them even if users do specify different values for these fields. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) + + client.alterTable(newDef) +} } override def getTable(db: String, table: String): CatalogTable = withClient { -client.getTable(db, table) +restoreTableMetadata(client.getTable(db, table)) } override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient { -client.getTableOption(db, table) +client.getTableOption(db, table).map(restoreTableMetadata) + } + + /** + * Restores table metadata from the table properties if it's a datasouce table. This method is + * kind of a opposite version of [[createTable]]. + * + * It reads table schema, provider, partition column names and bucket specification from table + * properties, and filter out these special entries from table properties. + */ + private def restoreTableMetadata(table: CatalogTable): CatalogTable = { +if (table.tableType == VIEW) { + table +} else { + getProviderFromTableProperties(table).map { provider => +// SPARK-15269: Persisted data source tables always store the location URI as a storage +// property named "path" instead of standard Hive `dataLocation`, because Hive only +// allows directory paths as location URIs while Spark SQL data source tables also +// allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL +// data source tables. +// Spark SQL may also save external data source in Hive compatible format when +// possible, so that these tables can be directly accessed by Hive. For these tables, +// `dataLocation` is still necessary. Here we also check for input format because only +// these Hive compatible tables set this field. +val storage = if (table.tableType == EXTERNAL && table.storage.inputFormat.isEmpty) { + table.storage.copy(locationUri = None) +} else { + table.storage +} +table.copy( + storage = storage, + schema = getSchemaFromTableProperties(table), + provider = Some(provider), + partitionColumnNames = getPartitionColumnsFromTableProperties(table), + bucketSpec = getBucketSpecFromTableProperties(table), + properties = getOriginalTableProperties(table)) --- End diff -- To users, there is no table property or serde property. They only see options. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail:
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75523842 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +375,77 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this doesn't support altering table schema, partition column names and bucket + * specification. We will ignore them even if users do specify different values for these fields. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) + + client.alterTable(newDef) +} } override def getTable(db: String, table: String): CatalogTable = withClient { -client.getTable(db, table) +restoreTableMetadata(client.getTable(db, table)) } override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient { -client.getTableOption(db, table) +client.getTableOption(db, table).map(restoreTableMetadata) + } + + /** + * Restores table metadata from the table properties if it's a datasouce table. This method is + * kind of a opposite version of [[createTable]]. + * + * It reads table schema, provider, partition column names and bucket specification from table + * properties, and filter out these special entries from table properties. + */ + private def restoreTableMetadata(table: CatalogTable): CatalogTable = { +if (table.tableType == VIEW) { + table +} else { + getProviderFromTableProperties(table).map { provider => +// SPARK-15269: Persisted data source tables always store the location URI as a storage +// property named "path" instead of standard Hive `dataLocation`, because Hive only +// allows directory paths as location URIs while Spark SQL data source tables also +// allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL +// data source tables. +// Spark SQL may also save external data source in Hive compatible format when +// possible, so that these tables can be directly accessed by Hive. For these tables, +// `dataLocation` is still necessary. Here we also check for input format because only +// these Hive compatible tables set this field. +val storage = if (table.tableType == EXTERNAL && table.storage.inputFormat.isEmpty) { + table.storage.copy(locationUri = None) +} else { + table.storage +} +table.copy( + storage = storage, + schema = getSchemaFromTableProperties(table), + provider = Some(provider), + partitionColumnNames = getPartitionColumnsFromTableProperties(table), + bucketSpec = getBucketSpecFromTableProperties(table), + properties = getOriginalTableProperties(table)) --- End diff -- We just use table properties to store schema and metadata that is not defined by users. All user options will be stored in serde properties. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75523227 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +375,77 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this doesn't support altering table schema, partition column names and bucket + * specification. We will ignore them even if users do specify different values for these fields. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) + + client.alterTable(newDef) +} } override def getTable(db: String, table: String): CatalogTable = withClient { -client.getTable(db, table) +restoreTableMetadata(client.getTable(db, table)) } override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient { -client.getTableOption(db, table) +client.getTableOption(db, table).map(restoreTableMetadata) + } + + /** + * Restores table metadata from the table properties if it's a datasouce table. This method is + * kind of a opposite version of [[createTable]]. + * + * It reads table schema, provider, partition column names and bucket specification from table + * properties, and filter out these special entries from table properties. + */ + private def restoreTableMetadata(table: CatalogTable): CatalogTable = { +if (table.tableType == VIEW) { + table +} else { + getProviderFromTableProperties(table).map { provider => +// SPARK-15269: Persisted data source tables always store the location URI as a storage +// property named "path" instead of standard Hive `dataLocation`, because Hive only +// allows directory paths as location URIs while Spark SQL data source tables also +// allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL +// data source tables. +// Spark SQL may also save external data source in Hive compatible format when +// possible, so that these tables can be directly accessed by Hive. For these tables, +// `dataLocation` is still necessary. Here we also check for input format because only +// these Hive compatible tables set this field. +val storage = if (table.tableType == EXTERNAL && table.storage.inputFormat.isEmpty) { + table.storage.copy(locationUri = None) +} else { + table.storage +} +table.copy( + storage = storage, + schema = getSchemaFromTableProperties(table), + provider = Some(provider), + partitionColumnNames = getPartitionColumnsFromTableProperties(table), + bucketSpec = getBucketSpecFromTableProperties(table), + properties = getOriginalTableProperties(table)) --- End diff -- We do not have the actual table properties for data source tables. The `options` functions like table properties here? BTW, found a bug when we convert Hive Serde CTAS to Data Source CTAS. Will submit a PR soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75451232 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { +case _ if skipHiveMetadata => + val message = +s"Persisting data source table $qualifiedTableName into Hive metastore in" + + "Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +// our bucketing is un-compatible with hive(different hash function) +case _ if tableDefinition.bucketSpec.nonEmpty => + val message = +s"Persisting bucketed data source table $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + (None, message) + +case (Some(serde), Some(path)) => + val message = +s"Persisting data source table $qualifiedTableName with a single input path " + + s"into Hive metastore in Hive compatible format." + (Some(newHiveCompatibleMetastoreTable(serde, path)), message) + +case (Some(_), None) => + val message = +s"Data source table $qualifiedTableName is not file based. Persisting it into " + + s"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +case _ => + val provider = tableDefinition.provider.get + val message = +s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + + s"Persisting data source table $qualifiedTableName into Hive metastore in " + + s"Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + } + + (hiveCompatibleTable, logMessage) match { +case (Some(table), message) => + // We first try to save the metadata of the table in a Hive compatible way. + // If Hive throws an error, we fall back to save its metadata in the Spark SQL + // specific way. + try { +logInfo(message) +saveTableIntoHive(table, ignoreIfExists) + } catch { +case NonFatal(e) => + val warningMessage = +s"Could not persist ${tableDefinition.identifier.quotedString} in a Hive " + + "compatible way. Persisting it into Hive metastore in Spark SQL specific format." + logWarning(warningMessage, e) + saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + } + +case (None, message) => + logWarning(message) + saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + } +} + } + + private def
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75451088 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") --- End diff -- even it's not set, we will save table in spark sql specified format, I think we are still safe. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75450667 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -584,13 +579,8 @@ case class AlterTableSetLocationCommand( object DDLUtils { - - def isDatasourceTable(props: Map[String, String]): Boolean = { -props.contains(DATASOURCE_PROVIDER) - } - def isDatasourceTable(table: CatalogTable): Boolean = { -isDatasourceTable(table.properties) +table.provider.isDefined && table.provider.get != "hive" --- End diff -- fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75446103 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +375,77 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this doesn't support altering table schema, partition column names and bucket + * specification. We will ignore them even if users do specify different values for these fields. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) + + client.alterTable(newDef) +} } override def getTable(db: String, table: String): CatalogTable = withClient { -client.getTable(db, table) +restoreTableMetadata(client.getTable(db, table)) } override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient { -client.getTableOption(db, table) +client.getTableOption(db, table).map(restoreTableMetadata) + } + + /** + * Restores table metadata from the table properties if it's a datasouce table. This method is + * kind of a opposite version of [[createTable]]. + * + * It reads table schema, provider, partition column names and bucket specification from table + * properties, and filter out these special entries from table properties. + */ + private def restoreTableMetadata(table: CatalogTable): CatalogTable = { +if (table.tableType == VIEW) { + table +} else { + getProviderFromTableProperties(table).map { provider => +// SPARK-15269: Persisted data source tables always store the location URI as a storage +// property named "path" instead of standard Hive `dataLocation`, because Hive only +// allows directory paths as location URIs while Spark SQL data source tables also +// allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL +// data source tables. +// Spark SQL may also save external data source in Hive compatible format when +// possible, so that these tables can be directly accessed by Hive. For these tables, +// `dataLocation` is still necessary. Here we also check for input format because only +// these Hive compatible tables set this field. +val storage = if (table.tableType == EXTERNAL && table.storage.inputFormat.isEmpty) { + table.storage.copy(locationUri = None) +} else { + table.storage +} +table.copy( + storage = storage, + schema = getSchemaFromTableProperties(table), + provider = Some(provider), + partitionColumnNames = getPartitionColumnsFromTableProperties(table), + bucketSpec = getBucketSpecFromTableProperties(table), + properties = getOriginalTableProperties(table)) --- End diff -- you can take a look at the SQL syntax for data source table, we can't set table properties for data source tables currently. But external catalog doesn't need to make this assumption. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA.
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75445428 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -584,13 +579,8 @@ case class AlterTableSetLocationCommand( object DDLUtils { - - def isDatasourceTable(props: Map[String, String]): Boolean = { -props.contains(DATASOURCE_PROVIDER) - } - def isDatasourceTable(table: CatalogTable): Boolean = { -isDatasourceTable(table.properties) +table.provider.isDefined && table.provider.get != "hive" --- End diff -- oh good idea --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75445214 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -264,10 +261,8 @@ case class AlterTableUnsetPropertiesCommand( extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { -val ident = if (isView) "VIEW" else "TABLE" val catalog = sparkSession.sessionState.catalog DDLUtils.verifyAlterTableType(catalog, tableName, isView) -DDLUtils.verifyTableProperties(propKeys, s"ALTER $ident") --- End diff -- then we will hit ``` if (!table.properties.contains(k)) { throw new AnalysisException( s"Attempted to unset non-existent property '$k' in table '$tableName'") } ``` I think it's ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75444916 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -233,226 +229,21 @@ case class CreateDataSourceTableAsSelectCommand( // We will use the schema of resolved.relation as the schema of the table (instead of // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). - CreateDataSourceTableUtils.createDataSourceTable( -sparkSession = sparkSession, -tableIdent = tableIdent, -schema = result.schema, -partitionColumns = partitionColumns, -bucketSpec = bucketSpec, -provider = provider, -options = optionsWithPath, -isExternal = isExternal) + val schema = result.schema + val table = CatalogTable( +identifier = tableIdent, +tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, +storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), +schema = schema, +provider = Some(provider), +partitionColumnNames = partitionColumns, +bucketSpec = bucketSpec + ) + sessionState.catalog.createTable(table, ignoreIfExists = false) } // Refresh the cache of the table in the catalog. sessionState.catalog.refreshTable(tableIdent) Seq.empty[Row] } } - - -object CreateDataSourceTableUtils extends Logging { - - val DATASOURCE_PREFIX = "spark.sql.sources." - val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider" - val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID" - val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path" - val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" - val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." - val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" - val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" - val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" - val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets" - val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols" - val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part." - val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol." - val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." - val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." - - def createDataSourceTable( - sparkSession: SparkSession, - tableIdent: TableIdentifier, - schema: StructType, - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], - provider: String, - options: Map[String, String], - isExternal: Boolean): Unit = { -val tableProperties = new mutable.HashMap[String, String] -tableProperties.put(DATASOURCE_PROVIDER, provider) - -// Serialized JSON schema string may be too long to be stored into a single metastore table -// property. In this case, we split the JSON string and store each part as a separate table -// property. -val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold -val schemaJsonString = schema.json -// Split the JSON string. -val parts = schemaJsonString.grouped(threshold).toSeq -tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) -parts.zipWithIndex.foreach { case (part, index) => - tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part) -} - -if (partitionColumns.length > 0) { - tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString) - partitionColumns.zipWithIndex.foreach { case (partCol, index) => -tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol) - } -} - -if (bucketSpec.isDefined) { - val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get - - tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString) - tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString) - bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) => -tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol) - } - - if (sortColumnNames.nonEmpty) { -
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75444695 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -97,16 +92,17 @@ case class CreateDataSourceTableCommand( } } -CreateDataSourceTableUtils.createDataSourceTable( - sparkSession = sparkSession, - tableIdent = tableIdent, +val table = CatalogTable( + identifier = tableIdent, + tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), schema = dataSource.schema, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - provider = provider, - options = optionsWithPath, - isExternal = isExternal) + provider = Some(provider), + partitionColumnNames = partitionColumns, + bucketSpec = bucketSpec +) +sessionState.catalog.createTable(table, ignoreIfExists) --- End diff -- When we hit this branch, the table does not exist, so `ignoreIfExists` doesn't matter here. I'll change it to `false` and add some comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75434165 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +375,77 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this doesn't support altering table schema, partition column names and bucket + * specification. We will ignore them even if users do specify different values for these fields. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) + + client.alterTable(newDef) +} } override def getTable(db: String, table: String): CatalogTable = withClient { -client.getTable(db, table) +restoreTableMetadata(client.getTable(db, table)) } override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient { -client.getTableOption(db, table) +client.getTableOption(db, table).map(restoreTableMetadata) + } + + /** + * Restores table metadata from the table properties if it's a datasouce table. This method is + * kind of a opposite version of [[createTable]]. + * + * It reads table schema, provider, partition column names and bucket specification from table + * properties, and filter out these special entries from table properties. + */ + private def restoreTableMetadata(table: CatalogTable): CatalogTable = { +if (table.tableType == VIEW) { + table +} else { + getProviderFromTableProperties(table).map { provider => +// SPARK-15269: Persisted data source tables always store the location URI as a storage +// property named "path" instead of standard Hive `dataLocation`, because Hive only +// allows directory paths as location URIs while Spark SQL data source tables also +// allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL +// data source tables. +// Spark SQL may also save external data source in Hive compatible format when +// possible, so that these tables can be directly accessed by Hive. For these tables, +// `dataLocation` is still necessary. Here we also check for input format because only +// these Hive compatible tables set this field. +val storage = if (table.tableType == EXTERNAL && table.storage.inputFormat.isEmpty) { + table.storage.copy(locationUri = None) +} else { + table.storage +} +table.copy( + storage = storage, + schema = getSchemaFromTableProperties(table), + provider = Some(provider), + partitionColumnNames = getPartitionColumnsFromTableProperties(table), + bucketSpec = getBucketSpecFromTableProperties(table), + properties = getOriginalTableProperties(table)) --- End diff -- If you see the data flow, you might realize the table properties are always empty for data source tables after CREATE TABLE or CTAS commands. For data source tables, the actual table properties are stored in the storage properties, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75432710 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -264,10 +261,8 @@ case class AlterTableUnsetPropertiesCommand( extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { -val ident = if (isView) "VIEW" else "TABLE" val catalog = sparkSession.sessionState.catalog DDLUtils.verifyAlterTableType(catalog, tableName, isView) -DDLUtils.verifyTableProperties(propKeys, s"ALTER $ident") --- End diff -- this is removing a property. If we remove it, it is too late to check in the external catalog. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75432607 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -584,13 +579,8 @@ case class AlterTableSetLocationCommand( object DDLUtils { - - def isDatasourceTable(props: Map[String, String]): Boolean = { -props.contains(DATASOURCE_PROVIDER) - } - def isDatasourceTable(table: CatalogTable): Boolean = { -isDatasourceTable(table.properties) +table.provider.isDefined && table.provider.get != "hive" --- End diff -- We are checking the same thing but using different ways. See the code: https://github.com/cloud-fan/spark/blob/96d57b665ac65750eb5c6f9757e5827ea9c14ca4/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L175 Could we make them consistent? or leave a comment to explain it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75432300 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +163,172 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) +// We can't create index table currently. +assert(tableDefinition.tableType != INDEX) +// All tables except view must have a provider. +assert(tableDefinition.tableType == VIEW || tableDefinition.provider.isDefined) + +// For view or Hive serde tables, they are guaranteed to be Hive compatible and we save them +// to Hive metastore directly. Otherwise, we need to put table metadata to table properties to +// work around some hive metastore problems, e.g. not case-preserving, bad decimal type support. +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + // Before saving data source table metadata into Hive metastore, we should: + // 1. Put table schema, partition column names and bucket specification in table properties. + // 2. Check if this table is hive compatible + //2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty + // and save table metadata to Hive. + //2.1 If it's hive compatible, set serde information in table metadata and try to save + // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 + + val tableProperties = tableMetadataToProperties(tableDefinition) + + // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column + // names and bucket specification to empty. + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + // converts the table metadata to Hive compatible format, i.e. set the serde information. + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { +case _ if skipHiveMetadata => + val message = +s"Persisting data source table $qualifiedTableName into Hive metastore in" + + "Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +// our bucketing is un-compatible with hive(different hash function) +case _ if tableDefinition.bucketSpec.nonEmpty => + val message = +s"Persisting bucketed data source table $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + (None, message) + +case (Some(serde), Some(path)) => + val message = +s"Persisting data source table $qualifiedTableName with a single input path " + + s"into Hive metastore in Hive compatible format." + (Some(newHiveCompatibleMetastoreTable(serde, path)), message) + +case (Some(_), None) => + val message = +s"Data source table $qualifiedTableName is not file based. Persisting it into " + + s"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +case _ => + val provider = tableDefinition.provider.get + val message = +
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75432051 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala --- @@ -985,35 +987,37 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTempDir { tempPath => val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) - createDataSourceTable( -sparkSession = spark, -tableIdent = TableIdentifier("not_skip_hive_metadata"), + val tableDesc1 = CatalogTable( +identifier = TableIdentifier("not_skip_hive_metadata"), +tableType = CatalogTableType.EXTERNAL, +storage = CatalogStorageFormat.empty.copy( + properties = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "false") +), schema = schema, -partitionColumns = Array.empty[String], -bucketSpec = None, -provider = "parquet", -options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "false"), -isExternal = false) +provider = Some("parquet") + ) + spark.sessionState.catalog.createTable(tableDesc1, ignoreIfExists = false) // As a proxy for verifying that the table was stored in Hive compatible format, // we verify that each column of the table is of native type StringType. - assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema + assert(hiveClient.getTable("default", "not_skip_hive_metadata").schema .forall(_.dataType == StringType)) - createDataSourceTable( -sparkSession = spark, -tableIdent = TableIdentifier("skip_hive_metadata"), + val tableDesc2 = CatalogTable( +identifier = TableIdentifier("skip_hive_metadata", Some("default")), +tableType = CatalogTableType.EXTERNAL, +storage = CatalogStorageFormat.empty.copy( + properties = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "true") +), schema = schema, -partitionColumns = Array.empty[String], -bucketSpec = None, -provider = "parquet", -options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "true"), -isExternal = false) +provider = Some("parquet") + ) + spark.sessionState.catalog.createTable(tableDesc2, ignoreIfExists = false) // As a proxy for verifying that the table was stored in SparkSQL format, // we verify that the table has a column type as array of StringType. - assert(sharedState.externalCatalog.getTable("default", "skip_hive_metadata") -.schema.forall(_.dataType == ArrayType(StringType))) + assert(hiveClient.getTable("default", "skip_hive_metadata").schema +.forall(_.dataType == ArrayType(StringType))) --- End diff -- we need a comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75432071 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala --- @@ -49,6 +49,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile } + private def hiveClient: HiveClient = sharedState.asInstanceOf[HiveSharedState].metadataHive --- End diff -- we need to have a comment at here to explain why we need to access directly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75432039 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala --- @@ -985,35 +987,37 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTempDir { tempPath => val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) - createDataSourceTable( -sparkSession = spark, -tableIdent = TableIdentifier("not_skip_hive_metadata"), + val tableDesc1 = CatalogTable( +identifier = TableIdentifier("not_skip_hive_metadata"), +tableType = CatalogTableType.EXTERNAL, +storage = CatalogStorageFormat.empty.copy( + properties = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "false") +), schema = schema, -partitionColumns = Array.empty[String], -bucketSpec = None, -provider = "parquet", -options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "false"), -isExternal = false) +provider = Some("parquet") + ) + spark.sessionState.catalog.createTable(tableDesc1, ignoreIfExists = false) // As a proxy for verifying that the table was stored in Hive compatible format, // we verify that each column of the table is of native type StringType. - assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema + assert(hiveClient.getTable("default", "not_skip_hive_metadata").schema .forall(_.dataType == StringType)) - createDataSourceTable( -sparkSession = spark, -tableIdent = TableIdentifier("skip_hive_metadata"), + val tableDesc2 = CatalogTable( +identifier = TableIdentifier("skip_hive_metadata", Some("default")), +tableType = CatalogTableType.EXTERNAL, +storage = CatalogStorageFormat.empty.copy( + properties = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "true") +), schema = schema, -partitionColumns = Array.empty[String], -bucketSpec = None, -provider = "parquet", -options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "true"), -isExternal = false) +provider = Some("parquet") + ) + spark.sessionState.catalog.createTable(tableDesc2, ignoreIfExists = false) // As a proxy for verifying that the table was stored in SparkSQL format, // we verify that the table has a column type as array of StringType. - assert(sharedState.externalCatalog.getTable("default", "skip_hive_metadata") -.schema.forall(_.dataType == ArrayType(StringType))) + assert(hiveClient.getTable("default", "skip_hive_metadata").schema +.forall(_.dataType == ArrayType(StringType))) --- End diff -- oh, i see. You want to bypass external catalog to just get the metadata. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75431775 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala --- @@ -49,6 +49,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile } + private def hiveClient: HiveClient = sharedState.asInstanceOf[HiveSharedState].metadataHive --- End diff -- is it possible to not use hiveClient directly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75431615 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +163,172 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) +// We can't create index table currently. +assert(tableDefinition.tableType != INDEX) +// All tables except view must have a provider. +assert(tableDefinition.tableType == VIEW || tableDefinition.provider.isDefined) + +// For view or Hive serde tables, they are guaranteed to be Hive compatible and we save them +// to Hive metastore directly. Otherwise, we need to put table metadata to table properties to +// work around some hive metastore problems, e.g. not case-preserving, bad decimal type support. +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + // Before saving data source table metadata into Hive metastore, we should: + // 1. Put table schema, partition column names and bucket specification in table properties. + // 2. Check if this table is hive compatible + //2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty + // and save table metadata to Hive. + //2.1 If it's hive compatible, set serde information in table metadata and try to save + // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 + + val tableProperties = tableMetadataToProperties(tableDefinition) + + // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column + // names and bucket specification to empty. + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + // converts the table metadata to Hive compatible format, i.e. set the serde information. + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { +case _ if skipHiveMetadata => + val message = +s"Persisting data source table $qualifiedTableName into Hive metastore in" + + "Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +// our bucketing is un-compatible with hive(different hash function) +case _ if tableDefinition.bucketSpec.nonEmpty => + val message = +s"Persisting bucketed data source table $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + (None, message) + +case (Some(serde), Some(path)) => + val message = +s"Persisting data source table $qualifiedTableName with a single input path " + + s"into Hive metastore in Hive compatible format." + (Some(newHiveCompatibleMetastoreTable(serde, path)), message) + +case (Some(_), None) => + val message = +s"Data source table $qualifiedTableName is not file based. Persisting it into " + + s"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +case _ => + val provider = tableDefinition.provider.get + val message = +
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75431528 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +348,73 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this only supports altering table properties and serde properties. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) --- End diff -- if tableMetadataToProperties(tableDefinition) has new schema, `tableMetadataToProperties(tableDefinition)` is actually changing the schema? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75431426 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -233,226 +229,21 @@ case class CreateDataSourceTableAsSelectCommand( // We will use the schema of resolved.relation as the schema of the table (instead of // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). - CreateDataSourceTableUtils.createDataSourceTable( -sparkSession = sparkSession, -tableIdent = tableIdent, -schema = result.schema, -partitionColumns = partitionColumns, -bucketSpec = bucketSpec, -provider = provider, -options = optionsWithPath, -isExternal = isExternal) + val schema = result.schema + val table = CatalogTable( +identifier = tableIdent, +tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, +storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), +schema = schema, +provider = Some(provider), +partitionColumnNames = partitionColumns, +bucketSpec = bucketSpec + ) + sessionState.catalog.createTable(table, ignoreIfExists = false) } // Refresh the cache of the table in the catalog. sessionState.catalog.refreshTable(tableIdent) Seq.empty[Row] } } - - -object CreateDataSourceTableUtils extends Logging { - - val DATASOURCE_PREFIX = "spark.sql.sources." - val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider" - val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID" - val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path" - val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" - val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." - val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" - val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" - val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" - val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets" - val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols" - val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part." - val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol." - val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." - val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." - - def createDataSourceTable( - sparkSession: SparkSession, - tableIdent: TableIdentifier, - schema: StructType, - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], - provider: String, - options: Map[String, String], - isExternal: Boolean): Unit = { -val tableProperties = new mutable.HashMap[String, String] -tableProperties.put(DATASOURCE_PROVIDER, provider) - -// Serialized JSON schema string may be too long to be stored into a single metastore table -// property. In this case, we split the JSON string and store each part as a separate table -// property. -val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold -val schemaJsonString = schema.json -// Split the JSON string. -val parts = schemaJsonString.grouped(threshold).toSeq -tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) -parts.zipWithIndex.foreach { case (part, index) => - tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part) -} - -if (partitionColumns.length > 0) { - tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString) - partitionColumns.zipWithIndex.foreach { case (partCol, index) => -tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol) - } -} - -if (bucketSpec.isDefined) { - val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get - - tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString) - tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString) - bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) => -tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol) - } - - if (sortColumnNames.nonEmpty) { -
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75431369 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +163,172 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) +// We can't create index table currently. +assert(tableDefinition.tableType != INDEX) +// All tables except view must have a provider. +assert(tableDefinition.tableType == VIEW || tableDefinition.provider.isDefined) + +// For view or Hive serde tables, they are guaranteed to be Hive compatible and we save them +// to Hive metastore directly. Otherwise, we need to put table metadata to table properties to +// work around some hive metastore problems, e.g. not case-preserving, bad decimal type support. +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + // Before saving data source table metadata into Hive metastore, we should: + // 1. Put table schema, partition column names and bucket specification in table properties. + // 2. Check if this table is hive compatible + //2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty + // and save table metadata to Hive. + //2.1 If it's hive compatible, set serde information in table metadata and try to save + // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 + + val tableProperties = tableMetadataToProperties(tableDefinition) + + // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column + // names and bucket specification to empty. + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + // converts the table metadata to Hive compatible format, i.e. set the serde information. + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { +case _ if skipHiveMetadata => + val message = +s"Persisting data source table $qualifiedTableName into Hive metastore in" + + "Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +// our bucketing is un-compatible with hive(different hash function) +case _ if tableDefinition.bucketSpec.nonEmpty => + val message = +s"Persisting bucketed data source table $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + (None, message) + +case (Some(serde), Some(path)) => + val message = +s"Persisting data source table $qualifiedTableName with a single input path " + + s"into Hive metastore in Hive compatible format." + (Some(newHiveCompatibleMetastoreTable(serde, path)), message) + +case (Some(_), None) => + val message = +s"Data source table $qualifiedTableName is not file based. Persisting it into " + + s"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +case _ => + val provider = tableDefinition.provider.get + val message = +
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75431206 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") --- End diff -- oh there is a chance that it is not set. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75424795 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -97,16 +92,17 @@ case class CreateDataSourceTableCommand( } } -CreateDataSourceTableUtils.createDataSourceTable( - sparkSession = sparkSession, - tableIdent = tableIdent, +val table = CatalogTable( + identifier = tableIdent, + tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), schema = dataSource.schema, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - provider = provider, - options = optionsWithPath, - isExternal = isExternal) + provider = Some(provider), + partitionColumnNames = partitionColumns, + bucketSpec = bucketSpec +) +sessionState.catalog.createTable(table, ignoreIfExists) --- End diff -- We made a change here. Before, `ignoreIfExists` is always set to `false` when we call `createTable`. Now, if we want to let the underlying `createTable` handles it, we should remove the code: https://github.com/cloud-fan/spark/blob/96d57b665ac65750eb5c6f9757e5827ea9c14ca4/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala#L58-L64 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75339230 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +163,172 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) +// We can't create index table currently. +assert(tableDefinition.tableType != INDEX) +// All tables except view must have a provider. +assert(tableDefinition.tableType == VIEW || tableDefinition.provider.isDefined) + +// For view or Hive serde tables, they are guaranteed to be Hive compatible and we save them +// to Hive metastore directly. Otherwise, we need to put table metadata to table properties to +// work around some hive metastore problems, e.g. not case-preserving, bad decimal type support. +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + // Before saving data source table metadata into Hive metastore, we should: + // 1. Put table schema, partition column names and bucket specification in table properties. + // 2. Check if this table is hive compatible + //2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty + // and save table metadata to Hive. + //2.1 If it's hive compatible, set serde information in table metadata and try to save + // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 + + val tableProperties = tableMetadataToProperties(tableDefinition) + + // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column + // names and bucket specification to empty. + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + // converts the table metadata to Hive compatible format, i.e. set the serde information. + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { +case _ if skipHiveMetadata => + val message = +s"Persisting data source table $qualifiedTableName into Hive metastore in" + + "Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +// our bucketing is un-compatible with hive(different hash function) +case _ if tableDefinition.bucketSpec.nonEmpty => + val message = +s"Persisting bucketed data source table $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + (None, message) + +case (Some(serde), Some(path)) => + val message = +s"Persisting data source table $qualifiedTableName with a single input path " + + s"into Hive metastore in Hive compatible format." + (Some(newHiveCompatibleMetastoreTable(serde, path)), message) + +case (Some(_), None) => --- End diff -- Agree. Let me continue the review today. : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75267415 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +163,172 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) +// We can't create index table currently. +assert(tableDefinition.tableType != INDEX) +// All tables except view must have a provider. +assert(tableDefinition.tableType == VIEW || tableDefinition.provider.isDefined) + +// For view or Hive serde tables, they are guaranteed to be Hive compatible and we save them +// to Hive metastore directly. Otherwise, we need to put table metadata to table properties to +// work around some hive metastore problems, e.g. not case-preserving, bad decimal type support. +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + // Before saving data source table metadata into Hive metastore, we should: + // 1. Put table schema, partition column names and bucket specification in table properties. + // 2. Check if this table is hive compatible + //2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty + // and save table metadata to Hive. + //2.1 If it's hive compatible, set serde information in table metadata and try to save + // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 + + val tableProperties = tableMetadataToProperties(tableDefinition) + + // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column + // names and bucket specification to empty. + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + // converts the table metadata to Hive compatible format, i.e. set the serde information. + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { +case _ if skipHiveMetadata => + val message = +s"Persisting data source table $qualifiedTableName into Hive metastore in" + + "Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +// our bucketing is un-compatible with hive(different hash function) +case _ if tableDefinition.bucketSpec.nonEmpty => + val message = +s"Persisting bucketed data source table $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + (None, message) + +case (Some(serde), Some(path)) => + val message = +s"Persisting data source table $qualifiedTableName with a single input path " + + s"into Hive metastore in Hive compatible format." + (Some(newHiveCompatibleMetastoreTable(serde, path)), message) + +case (Some(_), None) => --- End diff -- this branch is not reachable, it means the relation is file based but the path is not set, which should never happen. It's copied from previous code, we can leave it for safety. We can clean it up after we consolidate the path/locationUri for data source table and hive serde table. --- If your project is set up for it, you can reply to this email
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r7528 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { --- End diff -- I'd leave it there for safety, e.g. the path may not be set(should not happen) even for `HadoopFsRelation`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75258409 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +163,172 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) +// We can't create index table currently. +assert(tableDefinition.tableType != INDEX) +// All tables except view must have a provider. +assert(tableDefinition.tableType == VIEW || tableDefinition.provider.isDefined) + +// For view or Hive serde tables, they are guaranteed to be Hive compatible and we save them +// to Hive metastore directly. Otherwise, we need to put table metadata to table properties to +// work around some hive metastore problems, e.g. not case-preserving, bad decimal type support. +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + // Before saving data source table metadata into Hive metastore, we should: + // 1. Put table schema, partition column names and bucket specification in table properties. + // 2. Check if this table is hive compatible + //2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty + // and save table metadata to Hive. + //2.1 If it's hive compatible, set serde information in table metadata and try to save + // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 + + val tableProperties = tableMetadataToProperties(tableDefinition) + + // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column + // names and bucket specification to empty. + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + // converts the table metadata to Hive compatible format, i.e. set the serde information. + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { +case _ if skipHiveMetadata => + val message = +s"Persisting data source table $qualifiedTableName into Hive metastore in" + + "Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +// our bucketing is un-compatible with hive(different hash function) +case _ if tableDefinition.bucketSpec.nonEmpty => + val message = +s"Persisting bucketed data source table $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + (None, message) + +case (Some(serde), Some(path)) => + val message = +s"Persisting data source table $qualifiedTableName with a single input path " + + s"into Hive metastore in Hive compatible format." + (Some(newHiveCompatibleMetastoreTable(serde, path)), message) + +case (Some(_), None) => --- End diff -- This case is to create an empty data source table? Should we think this table is also Hive compatible? Then, we can insert data after creation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75257704 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { --- End diff -- uh, if `maybeSerde` is true, it must be `HadoopFsRelation`. Then, do we need to update the following case? https://github.com/cloud-fan/spark/blob/96d57b665ac65750eb5c6f9757e5827ea9c14ca4/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L236-L240 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75254588 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { --- End diff -- > Then, we used the generated BaseRelation to find whether this is a hiveCompatibleTable. If this is not a HadoopFsRelation, hiveCompatibleTable will be None. No, previously we use the `maybeSerde` and `BaseRelation` to decide, so the `HadoopFsRelation` check is already done in `maybeSerde`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75253703 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { --- End diff -- Previously, we create a DataSource and resolve it by calling `dataSource.resolveRelation`. (FYI, the `resolveRelation` consumes user-specified `options`.) Then, we used the generated `BaseRelation` to find whether this is a `hiveCompatibleTable`. If this is not a `HadoopFsRelation`, `hiveCompatibleTable` will be None. Now, the decision is based on whether the user-specified options has a `path` property or not. This is not always true. Right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75251612 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -97,16 +92,17 @@ case class CreateDataSourceTableCommand( } } -CreateDataSourceTableUtils.createDataSourceTable( - sparkSession = sparkSession, - tableIdent = tableIdent, +val table = CatalogTable( + identifier = tableIdent, + tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), schema = dataSource.schema, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - provider = provider, - options = optionsWithPath, --- End diff -- nvm, it sounds like the `write` API is just called by CTAS and the save API of DataFrameWriter. It is OK. Let me read it again and check if we might have an issue for `options` in the CREATE Data Source Table command. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75251290 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -97,16 +92,17 @@ case class CreateDataSourceTableCommand( } } -CreateDataSourceTableUtils.createDataSourceTable( - sparkSession = sparkSession, - tableIdent = tableIdent, +val table = CatalogTable( + identifier = tableIdent, + tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), schema = dataSource.schema, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - provider = provider, - options = optionsWithPath, --- End diff -- @cloud-fan How about the write path? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75251168 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -97,16 +92,17 @@ case class CreateDataSourceTableCommand( } } -CreateDataSourceTableUtils.createDataSourceTable( - sparkSession = sparkSession, - tableIdent = tableIdent, +val table = CatalogTable( + identifier = tableIdent, + tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), schema = dataSource.schema, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - provider = provider, - options = optionsWithPath, --- End diff -- That line is just putting the `options` in the storage properties. It works for `path`, but the external data source connectors might [pass some parameters into `createRelation`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L443). I think `option` is a critical parameter-passing channel for the external data source connectors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75250996 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -97,16 +92,17 @@ case class CreateDataSourceTableCommand( } } -CreateDataSourceTableUtils.createDataSourceTable( - sparkSession = sparkSession, - tableIdent = tableIdent, +val table = CatalogTable( + identifier = tableIdent, + tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), schema = dataSource.schema, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - provider = provider, - options = optionsWithPath, --- End diff -- I put the options in `CatalogStorageFormat.properties`, and when the table is read back, we will get the storage.properties as the data source options for create relation, see https://github.com/apache/spark/pull/14155/files#diff-d99813bd5bbc18277e4090475e4944cfR214 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75250883 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -97,16 +92,17 @@ case class CreateDataSourceTableCommand( } } -CreateDataSourceTableUtils.createDataSourceTable( - sparkSession = sparkSession, - tableIdent = tableIdent, +val table = CatalogTable( + identifier = tableIdent, + tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), schema = dataSource.schema, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - provider = provider, - options = optionsWithPath, --- End diff -- Is it different from what we do at line https://github.com/apache/spark/pull/14155/files/96d57b665ac65750eb5c6f9757e5827ea9c14ca4#diff-945e51801b84b92da242fcb42f83f5f5R98? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75250702 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala --- @@ -97,16 +92,17 @@ case class CreateDataSourceTableCommand( } } -CreateDataSourceTableUtils.createDataSourceTable( - sparkSession = sparkSession, - tableIdent = tableIdent, +val table = CatalogTable( + identifier = tableIdent, + tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), schema = dataSource.schema, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - provider = provider, - options = optionsWithPath, --- End diff -- It sounds like we are not following the previous behaviors. `options` might be consumed by the external Data Source implementors. `options` is not only used for specifying `path`, but also used for a channel to pass extra parameters to the data source. I checked the existing implementation of `createDataSourceTable`. We [pass the original `options` into the constructor of `DataSource`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala#L336). Then, [the `write` API will pass the `option` to `createRelation`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L443). How about adding it as an independent field in `CatalogTable`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75233753 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +348,73 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this only supports altering table properties and serde properties. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) + + client.alterTable(newDef) +} } override def getTable(db: String, table: String): CatalogTable = withClient { -client.getTable(db, table) +restoreTableMetadata(client.getTable(db, table)) } override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient { -client.getTableOption(db, table) +client.getTableOption(db, table).map(restoreTableMetadata) + } + + /** + * Restores table metadata from the table properties if it's a datasouce table. This method is + * kind of a opposite version of [[createTable]]. + */ + private def restoreTableMetadata(table: CatalogTable): CatalogTable = { --- End diff -- add doc and list what this method do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75233379 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { --- End diff -- Previous code create a DataSource and resolve it, just to get the paths for this data source. We can get the path from data source options directly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75232851 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -81,6 +86,18 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu withClient { getTable(db, table) } } + /** + * If the given table properties contains datasource properties, throw an exception. + */ --- End diff -- Let's make it clear that this method is used when we want to write metadata to metastore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75066432 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { +case _ if skipHiveMetadata => + val message = +s"Persisting data source table $qualifiedTableName into Hive metastore in" + + "Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +// our bucketing is un-compatible with hive(different hash function) +case _ if tableDefinition.bucketSpec.nonEmpty => + val message = +s"Persisting bucketed data source table $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + (None, message) + +case (Some(serde), Some(path)) => + val message = +s"Persisting data source table $qualifiedTableName with a single input path " + + s"into Hive metastore in Hive compatible format." + (Some(newHiveCompatibleMetastoreTable(serde, path)), message) + +case (Some(_), None) => + val message = +s"Data source table $qualifiedTableName is not file based. Persisting it into " + + s"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +case _ => + val provider = tableDefinition.provider.get + val message = +s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + + s"Persisting data source table $qualifiedTableName into Hive metastore in " + + s"Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + } + + (hiveCompatibleTable, logMessage) match { +case (Some(table), message) => + // We first try to save the metadata of the table in a Hive compatible way. + // If Hive throws an error, we fall back to save its metadata in the Spark SQL + // specific way. + try { +logInfo(message) +saveTableIntoHive(table, ignoreIfExists) + } catch { +case NonFatal(e) => + val warningMessage = +s"Could not persist ${tableDefinition.identifier.quotedString} in a Hive " + + "compatible way. Persisting it into Hive metastore in Spark SQL specific format." + logWarning(warningMessage, e) + saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + } + +case (None, message) => + logWarning(message) + saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + } +} + } + + private def
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75066449 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") --- End diff -- yup --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75066364 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +348,73 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this only supports altering table properties and serde properties. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) --- End diff -- The comment in this method says we only support alter table properties and storage format. Maybe we should add an assert to make this assumption clear? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75066296 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +348,73 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this only supports altering table properties and serde properties. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) + + client.alterTable(newDef) +} } override def getTable(db: String, table: String): CatalogTable = withClient { -client.getTable(db, table) +restoreTableMetadata(client.getTable(db, table)) } override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient { -client.getTableOption(db, table) +client.getTableOption(db, table).map(restoreTableMetadata) + } + + /** + * Restores table metadata from the table properties if it's a datasouce table. This method is + * kind of a opposite version of [[createTable]]. + */ + private def restoreTableMetadata(table: CatalogTable): CatalogTable = { +if (table.tableType == VIEW) { + table +} else { + getProviderFromTableProperties(table).map { provider => --- End diff -- no, for `hive` we won't go to the `table meta to properties` branch, thus if we have provider in table properties, it must not be hive. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75066185 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -81,6 +86,18 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu withClient { getTable(db, table) } } + /** + * If the given table properties contains datasource properties, throw an exception. + */ + private def verifyTableProperties(table: CatalogTable): Unit = { +val datasourceKeys = table.properties.keys.filter(_.startsWith(DATASOURCE_PREFIX)) +if (datasourceKeys.nonEmpty) { + throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " + +s"as table property keys may not start with '$DATASOURCE_PREFIX': " + +datasourceKeys.mkString("[", ", ", "]")) +} + } --- End diff -- yea, e.g. `CreateTableLike`. But `restoreTableMetadata` generates table without the data source table properties, and should be ok here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75064724 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -81,6 +86,18 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu withClient { getTable(db, table) } } + /** + * If the given table properties contains datasource properties, throw an exception. + */ + private def verifyTableProperties(table: CatalogTable): Unit = { +val datasourceKeys = table.properties.keys.filter(_.startsWith(DATASOURCE_PREFIX)) +if (datasourceKeys.nonEmpty) { + throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " + +s"as table property keys may not start with '$DATASOURCE_PREFIX': " + +datasourceKeys.mkString("[", ", ", "]")) +} + } --- End diff -- Just realized one thing. Is it possible that we somehow create `table` based on a `CatalogTable` generated from `restoreTableMetadata`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75064560 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +348,73 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this only supports altering table properties and serde properties. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) + + client.alterTable(newDef) +} } override def getTable(db: String, table: String): CatalogTable = withClient { -client.getTable(db, table) +restoreTableMetadata(client.getTable(db, table)) } override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient { -client.getTableOption(db, table) +client.getTableOption(db, table).map(restoreTableMetadata) + } + + /** + * Restores table metadata from the table properties if it's a datasouce table. This method is + * kind of a opposite version of [[createTable]]. + */ + private def restoreTableMetadata(table: CatalogTable): CatalogTable = { +if (table.tableType == VIEW) { + table +} else { + getProviderFromTableProperties(table).map { provider => --- End diff -- `provider` can be `hive`, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75064362 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -200,22 +348,73 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu * Alter a table whose name that matches the one specified in `tableDefinition`, * assuming the table exists. * - * Note: As of now, this only supports altering table properties, serde properties, - * and num buckets! + * Note: As of now, this only supports altering table properties and serde properties. */ override def alterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) -client.alterTable(tableDefinition) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.alterTable(tableDefinition) +} else { + val oldDef = client.getTable(db, tableDefinition.identifier.table) + // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. + // Also add table meta properties to table properties, to retain the data source table format. + val newDef = tableDefinition.copy( +schema = oldDef.schema, +partitionColumnNames = oldDef.partitionColumnNames, +bucketSpec = oldDef.bucketSpec, +properties = tableMetadataToProperties(tableDefinition) ++ tableDefinition.properties) --- End diff -- If we only look at this method, it is not clear if the new `tableDefinition` changes other fields like `storage`. Also, we are using the existing `bucketSpec`. But, is it possible that we have a new `bucketSpec` in `tableDefinition`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75063920 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { +case _ if skipHiveMetadata => + val message = +s"Persisting data source table $qualifiedTableName into Hive metastore in" + + "Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +// our bucketing is un-compatible with hive(different hash function) +case _ if tableDefinition.bucketSpec.nonEmpty => + val message = +s"Persisting bucketed data source table $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + (None, message) + +case (Some(serde), Some(path)) => + val message = +s"Persisting data source table $qualifiedTableName with a single input path " + + s"into Hive metastore in Hive compatible format." + (Some(newHiveCompatibleMetastoreTable(serde, path)), message) + +case (Some(_), None) => + val message = +s"Data source table $qualifiedTableName is not file based. Persisting it into " + + s"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +case _ => + val provider = tableDefinition.provider.get + val message = +s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + + s"Persisting data source table $qualifiedTableName into Hive metastore in " + + s"Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + } + + (hiveCompatibleTable, logMessage) match { +case (Some(table), message) => + // We first try to save the metadata of the table in a Hive compatible way. + // If Hive throws an error, we fall back to save its metadata in the Spark SQL + // specific way. + try { +logInfo(message) +saveTableIntoHive(table, ignoreIfExists) + } catch { +case NonFatal(e) => + val warningMessage = +s"Could not persist ${tableDefinition.identifier.quotedString} in a Hive " + + "compatible way. Persisting it into Hive metastore in Spark SQL specific format." + logWarning(warningMessage, e) + saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + } + +case (None, message) => + logWarning(message) + saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + } +} + } + + private def
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75063736 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { +case _ if skipHiveMetadata => + val message = +s"Persisting data source table $qualifiedTableName into Hive metastore in" + + "Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +// our bucketing is un-compatible with hive(different hash function) +case _ if tableDefinition.bucketSpec.nonEmpty => + val message = +s"Persisting bucketed data source table $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + (None, message) + +case (Some(serde), Some(path)) => + val message = +s"Persisting data source table $qualifiedTableName with a single input path " + + s"into Hive metastore in Hive compatible format." + (Some(newHiveCompatibleMetastoreTable(serde, path)), message) + +case (Some(_), None) => + val message = +s"Data source table $qualifiedTableName is not file based. Persisting it into " + + s"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +case _ => + val provider = tableDefinition.provider.get + val message = +s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + + s"Persisting data source table $qualifiedTableName into Hive metastore in " + + s"Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + } + + (hiveCompatibleTable, logMessage) match { +case (Some(table), message) => + // We first try to save the metadata of the table in a Hive compatible way. + // If Hive throws an error, we fall back to save its metadata in the Spark SQL + // specific way. + try { +logInfo(message) +saveTableIntoHive(table, ignoreIfExists) + } catch { +case NonFatal(e) => + val warningMessage = +s"Could not persist ${tableDefinition.identifier.quotedString} in a Hive " + + "compatible way. Persisting it into Hive metastore in Spark SQL specific format." + logWarning(warningMessage, e) + saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + } + +case (None, message) => + logWarning(message) + saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + } +} + } + + private def
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75063676 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") + val skipHiveMetadata = tableDefinition.storage.properties +.getOrElse("skipHiveMetadata", "false").toBoolean + + val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match { +case _ if skipHiveMetadata => + val message = +s"Persisting data source table $qualifiedTableName into Hive metastore in" + + "Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +// our bucketing is un-compatible with hive(different hash function) +case _ if tableDefinition.bucketSpec.nonEmpty => + val message = +s"Persisting bucketed data source table $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + (None, message) + +case (Some(serde), Some(path)) => + val message = +s"Persisting data source table $qualifiedTableName with a single input path " + + s"into Hive metastore in Hive compatible format." + (Some(newHiveCompatibleMetastoreTable(serde, path)), message) + +case (Some(_), None) => + val message = +s"Data source table $qualifiedTableName is not file based. Persisting it into " + + s"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + +case _ => + val provider = tableDefinition.provider.get + val message = +s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + + s"Persisting data source table $qualifiedTableName into Hive metastore in " + + s"Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + } + + (hiveCompatibleTable, logMessage) match { +case (Some(table), message) => + // We first try to save the metadata of the table in a Hive compatible way. + // If Hive throws an error, we fall back to save its metadata in the Spark SQL + // specific way. + try { +logInfo(message) +saveTableIntoHive(table, ignoreIfExists) + } catch { +case NonFatal(e) => + val warningMessage = +s"Could not persist ${tableDefinition.identifier.quotedString} in a Hive " + + "compatible way. Persisting it into Hive metastore in Spark SQL specific format." + logWarning(warningMessage, e) + saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + } + +case (None, message) => + logWarning(message) + saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + } +} + } + + private def
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75063449 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") --- End diff -- I think this path will be set by the ddl command (e.g. `CreateDataSourceTableAsSelectCommand`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75063156 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy( + storage = tableDefinition.storage.copy( +locationUri = Some(new Path(path).toUri.toString), +inputFormat = serde.inputFormat, +outputFormat = serde.outputFormat, +serde = serde.serde + ), + properties = tableDefinition.properties ++ tableProperties) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get) + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") --- End diff -- If the create table command does not specify the location, does this `maybePath` contains the default location? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75063094 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) --- End diff -- Let's explain what will be put into this `tableProperties`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75062850 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +tableDefinition.copy( + schema = new StructType, + partitionColumnNames = Nil, + bucketSpec = None, + properties = tableDefinition.properties ++ tableProperties) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { --- End diff -- comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75062846 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val tableProperties = tableMetadataToProperties(tableDefinition) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { --- End diff -- comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r75062743 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,147 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { --- End diff -- Let's add comment to explain what we are doing at here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74852722 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -689,4 +689,38 @@ class HiveDDLSuite )) } } + + test("datasource table property keys are not allowed") { +import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_PREFIX + +withTable("tbl") { + sql("CREATE TABLE tbl(a INT) STORED AS parquet") + + val e = intercept[AnalysisException] { +sql(s"ALTER TABLE tbl SET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo' = 'loser')") + } + assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo")) --- End diff -- Yea. I think it makes sense to consider `ALTER TABLE tbl SET TBLPROPERTIES` a hive specific command. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74684229 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +161,141 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val provider = tableDefinition.provider.get + val partitionColumns = tableDefinition.partitionColumnNames + val bucketSpec = tableDefinition.bucketSpec + + val tableProperties = new scala.collection.mutable.HashMap[String, String] + tableProperties.put(DATASOURCE_PROVIDER, provider) + + // Serialized JSON schema string may be too long to be stored into a single metastore table + // property. In this case, we split the JSON string and store each part as a separate table + // property. + val threshold = 4000 + val schemaJsonString = tableDefinition.schema.json + // Split the JSON string. + val parts = schemaJsonString.grouped(threshold).toSeq + tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) + parts.zipWithIndex.foreach { case (part, index) => +tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part) + } + + if (partitionColumns.nonEmpty) { +tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString) +partitionColumns.zipWithIndex.foreach { case (partCol, index) => + tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol) +} + } + + if (bucketSpec.isDefined) { +val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get + +tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString) +tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString) +bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) => + tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol) +} + +if (sortColumnNames.nonEmpty) { + tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString) + sortColumnNames.zipWithIndex.foreach { case (sortCol, index) => + tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol) + } +} + } + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { +CatalogTable( + identifier = tableDefinition.identifier, + tableType = tableDefinition.tableType, + storage = tableDefinition.storage, + schema = new StructType(), + properties = tableDefinition.properties ++ tableProperties.toMap) + } + + def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = { +tableDefinition.copy(properties = tableProperties.toMap).withNewStorage( + locationUri = Some(new Path(path).toUri.toString), + inputFormat = serde.inputFormat, + outputFormat = serde.outputFormat, + serde = serde.serde) + } + + val qualifiedTableName = tableDefinition.identifier.quotedString + val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path") --- End diff -- Previous code create a `DataSource` and resolve it, just to get the paths for this data source. We can get the path from data source options directly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74680540 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -207,15 +310,52 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) +verifyTableProperties(tableDefinition) --- End diff -- good catch! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74680428 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -396,40 +393,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(e.message == "Found duplicate column(s) in bucket: a") } - test("Describe Table with Corrupted Schema") { -import testImplicits._ - -val tabName = "tab1" -withTempPath { dir => - val path = dir.getCanonicalPath - val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("col1", "col2") - df.write.format("json").save(path) - - withTable(tabName) { -sql( - s""" - |CREATE TABLE $tabName - |USING json - |OPTIONS ( - | path '$path' - |) - """.stripMargin) - -val catalog = spark.sessionState.catalog -val table = catalog.getTableMetadata(TableIdentifier(tabName)) -val newProperties = table.properties.filterKeys(key => - key != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS) -val newTable = table.copy(properties = newProperties) -catalog.alterTable(newTable) - -val e = intercept[AnalysisException] { - sql(s"DESC $tabName") -}.getMessage -assert(e.contains(s"Could not read schema from the metastore because it is corrupted")) - } -} - } --- End diff -- sorry forget to comment, it's moved to https://github.com/apache/spark/pull/14155/files#diff-21a2d022682644c5c1cda0a4cf4c4114R1196 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74680404 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -363,3 +503,82 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu } } + +object HiveExternalCatalog { + val DATASOURCE_PREFIX = "spark.sql.sources." + val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider" + val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" + val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." + val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" + val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" + val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" + val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets" + val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols" + val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part." + val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol." + val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." + val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." + + def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = { +metadata.properties.get(DATASOURCE_PROVIDER) + } + + def getOriginalTableProperties(metadata: CatalogTable): Map[String, String] = { +metadata.properties.filterNot { case (key, _) => key.startsWith(DATASOURCE_PREFIX) } --- End diff -- For a pre-build spark, there is no way to read table properties either. `CatalogTable` is not public and DESCRIBE TABLE will filter our data source properties in table properties. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74680384 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +162,101 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val provider = tableDefinition.provider.get + val partitionColumns = tableDefinition.partitionColumnNames + val bucketSpec = tableDefinition.bucketSpec + + val tableProperties = new scala.collection.mutable.HashMap[String, String] + tableProperties.put(DATASOURCE_PROVIDER, provider) + + // Serialized JSON schema string may be too long to be stored into a single metastore table + // property. In this case, we split the JSON string and store each part as a separate table + // property. + val threshold = 4000 --- End diff -- do you mean put this conf in data source options? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74680344 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -81,6 +86,19 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu withClient { getTable(db, table) } } + /** + * If the given table properties contains datasource properties, throw an exception. + */ + private def verifyTableProperties(table: CatalogTable): Unit = { +val datasourceKeys = (table.properties.keys ++ table.storage.properties.keys) + .filter(_.startsWith(DATASOURCE_PREFIX)) --- End diff -- I moved it to https://github.com/apache/spark/pull/14506. If you agree with it now, I can include it in this PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74665317 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -207,15 +310,52 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) +verifyTableProperties(tableDefinition) --- End diff -- alterTable is my main concern. It is possible that `tableDefinition` is in the standard form. But, the actual metadata stored in the metastore is using properties to store schema. Then, the alterTable may just fail or significantly change the metadata in metastore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74665106 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -81,6 +86,19 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu withClient { getTable(db, table) } } + /** + * If the given table properties contains datasource properties, throw an exception. + */ + private def verifyTableProperties(table: CatalogTable): Unit = { +val datasourceKeys = (table.properties.keys ++ table.storage.properties.keys) + .filter(_.startsWith(DATASOURCE_PREFIX)) +if (datasourceKeys.nonEmpty) { + throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " + +s"as table/storage property keys may not start with '$DATASOURCE_PREFIX': " + +datasourceKeys.mkString("[", ", ", "]")) +} + } --- End diff -- nvm. This is for writing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74665091 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -81,6 +86,19 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu withClient { getTable(db, table) } } + /** + * If the given table properties contains datasource properties, throw an exception. + */ + private def verifyTableProperties(table: CatalogTable): Unit = { +val datasourceKeys = (table.properties.keys ++ table.storage.properties.keys) + .filter(_.startsWith(DATASOURCE_PREFIX)) --- End diff -- I think we should not include `table.storage.properties`, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74665033 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -363,3 +503,82 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu } } + +object HiveExternalCatalog { + val DATASOURCE_PREFIX = "spark.sql.sources." + val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider" + val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" + val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." + val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" + val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" + val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" + val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets" + val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols" + val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part." + val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol." + val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." + val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." + + def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = { +metadata.properties.get(DATASOURCE_PROVIDER) + } + + def getOriginalTableProperties(metadata: CatalogTable): Map[String, String] = { +metadata.properties.filterNot { case (key, _) => key.startsWith(DATASOURCE_PREFIX) } --- End diff -- For a pre-built spark, there is no way to read the raw data, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74664610 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -207,15 +310,52 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) +verifyTableProperties(tableDefinition) --- End diff -- For data source tables, seems we also need to adjust the metadata? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74664134 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +162,101 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val provider = tableDefinition.provider.get + val partitionColumns = tableDefinition.partitionColumnNames + val bucketSpec = tableDefinition.bucketSpec + + val tableProperties = new scala.collection.mutable.HashMap[String, String] + tableProperties.put(DATASOURCE_PROVIDER, provider) + + // Serialized JSON schema string may be too long to be stored into a single metastore table + // property. In this case, we split the JSON string and store each part as a separate table + // property. + val threshold = 4000 --- End diff -- Should we consider this conf as an immutable one and pass the value in when we create the external catalog? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74664001 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -81,6 +86,19 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu withClient { getTable(db, table) } } + /** + * If the given table properties contains datasource properties, throw an exception. + */ + private def verifyTableProperties(table: CatalogTable): Unit = { +val datasourceKeys = (table.properties.keys ++ table.storage.properties.keys) + .filter(_.startsWith(DATASOURCE_PREFIX)) +if (datasourceKeys.nonEmpty) { + throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " + +s"as table/storage property keys may not start with '$DATASOURCE_PREFIX': " + +datasourceKeys.mkString("[", ", ", "]")) +} + } --- End diff -- actually, I am wondering if we should keep it in the properties? So, it is easy for us to debug? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74663776 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -396,40 +393,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(e.message == "Found duplicate column(s) in bucket: a") } - test("Describe Table with Corrupted Schema") { -import testImplicits._ - -val tabName = "tab1" -withTempPath { dir => - val path = dir.getCanonicalPath - val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("col1", "col2") - df.write.format("json").save(path) - - withTable(tabName) { -sql( - s""" - |CREATE TABLE $tabName - |USING json - |OPTIONS ( - | path '$path' - |) - """.stripMargin) - -val catalog = spark.sessionState.catalog -val table = catalog.getTableMetadata(TableIdentifier(tabName)) -val newProperties = table.properties.filterKeys(key => - key != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS) -val newTable = table.copy(properties = newProperties) -catalog.alterTable(newTable) - -val e = intercept[AnalysisException] { - sql(s"DESC $tabName") -}.getMessage -assert(e.contains(s"Could not read schema from the metastore because it is corrupted")) - } -} - } --- End diff -- do we still have this test? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74184835 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -363,3 +503,82 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu } } + +object HiveExternalCatalog { + val DATASOURCE_PREFIX = "spark.sql.sources." + val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider" + val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" + val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." + val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" + val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" + val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" + val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets" + val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols" + val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part." + val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol." + val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." + val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." + + def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = { +metadata.properties.get(DATASOURCE_PROVIDER) + } + + def getOriginalTableProperties(metadata: CatalogTable): Map[String, String] = { +metadata.properties.filterNot { case (key, _) => key.startsWith(DATASOURCE_PREFIX) } --- End diff -- For debug, we can read the table metadata from hive client to see the raw data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74184777 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -81,6 +86,19 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu withClient { getTable(db, table) } } + /** + * If the given table properties contains datasource properties, throw an exception. + */ + private def verifyTableProperties(table: CatalogTable): Unit = { +val datasourceKeys = (table.properties.keys ++ table.storage.properties.keys) + .filter(_.startsWith(DATASOURCE_PREFIX)) +if (datasourceKeys.nonEmpty) { + throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " + +s"as table/storage property keys may not start with '$DATASOURCE_PREFIX': " + +datasourceKeys.mkString("[", ", ", "]")) +} + } --- End diff -- yup, this logic is moved from `DDLUtils.verifyTableProperties` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74184724 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -93,7 +92,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { .add("col2", "string") .add("a", "int") .add("b", "int"), - provider = Some("parquet"), + provider = Some("hive"), --- End diff -- Actually it was a mistake. This suite needs to test `set location` or something which is only valid for hive tables. However, previously we only set the provider name in provider field, but not the table properties. Then in the commands we will mistakenly treat it as hive table and won't throw exception. Now the hive hacks are all moved to external catalog, so setting provider name in provider field is enough. And we need to use `hive` for hive tables. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74166832 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala --- @@ -18,21 +18,32 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.StructType class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import testImplicits._ protected override def beforeAll(): Unit = { super.beforeAll() -sql( - """ -|CREATE TABLE parquet_tab1 (c1 INT, c2 STRING) -|USING org.apache.spark.sql.parquet.DefaultSource - """.stripMargin) --- End diff -- no way. The CREATE TABLE syntax for data source table doesn't allow specify table properties and we didn't support creating data source table using session catalog before because the hive hacks are in command. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74166431 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -689,4 +689,38 @@ class HiveDDLSuite )) } } + + test("datasource table property keys are not allowed") { +import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_PREFIX + +withTable("tbl") { + sql("CREATE TABLE tbl(a INT) STORED AS parquet") + + val e = intercept[AnalysisException] { +sql(s"ALTER TABLE tbl SET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo' = 'loser')") + } + assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo")) --- End diff -- The `HiveExternalCatalog`. My concern is that, this limitation is because of the tricks for hive metastore, so it should only exist in `HiveExternalCatalog`, the `InMemoryCatatalog` and our new metastore in the future should not have this limitation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74166239 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -229,10 +230,8 @@ case class AlterTableSetPropertiesCommand( extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { -val ident = if (isView) "VIEW" else "TABLE" val catalog = sparkSession.sessionState.catalog DDLUtils.verifyAlterTableType(catalog, tableName, isView) -DDLUtils.verifyTableProperties(properties.keys.toSeq, s"ALTER $ident") --- End diff -- it's moved to `HiveExternalCatalog` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74131369 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala --- @@ -18,21 +18,32 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.StructType class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import testImplicits._ protected override def beforeAll(): Unit = { super.beforeAll() -sql( - """ -|CREATE TABLE parquet_tab1 (c1 INT, c2 STRING) -|USING org.apache.spark.sql.parquet.DefaultSource - """.stripMargin) --- End diff -- How did we originally add `my_key1`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74131072 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala --- @@ -70,64 +69,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { override def load(in: QualifiedTableName): LogicalPlan = { logDebug(s"Creating new cached data source for $in") -val table = client.getTable(in.database, in.name) +val table = sparkSession.sharedState.externalCatalog.getTable(in.database, in.name) -// TODO: the following code is duplicated with FindDataSourceTable.readDataSourceTable --- End diff -- nice! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74130831 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -363,3 +503,82 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu } } + +object HiveExternalCatalog { + val DATASOURCE_PREFIX = "spark.sql.sources." + val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider" + val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" + val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." + val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" + val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" + val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" + val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets" + val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols" + val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part." + val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol." + val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." + val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." + + def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = { +metadata.properties.get(DATASOURCE_PROVIDER) + } + + def getOriginalTableProperties(metadata: CatalogTable): Map[String, String] = { +metadata.properties.filterNot { case (key, _) => key.startsWith(DATASOURCE_PREFIX) } --- End diff -- I am wondering if it is still helpful to keep those in the properties (they may be useful when we want to debug an issue). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74130452 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -144,16 +162,101 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireDbExists(db) +verifyTableProperties(tableDefinition) + +if (tableDefinition.provider == Some("hive") || tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) +} else { + val provider = tableDefinition.provider.get + val partitionColumns = tableDefinition.partitionColumnNames + val bucketSpec = tableDefinition.bucketSpec + + val tableProperties = new scala.collection.mutable.HashMap[String, String] + tableProperties.put(DATASOURCE_PROVIDER, provider) + + // Serialized JSON schema string may be too long to be stored into a single metastore table + // property. In this case, we split the JSON string and store each part as a separate table + // property. + val threshold = 4000 --- End diff -- Let's me see if we can still get the conf value. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74130034 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -81,6 +86,19 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu withClient { getTable(db, table) } } + /** + * If the given table properties contains datasource properties, throw an exception. + */ + private def verifyTableProperties(table: CatalogTable): Unit = { +val datasourceKeys = (table.properties.keys ++ table.storage.properties.keys) + .filter(_.startsWith(DATASOURCE_PREFIX)) +if (datasourceKeys.nonEmpty) { + throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " + +s"as table/storage property keys may not start with '$DATASOURCE_PREFIX': " + +datasourceKeys.mkString("[", ", ", "]")) +} + } --- End diff -- Oh, I see. You do not want `CatalogTable`'s properties have keys starting with `DATASOURCE_PREFIX`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74129714 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -81,6 +86,19 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu withClient { getTable(db, table) } } + /** + * If the given table properties contains datasource properties, throw an exception. + */ + private def verifyTableProperties(table: CatalogTable): Unit = { +val datasourceKeys = (table.properties.keys ++ table.storage.properties.keys) + .filter(_.startsWith(DATASOURCE_PREFIX)) +if (datasourceKeys.nonEmpty) { + throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " + +s"as table/storage property keys may not start with '$DATASOURCE_PREFIX': " + +datasourceKeys.mkString("[", ", ", "]")) +} + } --- End diff -- How is it used? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74129515 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -689,4 +689,38 @@ class HiveDDLSuite )) } } + + test("datasource table property keys are not allowed") { +import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_PREFIX + +withTable("tbl") { + sql("CREATE TABLE tbl(a INT) STORED AS parquet") + + val e = intercept[AnalysisException] { +sql(s"ALTER TABLE tbl SET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo' = 'loser')") + } + assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo")) --- End diff -- ah, I guess I am confused. Sorry. Who will throw the exception? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74128927 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -93,7 +92,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { .add("col2", "string") .add("a", "int") .add("b", "int"), - provider = Some("parquet"), + provider = Some("hive"), --- End diff -- Will we lose test coverage after changing this line? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r74127228 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -229,10 +230,8 @@ case class AlterTableSetPropertiesCommand( extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { -val ident = if (isView) "VIEW" else "TABLE" val catalog = sparkSession.sessionState.catalog DDLUtils.verifyAlterTableType(catalog, tableName, isView) -DDLUtils.verifyTableProperties(properties.keys.toSeq, s"ALTER $ident") --- End diff -- I am wondering if it is possible to not change this part in this part? We can decide if we want to remove it in its individual pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r73815242 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -1218,11 +1151,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { intercept[AnalysisException] { sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')") } -// serde properties must not be a datasource property -val e = intercept[AnalysisException] { --- End diff -- moved to `HiveDDLSuite` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r73815246 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -1523,12 +1451,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - test("create table with datasource properties (not allowed)") { -assertUnsupported("CREATE TABLE my_tab TBLPROPERTIES ('spark.sql.sources.me'='anything')") -assertUnsupported("CREATE TABLE my_tab ROW FORMAT SERDE 'serde' " + --- End diff -- moved to `HiveDDLSuite` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r73815233 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -1109,11 +1047,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { // property to unset does not exist, but "IF EXISTS" is specified sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')") assert(getProps == Map("x" -> "y")) -// datasource table property keys are not allowed -val e2 = intercept[AnalysisException] { --- End diff -- moved to `HiveDDLSuite` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14155: [SPARK-16498][SQL] move hive hack for data source...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/14155#discussion_r73727982 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -301,9 +298,6 @@ case class AlterTableSerDePropertiesCommand( "ALTER TABLE attempted to set neither serde class name nor serde properties") override def run(sparkSession: SparkSession): Seq[Row] = { -DDLUtils.verifyTableProperties( --- End diff -- yea. Let's discuss it. Can we revert these changes for now? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org