This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b0c90c476 [flink] Add clone from Paimon tables in clone action (#5911)
9b0c90c476 is described below

commit 9b0c90c4766759e85b2582c203e5f7eed809ab12
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jul 18 18:17:45 2025 +0800

    [flink] Add clone from Paimon tables in clone action (#5911)
---
 .../apache/paimon/flink/action/CloneAction.java    | 103 ++++------
 .../paimon/flink/action/CloneActionFactory.java    |   9 +-
 .../{CloneUtils.java => CloneHiveTableUtils.java}  | 131 +++++++------
 .../paimon/flink/clone/ClonePaimonTableUtils.java  | 214 +++++++++++++++++++++
 ....java => ShuffleIdentifierByTableComputer.java} |  38 ++--
 .../flink/clone/{ => files}/CloneFileInfo.java     |   2 +-
 .../CloneFilesCommitOperator.java}                 |   6 +-
 .../clone/{ => files}/CloneFilesFunction.java      |   4 +-
 .../CloneFilesProcessFunction.java}                |  10 +-
 .../flink/clone/{ => files}/DataFileInfo.java      |   2 +-
 .../clone/{ => files}/ListCloneFilesFunction.java  |   8 +-
 .../files/ShuffleDataFileByTableComputer.java      |  46 +++++
 .../paimon/flink/clone/spits/CloneSplitInfo.java   |  62 ++++++
 .../flink/clone/spits/CloneSplitsFunction.java     | 108 +++++++++++
 .../CommitMessageInfo.java}                        |  37 ++--
 .../CommitMessageTableOperator.java}               |  43 ++---
 .../flink/clone/spits/ListCloneSplitsFunction.java | 139 +++++++++++++
 .../spits/ShuffleCommitMessageByTableComputer.java |  46 +++++
 .../paimon/flink/procedure/CloneProcedure.java     |  10 +-
 .../flink/clone/ListCloneFilesFunctionTest.java    |   3 +-
 .../paimon/hive/procedure/CloneActionITCase.java   |  86 +++++++++
 21 files changed, 899 insertions(+), 208 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
index 03ecdff9b9..8114b43fe1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
@@ -20,21 +20,10 @@ package org.apache.paimon.flink.action;
 
 import org.apache.paimon.catalog.CachingCatalog;
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.clone.CloneFileInfo;
-import org.apache.paimon.flink.clone.CloneFilesFunction;
-import org.apache.paimon.flink.clone.CloneUtils;
-import org.apache.paimon.flink.clone.CommitTableOperator;
-import org.apache.paimon.flink.clone.DataFileInfo;
-import org.apache.paimon.flink.clone.ListCloneFilesFunction;
-import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
+import org.apache.paimon.flink.clone.CloneHiveTableUtils;
+import org.apache.paimon.flink.clone.ClonePaimonTableUtils;
 import org.apache.paimon.hive.HiveCatalog;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
-
 import javax.annotation.Nullable;
 
 import java.util.List;
