This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 20f0703800 [flink] Rename clone_hive to clone and add procedure (#5519)
20f0703800 is described below

commit 20f0703800ed8aaa087bcfa525d9f42ecd9db922
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Apr 23 13:14:57 2025 +0800

    [flink] Rename clone_hive to clone and add procedure (#5519)
---
 .../{clone-from-hive.md => clone-to-paimon.md}     | 24 ++++--
 .../{CloneHiveAction.java => CloneAction.java}     | 29 ++++---
 ...eActionFactory.java => CloneActionFactory.java} | 14 ++--
 .../flink/clone/{hive => }/CloneFileInfo.java      |  2 +-
 ...eFilesFunction.java => CloneFilesFunction.java} | 21 +++--
 ...cessFunction.java => CloneProcessFunction.java} | 10 +--
 .../{hive/CloneHiveUtils.java => CloneUtils.java}  | 10 +--
 .../clone/{hive => }/CommitTableOperator.java      |  2 +-
 .../flink/clone/{hive => }/DataFileInfo.java       |  2 +-
 ...esFunction.java => ListCloneFilesFunction.java} |  8 +-
 .../paimon/flink/procedure/CloneProcedure.java     | 93 ++++++++++++++++++++++
 .../services/org.apache.paimon.factories.Factory   |  3 +-
 ...onTest.java => ListCloneFilesFunctionTest.java} |  8 +-
 .../paimon/flink/procedure/ProcedureTest.java      |  2 -
 ...iveActionITCase.java => CloneActionITCase.java} | 36 ++++-----
 15 files changed, 187 insertions(+), 77 deletions(-)

diff --git a/docs/content/migration/clone-from-hive.md 
b/docs/content/migration/clone-to-paimon.md
similarity index 75%
rename from docs/content/migration/clone-from-hive.md
rename to docs/content/migration/clone-to-paimon.md
index a804fc07e8..b92aa4ddf2 100644
--- a/docs/content/migration/clone-from-hive.md
+++ b/docs/content/migration/clone-to-paimon.md
@@ -1,9 +1,9 @@
 ---
-title: "Clone From Hive"
+title: "Clone To Paimon"
 weight: 5
 type: docs
 aliases:
-- /migration/clone-from-hive.html
+- /migration/clone.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -24,20 +24,28 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Clone Hive Table
+# Clone To Paimon
 
-Clone Hive Table supports cloning hive tables with parquet, orc and avro 
formats. The cloned table will be
-[append table]({{< ref "append-table/overview" >}}).
+Clone supports cloning tables to Paimon tables.
 
 1. Clone is `OVERWRITE` semantic that will overwrite the partitions of the 
target table according to the data.
 2. Clone is reentrant, but it requires existing tables to contain all fields 
from the source table and have the
    same partition fields.
 
+Currently, clone supports:
+
+1. Clone Hive tables in Hive Catalog to Paimon Catalog, supports Parquet, ORC, 
Avro formats, target table will
+   be append table.
+
+The source table below is currently under development:
+1. Clone Hudi tables in Hive Catalog to Paimon Catalog, target table will be 
append table.
+2. Clone Paimon tables to Paimon tables, target table can be primary table or 
append table.
+
 ## Clone Hive Table
 
 ```bash
 <FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar \
-clone_hive \
+clone \
 --database default \
 --table hivetable \
 --catalog_conf metastore=hive \
@@ -45,6 +53,7 @@ clone_hive \
 --target_database test \
 --target_table test_table \
 --target_catalog_conf warehouse=my_warehouse \
+--parallelism 10 \
 --where <filter_spec>
 ```
 
@@ -54,10 +63,11 @@ You can use filter spec to specify the filtering condition 
for the partition.
 
 ```bash
 <FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar \
-clone_hive \
+clone \
 --database default \
 --catalog_conf metastore=hive \
 --catalog_conf uri=thrift://localhost:9088 \
 --target_database test \
+--parallelism 10 \
 --target_catalog_conf warehouse=my_warehouse
 ```
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
similarity index 83%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
index b418f7a744..313db28336 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
@@ -21,12 +21,12 @@ package org.apache.paimon.flink.action;
 import org.apache.paimon.catalog.CachingCatalog;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.clone.hive.CloneFileInfo;
