This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d8f341276 [flink] Make clone hive tables production-ready (#5490)
6d8f341276 is described below

commit 6d8f341276b4b9c9ba1fbcf580861b70fc316f36
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 18 18:05:44 2025 +0800

    [flink] Make clone hive tables production-ready (#5490)
---
 docs/content/migration/clone-from-hive.md          |  56 +++
 .../paimon/flink/action/CloneHiveAction.java       |  33 +-
 .../apache/paimon/flink/clone/CloneHiveUtils.java  | 452 ---------------------
 .../paimon/flink/clone/hive/CloneFileInfo.java     |  84 ++++
 .../paimon/flink/clone/hive/CloneHiveUtils.java    | 166 ++++++++
 .../flink/clone/hive/CommitTableOperator.java      |  96 +++++
 .../flink/clone/hive/CopyHiveFilesFunction.java    | 125 ++++++
 .../flink/clone/hive/CopyProcessFunction.java      |  83 ++++
 .../paimon/flink/clone/hive/DataFileInfo.java      |  55 +++
 .../flink/clone/hive/ListHiveFilesFunction.java    | 120 ++++++
 .../paimon/flink/clone/CloneHiveUtilsTest.java     |  13 +-
 .../{HiveMigrateUtils.java => HiveCloneUtils.java} |   6 +-
 .../hive/procedure/CloneHiveActionITCase.java      |  17 -
 13 files changed, 818 insertions(+), 488 deletions(-)

diff --git a/docs/content/migration/clone-from-hive.md 
b/docs/content/migration/clone-from-hive.md
new file mode 100644
index 0000000000..705312d8b8
--- /dev/null
+++ b/docs/content/migration/clone-from-hive.md
@@ -0,0 +1,56 @@
+---
+title: "Clone From Hive"
+weight: 5
+type: docs
+aliases:
+- /migration/clone-from-hive.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Clone Hive Table
+
+Clone Hive Table supports cloning hive tables with parquet, orc and avro 
formats. The cloned table will be
+[append table]({{< ref "append-table/overview" >}}).
+
+## Clone Hive Table
+
+```bash
+<FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar \
+clone_hive \
+--database default
+--table hivetable
+--catalog_conf metastore=hive
+--catalog_conf uri=thrift://localhost:9088
+--target_database test
+--target_table test_table
+--target_catalog_conf warehouse=my_warehouse
+```
+
+## Clone Hive Database
+
+```bash
+<FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar \
+clone_hive \
+--database default
+--catalog_conf metastore=hive
+--catalog_conf uri=thrift://localhost:9088
+--target_database test
+--target_catalog_conf warehouse=my_warehouse
+```
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
index 9a1666bcea..25b1c5809f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
@@ -19,17 +19,22 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.clone.CloneHiveUtils;
+import org.apache.paimon.flink.clone.hive.CloneFileInfo;
+import org.apache.paimon.flink.clone.hive.CloneHiveUtils;
+import org.apache.paimon.flink.clone.hive.CommitTableOperator;
+import org.apache.paimon.flink.clone.hive.CopyHiveFilesFunction;
+import org.apache.paimon.flink.clone.hive.DataFileInfo;
+import org.apache.paimon.flink.clone.hive.ListHiveFilesFunction;
 import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
 import org.apache.paimon.options.CatalogOptions;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
 
 import javax.annotation.Nullable;
 
-import java.util.List;
 import java.util.Map;
 
 /** Clone source files managed by HiveMetaStore and commit metas to construct 
Paimon table. */
@@ -91,23 +96,33 @@ public class CloneHiveAction extends ActionBase {
                         source, new CloneHiveUtils.TableChannelComputer(), 
parallelism);
 
         // create target table, list files and group by <table, partition>
-        DataStream<List<CloneHiveUtils.CloneFilesInfo>> files =
+        DataStream<CloneFileInfo> files =
                 partitionedSource
                         .process(
-                                
CloneHiveUtils.createTargetTableAndListFilesFunction(
+                                new ListHiveFilesFunction(
                                         sourceCatalogConfig, 
targetCatalogConfig, whereSql))
                         .name("List Files")
                         .setParallelism(parallelism);
 
         // copy files and commit
-        DataStream<Void> committed =
-                files.forward()
+        DataStream<DataFileInfo> dataFile =
+                files.rebalance()
                         .process(
-                                CloneHiveUtils.copyAndCommitFunction(
-                                        sourceCatalogConfig, 
targetCatalogConfig))
-                        .name("Copy and Commit")
+                                new CopyHiveFilesFunction(sourceCatalogConfig, 
targetCatalogConfig))
+                        .name("Copy Files")
                         .setParallelism(parallelism);
 
+        DataStream<DataFileInfo> partitionedDataFile =
+                FlinkStreamPartitioner.partition(
+                        dataFile, new 
CloneHiveUtils.DataFileChannelComputer(), parallelism);
+
+        DataStream<Long> committed =
+                partitionedDataFile
+                        .transform(
+                                "Commit table",
+                                BasicTypeInfo.LONG_TYPE_INFO,
+                                new CommitTableOperator(targetCatalogConfig))
+                        .setParallelism(parallelism);
         committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveUtils.java
deleted file mode 100644
index f2b8f63cd6..0000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveUtils.java
+++ /dev/null
@@ -1,452 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.clone;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.DelegateCatalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.flink.FlinkCatalogFactory;
-import org.apache.paimon.flink.action.CloneHiveAction;
-import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
-import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.SimpleColStats;
-import org.apache.paimon.format.SimpleStatsExtractor;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.hive.HiveCatalog;
-import org.apache.paimon.hive.migrate.HiveMigrateUtils;
-import org.apache.paimon.hive.migrate.HivePartitionFiles;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.migrate.FileMetaUtils;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.statistics.SimpleColStatsCollector;
-import org.apache.paimon.stats.SimpleStats;
-import org.apache.paimon.stats.SimpleStatsConverter;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.BatchTableCommit;
-import org.apache.paimon.table.sink.ChannelComputer;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.StatsCollectorFactories;
-import org.apache.paimon.utils.StringUtils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
-import static org.apache.paimon.utils.Preconditions.checkState;
-
-/** Utils for building {@link CloneHiveAction}. */
-public class CloneHiveUtils {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(CloneHiveUtils.class);
-
-    public static DataStream<Tuple2<Identifier, Identifier>> buildSource(
-            String sourceDatabase,
-            String sourceTableName,
-            String targetDatabase,
-            String targetTableName,
-            Catalog sourceCatalog,
-            StreamExecutionEnvironment env)
-            throws Exception {
-        List<Tuple2<Identifier, Identifier>> result = new ArrayList<>();
-        HiveCatalog hiveCatalog = checkAndGetHiveCatalog(sourceCatalog);
-        if (StringUtils.isNullOrWhitespaceOnly(sourceDatabase)) {
-            checkArgument(
-                    StringUtils.isNullOrWhitespaceOnly(sourceTableName),
-                    "sourceTableName must be blank when database is null.");
-            checkArgument(
-                    StringUtils.isNullOrWhitespaceOnly(targetDatabase),
-                    "targetDatabase must be blank when clone all tables in a 
catalog.");
-            checkArgument(
-                    StringUtils.isNullOrWhitespaceOnly(targetTableName),
-                    "targetTableName must be blank when clone all tables in a 
catalog.");
-
-            for (Identifier identifier : 
HiveMigrateUtils.listTables(hiveCatalog)) {
-                result.add(new Tuple2<>(identifier, identifier));
-            }
-        } else if (StringUtils.isNullOrWhitespaceOnly(sourceTableName)) {
-            checkArgument(
-                    !StringUtils.isNullOrWhitespaceOnly(targetDatabase),
-                    "targetDatabase must not be blank when clone all tables in 
a database.");
-            checkArgument(
-                    StringUtils.isNullOrWhitespaceOnly(targetTableName),
-                    "targetTableName must be blank when clone all tables in a 
catalog.");
-
-            for (Identifier identifier : 
HiveMigrateUtils.listTables(hiveCatalog, sourceDatabase)) {
-                result.add(
-                        new Tuple2<>(
-                                identifier,
-                                Identifier.create(targetDatabase, 
identifier.getObjectName())));
-            }
-        } else {
-            checkArgument(
-                    !StringUtils.isNullOrWhitespaceOnly(targetDatabase),
-                    "targetDatabase must not be blank when clone a table.");
-            checkArgument(
-                    !StringUtils.isNullOrWhitespaceOnly(targetTableName),
-                    "targetTableName must not be blank when clone a table.");
-            result.add(
-                    new Tuple2<>(
-                            Identifier.create(sourceDatabase, sourceTableName),
-                            Identifier.create(targetDatabase, 
targetTableName)));
-        }
-
-        checkState(!result.isEmpty(), "Didn't find any table in source 
catalog.");
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("The clone identifiers of source table and target table 
are: {}", result);
-        }
-        return env.fromCollection(result).forceNonParallel();
-    }
-
-    public static ProcessFunction<Tuple2<Identifier, Identifier>, 
List<CloneFilesInfo>>
-            createTargetTableAndListFilesFunction(
-                    Map<String, String> sourceCatalogConfig,
-                    Map<String, String> targetCatalogConfig,
-                    @Nullable String whereSql) {
-        return new ProcessFunction<Tuple2<Identifier, Identifier>, 
List<CloneFilesInfo>>() {
-            @Override
-            public void processElement(
-                    Tuple2<Identifier, Identifier> tuple,
-                    ProcessFunction<Tuple2<Identifier, Identifier>, 
List<CloneFilesInfo>>.Context
-                            context,
-                    Collector<List<CloneFilesInfo>> collector)
-                    throws Exception {
-                String sourceType = 
sourceCatalogConfig.get(CatalogOptions.METASTORE.key());
-                checkNotNull(sourceType);
-                try (Catalog sourceCatalog =
-                                FlinkCatalogFactory.createPaimonCatalog(
-                                        Options.fromMap(sourceCatalogConfig));
-                        Catalog targetCatalog =
-                                FlinkCatalogFactory.createPaimonCatalog(
-                                        Options.fromMap(targetCatalogConfig))) 
{
-
-                    HiveCatalog hiveCatalog = 
checkAndGetHiveCatalog(sourceCatalog);
-
-                    Schema schema = 
HiveMigrateUtils.hiveTableToPaimonSchema(hiveCatalog, tuple.f0);
-                    Map<String, String> options = schema.options();
-                    // only support Hive to unaware-bucket table now
-                    options.put(CoreOptions.BUCKET.key(), "-1");
-                    schema =
-                            new Schema(
-                                    schema.fields(),
-                                    schema.partitionKeys(),
-                                    schema.primaryKeys(),
-                                    options,
-                                    schema.comment());
-                    targetCatalog.createTable(tuple.f1, schema, false);
-                    FileStoreTable table = (FileStoreTable) 
targetCatalog.getTable(tuple.f1);
-                    PartitionPredicate predicate =
-                            getPartitionPredicate(
-                                    whereSql, 
table.schema().logicalPartitionType(), tuple.f0);
-
-                    List<HivePartitionFiles> allPartitions =
-                            HiveMigrateUtils.listFiles(
-                                    hiveCatalog,
-                                    tuple.f0,
-                                    table.schema().logicalPartitionType(),
-                                    table.coreOptions().partitionDefaultName(),
-                                    predicate);
-                    List<CloneFilesInfo> cloneFilesInfos = new ArrayList<>();
-                    for (HivePartitionFiles partitionFiles : allPartitions) {
-                        cloneFilesInfos.add(CloneFilesInfo.fromHive(tuple.f1, 
partitionFiles, 0));
-                    }
-                    collector.collect(cloneFilesInfos);
-                }
-            }
-        };
-    }
-
-    public static ProcessFunction<List<CloneFilesInfo>, Void> 
copyAndCommitFunction(
-            Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
-        return new ProcessFunction<List<CloneFilesInfo>, Void>() {
-
-            @Override
-            public void processElement(
-                    List<CloneFilesInfo> cloneFilesInfos,
-                    ProcessFunction<List<CloneFilesInfo>, Void>.Context 
context,
-                    Collector<Void> collector)
-                    throws Exception {
-                try (Catalog targetCatalog =
-                        FlinkCatalogFactory.createPaimonCatalog(
-                                Options.fromMap(targetCatalogConfig))) {
-
-                    // source FileIO
-                    CatalogContext sourceContext =
-                            
CatalogContext.create(Options.fromMap(sourceCatalogConfig));
-                    String warehouse = 
checkNotNull(sourceContext.options().get(WAREHOUSE));
-                    FileIO sourceFileIO = FileIO.get(new Path(warehouse), 
sourceContext);
-
-                    // group by table
-                    Map<Identifier, List<CloneFilesInfo>> groupedFiles = new 
HashMap<>();
-                    for (CloneFilesInfo files : cloneFilesInfos) {
-                        groupedFiles
-                                .computeIfAbsent(files.identifier(), k -> new 
ArrayList<>())
-                                .add(files);
-                    }
-
-                    for (Map.Entry<Identifier, List<CloneFilesInfo>> entry :
-                            groupedFiles.entrySet()) {
-                        FileStoreTable targetTable =
-                                (FileStoreTable) 
targetCatalog.getTable(entry.getKey());
-                        commit(entry.getValue(), sourceFileIO, targetTable);
-                    }
-                }
-            }
-        };
-    }
-
-    private static void commit(
-            List<CloneFilesInfo> cloneFilesInfos, FileIO sourceFileIO, 
FileStoreTable targetTable)
-            throws Exception {
-        List<CommitMessage> commitMessages = new ArrayList<>();
-        for (CloneFilesInfo onePartitionFiles : cloneFilesInfos) {
-            commitMessages.add(
-                    copyFileAndGetCommitMessage(onePartitionFiles, 
sourceFileIO, targetTable));
-        }
-        try (BatchTableCommit commit = 
targetTable.newBatchWriteBuilder().newCommit()) {
-            commit.commit(commitMessages);
-        }
-    }
-
-    private static CommitMessage copyFileAndGetCommitMessage(
-            CloneFilesInfo cloneFilesInfo, FileIO sourceFileIO, FileStoreTable 
targetTable)
-            throws IOException {
-        // util for collecting stats
-        CoreOptions options = targetTable.coreOptions();
-        SimpleColStatsCollector.Factory[] factories =
-                StatsCollectorFactories.createStatsFactories(
-                        options.statsMode(), options, 
targetTable.rowType().getFieldNames());
-
-        SimpleStatsExtractor simpleStatsExtractor =
-                FileFormat.fromIdentifier(cloneFilesInfo.format(), 
options.toConfiguration())
-                        .createStatsExtractor(targetTable.rowType(), factories)
-                        .orElseThrow(
-                                () ->
-                                        new RuntimeException(
-                                                "Can't get table stats 
extractor for format "
-                                                        + 
cloneFilesInfo.format()));
-        RowType rowTypeWithSchemaId =
-                
targetTable.schemaManager().schema(targetTable.schema().id()).logicalRowType();
-
-        SimpleStatsConverter statsArraySerializer = new 
SimpleStatsConverter(rowTypeWithSchemaId);
-
-        List<Path> paths = cloneFilesInfo.paths();
-        List<Long> fileSizes = cloneFilesInfo.fileSizes();
-        List<DataFileMeta> dataFileMetas = new ArrayList<>();
-        for (int i = 0; i < paths.size(); i++) {
-            Path path = paths.get(i);
-            long fileSize = fileSizes.get(i);
-
-            // extract stats
-            Pair<SimpleColStats[], SimpleStatsExtractor.FileInfo> fileInfo =
-                    simpleStatsExtractor.extractWithFileInfo(sourceFileIO, 
path, fileSize);
-            SimpleStats stats = 
statsArraySerializer.toBinaryAllMode(fileInfo.getLeft());
-
-            // new file name
-            String suffix = "." + cloneFilesInfo.format();
-            String fileName = path.getName();
-            String newFileName = fileName.endsWith(suffix) ? fileName : 
fileName + suffix;
-
-            // copy files
-            Path targetFilePath =
-                    targetTable
-                            .store()
-                            .pathFactory()
-                            .bucketPath(cloneFilesInfo.partition(), 
cloneFilesInfo.bucket());
-            IOUtils.copyBytes(
-                    sourceFileIO.newInputStream(path),
-                    targetTable
-                            .fileIO()
-                            .newOutputStream(new Path(targetFilePath, 
newFileName), false));
-
-            // to DataFileMeta
-            DataFileMeta dataFileMeta =
-                    DataFileMeta.forAppend(
-                            newFileName,
-                            fileSize,
-                            fileInfo.getRight().getRowCount(),
-                            stats,
-                            0,
-                            0,
-                            targetTable.schema().id(),
-                            Collections.emptyList(),
-                            null,
-                            FileSource.APPEND,
-                            null,
-                            null);
-            dataFileMetas.add(dataFileMeta);
-        }
-        return FileMetaUtils.commitFile(
-                cloneFilesInfo.partition(), 
targetTable.coreOptions().bucket(), dataFileMetas);
-    }
-
-    private static HiveCatalog checkAndGetHiveCatalog(Catalog catalog) {
-        Catalog rootCatalog = DelegateCatalog.rootCatalog(catalog);
-        checkArgument(
-                rootCatalog instanceof HiveCatalog,
-                "Only support HiveCatalog now but found %s.",
-                rootCatalog.getClass().getName());
-        return (HiveCatalog) rootCatalog;
-    }
-
-    @VisibleForTesting
-    @Nullable
-    static PartitionPredicate getPartitionPredicate(
-            @Nullable String whereSql, RowType partitionType, Identifier 
tableId) throws Exception {
-        if (whereSql == null) {
-            return null;
-        }
-
-        SimpleSqlPredicateConvertor simpleSqlPredicateConvertor =
-                new SimpleSqlPredicateConvertor(partitionType);
-        try {
-            Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate(whereSql);
-            return PartitionPredicate.fromPredicate(partitionType, predicate);
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    "Failed to parse partition filter sql '"
-                            + whereSql
-                            + "' for table "
-                            + tableId.getFullName(),
-                    e);
-        }
-    }
-
-    // ---------------------------------- Classes 
----------------------------------
-
-    /** Shuffle tables. */
-    public static class TableChannelComputer
-            implements ChannelComputer<Tuple2<Identifier, Identifier>> {
-
-        private static final long serialVersionUID = 1L;
-
-        private transient int numChannels;
-
-        @Override
-        public void setup(int numChannels) {
-            this.numChannels = numChannels;
-        }
-
-        @Override
-        public int channel(Tuple2<Identifier, Identifier> record) {
-            return Math.floorMod(
-                    Objects.hash(record.f1.getDatabaseName(), 
record.f1.getTableName()),
-                    numChannels);
-        }
-
-        @Override
-        public String toString() {
-            return "shuffle by identifier hash";
-        }
-    }
-
-    /** Files grouped by (table, partition, bucket) with necessary 
information. */
-    public static class CloneFilesInfo implements Serializable {
-
-        private static final long serialVersionUID = 1L;
-
-        private final Identifier identifier;
-        private final BinaryRow partition;
-        private final int bucket;
-        private final List<Path> paths;
-        private final List<Long> fileSizes;
-        private final String format;
-
-        public CloneFilesInfo(
-                Identifier identifier,
-                BinaryRow partition,
-                int bucket,
-                List<Path> paths,
-                List<Long> fileSizes,
-                String format) {
-            this.identifier = identifier;
-            this.partition = partition;
-            this.bucket = bucket;
-            this.paths = paths;
-            this.fileSizes = fileSizes;
-            this.format = format;
-        }
-
-        public Identifier identifier() {
-            return identifier;
-        }
-
-        public BinaryRow partition() {
-            return partition;
-        }
-
-        public int bucket() {
-            return bucket;
-        }
-
-        public List<Path> paths() {
-            return paths;
-        }
-
-        public List<Long> fileSizes() {
-            return fileSizes;
-        }
-
-        public String format() {
-            return format;
-        }
-
-        public static CloneFilesInfo fromHive(
-                Identifier identifier, HivePartitionFiles hivePartitionFiles, 
int bucket) {
-            return new CloneFilesInfo(
-                    identifier,
-                    hivePartitionFiles.partition(),
-                    bucket,
-                    hivePartitionFiles.paths(),
-                    hivePartitionFiles.fileSizes(),
-                    hivePartitionFiles.format());
-        }
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneFileInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneFileInfo.java
new file mode 100644
index 0000000000..80b228752f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneFileInfo.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.hive;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.migrate.HivePartitionFiles;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** Clone File (table, partition) with necessary information. */
+public class CloneFileInfo implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Identifier identifier;
+    private final byte[] partition;
+    private final Path path;
+    private final long fileSize;
+    private final String format;
+
+    public CloneFileInfo(
+            Identifier identifier, BinaryRow partition, Path path, long 
fileSize, String format) {
+        this.identifier = identifier;
+        this.partition = serializeBinaryRow(partition);
+        this.path = path;
+        this.fileSize = fileSize;
+        this.format = format;
+    }
+
+    public Identifier identifier() {
+        return identifier;
+    }
+
+    public BinaryRow partition() {
+        return deserializeBinaryRow(partition);
+    }
+
+    public Path path() {
+        return path;
+    }
+
+    public long fileSize() {
+        return fileSize;
+    }
+
+    public String format() {
+        return format;
+    }
+
+    public static List<CloneFileInfo> fromHive(Identifier identifier, 
HivePartitionFiles files) {
+        List<CloneFileInfo> result = new ArrayList<>();
+        for (int i = 0; i < files.paths().size(); i++) {
+            Path path = files.paths().get(i);
+            long fileSize = files.fileSizes().get(i);
+            result.add(
+                    new CloneFileInfo(
+                            identifier, files.partition(), path, fileSize, 
files.format()));
+        }
+        return result;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneHiveUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneHiveUtils.java
new file mode 100644
index 0000000000..1afed4a9ce
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneHiveUtils.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.hive;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.DelegateCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.CloneHiveAction;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.migrate.HiveCloneUtils;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Utils for building {@link CloneHiveAction}. */
+public class CloneHiveUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CloneHiveUtils.class);
+
+    public static DataStream<Tuple2<Identifier, Identifier>> buildSource(
+            String sourceDatabase,
+            String sourceTableName,
+            String targetDatabase,
+            String targetTableName,
+            Catalog sourceCatalog,
+            StreamExecutionEnvironment env)
+            throws Exception {
+        List<Tuple2<Identifier, Identifier>> result = new ArrayList<>();
+        HiveCatalog hiveCatalog = getRootHiveCatalog(sourceCatalog);
+        if (StringUtils.isNullOrWhitespaceOnly(sourceDatabase)) {
+            checkArgument(
+                    StringUtils.isNullOrWhitespaceOnly(sourceTableName),
+                    "sourceTableName must be blank when database is null.");
+            checkArgument(
+                    StringUtils.isNullOrWhitespaceOnly(targetDatabase),
+                    "targetDatabase must be blank when clone all tables in a 
catalog.");
+            checkArgument(
+                    StringUtils.isNullOrWhitespaceOnly(targetTableName),
+                    "targetTableName must be blank when clone all tables in a 
catalog.");
+
+            for (Identifier identifier : 
HiveCloneUtils.listTables(hiveCatalog)) {
+                result.add(new Tuple2<>(identifier, identifier));
+            }
+        } else if (StringUtils.isNullOrWhitespaceOnly(sourceTableName)) {
+            checkArgument(
+                    !StringUtils.isNullOrWhitespaceOnly(targetDatabase),
+                    "targetDatabase must not be blank when clone all tables in 
a database.");
+            checkArgument(
+                    StringUtils.isNullOrWhitespaceOnly(targetTableName),
+                    "targetTableName must be blank when clone all tables in a 
catalog.");
+
+            for (Identifier identifier : 
HiveCloneUtils.listTables(hiveCatalog, sourceDatabase)) {
+                result.add(
+                        new Tuple2<>(
+                                identifier,
+                                Identifier.create(targetDatabase, 
identifier.getObjectName())));
+            }
+        } else {
+            checkArgument(
+                    !StringUtils.isNullOrWhitespaceOnly(targetDatabase),
+                    "targetDatabase must not be blank when clone a table.");
+            checkArgument(
+                    !StringUtils.isNullOrWhitespaceOnly(targetTableName),
+                    "targetTableName must not be blank when clone a table.");
+            result.add(
+                    new Tuple2<>(
+                            Identifier.create(sourceDatabase, sourceTableName),
+                            Identifier.create(targetDatabase, 
targetTableName)));
+        }
+
+        checkState(!result.isEmpty(), "Didn't find any table in source 
catalog.");
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("The clone identifiers of source table and target table 
are: {}", result);
+        }
+        return env.fromCollection(result).forceNonParallel();
+    }
+
+    public static HiveCatalog getRootHiveCatalog(Catalog catalog) {
+        Catalog rootCatalog = DelegateCatalog.rootCatalog(catalog);
+        checkArgument(
+                rootCatalog instanceof HiveCatalog,
+                "Only support HiveCatalog now but found %s.",
+                rootCatalog.getClass().getName());
+        return (HiveCatalog) rootCatalog;
+    }
+
+    // ---------------------------------- Classes 
----------------------------------
+
+    /** Shuffle tables. */
+    public static class TableChannelComputer
+            implements ChannelComputer<Tuple2<Identifier, Identifier>> {
+
+        private static final long serialVersionUID = 1L;
+
+        private transient int numChannels;
+
+        @Override
+        public void setup(int numChannels) {
+            this.numChannels = numChannels;
+        }
+
+        @Override
+        public int channel(Tuple2<Identifier, Identifier> record) {
+            return Math.floorMod(
+                    Objects.hash(record.f1.getDatabaseName(), 
record.f1.getTableName()),
+                    numChannels);
+        }
+
+        @Override
+        public String toString() {
+            return "shuffle by identifier hash";
+        }
+    }
+
+    /** Shuffle tables. */
+    public static class DataFileChannelComputer implements 
ChannelComputer<DataFileInfo> {
+
+        private static final long serialVersionUID = 1L;
+
+        private transient int numChannels;
+
+        @Override
+        public void setup(int numChannels) {
+            this.numChannels = numChannels;
+        }
+
+        @Override
+        public int channel(DataFileInfo record) {
+            return Math.floorMod(Objects.hash(record.identifier()), 
numChannels);
+        }
+
+        @Override
+        public String toString() {
+            return "shuffle by identifier hash";
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CommitTableOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CommitTableOperator.java
new file mode 100644
index 0000000000..a51ba6c097
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CommitTableOperator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.hive;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitMessage;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
+
+/** Commit table operator. */
+public class CommitTableOperator extends AbstractStreamOperator<Long>
+        implements OneInputStreamOperator<DataFileInfo, Long>, BoundedOneInput 
{
+
+    private static final long serialVersionUID = 1L;
+
+    private final Map<String, String> catalogConfig;
+
+    private transient DataFileMetaSerializer dataFileSerializer;
+    private transient Map<Identifier, Map<BinaryRow, List<DataFileMeta>>> 
files;
+
+    public CommitTableOperator(Map<String, String> catalogConfig) {
+        this.catalogConfig = catalogConfig;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.dataFileSerializer = new DataFileMetaSerializer();
+        this.files = new HashMap<>();
+    }
+
+    @Override
+    public void processElement(StreamRecord<DataFileInfo> streamRecord) throws 
Exception {
+        DataFileInfo file = streamRecord.getValue();
+        BinaryRow partition = file.partition();
+        List<DataFileMeta> files =
+                this.files
+                        .computeIfAbsent(file.identifier(), k -> new 
HashMap<>())
+                        .computeIfAbsent(partition, k -> new ArrayList<>());
+        
files.add(dataFileSerializer.deserializeFromBytes(file.dataFileMeta()));
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        try (Catalog catalog = 
createPaimonCatalog(Options.fromMap(catalogConfig))) {
+            for (Map.Entry<Identifier, Map<BinaryRow, List<DataFileMeta>>> 
entry :
+                    files.entrySet()) {
+                List<CommitMessage> commitMessages = new ArrayList<>();
+                for (Map.Entry<BinaryRow, List<DataFileMeta>> listEntry :
+                        entry.getValue().entrySet()) {
+                    commitMessages.add(
+                            FileMetaUtils.commitFile(listEntry.getKey(), 0, 
listEntry.getValue()));
+                }
+
+                Table table = catalog.getTable(entry.getKey());
+                try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+                    commit.commit(commitMessages);
+                }
+            }
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
new file mode 100644
index 0000000000..20bca892dd
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.hive;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.SimpleColStats;
+import org.apache.paimon.format.SimpleStatsExtractor;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.stats.SimpleStatsConverter;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.StatsCollectorFactories;
+
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+import java.util.Map;
+
+/** Copy files for table. */
+public class CopyHiveFilesFunction extends CopyProcessFunction<CloneFileInfo, 
DataFileInfo> {
+
+    private static final long serialVersionUID = 1L;
+
+    public CopyHiveFilesFunction(
+            Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
+        super(sourceCatalogConfig, targetCatalogConfig);
+    }
+
+    @Override
+    public void processElement(
+            CloneFileInfo cloneFileInfo,
+            ProcessFunction<CloneFileInfo, DataFileInfo>.Context context,
+            Collector<DataFileInfo> collector)
+            throws Exception {
+        Identifier identifier = cloneFileInfo.identifier();
+        long fileSize = cloneFileInfo.fileSize();
+        String format = cloneFileInfo.format();
+        Path path = cloneFileInfo.path();
+        BinaryRow partition = cloneFileInfo.partition();
+
+        FileIO sourceFileIO = hiveCatalog.fileIO();
+        FileStoreTable targetTable = (FileStoreTable) getTable(identifier);
+        // util for collecting stats
+        CoreOptions options = targetTable.coreOptions();
+        SimpleColStatsCollector.Factory[] factories =
+                StatsCollectorFactories.createStatsFactories(
+                        options.statsMode(), options, 
targetTable.rowType().getFieldNames());
+
+        SimpleStatsExtractor simpleStatsExtractor =
+                FileFormat.fromIdentifier(format, options.toConfiguration())
+                        .createStatsExtractor(targetTable.rowType(), factories)
+                        .orElseThrow(
+                                () ->
+                                        new RuntimeException(
+                                                "Can't get table stats 
extractor for format "
+                                                        + format));
+        RowType rowTypeWithSchemaId =
+                
targetTable.schemaManager().schema(targetTable.schema().id()).logicalRowType();
+
+        SimpleStatsConverter statsArraySerializer = new 
SimpleStatsConverter(rowTypeWithSchemaId);
+
+        // extract stats
+        Pair<SimpleColStats[], SimpleStatsExtractor.FileInfo> fileInfo =
+                simpleStatsExtractor.extractWithFileInfo(sourceFileIO, path, 
fileSize);
+        SimpleStats stats = 
statsArraySerializer.toBinaryAllMode(fileInfo.getLeft());
+
+        // new file name
+        String suffix = "." + format;
+        String fileName = path.getName();
+        String newFileName = fileName.endsWith(suffix) ? fileName : fileName + 
suffix;
+
+        // copy files
+        Path targetFilePath = 
targetTable.store().pathFactory().bucketPath(partition, 0);
+        IOUtils.copyBytes(
+                sourceFileIO.newInputStream(path),
+                targetTable.fileIO().newOutputStream(new Path(targetFilePath, 
newFileName), false));
+
+        // to DataFileMeta
+        DataFileMeta dataFileMeta =
+                DataFileMeta.forAppend(
+                        newFileName,
+                        fileSize,
+                        fileInfo.getRight().getRowCount(),
+                        stats,
+                        0,
+                        0,
+                        targetTable.schema().id(),
+                        Collections.emptyList(),
+                        null,
+                        FileSource.APPEND,
+                        null,
+                        null);
+
+        collector.collect(
+                new DataFileInfo(
+                        identifier, partition, 
dataFileSerializer.serializeToBytes(dataFileMeta)));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
new file mode 100644
index 0000000000..d083a6dc3d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.hive;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
+import static 
org.apache.paimon.flink.clone.hive.CloneHiveUtils.getRootHiveCatalog;
+
+/** Abstract function for copying tables. */
+public abstract class CopyProcessFunction<I, O> extends ProcessFunction<I, O> {
+
+    protected final Map<String, String> sourceCatalogConfig;
+    protected final Map<String, String> targetCatalogConfig;
+
+    protected transient HiveCatalog hiveCatalog;
+    protected transient Catalog targetCatalog;
+
+    protected transient Map<Identifier, Table> tableCache;
+    protected transient DataFileMetaSerializer dataFileSerializer;
+
+    public CopyProcessFunction(
+            Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
+        this.sourceCatalogConfig = sourceCatalogConfig;
+        this.targetCatalogConfig = targetCatalogConfig;
+    }
+
+    @Override
+    public void open(OpenContext openContext) throws Exception {
+        super.open(openContext);
+        this.hiveCatalog =
+                
getRootHiveCatalog(createPaimonCatalog(Options.fromMap(sourceCatalogConfig)));
+        this.targetCatalog = 
createPaimonCatalog(Options.fromMap(targetCatalogConfig));
+        this.dataFileSerializer = new DataFileMetaSerializer();
+        this.tableCache = new HashMap<>();
+    }
+
+    protected Table getTable(Identifier identifier) {
+        return tableCache.computeIfAbsent(
+                identifier,
+                k -> {
+                    try {
+                        return targetCatalog.getTable(k);
+                    } catch (Catalog.TableNotExistException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        this.hiveCatalog.close();
+        this.targetCatalog.close();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/DataFileInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/DataFileInfo.java
new file mode 100644
index 0000000000..2baa11af61
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/DataFileInfo.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.hive;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+
+import java.io.Serializable;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** Data File (table, partition) with necessary information. */
+public class DataFileInfo implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Identifier identifier;
+    private final byte[] partition;
+    private final byte[] dataFileMeta;
+
+    public DataFileInfo(Identifier identifier, BinaryRow partition, byte[] 
dataFileMeta) {
+        this.identifier = identifier;
+        this.partition = serializeBinaryRow(partition);
+        this.dataFileMeta = dataFileMeta;
+    }
+
+    public Identifier identifier() {
+        return identifier;
+    }
+
+    public BinaryRow partition() {
+        return deserializeBinaryRow(partition);
+    }
+
+    public byte[] dataFileMeta() {
+        return dataFileMeta;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
new file mode 100644
index 0000000000..6e95257929
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.hive;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
+import org.apache.paimon.hive.migrate.HiveCloneUtils;
+import org.apache.paimon.hive.migrate.HivePartitionFiles;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** List files for table. */
+public class ListHiveFilesFunction
+        extends CopyProcessFunction<Tuple2<Identifier, Identifier>, 
CloneFileInfo> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Nullable private final String whereSql;
+
+    public ListHiveFilesFunction(
+            Map<String, String> sourceCatalogConfig,
+            Map<String, String> targetCatalogConfig,
+            @Nullable String whereSql) {
+        super(sourceCatalogConfig, targetCatalogConfig);
+        this.whereSql = whereSql;
+    }
+
+    @Override
+    public void processElement(
+            Tuple2<Identifier, Identifier> tuple,
+            ProcessFunction<Tuple2<Identifier, Identifier>, 
CloneFileInfo>.Context context,
+            Collector<CloneFileInfo> collector)
+            throws Exception {
+        String sourceType = 
sourceCatalogConfig.get(CatalogOptions.METASTORE.key());
+        checkNotNull(sourceType);
+
+        Schema schema = HiveCloneUtils.hiveTableToPaimonSchema(hiveCatalog, 
tuple.f0);
+        Map<String, String> options = schema.options();
+        // only support Hive to unaware-bucket table now
+        options.put(CoreOptions.BUCKET.key(), "-1");
+        schema =
+                new Schema(
+                        schema.fields(),
+                        schema.partitionKeys(),
+                        schema.primaryKeys(),
+                        options,
+                        schema.comment());
+        targetCatalog.createTable(tuple.f1, schema, false);
+        FileStoreTable table = (FileStoreTable) 
targetCatalog.getTable(tuple.f1);
+        PartitionPredicate predicate =
+                getPartitionPredicate(whereSql, 
table.schema().logicalPartitionType(), tuple.f0);
+
+        List<HivePartitionFiles> allPartitions =
+                HiveCloneUtils.listFiles(
+                        hiveCatalog,
+                        tuple.f0,
+                        table.schema().logicalPartitionType(),
+                        table.coreOptions().partitionDefaultName(),
+                        predicate);
+        for (HivePartitionFiles partitionFiles : allPartitions) {
+            CloneFileInfo.fromHive(tuple.f1, 
partitionFiles).forEach(collector::collect);
+        }
+    }
+
+    @VisibleForTesting
+    @Nullable
+    public static PartitionPredicate getPartitionPredicate(
+            @Nullable String whereSql, RowType partitionType, Identifier 
tableId) throws Exception {
+        if (whereSql == null) {
+            return null;
+        }
+
+        SimpleSqlPredicateConvertor simpleSqlPredicateConvertor =
+                new SimpleSqlPredicateConvertor(partitionType);
+        try {
+            Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate(whereSql);
+            return PartitionPredicate.fromPredicate(partitionType, predicate);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to parse partition filter sql '"
+                            + whereSql
+                            + "' for table "
+                            + tableId.getFullName(),
+                    e);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/CloneHiveUtilsTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/CloneHiveUtilsTest.java
index fa43522132..29dc714b72 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/CloneHiveUtilsTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/CloneHiveUtilsTest.java
@@ -22,12 +22,14 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.flink.clone.hive.CloneHiveUtils;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.paimon.flink.clone.hive.ListHiveFilesFunction.getPartitionPredicate;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -43,25 +45,22 @@ public class CloneHiveUtilsTest {
                         .field("b", DataTypes.STRING())
                         .build();
 
-        PartitionPredicate p = CloneHiveUtils.getPartitionPredicate("a=1", 
partitionType, tableId);
+        PartitionPredicate p = getPartitionPredicate("a=1", partitionType, 
tableId);
         assertThat(p.test(twoColumnsPartition(1, "2"))).isTrue();
         assertThat(p.test(twoColumnsPartition(2, "1"))).isFalse();
 
-        p = CloneHiveUtils.getPartitionPredicate("a=1 OR b='2'", 
partitionType, tableId);
+        p = getPartitionPredicate("a=1 OR b='2'", partitionType, tableId);
         assertThat(p.test(twoColumnsPartition(1, "1"))).isTrue();
         assertThat(p.test(twoColumnsPartition(2, "2"))).isTrue();
         assertThat(p.test(twoColumnsPartition(2, "1"))).isFalse();
 
         // c not in partition fields
-        assertThatThrownBy(
-                        () ->
-                                CloneHiveUtils.getPartitionPredicate(
-                                        "a=1 OR c=1", partitionType, tableId))
+        assertThatThrownBy(() -> getPartitionPredicate("a=1 OR c=1", 
partitionType, tableId))
                 .hasMessage(
                         "Failed to parse partition filter sql 'a=1 OR c=1' for 
table test_db.test_table");
 
         // no partition keys
-        assertThatThrownBy(() -> CloneHiveUtils.getPartitionPredicate("a=1", 
RowType.of(), tableId))
+        assertThatThrownBy(() -> getPartitionPredicate("a=1", RowType.of(), 
tableId))
                 .hasMessage(
                         "Failed to parse partition filter sql 'a=1' for table 
test_db.test_table");
     }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrateUtils.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
similarity index 98%
rename from 
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrateUtils.java
rename to 
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
index 260850718d..8532cbecf6 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrateUtils.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
@@ -52,10 +52,10 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
 
-/** Utils for migrating Hive table to Paimon table. */
-public class HiveMigrateUtils {
+/** Utils for cloning Hive table to Paimon table. */
+public class HiveCloneUtils {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(HiveMigrateUtils.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(HiveCloneUtils.class);
 
     private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
             p -> !p.getPath().getName().startsWith("_") && 
!p.getPath().getName().startsWith(".");
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
index c960449ba9..5296c3b0fb 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
@@ -31,7 +31,6 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.types.Row;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -97,9 +96,6 @@ public class CloneHiveActionITCase extends ActionITCaseBase {
                         "metastore=hive",
                         "--catalog_conf",
                         "uri=thrift://localhost:" + PORT,
-                        "--catalog_conf",
-                        "warehouse="
-                                + 
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
                         "--target_database",
                         "test",
                         "--target_table",
@@ -160,10 +156,6 @@ public class CloneHiveActionITCase extends 
ActionITCaseBase {
                                 "metastore=hive",
                                 "--catalog_conf",
                                 "uri=thrift://localhost:" + PORT,
-                                "--catalog_conf",
-                                "warehouse="
-                                        + System.getProperty(
-                                                
HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
                                 "--target_database",
                                 "test",
                                 "--target_table",
@@ -218,9 +210,6 @@ public class CloneHiveActionITCase extends ActionITCaseBase 
{
                         "metastore=hive",
                         "--catalog_conf",
                         "uri=thrift://localhost:" + PORT,
-                        "--catalog_conf",
-                        "warehouse="
-                                + 
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
                         "--target_database",
                         "test",
                         "--target_table",
@@ -278,9 +267,6 @@ public class CloneHiveActionITCase extends ActionITCaseBase 
{
                         "metastore=hive",
                         "--catalog_conf",
                         "uri=thrift://localhost:" + PORT,
-                        "--catalog_conf",
-                        "warehouse="
-                                + 
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
                         "--target_database",
                         "test",
                         "--target_catalog_conf",
@@ -339,9 +325,6 @@ public class CloneHiveActionITCase extends ActionITCaseBase 
{
                         "metastore=hive",
                         "--catalog_conf",
                         "uri=thrift://localhost:" + PORT,
-                        "--catalog_conf",
-                        "warehouse="
-                                + 
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
                         "--target_database",
                         "test",
                         "--target_catalog_conf",

Reply via email to