This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 6b848f028ecd628673be2b4154c675ff03227e42 Author: Rex(Hui) An <bonean...@gmail.com> AuthorDate: Tue Aug 15 09:02:04 2023 +0800 [HUDI-6676] Add command for CreateHoodieTableLike (#9412) * add command for CreateHoodieTableLike * don't support spark2 --- .../spark/sql/HoodieCatalystPlansUtils.scala | 7 ++ .../org/apache/spark/sql/hudi/SparkAdapter.scala | 8 +- .../apache/spark/sql/hudi/HoodieOptionConfig.scala | 8 ++ .../command/CreateHoodieTableLikeCommand.scala | 110 ++++++++++++++++ .../spark/sql/hudi/analysis/HoodieAnalysis.scala | 13 +- .../apache/spark/sql/hudi/TestCreateTable.scala | 139 +++++++++++++++++++++ .../spark/sql/HoodieSpark2CatalystPlanUtils.scala | 9 ++ .../spark/sql/HoodieSpark3CatalystPlanUtils.scala | 13 +- 8 files changed, 302 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala index 58789681c54..9cfe23f86cc 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} @@ -93,6 +94,12 @@ trait HoodieCatalystPlansUtils { */ def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] + /** + * Decomposes [[CreateTableLikeCommand]] into its arguments allowing to accommodate for API + * changes in Spark 3 + */ + def unapplyCreateTableLikeCommand(plan: LogicalPlan): Option[(TableIdentifier, TableIdentifier, CatalogStorageFormat, Option[String], Map[String, String], Boolean)] + /** * Rebases instance of {@code InsertIntoStatement} onto provided instance of {@code targetTable} and {@code query} */ diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 041beba95df..1c6111afe47 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -150,11 +150,11 @@ trait SparkAdapter extends Serializable { } def isHoodieTable(map: java.util.Map[String, String]): Boolean = { - map.getOrDefault("provider", "").equals("hudi") + isHoodieTable(map.getOrDefault("provider", "")) } def isHoodieTable(table: CatalogTable): Boolean = { - table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi" + isHoodieTable(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull) } def isHoodieTable(tableId: TableIdentifier, spark: SparkSession): Boolean = { @@ -162,6 +162,10 @@ trait SparkAdapter extends Serializable { isHoodieTable(table) } + def isHoodieTable(provider: String): Boolean = { + "hudi".equalsIgnoreCase(provider) + } + /** * Create instance of [[ParquetFileFormat]] */ diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index d715a108d62..abe98bb46cf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -182,6 +182,14 @@ object HoodieOptionConfig { options.filterNot(_._1.startsWith("hoodie.")).filterNot(kv => sqlOptionKeyToWriteConfigKey.contains(kv._1)) } + /** + * The opposite of `deleteHoodieOptions`, this method extract all hoodie related + * options(start with `hoodie.` and all sql options) + */ + def extractHoodieOptions(options: Map[String, String]): Map[String, String] = { + options.filter(_._1.startsWith("hoodie.")) ++ extractSqlOptions(options) + } + // extract primaryKey, preCombineField, type options def extractSqlOptions(options: Map[String, String]): Map[String, String] = { val sqlOptions = mapTableConfigsToSqlOptions(options) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableLikeCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableLikeCommand.scala new file mode 100644 index 00000000000..dc4458d8ad1 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableLikeCommand.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.util.ConfigUtils +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HoodieCatalogTable} +import org.apache.spark.sql.hudi.HoodieOptionConfig + +import scala.util.control.NonFatal + +case class CreateHoodieTableLikeCommand(targetTable: TableIdentifier, + sourceTable: TableIdentifier, + fileFormat: CatalogStorageFormat, + properties: Map[String, String] = Map.empty, + ignoreIfExists: Boolean) + extends HoodieLeafRunnableCommand with SparkAdapterSupport { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + + val tableIsExists = catalog.tableExists(targetTable) + if (tableIsExists) { + if (ignoreIfExists) { + // scalastyle:off + return Seq.empty[Row] + // scalastyle:on + } else { + throw new IllegalArgumentException(s"Table $targetTable already exists.") + } + } + + val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable) + + val newStorage = if (fileFormat.inputFormat.isDefined) { + fileFormat + } else { + sourceTableDesc.storage.copy(locationUri = fileFormat.locationUri) + } + + // If the location is specified, we create an external table internally. + // Otherwise create a managed table. + val tblType = if (newStorage.locationUri.isEmpty) { + CatalogTableType.MANAGED + } else { + CatalogTableType.EXTERNAL + } + + val targetTableProperties = if (sparkAdapter.isHoodieTable(sourceTableDesc)) { + HoodieOptionConfig.extractHoodieOptions(sourceTableDesc.properties) ++ properties + } else { + properties + } + + val newTableDesc = CatalogTable( + identifier = targetTable, + tableType = tblType, + storage = newStorage, + schema = sourceTableDesc.schema, + provider = Some("hudi"), + partitionColumnNames = sourceTableDesc.partitionColumnNames, + bucketSpec = sourceTableDesc.bucketSpec, + properties = targetTableProperties, + tracksPartitionsInCatalog = sourceTableDesc.tracksPartitionsInCatalog) + + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTableDesc) + // check if there are conflict between table configs defined in hoodie table and properties defined in catalog. + CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable) + + val queryAsProp = hoodieCatalogTable.catalogProperties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) + if (queryAsProp.isEmpty) { + // init hoodie table for a normal table (not a ro/rt table) + hoodieCatalogTable.initHoodieTable() + } else { + if (!hoodieCatalogTable.hoodieTableExists) { + throw new AnalysisException("Creating ro/rt table need the existence of the base table.") + } + if (HoodieTableType.MERGE_ON_READ != hoodieCatalogTable.tableType) { + throw new AnalysisException("Creating ro/rt table should only apply to a mor table.") + } + } + + try { + // create catalog table for this hoodie table + CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, ignoreIfExists, queryAsProp) + } catch { + case NonFatal(e) => + logWarning("Failed to create catalog table in metastore", e) + } + Seq.empty[Row] + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 3c2d41aa582..24820c1c032 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -20,8 +20,9 @@ package org.apache.spark.sql.hudi.analysis import org.apache.hudi.common.util.ReflectionUtils import org.apache.hudi.common.util.ReflectionUtils.loadClass import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSeq, Expression, GenericInternalRow} import org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions import org.apache.spark.sql.catalyst.plans.logical._ @@ -29,7 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isMetaField, removeMetaFields} -import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchInsertIntoStatement, MatchMergeIntoTable, ResolvesToHudiTable, sparkAdapter} +import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchCreateTableLike, MatchInsertIntoStatement, MatchMergeIntoTable, ResolvesToHudiTable, sparkAdapter} import org.apache.spark.sql.hudi.command._ import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedures, Procedure, ProcedureArgs} import org.apache.spark.sql.{AnalysisException, SparkSession} @@ -348,6 +349,11 @@ object HoodieAnalysis extends SparkAdapterSupport { sparkAdapter.resolveHoodieTable(plan) } + private[sql] object MatchCreateTableLike { + def unapply(plan: LogicalPlan): Option[(TableIdentifier, TableIdentifier, CatalogStorageFormat, Option[String], Map[String, String], Boolean)] = + sparkAdapter.getCatalystPlanUtils.unapplyCreateTableLikeCommand(plan) + } + private[sql] def failAnalysis(msg: String): Nothing = { throw new AnalysisException(msg) } @@ -504,6 +510,9 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic case CreateDataSourceTableCommand(table, ignoreIfExists) if sparkAdapter.isHoodieTable(table) => CreateHoodieTableCommand(table, ignoreIfExists) + case MatchCreateTableLike(targetTable, sourceTable, fileFormat, provider, properties, ifNotExists) + if sparkAdapter.isHoodieTable(provider.orNull) => + CreateHoodieTableLikeCommand(targetTable, sourceTable, fileFormat, properties, ifNotExists) // Rewrite the DropTableCommand to DropHoodieTableCommand case DropTableCommand(tableName, ifExists, false, purge) if sparkSession.sessionState.catalog.tableExists(tableName) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index a5ddd7ca854..bc3540ebf50 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -405,6 +405,145 @@ class TestCreateTable extends HoodieSparkSqlTestBase { } } + test("Test create table like") { + if (HoodieSparkUtils.gteqSpark3_1) { + // 1. Test create table from an existing HUDI table + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + withTable(generateTableName) { sourceTable => + spark.sql( + s""" + |create table $sourceTable ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id,name', + | type = '$tableType' + | ) + | location '${tmp.getCanonicalPath}/$sourceTable'""".stripMargin) + + // 1.1 Test Managed table + withTable(generateTableName) { targetTable => + spark.sql( + s""" + |create table $targetTable + |like $sourceTable + |using hudi""".stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTable)) + + assertResult(targetTable)(table.identifier.table) + assertResult("hudi")(table.provider.get) + assertResult(CatalogTableType.MANAGED)(table.tableType) + assertResult( + HoodieRecord.HOODIE_META_COLUMNS.asScala.map(StructField(_, StringType)) + ++ Seq( + StructField("id", IntegerType), + StructField("name", StringType), + StructField("price", DoubleType), + StructField("ts", LongType)) + )(table.schema.fields) + assertResult(tableType)(table.properties("type")) + assertResult("id,name")(table.properties("primaryKey")) + + // target table already exist + assertThrows[IllegalArgumentException] { + spark.sql( + s""" + |create table $targetTable + |like $sourceTable + |using hudi""".stripMargin) + } + + // should ignore if the table already exist + spark.sql( + s""" + |create table if not exists $targetTable + |like $sourceTable + |using hudi""".stripMargin) + } + + // 1.2 Test External table + withTable(generateTableName) { targetTable => + spark.sql( + s""" + |create table $targetTable + |like $sourceTable + |using hudi + |location '${tmp.getCanonicalPath}/$targetTable'""".stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTable)) + assertResult(CatalogTableType.EXTERNAL)(table.tableType) + } + + + // 1.3 New target table options should override source table's + withTable(generateTableName) { targetTable => + spark.sql( + s""" + |create table $targetTable + |like $sourceTable + |using hudi + |tblproperties (primaryKey = 'id')""".stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTable)) + assertResult("id")(table.properties("primaryKey")) + } + } + } + } + + // 2. Test create table from an existing non-HUDI table + withTempDir { tmp => + withTable(generateTableName) { sourceTable => + spark.sql( + s""" + |create table $sourceTable ( + | id int, + | name string, + | price double, + | ts long + |) using parquet + | tblproperties ( + | non.hoodie.property='value' + | ) + | location '${tmp.getCanonicalPath}/$sourceTable'""".stripMargin) + + withTable(generateTableName) { targetTable => + spark.sql( + s""" + |create table $targetTable + |like $sourceTable + |using hudi + |tblproperties ( + | primaryKey = 'id,name', + | type = 'cow' + |)""".stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTable)) + + assertResult(targetTable)(table.identifier.table) + assertResult("hudi")(table.provider.get) + assertResult(CatalogTableType.MANAGED)(table.tableType) + assertResult( + HoodieRecord.HOODIE_META_COLUMNS.asScala.map(StructField(_, StringType)) + ++ Seq( + StructField("id", IntegerType), + StructField("name", StringType), + StructField("price", DoubleType), + StructField("ts", LongType)) + )(table.schema.fields) + + // Should not include non.hoodie.property + assertResult(2)(table.properties.size) + assertResult("cow")(table.properties("type")) + assertResult("id,name")(table.properties("primaryKey")) + } + } + } + } + } + test("Test Create Table As Select With Auto record key gen") { withTempDir { tmp => // Create Non-Partitioned table diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala index cdb4c5226a6..6fb1719cede 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import org.apache.hudi.SparkHoodieTableFileIndex import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -68,6 +69,14 @@ object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils { } } + /** + * Don't support CreateTableLike in spark2, since spark2 doesn't support passing + * provider, whereas HUDI can't identify whether the targetTable is a HUDI table or not. + */ + override def unapplyCreateTableLikeCommand(plan: LogicalPlan): Option[(TableIdentifier, TableIdentifier, CatalogStorageFormat, Option[String], Map[String, String], Boolean)] = { + None + } + def rebaseInsertIntoStatement(iis: LogicalPlan, targetTable: LogicalPlan, query: LogicalPlan): LogicalPlan = iis.asInstanceOf[InsertIntoTable].copy(table = targetTable, query = query) diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala index cd8d0ca6a70..a01cce70c1f 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql import org.apache.hudi.SparkAdapterSupport +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableOutputResolver +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LeafNode, LogicalPlan} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} -import org.apache.spark.sql.execution.command.ExplainCommand +import org.apache.spark.sql.execution.command.{CreateTableLikeCommand, ExplainCommand} import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -63,6 +65,15 @@ trait HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils { } } + + override def unapplyCreateTableLikeCommand(plan: LogicalPlan): Option[(TableIdentifier, TableIdentifier, CatalogStorageFormat, Option[String], Map[String, String], Boolean)] = { + plan match { + case CreateTableLikeCommand(targetTable, sourceTable, fileFormat, provider, properties, ifNotExists) => + Some(targetTable, sourceTable, fileFormat, provider, properties, ifNotExists) + case _ => None + } + } + def rebaseInsertIntoStatement(iis: LogicalPlan, targetTable: LogicalPlan, query: LogicalPlan): LogicalPlan = iis.asInstanceOf[InsertIntoStatement].copy(table = targetTable, query = query)