-import org.apache.paimon.flink.clone.hive.CloneHiveUtils;
-import org.apache.paimon.flink.clone.hive.CommitTableOperator;
-import org.apache.paimon.flink.clone.hive.CopyHiveFilesFunction;
-import org.apache.paimon.flink.clone.hive.DataFileInfo;
-import org.apache.paimon.flink.clone.hive.ListHiveFilesFunction;
+import org.apache.paimon.flink.clone.CloneFileInfo;
+import org.apache.paimon.flink.clone.CloneFilesFunction;
+import org.apache.paimon.flink.clone.CloneUtils;
+import org.apache.paimon.flink.clone.CommitTableOperator;
+import org.apache.paimon.flink.clone.DataFileInfo;
+import org.apache.paimon.flink.clone.ListCloneFilesFunction;
 import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
 import org.apache.paimon.hive.HiveCatalog;
 
@@ -39,8 +39,8 @@ import javax.annotation.Nullable;
 
 import java.util.Map;
 
-/** Clone source files managed by HiveMetaStore and commit metas to construct 
Paimon table. */
-public class CloneHiveAction extends ActionBase {
+/** Clone source table to target table. */
+public class CloneAction extends ActionBase {
 
     private final Map<String, String> sourceCatalogConfig;
     private final String sourceDatabase;
@@ -53,7 +53,7 @@ public class CloneHiveAction extends ActionBase {
     private final int parallelism;
     @Nullable private final String whereSql;
 
-    public CloneHiveAction(
+    public CloneAction(
             String sourceDatabase,
             String sourceTableName,
             Map<String, String> sourceCatalogConfig,
@@ -90,7 +90,7 @@ public class CloneHiveAction extends ActionBase {
     public void build() throws Exception {
         // list source tables
         DataStream<Tuple2<Identifier, Identifier>> source =
-                CloneHiveUtils.buildSource(
+                CloneUtils.buildSource(
                         sourceDatabase,
                         sourceTableName,
                         targetDatabase,
@@ -100,13 +100,13 @@ public class CloneHiveAction extends ActionBase {
 
         DataStream<Tuple2<Identifier, Identifier>> partitionedSource =
                 FlinkStreamPartitioner.partition(
-                        source, new CloneHiveUtils.TableChannelComputer(), 
parallelism);
+                        source, new CloneUtils.TableChannelComputer(), 
parallelism);
 
         // create target table, list files and group by <table, partition>
         DataStream<CloneFileInfo> files =
                 partitionedSource
                         .process(
-                                new ListHiveFilesFunction(
+                                new ListCloneFilesFunction(
                                         sourceCatalogConfig, 
targetCatalogConfig, whereSql))
                         .name("List Files")
                         .setParallelism(parallelism);
@@ -114,14 +114,13 @@ public class CloneHiveAction extends ActionBase {
         // copy files and commit
         DataStream<DataFileInfo> dataFile =
                 files.rebalance()
-                        .process(
-                                new CopyHiveFilesFunction(sourceCatalogConfig, 
targetCatalogConfig))
+                        .process(new CloneFilesFunction(sourceCatalogConfig, 
targetCatalogConfig))
                         .name("Copy Files")
                         .setParallelism(parallelism);
 
         DataStream<DataFileInfo> partitionedDataFile =
                 FlinkStreamPartitioner.partition(
-                        dataFile, new 
CloneHiveUtils.DataFileChannelComputer(), parallelism);
+                        dataFile, new CloneUtils.DataFileChannelComputer(), 
parallelism);
 
         DataStream<Long> committed =
                 partitionedDataFile
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
similarity index 86%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveActionFactory.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
index 003d15cd1d..fd5cc8d58b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
@@ -22,10 +22,10 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
-/** Factory to create {@link CloneHiveAction}. */
-public class CloneHiveActionFactory implements ActionFactory {
+/** Factory to create {@link CloneAction}. */
+public class CloneActionFactory implements ActionFactory {
 
-    private static final String IDENTIFIER = "clone_hive";
+    private static final String IDENTIFIER = "clone";
     private static final String TARGET_WAREHOUSE = "target_warehouse";
     private static final String TARGET_DATABASE = "target_database";
     private static final String TARGET_TABLE = "target_table";
@@ -51,8 +51,8 @@ public class CloneHiveActionFactory implements ActionFactory {
 
         String parallelism = params.get(PARALLELISM);
 
-        CloneHiveAction cloneHiveAction =
-                new CloneHiveAction(
+        CloneAction cloneAction =
+                new CloneAction(
                         params.get(DATABASE),
                         params.get(TABLE),
                         catalogConfig,
@@ -62,13 +62,13 @@ public class CloneHiveActionFactory implements 
ActionFactory {
                         parallelism == null ? null : 
Integer.parseInt(parallelism),
                         params.get(WHERE));
 
-        return Optional.of(cloneHiveAction);
+        return Optional.of(cloneAction);
     }
 
     @Override
     public void printHelp() {
         System.out.println(
-                "Action \"clone_hive\" clones the source files and migrate 
them to paimon table.");
+                "Action \"clone\" clones the source files and migrate them to 
paimon table.");
         System.out.println();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneFileInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
similarity index 98%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneFileInfo.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
index 80b228752f..f0c3382b02 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneFileInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone.hive;
+package org.apache.paimon.flink.clone;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFilesFunction.java
similarity index 82%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFilesFunction.java
index 7f82124aed..145770fa69 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFilesFunction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone.hive;
+package org.apache.paimon.flink.clone;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
@@ -36,14 +36,14 @@ import org.apache.flink.util.Collector;
 import java.util.HashMap;
 import java.util.Map;
 
-/** Copy files for table. */
-public class CopyHiveFilesFunction extends CopyProcessFunction<CloneFileInfo, 
DataFileInfo> {
+/** Clone files for table. */
+public class CloneFilesFunction extends CloneProcessFunction<CloneFileInfo, 
DataFileInfo> {
 
     private static final long serialVersionUID = 1L;
 
     private transient Map<Identifier, Map<BinaryRow, DataFilePathFactory>> 
pathFactoryMap;
 
-    public CopyHiveFilesFunction(
+    public CloneFilesFunction(
             Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
         super(sourceCatalogConfig, targetCatalogConfig);
     }
@@ -92,7 +92,16 @@ public class CopyHiveFilesFunction extends 
CopyProcessFunction<CloneFileInfo, Da
     }
 
     private DataFilePathFactory pathFactory(Identifier identifier, BinaryRow 
part) {
-        FileStoreTable targetTable = (FileStoreTable) getTable(identifier);
-        return 
targetTable.store().pathFactory().createDataFilePathFactory(part, 0);
+        return pathFactoryMap
+                .computeIfAbsent(identifier, k -> new HashMap<>())
+                .computeIfAbsent(
+                        part,
+                        k -> {
+                            FileStoreTable targetTable = (FileStoreTable) 
getTable(identifier);
+                            return targetTable
+                                    .store()
+                                    .pathFactory()
+                                    .createDataFilePathFactory(part, 0);
+                        });
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneProcessFunction.java
similarity index 92%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneProcessFunction.java
index 76834c8857..3ee51fe444 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneProcessFunction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone.hive;
+package org.apache.paimon.flink.clone;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
@@ -35,12 +35,12 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
-import static 
org.apache.paimon.flink.clone.hive.CloneHiveUtils.getRootHiveCatalog;
+import static org.apache.paimon.flink.clone.CloneUtils.getRootHiveCatalog;
 
 /** Abstract function for copying tables. */
-public abstract class CopyProcessFunction<I, O> extends ProcessFunction<I, O> {
+public abstract class CloneProcessFunction<I, O> extends ProcessFunction<I, O> 
{
 
-    protected static final Logger LOG = 
LoggerFactory.getLogger(CopyProcessFunction.class);
+    protected static final Logger LOG = 
LoggerFactory.getLogger(CloneProcessFunction.class);
 
     protected final Map<String, String> sourceCatalogConfig;
     protected final Map<String, String> targetCatalogConfig;
@@ -51,7 +51,7 @@ public abstract class CopyProcessFunction<I, O> extends 
ProcessFunction<I, O> {
     protected transient Map<Identifier, Table> tableCache;
     protected transient DataFileMetaSerializer dataFileSerializer;
 
-    public CopyProcessFunction(
+    public CloneProcessFunction(
             Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
         this.sourceCatalogConfig = sourceCatalogConfig;
         this.targetCatalogConfig = targetCatalogConfig;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneHiveUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
similarity index 96%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneHiveUtils.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
index 1afed4a9ce..6dbe35ee85 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneHiveUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
@@ -16,12 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone.hive;
+package org.apache.paimon.flink.clone;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.DelegateCatalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.action.CloneHiveAction;
+import org.apache.paimon.flink.action.CloneAction;
 import org.apache.paimon.hive.HiveCatalog;
 import org.apache.paimon.hive.migrate.HiveCloneUtils;
 import org.apache.paimon.table.sink.ChannelComputer;
@@ -40,10 +40,10 @@ import java.util.Objects;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
-/** Utils for building {@link CloneHiveAction}. */
-public class CloneHiveUtils {
+/** Utils for building {@link CloneAction}. */
+public class CloneUtils {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(CloneHiveUtils.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(CloneUtils.class);
 
     public static DataStream<Tuple2<Identifier, Identifier>> buildSource(
             String sourceDatabase,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CommitTableOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
similarity index 98%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CommitTableOperator.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
index 954beaa079..57093e9fb8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CommitTableOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone.hive;
+package org.apache.paimon.flink.clone;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/DataFileInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
similarity index 97%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/DataFileInfo.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
index 2baa11af61..25bfa6a757 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/DataFileInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone.hive;
+package org.apache.paimon.flink.clone;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
similarity index 97%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
index a49147e826..6e933ac434 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone.hive;
+package org.apache.paimon.flink.clone;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
@@ -48,14 +48,14 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** List files for table. */
-public class ListHiveFilesFunction
-        extends CopyProcessFunction<Tuple2<Identifier, Identifier>, 
CloneFileInfo> {
+public class ListCloneFilesFunction
+        extends CloneProcessFunction<Tuple2<Identifier, Identifier>, 
CloneFileInfo> {
 
     private static final long serialVersionUID = 1L;
 
     @Nullable private final String whereSql;
 
-    public ListHiveFilesFunction(
+    public ListCloneFilesFunction(
             Map<String, String> sourceCatalogConfig,
             Map<String, String> targetCatalogConfig,
             @Nullable String whereSql) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
new file mode 100644
index 0000000000..94233eed12
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.action.CloneAction;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Clone tables procedure. */
+public class CloneProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "clone";
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "database", type = 
@DataTypeHint("STRING"), isOptional = true),
+                @ArgumentHint(name = "table", type = @DataTypeHint("STRING"), 
isOptional = true),
+                @ArgumentHint(
+                        name = "catalog_conf",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "target_database",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "target_table",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "target_catalog_conf",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true),
+                @ArgumentHint(name = "parallelism", type = 
@DataTypeHint("INT"), isOptional = true),
+                @ArgumentHint(name = "where", type = @DataTypeHint("STRING"), 
isOptional = true)
+            })
+    public String[] call(
+            ProcedureContext procedureContext,
+            String database,
+            String tableName,
+            String sourceCatalogConfigStr,
+            String targetDatabase,
+            String targetTableName,
+            String targetCatalogConfigStr,
+            Integer parallelism,
+            String where)
+            throws Exception {
+        Map<String, String> sourceCatalogConfig =
+                new HashMap<>(optionalConfigMap(sourceCatalogConfigStr));
+
+        Map<String, String> targetCatalogConfig =
+                new HashMap<>(optionalConfigMap(targetCatalogConfigStr));
+
+        CloneAction action =
+                new CloneAction(
+                        database,
+                        tableName,
+                        sourceCatalogConfig,
+                        targetDatabase,
+                        targetTableName,
+                        targetCatalogConfig,
+                        parallelism,
+                        where);
+        return execute(procedureContext, action, "Clone Job");
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index c98ef99b94..b344ea56af 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -46,7 +46,7 @@ org.apache.paimon.flink.action.ExpireChangelogsActionFactory
 org.apache.paimon.flink.action.RemoveUnexistingFilesActionFactory
 org.apache.paimon.flink.action.ClearConsumerActionFactory
 org.apache.paimon.flink.action.RescaleActionFactory
-org.apache.paimon.flink.action.CloneHiveActionFactory
+org.apache.paimon.flink.action.CloneActionFactory
 
 ### procedure factories
 org.apache.paimon.flink.procedure.CompactDatabaseProcedure
@@ -84,6 +84,7 @@ org.apache.paimon.flink.procedure.RenameTagProcedure
 org.apache.paimon.flink.procedure.FastForwardProcedure
 org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure
 org.apache.paimon.flink.procedure.CopyFilesProcedure
+org.apache.paimon.flink.procedure.CloneProcedure
 org.apache.paimon.flink.procedure.CompactManifestProcedure
 org.apache.paimon.flink.procedure.RefreshObjectTableProcedure
 org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunctionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/ListCloneFilesFunctionTest.java
similarity index 92%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunctionTest.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/ListCloneFilesFunctionTest.java
index 098a3ba1ff..880b65ab5f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunctionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/ListCloneFilesFunctionTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone.hive;
+package org.apache.paimon.flink.clone;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
@@ -28,12 +28,12 @@ import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.Test;
 
-import static 
org.apache.paimon.flink.clone.hive.ListHiveFilesFunction.getPartitionPredicate;
+import static 
org.apache.paimon.flink.clone.ListCloneFilesFunction.getPartitionPredicate;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Test for {@link ListHiveFilesFunction}. */
-public class ListHiveFilesFunctionTest {
+/** Test for {@link ListCloneFilesFunction}. */
+public class ListCloneFilesFunctionTest {
 
     @Test
     public void testConvertSqlToPartitionPredicate() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java
index ee0f7875f9..74e3aeeac5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java
@@ -40,8 +40,6 @@ public class ProcedureTest {
         Set<String> expectedExclusions = new HashSet<>();
         // Can be covered by `DELETE FROM` syntax. No procedure needed.
         expectedExclusions.add("delete");
-        // TODO: implement later
-        expectedExclusions.add("clone_hive");
         List<String> actionIdentifiers =
                 FactoryUtil.discoverIdentifiers(
                         ActionFactory.class.getClassLoader(), 
ActionFactory.class);
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
similarity index 95%
rename from 
paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
rename to 
paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
index 4e9ad2ed7f..0a4deee915 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
@@ -23,7 +23,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.flink.action.ActionITCaseBase;
-import org.apache.paimon.flink.action.CloneHiveAction;
+import org.apache.paimon.flink.action.CloneAction;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.hive.TestHiveMetastore;
 import org.apache.paimon.manifest.ManifestFileMeta;
@@ -49,8 +49,8 @@ import java.util.concurrent.ThreadLocalRandom;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 
-/** Tests for {@link CloneHiveAction}. */
-public class CloneHiveActionITCase extends ActionITCaseBase {
+/** Tests for {@link CloneAction}. */
+public class CloneActionITCase extends ActionITCaseBase {
 
     private static final TestHiveMetastore TEST_HIVE_METASTORE = new 
TestHiveMetastore();
 
@@ -90,8 +90,8 @@ public class CloneHiveActionITCase extends ActionITCaseBase {
         tEnv.executeSql("CREATE DATABASE test");
 
         createAction(
-                        CloneHiveAction.class,
-                        "clone_hive",
+                        CloneAction.class,
+                        "clone",
                         "--database",
                         "default",
                         "--table",
@@ -159,7 +159,7 @@ public class CloneHiveActionITCase extends ActionITCaseBase 
{
         List<String> args =
                 new ArrayList<>(
                         Arrays.asList(
-                                "clone_hive",
+                                "clone",
                                 "--database",
                                 "default",
                                 "--table",
@@ -179,7 +179,7 @@ public class CloneHiveActionITCase extends ActionITCaseBase 
{
             args.add(whereSql);
         }
 
-        createAction(CloneHiveAction.class, args).run();
+        createAction(CloneAction.class, args).run();
         FileStoreTable paimonTable =
                 paimonTable(tEnv, "PAIMON", Identifier.create("test", 
"test_table"));
 
@@ -195,7 +195,7 @@ public class CloneHiveActionITCase extends ActionITCaseBase 
{
             // drop where
             args = new ArrayList<>(args.subList(0, args.size() - 1));
             args.add("id2 <> 1 AND id3 <> 1");
-            createAction(CloneHiveAction.class, args).run();
+            createAction(CloneAction.class, args).run();
 
             // assert not file deleted
             Snapshot snapshot = paimonTable.latestSnapshot().get();
@@ -213,7 +213,7 @@ public class CloneHiveActionITCase extends ActionITCaseBase 
{
             
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
         } else {
             // run again, validate overwrite
-            createAction(CloneHiveAction.class, args).run();
+            createAction(CloneAction.class, args).run();
             r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
test.test_table").collect());
             
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
         }
@@ -240,7 +240,7 @@ public class CloneHiveActionITCase extends ActionITCaseBase 
{
 
         List<String> args =
                 Arrays.asList(
-                        "clone_hive",
+                        "clone",
                         "--database",
                         "default",
                         "--table",
@@ -259,7 +259,7 @@ public class CloneHiveActionITCase extends ActionITCaseBase 
{
                         // the data won't < 0
                         "id2 < 0");
 
-        createAction(CloneHiveAction.class, args).run();
+        createAction(CloneAction.class, args).run();
 
         // table exists but no data
         FileStoreTable paimonTable =
@@ -298,8 +298,8 @@ public class CloneHiveActionITCase extends ActionITCaseBase 
{
         tEnv.executeSql("CREATE DATABASE test");
 
         createAction(
-                        CloneHiveAction.class,
-                        "clone_hive",
+                        CloneAction.class,
+                        "clone",
                         "--database",
                         "hivedb",
                         "--catalog_conf",
@@ -356,8 +356,8 @@ public class CloneHiveActionITCase extends ActionITCaseBase 
{
         tEnv.executeSql("CREATE DATABASE test");
 
         createAction(
-                        CloneHiveAction.class,
-                        "clone_hive",
+                        CloneAction.class,
+                        "clone",
                         "--database",
                         "hivedb",
                         "--catalog_conf",
@@ -412,7 +412,7 @@ public class CloneHiveActionITCase extends ActionITCaseBase 
{
         List<String> args =
                 new ArrayList<>(
                         Arrays.asList(
-                                "clone_hive",
+                                "clone",
                                 "--database",
                                 "default",
                                 "--table",
@@ -429,11 +429,11 @@ public class CloneHiveActionITCase extends 
ActionITCaseBase {
                                 "warehouse=" + warehouse));
 
         if (ddlIndex < 3) {
-            assertThatThrownBy(() -> createAction(CloneHiveAction.class, 
args).run())
+            assertThatThrownBy(() -> createAction(CloneAction.class, 
args).run())
                     .rootCause()
                     .hasMessageContaining(exceptionMsg()[ddlIndex]);
         } else {
-            createAction(CloneHiveAction.class, args).run();
+            createAction(CloneAction.class, args).run();
             FileStoreTable paimonTable =
                     paimonTable(tEnv, "PAIMON", Identifier.create("test", 
"test_table"));
 


Reply via email to