This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6b848f028ecd628673be2b4154c675ff03227e42
Author: Rex(Hui) An <bonean...@gmail.com>
AuthorDate: Tue Aug 15 09:02:04 2023 +0800

    [HUDI-6676] Add command for CreateHoodieTableLike (#9412)
    
    * add command for CreateHoodieTableLike
    * don't support spark2
---
 .../spark/sql/HoodieCatalystPlansUtils.scala       |   7 ++
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   8 +-
 .../apache/spark/sql/hudi/HoodieOptionConfig.scala |   8 ++
 .../command/CreateHoodieTableLikeCommand.scala     | 110 ++++++++++++++++
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |  13 +-
 .../apache/spark/sql/hudi/TestCreateTable.scala    | 139 +++++++++++++++++++++
 .../spark/sql/HoodieSpark2CatalystPlanUtils.scala  |   9 ++
 .../spark/sql/HoodieSpark3CatalystPlanUtils.scala  |  13 +-
 8 files changed, 302 insertions(+), 5 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index 58789681c54..9cfe23f86cc 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
@@ -93,6 +94,12 @@ trait HoodieCatalystPlansUtils {
    */
   def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, 
Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]
 
+  /**
+   * Decomposes [[CreateTableLikeCommand]] into its arguments allowing to 
accommodate for API
+   * changes in Spark 3
+   */
+  def unapplyCreateTableLikeCommand(plan: LogicalPlan): 
Option[(TableIdentifier, TableIdentifier, CatalogStorageFormat, Option[String], 
Map[String, String], Boolean)]
+
   /**
    * Rebases instance of {@code InsertIntoStatement} onto provided instance of 
{@code targetTable} and {@code query}
    */
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 041beba95df..1c6111afe47 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -150,11 +150,11 @@ trait SparkAdapter extends Serializable {
   }
 
   def isHoodieTable(map: java.util.Map[String, String]): Boolean = {
-    map.getOrDefault("provider", "").equals("hudi")
+    isHoodieTable(map.getOrDefault("provider", ""))
   }
 
   def isHoodieTable(table: CatalogTable): Boolean = {
-    table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
+    isHoodieTable(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull)
   }
 
   def isHoodieTable(tableId: TableIdentifier, spark: SparkSession): Boolean = {
@@ -162,6 +162,10 @@ trait SparkAdapter extends Serializable {
     isHoodieTable(table)
   }
 
+  def isHoodieTable(provider: String): Boolean = {
+    "hudi".equalsIgnoreCase(provider)
+  }
+
   /**
    * Create instance of [[ParquetFileFormat]]
    */
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index d715a108d62..abe98bb46cf 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -182,6 +182,14 @@ object HoodieOptionConfig {
     options.filterNot(_._1.startsWith("hoodie.")).filterNot(kv => 
sqlOptionKeyToWriteConfigKey.contains(kv._1))
   }
 
+  /**
+   * The opposite of `deleteHoodieOptions`, this method extract all hoodie 
related
+   * options(start with `hoodie.` and all sql options)
+   */
+  def extractHoodieOptions(options: Map[String, String]): Map[String, String] 
= {
+    options.filter(_._1.startsWith("hoodie.")) ++ extractSqlOptions(options)
+  }
+
   // extract primaryKey, preCombineField, type options
   def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
     val sqlOptions = mapTableConfigsToSqlOptions(options)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableLikeCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableLikeCommand.scala
new file mode 100644
index 00000000000..dc4458d8ad1
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableLikeCommand.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.util.ConfigUtils
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType, HoodieCatalogTable}
+import org.apache.spark.sql.hudi.HoodieOptionConfig
+
+import scala.util.control.NonFatal
+
+case class CreateHoodieTableLikeCommand(targetTable: TableIdentifier,
+                                        sourceTable: TableIdentifier,
+                                        fileFormat: CatalogStorageFormat,
+                                        properties: Map[String, String] = 
Map.empty,
+                                        ignoreIfExists: Boolean)
+  extends HoodieLeafRunnableCommand with SparkAdapterSupport {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
+
+    val tableIsExists = catalog.tableExists(targetTable)
+    if (tableIsExists) {
+      if (ignoreIfExists) {
+        // scalastyle:off
+        return Seq.empty[Row]
+        // scalastyle:on
+      } else {
+        throw new IllegalArgumentException(s"Table $targetTable already 
exists.")
+      }
+    }
+
+    val sourceTableDesc = 
catalog.getTempViewOrPermanentTableMetadata(sourceTable)
+
+    val newStorage = if (fileFormat.inputFormat.isDefined) {
+      fileFormat
+    } else {
+      sourceTableDesc.storage.copy(locationUri = fileFormat.locationUri)
+    }
+
+    // If the location is specified, we create an external table internally.
+    // Otherwise create a managed table.
+    val tblType = if (newStorage.locationUri.isEmpty) {
+      CatalogTableType.MANAGED
+    } else {
+      CatalogTableType.EXTERNAL
+    }
+
+    val targetTableProperties = if 
(sparkAdapter.isHoodieTable(sourceTableDesc)) {
+      HoodieOptionConfig.extractHoodieOptions(sourceTableDesc.properties) ++ 
properties
+    } else {
+      properties
+    }
+
+    val newTableDesc = CatalogTable(
+      identifier = targetTable,
+      tableType = tblType,
+      storage = newStorage,
+      schema = sourceTableDesc.schema,
+      provider = Some("hudi"),
+      partitionColumnNames = sourceTableDesc.partitionColumnNames,
+      bucketSpec = sourceTableDesc.bucketSpec,
+      properties = targetTableProperties,
+      tracksPartitionsInCatalog = sourceTableDesc.tracksPartitionsInCatalog)
+
+    val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTableDesc)
+    // check if there are conflict between table configs defined in hoodie 
table and properties defined in catalog.
+    CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable)
+
+    val queryAsProp = 
hoodieCatalogTable.catalogProperties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
+    if (queryAsProp.isEmpty) {
+      // init hoodie table for a normal table (not a ro/rt table)
+      hoodieCatalogTable.initHoodieTable()
+    } else {
+      if (!hoodieCatalogTable.hoodieTableExists) {
+        throw new AnalysisException("Creating ro/rt table need the existence 
of the base table.")
+      }
+      if (HoodieTableType.MERGE_ON_READ != hoodieCatalogTable.tableType) {
+        throw new AnalysisException("Creating ro/rt table should only apply to 
a mor table.")
+      }
+    }
+
+    try {
+      // create catalog table for this hoodie table
+      CreateHoodieTableCommand.createTableInCatalog(sparkSession, 
hoodieCatalogTable, ignoreIfExists, queryAsProp)
+    } catch {
+      case NonFatal(e) =>
+        logWarning("Failed to create catalog table in metastore", e)
+    }
+    Seq.empty[Row]
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 3c2d41aa582..24820c1c032 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -20,8 +20,9 @@ package org.apache.spark.sql.hudi.analysis
 import org.apache.hudi.common.util.ReflectionUtils
 import org.apache.hudi.common.util.ReflectionUtils.loadClass
 import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSeq, 
Expression, GenericInternalRow}
 import org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -29,7 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{CreateTable, 
LogicalRelation}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isMetaField, 
removeMetaFields}
-import 
org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchInsertIntoStatement, 
MatchMergeIntoTable, ResolvesToHudiTable, sparkAdapter}
+import 
org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchCreateTableLike, 
MatchInsertIntoStatement, MatchMergeIntoTable, ResolvesToHudiTable, 
sparkAdapter}
 import org.apache.spark.sql.hudi.command._
 import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedures, 