@@ -54,6 +43,7 @@ public class CloneAction extends ActionBase {
     private final int parallelism;
     @Nullable private final String whereSql;
     @Nullable private final List<String> excludedTables;
+    private final String cloneFrom;
 
     public CloneAction(
             String sourceDatabase,
@@ -64,17 +54,20 @@ public class CloneAction extends ActionBase {
             Map<String, String> targetCatalogConfig,
             @Nullable Integer parallelism,
             @Nullable String whereSql,
-            @Nullable List<String> excludedTables) {
+            @Nullable List<String> excludedTables,
+            String cloneFrom) {
         super(sourceCatalogConfig);
 
-        Catalog sourceCatalog = catalog;
-        if (sourceCatalog instanceof CachingCatalog) {
-            sourceCatalog = ((CachingCatalog) sourceCatalog).wrapped();
-        }
-        if (!(sourceCatalog instanceof HiveCatalog)) {
-            throw new UnsupportedOperationException(
-                    "Only support clone hive tables using HiveCatalog, but 
current source catalog is "
-                            + sourceCatalog.getClass().getName());
+        if (cloneFrom.equalsIgnoreCase("hive")) {
+            Catalog sourceCatalog = catalog;
+            if (sourceCatalog instanceof CachingCatalog) {
+                sourceCatalog = ((CachingCatalog) sourceCatalog).wrapped();
+            }
+            if (!(sourceCatalog instanceof HiveCatalog)) {
+                throw new UnsupportedOperationException(
+                        "Only support clone hive tables using HiveCatalog, but 
current source catalog is "
+                                + sourceCatalog.getClass().getName());
+            }
         }
 
         this.sourceDatabase = sourceDatabase;
@@ -88,58 +81,46 @@ public class CloneAction extends ActionBase {
         this.parallelism = parallelism == null ? env.getParallelism() : 
parallelism;
         this.whereSql = whereSql;
         this.excludedTables = excludedTables;
+        this.cloneFrom = cloneFrom;
     }
 
     @Override
     public void build() throws Exception {
-        // list source tables
-        DataStream<Tuple2<Identifier, Identifier>> source =
-                CloneUtils.buildSource(
+        switch (cloneFrom) {
+            case "hive":
+                CloneHiveTableUtils.build(
+                        env,
+                        catalog,
                         sourceDatabase,
                         sourceTableName,
+                        sourceCatalogConfig,
                         targetDatabase,
                         targetTableName,
+                        targetCatalogConfig,
+                        parallelism,
+                        whereSql,
+                        excludedTables);
+                break;
+            case "paimon":
+                ClonePaimonTableUtils.build(
+                        env,
                         catalog,
-                        excludedTables,
-                        env);
-
-        DataStream<Tuple2<Identifier, Identifier>> partitionedSource =
-                FlinkStreamPartitioner.partition(
-                        source, new CloneUtils.TableChannelComputer(), 
parallelism);
-
-        // create target table, list files and group by <table, partition>
-        DataStream<CloneFileInfo> files =
-                partitionedSource
-                        .process(
-                                new ListCloneFilesFunction(
-                                        sourceCatalogConfig, 
targetCatalogConfig, whereSql))
-                        .name("List Files")
-                        .setParallelism(parallelism);
-
-        // copy files and commit
-        DataStream<DataFileInfo> dataFile =
-                files.rebalance()
-                        .process(new CloneFilesFunction(sourceCatalogConfig, 
targetCatalogConfig))
-                        .name("Copy Files")
-                        .setParallelism(parallelism);
-
-        DataStream<DataFileInfo> partitionedDataFile =
-                FlinkStreamPartitioner.partition(
-                        dataFile, new CloneUtils.DataFileChannelComputer(), 
parallelism);
-
-        DataStream<Long> committed =
-                partitionedDataFile
-                        .transform(
-                                "Commit table",
-                                BasicTypeInfo.LONG_TYPE_INFO,
-                                new CommitTableOperator(targetCatalogConfig))
-                        .setParallelism(parallelism);
-        committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
+                        sourceDatabase,
+                        sourceTableName,
+                        sourceCatalogConfig,
+                        targetDatabase,
+                        targetTableName,
+                        targetCatalogConfig,
+                        parallelism,
+                        whereSql,
+                        excludedTables);
+                break;
+        }
     }
 
     @Override
     public void run() throws Exception {
         build();
-        execute("Clone Hive job");
+        execute("Clone job");
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
index 29d3331914..53b88d5f9c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
@@ -37,6 +37,7 @@ public class CloneActionFactory implements ActionFactory {
     private static final String PARALLELISM = "parallelism";
     private static final String WHERE = "where";
     private static final String EXCLUDED_TABLES = "excluded_tables";
+    private static final String CLONE_FROM = "clone_from";
 
     @Override
     public String identifier() {
@@ -62,6 +63,11 @@ public class CloneActionFactory implements ActionFactory {
                         ? null
                         : Arrays.asList(StringUtils.split(excludedTablesStr, 
","));
 
+        String cloneFrom = params.get(CLONE_FROM);
+        if (StringUtils.isNullOrWhitespaceOnly(cloneFrom)) {
+            cloneFrom = "hive";
+        }
+
         CloneAction cloneAction =
                 new CloneAction(
                         params.get(DATABASE),
@@ -72,7 +78,8 @@ public class CloneActionFactory implements ActionFactory {
                         targetCatalogConfig,
                         parallelism == null ? null : 
Integer.parseInt(parallelism),
                         params.get(WHERE),
-                        excludedTables);
+                        excludedTables,
+                        cloneFrom);
 
         return Optional.of(cloneAction);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
similarity index 59%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
index 06ff96af3d..c7cf68da05 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
@@ -22,15 +22,22 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.DelegateCatalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.CloneAction;
+import org.apache.paimon.flink.clone.files.CloneFileInfo;
+import org.apache.paimon.flink.clone.files.CloneFilesCommitOperator;
+import org.apache.paimon.flink.clone.files.CloneFilesFunction;
+import org.apache.paimon.flink.clone.files.DataFileInfo;
+import org.apache.paimon.flink.clone.files.ListCloneFilesFunction;
+import org.apache.paimon.flink.clone.files.ShuffleDataFileByTableComputer;
+import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
 import org.apache.paimon.hive.HiveCatalog;
-import org.apache.paimon.hive.clone.HiveCloneUtils;
-import org.apache.paimon.table.sink.ChannelComputer;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,15 +45,15 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
-/** Utils for building {@link CloneAction}. */
-public class CloneUtils {
+/** Utils for building {@link CloneAction} for append tables. */
+public class CloneHiveTableUtils {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(CloneUtils.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(CloneHiveTableUtils.class);
 
     public static DataStream<Tuple2<Identifier, Identifier>> buildSource(
             String sourceDatabase,
@@ -70,7 +77,9 @@ public class CloneUtils {
                     StringUtils.isNullOrWhitespaceOnly(targetTableName),
                     "targetTableName must be blank when clone all tables in a 
catalog.");
 
-            for (Identifier identifier : 
HiveCloneUtils.listTables(hiveCatalog, excludedTables)) {
+            for (Identifier identifier :
+                    org.apache.paimon.hive.clone.HiveCloneUtils.listTables(
+                            hiveCatalog, excludedTables)) {
                 result.add(new Tuple2<>(identifier, identifier));
             }
         } else if (StringUtils.isNullOrWhitespaceOnly(sourceTableName)) {
@@ -82,7 +91,8 @@ public class CloneUtils {
                     "targetTableName must be blank when clone all tables in a 
catalog.");
 
             for (Identifier identifier :
-                    HiveCloneUtils.listTables(hiveCatalog, sourceDatabase, 
excludedTables)) {
+                    org.apache.paimon.hive.clone.HiveCloneUtils.listTables(
+                            hiveCatalog, sourceDatabase, excludedTables)) {
                 result.add(
                         new Tuple2<>(
                                 identifier,
@@ -121,54 +131,61 @@ public class CloneUtils {
         return (HiveCatalog) rootCatalog;
     }
 
-    // ---------------------------------- Classes 
----------------------------------
-
-    /** Shuffle tables. */
-    public static class TableChannelComputer
-            implements ChannelComputer<Tuple2<Identifier, Identifier>> {
-
-        private static final long serialVersionUID = 1L;
-
-        private transient int numChannels;
-
-        @Override
-        public void setup(int numChannels) {
-            this.numChannels = numChannels;
-        }
-
-        @Override
-        public int channel(Tuple2<Identifier, Identifier> record) {
-            return Math.floorMod(
-                    Objects.hash(record.f1.getDatabaseName(), 
record.f1.getTableName()),
-                    numChannels);
-        }
-
-        @Override
-        public String toString() {
-            return "shuffle by identifier hash";
-        }
-    }
-
-    /** Shuffle tables. */
-    public static class DataFileChannelComputer implements 
ChannelComputer<DataFileInfo> {
-
-        private static final long serialVersionUID = 1L;
-
-        private transient int numChannels;
-
-        @Override
-        public void setup(int numChannels) {
-            this.numChannels = numChannels;
-        }
-
-        @Override
-        public int channel(DataFileInfo record) {
-            return Math.floorMod(Objects.hash(record.identifier()), 
numChannels);
-        }
-
-        @Override
-        public String toString() {
-            return "shuffle by identifier hash";
-        }
+    public static void build(
+            StreamExecutionEnvironment env,
+            Catalog sourceCatalog,
+            String sourceDatabase,
+            String sourceTableName,
+            Map<String, String> sourceCatalogConfig,
+            String targetDatabase,
+            String targetTableName,
+            Map<String, String> targetCatalogConfig,
+            int parallelism,
+            @Nullable String whereSql,
+            @Nullable List<String> excludedTables)
+            throws Exception {
+        // list source tables
+        DataStream<Tuple2<Identifier, Identifier>> source =
+                buildSource(
+                        sourceDatabase,
+                        sourceTableName,
+                        targetDatabase,
+                        targetTableName,
+                        sourceCatalog,
+                        excludedTables,
+                        env);
+
+        DataStream<Tuple2<Identifier, Identifier>> partitionedSource =
+                FlinkStreamPartitioner.partition(
+                        source, new ShuffleIdentifierByTableComputer(), 
parallelism);
+
+        // create target table, list files and group by <table, partition>
+        DataStream<CloneFileInfo> files =
+                partitionedSource
+                        .process(
+                                new ListCloneFilesFunction(
+                                        sourceCatalogConfig, 
targetCatalogConfig, whereSql))
+                        .name("List Files")
+                        .setParallelism(parallelism);
+
+        // copy files and commit
+        DataStream<DataFileInfo> dataFile =
+                files.rebalance()
+                        .process(new CloneFilesFunction(sourceCatalogConfig, 
targetCatalogConfig))
+                        .name("Copy Files")
+                        .setParallelism(parallelism);
+
+        DataStream<DataFileInfo> partitionedDataFile =
+                FlinkStreamPartitioner.partition(
+                        dataFile, new ShuffleDataFileByTableComputer(), 
parallelism);
+
+        DataStream<Long> committed =
+                partitionedDataFile
+                        .transform(
+                                "Commit table",
+                                BasicTypeInfo.LONG_TYPE_INFO,
+                                new 
CloneFilesCommitOperator(targetCatalogConfig))
+                        .setParallelism(parallelism);
+        committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
new file mode 100644
index 0000000000..8b539b01e8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.CloneAction;
+import org.apache.paimon.flink.clone.spits.CloneSplitInfo;
+import org.apache.paimon.flink.clone.spits.CloneSplitsFunction;
+import org.apache.paimon.flink.clone.spits.CommitMessageInfo;
+import org.apache.paimon.flink.clone.spits.CommitMessageTableOperator;
+import org.apache.paimon.flink.clone.spits.ListCloneSplitsFunction;
+import org.apache.paimon.flink.clone.spits.ShuffleCommitMessageByTableComputer;
+import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Utils for building {@link CloneAction} for Paimon tables. */
+public class ClonePaimonTableUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClonePaimonTableUtils.class);
+
+    public static DataStream<Tuple2<Identifier, Identifier>> buildSource(
+            String sourceDatabase,
+            String sourceTableName,
+            String targetDatabase,
+            String targetTableName,
+            Catalog sourceCatalog,
+            @Nullable List<String> excludedTables,
+            StreamExecutionEnvironment env)
+            throws Exception {
+        List<Tuple2<Identifier, Identifier>> result = new ArrayList<>();
+        if (StringUtils.isNullOrWhitespaceOnly(sourceDatabase)) {
+            checkArgument(
+                    StringUtils.isNullOrWhitespaceOnly(sourceTableName),
+                    "sourceTableName must be blank when database is null.");
+            checkArgument(
+                    StringUtils.isNullOrWhitespaceOnly(targetDatabase),
+                    "targetDatabase must be blank when clone all tables in a 
catalog.");
+            checkArgument(
+                    StringUtils.isNullOrWhitespaceOnly(targetTableName),
+                    "targetTableName must be blank when clone all tables in a 
catalog.");
+
+            for (Identifier identifier : listTables(sourceCatalog, 
excludedTables)) {
+                result.add(new Tuple2<>(identifier, identifier));
+            }
+        } else if (StringUtils.isNullOrWhitespaceOnly(sourceTableName)) {
+            checkArgument(
+                    !StringUtils.isNullOrWhitespaceOnly(targetDatabase),
+                    "targetDatabase must not be blank when clone all tables in 
a database.");
+            checkArgument(
+                    StringUtils.isNullOrWhitespaceOnly(targetTableName),
+                    "targetTableName must be blank when clone all tables in a 
catalog.");
+
+            for (Identifier identifier :
+                    listTables(sourceCatalog, sourceDatabase, excludedTables)) 
{
+                result.add(
+                        new Tuple2<>(
+                                identifier,
+                                Identifier.create(targetDatabase, 
identifier.getObjectName())));
+            }
+        } else {
+            checkArgument(
+                    !StringUtils.isNullOrWhitespaceOnly(targetDatabase),
+                    "targetDatabase must not be blank when clone a table.");
+            checkArgument(
+                    !StringUtils.isNullOrWhitespaceOnly(targetTableName),
+                    "targetTableName must not be blank when clone a table.");
+            checkArgument(
+                    CollectionUtils.isEmpty(excludedTables),
+                    "excludedTables must be empty when clone a single table.");
+            result.add(
+                    new Tuple2<>(
+                            Identifier.create(sourceDatabase, sourceTableName),
+                            Identifier.create(targetDatabase, 
targetTableName)));
+        }
+
+        checkState(!result.isEmpty(), "Didn't find any table in source 
catalog.");
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("The clone identifiers of source table and target table 
are: {}", result);
+        }
+        return env.fromCollection(result).forceNonParallel();
+    }
+
+    public static void build(
+            StreamExecutionEnvironment env,
+            Catalog sourceCatalog,
+            String sourceDatabase,
+            String sourceTableName,
+            Map<String, String> sourceCatalogConfig,
+            String targetDatabase,
+            String targetTableName,
+            Map<String, String> targetCatalogConfig,
+            int parallelism,
+            @Nullable String whereSql,
+            @Nullable List<String> excludedTables)
+            throws Exception {
+        // list source tables
+        DataStream<Tuple2<Identifier, Identifier>> source =
+                buildSource(
+                        sourceDatabase,
+                        sourceTableName,
+                        targetDatabase,
+                        targetTableName,
+                        sourceCatalog,
+                        excludedTables,
+                        env);
+
+        DataStream<Tuple2<Identifier, Identifier>> partitionedSource =
+                FlinkStreamPartitioner.partition(
+                        source, new ShuffleIdentifierByTableComputer(), 
parallelism);
+
+        DataStream<CloneSplitInfo> splits =
+                partitionedSource
+                        .process(
+                                new ListCloneSplitsFunction(
+                                        sourceCatalogConfig, 
targetCatalogConfig, whereSql))
+                        .name("List Files")
+                        .setParallelism(parallelism);
+
+        // copy splits and commit
+        DataStream<CommitMessageInfo> commitMessage =
+                splits.rebalance()
+                        .process(new CloneSplitsFunction(sourceCatalogConfig, 
targetCatalogConfig))
+                        .name("Copy Files")
+                        .setParallelism(parallelism);
+
+        DataStream<CommitMessageInfo> partitionedCommitMessage =
+                FlinkStreamPartitioner.partition(
+                        commitMessage, new 
ShuffleCommitMessageByTableComputer(), parallelism);
+
+        DataStream<Long> committed =
+                partitionedCommitMessage
+                        .transform(
+                                "Commit table",
+                                BasicTypeInfo.LONG_TYPE_INFO,
+                                new 
CommitMessageTableOperator(targetCatalogConfig))
+                        .setParallelism(parallelism);
+        committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
+    }
+
+    public static List<Identifier> listTables(
+            Catalog catalog, @Nullable List<String> excludedTables) throws 
Exception {
+        Set<String> excludedTableSet = new HashSet<>();
+        if (CollectionUtils.isNotEmpty(excludedTables)) {
+            excludedTableSet.addAll(excludedTables);
+        }
+        List<Identifier> results = new ArrayList<>();
+        for (String database : catalog.listDatabases()) {
+            for (String table : catalog.listTables(database)) {
+                Identifier identifier = Identifier.create(database, table);
+                if (excludedTableSet.contains(identifier.getFullName())) {
+                    continue;
+                }
+                results.add(identifier);
+            }
+        }
+        return results;
+    }
+
+    public static List<Identifier> listTables(
+            Catalog catalog, String database, @Nullable List<String> 
excludedTables)
+            throws Exception {
+        Set<String> excludedTableSet = new HashSet<>();
+        if (CollectionUtils.isNotEmpty(excludedTables)) {
+            excludedTableSet.addAll(excludedTables);
+        }
+        List<Identifier> results = new ArrayList<>();
+        for (String table : catalog.listTables(database)) {
+            Identifier identifier = Identifier.create(database, table);
+            if (excludedTableSet.contains(identifier.getFullName())) {
+                continue;
+            }
+            results.add(identifier);
+        }
+        return results;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ShuffleIdentifierByTableComputer.java
similarity index 51%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ShuffleIdentifierByTableComputer.java
index 25bfa6a757..d58dbc5117 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ShuffleIdentifierByTableComputer.java
@@ -19,37 +19,33 @@
 package org.apache.paimon.flink.clone;
 
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.sink.ChannelComputer;
 
-import java.io.Serializable;
+import org.apache.flink.api.java.tuple.Tuple2;
 
-import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
-import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+import java.util.Objects;
 
-/** Data File (table, partition) with necessary information. */
-public class DataFileInfo implements Serializable {
+/** Shuffle tables from source identifier and target identifier by target 
identifier. */
+public class ShuffleIdentifierByTableComputer
+        implements ChannelComputer<Tuple2<Identifier, Identifier>> {
 
     private static final long serialVersionUID = 1L;
 
-    private final Identifier identifier;
-    private final byte[] partition;
-    private final byte[] dataFileMeta;
+    private transient int numChannels;
 
-    public DataFileInfo(Identifier identifier, BinaryRow partition, byte[] 
dataFileMeta) {
-        this.identifier = identifier;
-        this.partition = serializeBinaryRow(partition);
-        this.dataFileMeta = dataFileMeta;
+    @Override
+    public void setup(int numChannels) {
+        this.numChannels = numChannels;
     }
 
-    public Identifier identifier() {
-        return identifier;
+    @Override
+    public int channel(Tuple2<Identifier, Identifier> record) {
+        return Math.floorMod(
+                Objects.hash(record.f1.getDatabaseName(), 
record.f1.getTableName()), numChannels);
     }
 
-    public BinaryRow partition() {
-        return deserializeBinaryRow(partition);
-    }
-
-    public byte[] dataFileMeta() {
-        return dataFileMeta;
+    @Override
+    public String toString() {
+        return "shuffle by identifier hash";
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFileInfo.java
similarity index 98%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFileInfo.java
index 649a509d62..c50dbec69f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFileInfo.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.files;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesCommitOperator.java
similarity index 95%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesCommitOperator.java
index 57093e9fb8..1490c0912b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesCommitOperator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.files;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
@@ -42,7 +42,7 @@ import java.util.Map;
 import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
 
 /** Commit table operator. */
-public class CommitTableOperator extends AbstractStreamOperator<Long>
+public class CloneFilesCommitOperator extends AbstractStreamOperator<Long>
         implements OneInputStreamOperator<DataFileInfo, Long>, BoundedOneInput 
{
 
     private static final long serialVersionUID = 1L;
@@ -52,7 +52,7 @@ public class CommitTableOperator extends 
AbstractStreamOperator<Long>
     private transient DataFileMetaSerializer dataFileSerializer;
     private transient Map<Identifier, Map<BinaryRow, List<DataFileMeta>>> 
files;
 
-    public CommitTableOperator(Map<String, String> catalogConfig) {
+    public CloneFilesCommitOperator(Map<String, String> catalogConfig) {
         this.catalogConfig = catalogConfig;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFilesFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesFunction.java
similarity index 96%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFilesFunction.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesFunction.java
index 145770fa69..57de8621ea 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFilesFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesFunction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.files;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
@@ -37,7 +37,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 /** Clone files for table. */
-public class CloneFilesFunction extends CloneProcessFunction<CloneFileInfo, 
DataFileInfo> {
+public class CloneFilesFunction extends 
CloneFilesProcessFunction<CloneFileInfo, DataFileInfo> {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesProcessFunction.java
similarity index 92%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneProcessFunction.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesProcessFunction.java
index 3ee51fe444..3733052448 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesProcessFunction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.files;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
@@ -35,12 +35,12 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
-import static org.apache.paimon.flink.clone.CloneUtils.getRootHiveCatalog;
+import static 
org.apache.paimon.flink.clone.CloneHiveTableUtils.getRootHiveCatalog;
 
 /** Abstract function for copying tables. */
-public abstract class CloneProcessFunction<I, O> extends ProcessFunction<I, O> 
{
+public abstract class CloneFilesProcessFunction<I, O> extends 
ProcessFunction<I, O> {
 
-    protected static final Logger LOG = 
LoggerFactory.getLogger(CloneProcessFunction.class);
+    protected static final Logger LOG = 
LoggerFactory.getLogger(CloneFilesProcessFunction.class);
 
     protected final Map<String, String> sourceCatalogConfig;
     protected final Map<String, String> targetCatalogConfig;
@@ -51,7 +51,7 @@ public abstract class CloneProcessFunction<I, O> extends 
ProcessFunction<I, O> {
     protected transient Map<Identifier, Table> tableCache;
     protected transient DataFileMetaSerializer dataFileSerializer;
 
-    public CloneProcessFunction(
+    public CloneFilesProcessFunction(
             Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
         this.sourceCatalogConfig = sourceCatalogConfig;
         this.targetCatalogConfig = targetCatalogConfig;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/DataFileInfo.java
similarity index 97%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/DataFileInfo.java
index 25bfa6a757..4bf0f62990 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/DataFileInfo.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.files;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
similarity index 96%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
index 40dd2d53c6..8957b217d1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
@@ -16,10 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.files;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
@@ -51,7 +50,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkState;
 
 /** List files for table. */
 public class ListCloneFilesFunction
-        extends CloneProcessFunction<Tuple2<Identifier, Identifier>, 
CloneFileInfo> {
+        extends CloneFilesProcessFunction<Tuple2<Identifier, Identifier>, 
CloneFileInfo> {
 
     private static final long serialVersionUID = 1L;
 
@@ -169,9 +168,8 @@ public class ListCloneFilesFunction
                 "source table partition keys is not compatible with existed 
paimon table partition keys.");
     }
 
-    @VisibleForTesting
     @Nullable
-    static PartitionPredicate getPartitionPredicate(
+    public static PartitionPredicate getPartitionPredicate(
             @Nullable String whereSql, RowType partitionType, Identifier 
tableId) throws Exception {
         if (whereSql == null) {
             return null;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ShuffleDataFileByTableComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ShuffleDataFileByTableComputer.java
new file mode 100644
index 0000000000..f8ac669664
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ShuffleDataFileByTableComputer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.files;
+
+import org.apache.paimon.table.sink.ChannelComputer;
+
+import java.util.Objects;
+
+/** Shuffle DataFile By Table Computer. */
+public class ShuffleDataFileByTableComputer implements 
ChannelComputer<DataFileInfo> {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient int numChannels;
+
+    @Override
+    public void setup(int numChannels) {
+        this.numChannels = numChannels;
+    }
+
+    @Override
+    public int channel(DataFileInfo record) {
+        return Math.floorMod(Objects.hash(record.identifier()), numChannels);
+    }
+
+    @Override
+    public String toString() {
+        return "shuffle by identifier hash";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitInfo.java
new file mode 100644
index 0000000000..96d8d737d0
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitInfo.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.spits;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/** Clone split with necessary information. */
+public class CloneSplitInfo implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Identifier sourceIdentifier;
+    private final Identifier targetidentifier;
+    private final byte[] split;
+
+    public CloneSplitInfo(Identifier sourceIdentifier, Identifier 
targetidentifier, Split split) {
+        this.sourceIdentifier = sourceIdentifier;
+        this.targetidentifier = targetidentifier;
+        try {
+            this.split = InstantiationUtil.serializeObject(split);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Identifier sourceIdentifier() {
+        return sourceIdentifier;
+    }
+
+    public Identifier targetidentifier() {
+        return targetidentifier;
+    }
+
+    public Split split() {
+        try {
+            return InstantiationUtil.deserializeObject(split, 
getClass().getClassLoader());
+        } catch (IOException | ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitsFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitsFunction.java
new file mode 100644
index 0000000000..4b5bec097d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitsFunction.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.spits;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.TableRead;
+
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
+
+/** Clone splits for table. */
+public class CloneSplitsFunction extends ProcessFunction<CloneSplitInfo, 
CommitMessageInfo> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Map<String, String> sourceCatalogConfig;
+    private final Map<String, String> targetCatalogConfig;
+
+    private transient Catalog sourceCatalog;
+    private transient Catalog targetCatalog;
+
+    public CloneSplitsFunction(
+            Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
+        this.sourceCatalogConfig = sourceCatalogConfig;
+        this.targetCatalogConfig = targetCatalogConfig;
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
+    public void open(Configuration conf) throws Exception {
+        this.sourceCatalog = 
createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
+        this.targetCatalog = 
createPaimonCatalog(Options.fromMap(targetCatalogConfig));
+    }
+
+    @Override
+    public void processElement(
+            CloneSplitInfo cloneSplitInfo,
+            ProcessFunction<CloneSplitInfo, CommitMessageInfo>.Context context,
+            Collector<CommitMessageInfo> collector)
+            throws Exception {
+
+        // TODO reuse table read?
+        TableRead tableRead =
+                sourceCatalog
+                        .getTable(cloneSplitInfo.sourceIdentifier())
+                        .newReadBuilder()
+                        .newRead();
+        // TODO reuse table write
+        BatchTableWrite write =
+                targetCatalog
+                        .getTable(cloneSplitInfo.targetidentifier())
+                        .newBatchWriteBuilder()
+                        .newWrite();
+
+        tableRead
+                .createReader(cloneSplitInfo.split())
+                .forEachRemaining(
+                        row -> {
+                            try {
+                                write.write(row);
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        List<CommitMessage> commitMessages = write.prepareCommit();
+        for (CommitMessage commitMessage : commitMessages) {
+            CommitMessageInfo messageInfo =
+                    new CommitMessageInfo(cloneSplitInfo.targetidentifier(), 
commitMessage);
+            collector.collect(messageInfo);
+        }
+        write.close();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CommitMessageInfo.java
similarity index 56%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CommitMessageInfo.java
index 25bfa6a757..20ea096a75 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CommitMessageInfo.java
@@ -16,40 +16,41 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.spits;
 
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.utils.InstantiationUtil;
 
+import java.io.IOException;
 import java.io.Serializable;
 
-import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
-import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
-
-/** Data File (table, partition) with necessary information. */
-public class DataFileInfo implements Serializable {
+/** Commit message with necessary information. */
+public class CommitMessageInfo implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
     private final Identifier identifier;
-    private final byte[] partition;
-    private final byte[] dataFileMeta;
+    private final byte[] commitMessage;
 
-    public DataFileInfo(Identifier identifier, BinaryRow partition, byte[] 
dataFileMeta) {
+    public CommitMessageInfo(Identifier identifier, CommitMessage 
commitMessage) {
         this.identifier = identifier;
-        this.partition = serializeBinaryRow(partition);
-        this.dataFileMeta = dataFileMeta;
+        try {
+            this.commitMessage = 
InstantiationUtil.serializeObject(commitMessage);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public Identifier identifier() {
         return identifier;
     }
 
-    public BinaryRow partition() {
-        return deserializeBinaryRow(partition);
-    }
-
-    public byte[] dataFileMeta() {
-        return dataFileMeta;
+    public CommitMessage commitMessage() {
+        try {
+            return InstantiationUtil.deserializeObject(commitMessage, 
getClass().getClassLoader());
+        } catch (IOException | ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CommitMessageTableOperator.java
similarity index 57%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CommitMessageTableOperator.java
index 57093e9fb8..ad71ebf01a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CommitMessageTableOperator.java
@@ -16,14 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.spits;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFileMetaSerializer;
-import org.apache.paimon.migrate.FileMetaUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
@@ -42,51 +38,38 @@ import java.util.Map;
 import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
 
 /** Commit table operator. */
-public class CommitTableOperator extends AbstractStreamOperator<Long>
-        implements OneInputStreamOperator<DataFileInfo, Long>, BoundedOneInput 
{
+public class CommitMessageTableOperator extends AbstractStreamOperator<Long>
+        implements OneInputStreamOperator<CommitMessageInfo, Long>, 
BoundedOneInput {
 
     private static final long serialVersionUID = 1L;
 
     private final Map<String, String> catalogConfig;
 
-    private transient DataFileMetaSerializer dataFileSerializer;
-    private transient Map<Identifier, Map<BinaryRow, List<DataFileMeta>>> 
files;
+    private transient Map<Identifier, List<CommitMessage>> commitMessages;
 
-    public CommitTableOperator(Map<String, String> catalogConfig) {
+    public CommitMessageTableOperator(Map<String, String> catalogConfig) {
         this.catalogConfig = catalogConfig;
     }
 
     @Override
     public void open() throws Exception {
         super.open();
-        this.dataFileSerializer = new DataFileMetaSerializer();
-        this.files = new HashMap<>();
+        this.commitMessages = new HashMap<>();
     }
 
     @Override
-    public void processElement(StreamRecord<DataFileInfo> streamRecord) throws 
Exception {
-        DataFileInfo file = streamRecord.getValue();
-        BinaryRow partition = file.partition();
-        List<DataFileMeta> files =
-                this.files
-                        .computeIfAbsent(file.identifier(), k -> new 
HashMap<>())
-                        .computeIfAbsent(partition, k -> new ArrayList<>());
-        
files.add(dataFileSerializer.deserializeFromBytes(file.dataFileMeta()));
+    public void processElement(StreamRecord<CommitMessageInfo> streamRecord) 
throws Exception {
+        CommitMessageInfo info = streamRecord.getValue();
+        List<CommitMessage> messages =
+                this.commitMessages.computeIfAbsent(info.identifier(), k -> 
new ArrayList<>());
+        messages.add(info.commitMessage());
     }
 
     @Override
     public void endInput() throws Exception {
         try (Catalog catalog = 
createPaimonCatalog(Options.fromMap(catalogConfig))) {
-            for (Map.Entry<Identifier, Map<BinaryRow, List<DataFileMeta>>> 
entry :
-                    files.entrySet()) {
-                List<CommitMessage> commitMessages = new ArrayList<>();
-                for (Map.Entry<BinaryRow, List<DataFileMeta>> listEntry :
-                        entry.getValue().entrySet()) {
-                    commitMessages.add(
-                            FileMetaUtils.createCommitMessage(
-                                    listEntry.getKey(), 0, 
listEntry.getValue()));
-                }
-
+            for (Map.Entry<Identifier, List<CommitMessage>> entry : 
commitMessages.entrySet()) {
+                List<CommitMessage> commitMessages = entry.getValue();
                 Table table = catalog.getTable(entry.getKey());
                 try (BatchTableCommit commit =
                         
table.newBatchWriteBuilder().withOverwrite().newCommit()) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
new file mode 100644
index 0000000000..9fdb5226b9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.spits;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
+
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
+import static 
org.apache.paimon.flink.clone.files.ListCloneFilesFunction.getPartitionPredicate;
+
+/** List splits for table. */
+public class ListCloneSplitsFunction
+        extends ProcessFunction<Tuple2<Identifier, Identifier>, 
CloneSplitInfo> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Map<String, String> sourceCatalogConfig;
+    private final Map<String, String> targetCatalogConfig;
+    @Nullable private final String whereSql;
+
+    private transient Catalog sourceCatalog;
+    private transient Catalog targetCatalog;
+
+    public ListCloneSplitsFunction(
+            Map<String, String> sourceCatalogConfig,
+            Map<String, String> targetCatalogConfig,
+            @Nullable String whereSql) {
+        this.sourceCatalogConfig = sourceCatalogConfig;
+        this.targetCatalogConfig = targetCatalogConfig;
+        this.whereSql = whereSql;
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
+    public void open(Configuration conf) throws Exception {
+        this.sourceCatalog = 
createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
+        this.targetCatalog = 
createPaimonCatalog(Options.fromMap(targetCatalogConfig));
+    }
+
+    @Override
+    public void processElement(
+            Tuple2<Identifier, Identifier> tuple,
+            ProcessFunction<Tuple2<Identifier, Identifier>, 
CloneSplitInfo>.Context context,
+            Collector<CloneSplitInfo> collector)
+            throws Exception {
+
+        // create database if not exists
+        targetCatalog.createDatabase(tuple.f1.getDatabaseName(), true);
+
+        Table sourceTable = sourceCatalog.getTable(tuple.f0);
+        Schema.Builder builder = Schema.newBuilder();
+        sourceTable
+                .rowType()
+                .getFields()
+                .forEach(
+                        f -> builder.column(f.name(), f.type(), 
f.description(), f.defaultValue()));
+        builder.partitionKeys(sourceTable.partitionKeys());
+        builder.primaryKey(sourceTable.primaryKeys());
+        sourceTable
+                .options()
+                .forEach(
+                        (k, v) -> {
+                            if (k.equalsIgnoreCase(BUCKET.key())
+                                    || k.equalsIgnoreCase(PATH.key())) {
+                                return;
+                            }
+                            builder.option(k, v);
+                        });
+
+        if (sourceTable.primaryKeys().isEmpty()) {
+            // for append table with bucket
+            if (sourceTable.options().containsKey(BUCKET_KEY.key())) {
+                builder.option(BUCKET.key(), 
sourceTable.options().get(BUCKET.key()));
+                builder.option(BUCKET_KEY.key(), 
sourceTable.options().get(BUCKET_KEY.key()));
+            }
+        } else {
+            // for primary key table, only postpone bucket supports clone
+            builder.option(BUCKET.key(), "-2");
+        }
+
+        targetCatalog.createTable(tuple.f1, builder.build(), true);
+
+        PartitionPredicate predicate =
+                getPartitionPredicate(
+                        whereSql,
+                        
sourceTable.rowType().project(sourceTable.partitionKeys()),
+                        tuple.f0);
+        TableScan scan = 
sourceTable.newReadBuilder().withPartitionFilter(predicate).newScan();
+        List<Split> splits = scan.plan().splits();
+        for (Split split : splits) {
+            CloneSplitInfo splitInfo = new CloneSplitInfo(tuple.f0, tuple.f1, 
split);
+            collector.collect(splitInfo);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ShuffleCommitMessageByTableComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ShuffleCommitMessageByTableComputer.java
new file mode 100644
index 0000000000..974c879e7e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ShuffleCommitMessageByTableComputer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.spits;
+
+import org.apache.paimon.table.sink.ChannelComputer;
+
+import java.util.Objects;
+
+/** Shuffle by table. */
+public class ShuffleCommitMessageByTableComputer implements 
ChannelComputer<CommitMessageInfo> {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient int numChannels;
+
+    @Override
+    public void setup(int numChannels) {
+        this.numChannels = numChannels;
+    }
+
+    @Override
+    public int channel(CommitMessageInfo record) {
+        return Math.floorMod(Objects.hash(record.identifier()), numChannels);
+    }
+
+    @Override
+    public String toString() {
+        return "shuffle by identifier hash";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
index 55647848ba..287b9758a3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
@@ -61,6 +61,10 @@ public class CloneProcedure extends ProcedureBase {
                 @ArgumentHint(
                         name = "excluded_tables",
                         type = @DataTypeHint("STRING"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "clone_from",
+                        type = @DataTypeHint("STRING"),
                         isOptional = true)
             })
     public String[] call(
@@ -73,7 +77,8 @@ public class CloneProcedure extends ProcedureBase {
             String targetCatalogConfigStr,
             Integer parallelism,
             String where,
-            String excludedTablesStr)
+            String excludedTablesStr,
+            String cloneFrom)
             throws Exception {
         Map<String, String> sourceCatalogConfig =
                 new HashMap<>(optionalConfigMap(sourceCatalogConfigStr));
@@ -96,7 +101,8 @@ public class CloneProcedure extends ProcedureBase {
                         targetCatalogConfig,
                         parallelism,
                         where,
-                        excludedTables);
+                        excludedTables,
+                        cloneFrom);
         return execute(procedureContext, action, "Clone Job");
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/ListCloneFilesFunctionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/ListCloneFilesFunctionTest.java
index 880b65ab5f..5d63e32b3e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/ListCloneFilesFunctionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/ListCloneFilesFunctionTest.java
@@ -22,13 +22,14 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.flink.clone.files.ListCloneFilesFunction;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.Test;
 
-import static 
org.apache.paimon.flink.clone.ListCloneFilesFunction.getPartitionPredicate;
+import static 
org.apache.paimon.flink.clone.files.ListCloneFilesFunction.getPartitionPredicate;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
index b15a6f84ae..7858e33839 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
@@ -34,6 +34,7 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.assertj.core.api.Assertions;
@@ -70,6 +71,91 @@ public class CloneActionITCase extends ActionITCaseBase {
         TEST_HIVE_METASTORE.stop();
     }
 
+    @Test
+    public void testClonePKTableFromPaimon() throws Exception {
+        TableEnvironment tEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+        String warehouse1 = getTempDirPath();
+        String warehouse2 = getTempDirPath();
+        sql(tEnv, "CREATE CATALOG catalog1 WITH ('type'='paimon', 'warehouse' 
= '%s')", warehouse1);
+        sql(tEnv, "CREATE CATALOG catalog2 WITH ('type'='paimon', 'warehouse' 
= '%s')", warehouse2);
+
+        sql(
+                tEnv,
+                "CREATE TABLE catalog1.`default`.src (a INT, b INT, PRIMARY 
KEY (a) NOT ENFORCED)");
+        sql(tEnv, "INSERT INTO catalog1.`default`.src VALUES (1, 1), (2, 2)");
+        createAction(
+                        CloneAction.class,
+                        "clone",
+                        "--database",
+                        "default",
+                        "--table",
+                        "src",
+                        "--catalog_conf",
+                        "warehouse=" + warehouse1,
+                        "--target_database",
+                        "default",
+                        "--target_table",
+                        "target",
+                        "--target_catalog_conf",
+                        "warehouse=" + warehouse2,
+                        "--clone_from",
+                        "paimon")
+                .run();
+
+        sql(tEnv, "CALL catalog2.sys.compact(`table` => 'default.target')");
+        List<Row> result = sql(tEnv, "SELECT * FROM 
catalog2.`default`.target");
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1), Row.of(2, 
2));
+        List<Row> show = sql(tEnv, "SHOW CREATE TABLE 
catalog2.`default`.target");
+        assertThat(show.toString()).contains("PRIMARY KEY");
+    }
+
+    @Test
+    public void testCloneBucketedAppendFromPaimon() throws Exception {
+        TableEnvironment tEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+        String warehouse1 = getTempDirPath();
+        String warehouse2 = getTempDirPath();
+        sql(tEnv, "CREATE CATALOG catalog1 WITH ('type'='paimon', 'warehouse' 
= '%s')", warehouse1);
+        sql(tEnv, "CREATE CATALOG catalog2 WITH ('type'='paimon', 'warehouse' 
= '%s')", warehouse2);
+
+        sql(
+                tEnv,
+                "CREATE TABLE catalog1.`default`.src (a INT, b INT) WITH 
('bucket' = '2', 'bucket-key' = 'b')");
+        sql(tEnv, "INSERT INTO catalog1.`default`.src VALUES (1, 1), (2, 2)");
+        createAction(
+                        CloneAction.class,
+                        "clone",
+                        "--database",
+                        "default",
+                        "--table",
+                        "src",
+                        "--catalog_conf",
+                        "warehouse=" + warehouse1,
+                        "--target_database",
+                        "default",
+                        "--target_table",
+                        "target",
+                        "--target_catalog_conf",
+                        "warehouse=" + warehouse2,
+                        "--clone_from",
+                        "paimon")
+                .run();
+
+        sql(tEnv, "CALL catalog2.sys.compact(`table` => 'default.target')");
+        List<Row> result = sql(tEnv, "SELECT * FROM 
catalog2.`default`.target");
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1), Row.of(2, 
2));
+        List<Row> show = sql(tEnv, "SHOW CREATE TABLE 
catalog2.`default`.target");
+        assertThat(show.toString()).contains("'bucket' = '2'");
+        assertThat(show.toString()).contains("'bucket-key' = 'b'");
+    }
+
     @Test
     public void testMigrateOneNonPartitionedTable() throws Exception {
         String format = randomFormat();

Reply via email to