Procedure, ProcedureArgs}
 import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -348,6 +349,11 @@ object HoodieAnalysis extends SparkAdapterSupport {
       sparkAdapter.resolveHoodieTable(plan)
   }
 
+  private[sql] object MatchCreateTableLike {
+    def unapply(plan: LogicalPlan): Option[(TableIdentifier, TableIdentifier, 
CatalogStorageFormat, Option[String], Map[String, String], Boolean)] =
+      sparkAdapter.getCatalystPlanUtils.unapplyCreateTableLikeCommand(plan)
+  }
+
   private[sql] def failAnalysis(msg: String): Nothing = {
     throw new AnalysisException(msg)
   }
@@ -504,6 +510,9 @@ case class HoodiePostAnalysisRule(sparkSession: 
SparkSession) extends Rule[Logic
       case CreateDataSourceTableCommand(table, ignoreIfExists)
         if sparkAdapter.isHoodieTable(table) =>
         CreateHoodieTableCommand(table, ignoreIfExists)
+      case MatchCreateTableLike(targetTable, sourceTable, fileFormat, 
provider, properties, ifNotExists)
+        if sparkAdapter.isHoodieTable(provider.orNull) =>
+        CreateHoodieTableLikeCommand(targetTable, sourceTable, fileFormat, 
properties, ifNotExists)
       // Rewrite the DropTableCommand to DropHoodieTableCommand
       case DropTableCommand(tableName, ifExists, false, purge)
         if sparkSession.sessionState.catalog.tableExists(tableName)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index a5ddd7ca854..bc3540ebf50 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -405,6 +405,145 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test create table like") {
+    if (HoodieSparkUtils.gteqSpark3_1) {
+      // 1. Test create table from an existing HUDI table
+      withTempDir { tmp =>
+        Seq("cow", "mor").foreach { tableType =>
+          withTable(generateTableName) { sourceTable =>
+            spark.sql(
+              s"""
+                 |create table $sourceTable (
+                 |  id int,
+                 |  name string,
+                 |  price double,
+                 |  ts long
+                 |) using hudi
+                 | tblproperties (
+                 |  primaryKey = 'id,name',
+                 |  type = '$tableType'
+                 | )
+                 | location 
'${tmp.getCanonicalPath}/$sourceTable'""".stripMargin)
+
+            // 1.1 Test Managed table
+            withTable(generateTableName) { targetTable =>
+              spark.sql(
+                s"""
+                   |create table $targetTable
+                   |like $sourceTable
+                   |using hudi""".stripMargin)
+
+              val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTable))
+
+              assertResult(targetTable)(table.identifier.table)
+              assertResult("hudi")(table.provider.get)
+              assertResult(CatalogTableType.MANAGED)(table.tableType)
+              assertResult(
+                HoodieRecord.HOODIE_META_COLUMNS.asScala.map(StructField(_, 
StringType))
+                  ++ Seq(
+                  StructField("id", IntegerType),
+                  StructField("name", StringType),
+                  StructField("price", DoubleType),
+                  StructField("ts", LongType))
+              )(table.schema.fields)
+              assertResult(tableType)(table.properties("type"))
+              assertResult("id,name")(table.properties("primaryKey"))
+
+              // target table already exist
+              assertThrows[IllegalArgumentException] {
+                spark.sql(
+                  s"""
+                     |create table $targetTable
+                     |like $sourceTable
+                     |using hudi""".stripMargin)
+              }
+
+              // should ignore if the table already exist
+              spark.sql(
+                s"""
+                   |create table if not exists $targetTable
+                   |like $sourceTable
+                   |using hudi""".stripMargin)
+            }
+
+            // 1.2 Test External table
+            withTable(generateTableName) { targetTable =>
+              spark.sql(
+                s"""
+                   |create table $targetTable
+                   |like $sourceTable
+                   |using hudi
+                   |location 
'${tmp.getCanonicalPath}/$targetTable'""".stripMargin)
+              val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTable))
+              assertResult(CatalogTableType.EXTERNAL)(table.tableType)
+            }
+
+
+            // 1.3 New target table options should override source table's
+            withTable(generateTableName) { targetTable =>
+              spark.sql(
+                s"""
+                   |create table $targetTable
+                   |like $sourceTable
+                   |using hudi
+                   |tblproperties (primaryKey = 'id')""".stripMargin)
+              val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTable))
+              assertResult("id")(table.properties("primaryKey"))
+            }
+          }
+        }
+      }
+
+      // 2. Test create table from an existing non-HUDI table
+      withTempDir { tmp =>
+        withTable(generateTableName) { sourceTable =>
+          spark.sql(
+            s"""
+               |create table $sourceTable (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using parquet
+               | tblproperties (
+               |  non.hoodie.property='value'
+               | )
+               | location 
'${tmp.getCanonicalPath}/$sourceTable'""".stripMargin)
+
+          withTable(generateTableName) { targetTable =>
+            spark.sql(
+              s"""
+                 |create table $targetTable
+                 |like $sourceTable
+                 |using hudi
+                 |tblproperties (
+                 | primaryKey = 'id,name',
+                 | type = 'cow'
+                 |)""".stripMargin)
+            val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTable))
+
+            assertResult(targetTable)(table.identifier.table)
+            assertResult("hudi")(table.provider.get)
+            assertResult(CatalogTableType.MANAGED)(table.tableType)
+            assertResult(
+              HoodieRecord.HOODIE_META_COLUMNS.asScala.map(StructField(_, 
StringType))
+                ++ Seq(
+                StructField("id", IntegerType),
+                StructField("name", StringType),
+                StructField("price", DoubleType),
+                StructField("ts", LongType))
+            )(table.schema.fields)
+
+            // Should not include non.hoodie.property
+            assertResult(2)(table.properties.size)
+            assertResult("cow")(table.properties("type"))
+            assertResult("id,name")(table.properties("primaryKey"))
+          }
+        }
+      }
+    }
+  }
+
   test("Test Create Table As Select With Auto record key gen") {
     withTempDir { tmp =>
       // Create Non-Partitioned table
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index cdb4c5226a6..6fb1719cede 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import org.apache.hudi.SparkHoodieTableFileIndex
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -68,6 +69,14 @@ object HoodieSpark2CatalystPlanUtils extends 
HoodieCatalystPlansUtils {
     }
   }
 
+  /**
+   * Don't support CreateTableLike in spark2, since spark2 doesn't support 
passing
+   * provider, whereas HUDI can't identify whether the targetTable is a HUDI 
table or not.
+   */
+  override def unapplyCreateTableLikeCommand(plan: LogicalPlan): 
Option[(TableIdentifier, TableIdentifier, CatalogStorageFormat, Option[String], 
Map[String, String], Boolean)] = {
+    None
+  }
+
   def rebaseInsertIntoStatement(iis: LogicalPlan, targetTable: LogicalPlan, 
query: LogicalPlan): LogicalPlan =
     iis.asInstanceOf[InsertIntoTable].copy(table = targetTable, query = query)
 
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
index cd8d0ca6a70..a01cce70c1f 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
@@ -18,12 +18,14 @@
 package org.apache.spark.sql
 
 import org.apache.hudi.SparkAdapterSupport
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.TableOutputResolver
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, 
JoinHint, LeafNode, LogicalPlan}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
-import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.execution.command.{CreateTableLikeCommand, 
ExplainCommand}
 import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -63,6 +65,15 @@ trait HoodieSpark3CatalystPlanUtils extends 
HoodieCatalystPlansUtils {
     }
   }
 
+
+  override def unapplyCreateTableLikeCommand(plan: LogicalPlan): 
Option[(TableIdentifier, TableIdentifier, CatalogStorageFormat, Option[String], 
Map[String, String], Boolean)] = {
+    plan match {
+      case CreateTableLikeCommand(targetTable, sourceTable, fileFormat, 
provider, properties, ifNotExists) =>
+        Some(targetTable, sourceTable, fileFormat, provider, properties, 
ifNotExists)
+      case _ => None
+    }
+  }
+
   def rebaseInsertIntoStatement(iis: LogicalPlan, targetTable: LogicalPlan, 
query: LogicalPlan): LogicalPlan =
     iis.asInstanceOf[InsertIntoStatement].copy(table = targetTable, query = 
query)
 

Reply via email to