This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6d8f341276 [flink] Make clone hive tables production-ready (#5490)
6d8f341276 is described below
commit 6d8f341276b4b9c9ba1fbcf580861b70fc316f36
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 18 18:05:44 2025 +0800
[flink] Make clone hive tables production-ready (#5490)
---
docs/content/migration/clone-from-hive.md | 56 +++
.../paimon/flink/action/CloneHiveAction.java | 33 +-
.../apache/paimon/flink/clone/CloneHiveUtils.java | 452 ---------------------
.../paimon/flink/clone/hive/CloneFileInfo.java | 84 ++++
.../paimon/flink/clone/hive/CloneHiveUtils.java | 166 ++++++++
.../flink/clone/hive/CommitTableOperator.java | 96 +++++
.../flink/clone/hive/CopyHiveFilesFunction.java | 125 ++++++
.../flink/clone/hive/CopyProcessFunction.java | 83 ++++
.../paimon/flink/clone/hive/DataFileInfo.java | 55 +++
.../flink/clone/hive/ListHiveFilesFunction.java | 120 ++++++
.../paimon/flink/clone/CloneHiveUtilsTest.java | 13 +-
.../{HiveMigrateUtils.java => HiveCloneUtils.java} | 6 +-
.../hive/procedure/CloneHiveActionITCase.java | 17 -
13 files changed, 818 insertions(+), 488 deletions(-)
diff --git a/docs/content/migration/clone-from-hive.md
b/docs/content/migration/clone-from-hive.md
new file mode 100644
index 0000000000..705312d8b8
--- /dev/null
+++ b/docs/content/migration/clone-from-hive.md
@@ -0,0 +1,56 @@
+---
+title: "Clone From Hive"
+weight: 5
+type: docs
+aliases:
+- /migration/clone-from-hive.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Clone Hive Table
+
+Clone Hive Table supports cloning hive tables with parquet, orc and avro
formats. The cloned table will be
+[append table]({{< ref "append-table/overview" >}}).
+
+## Clone Hive Table
+
+```bash
+<FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar \
+clone_hive \
+--database default
+--table hivetable
+--catalog_conf metastore=hive
+--catalog_conf uri=thrift://localhost:9088
+--target_database test
+--target_table test_table
+--target_catalog_conf warehouse=my_warehouse
+```
+
+## Clone Hive Database
+
+```bash
+<FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar \
+clone_hive \
+--database default
+--catalog_conf metastore=hive
+--catalog_conf uri=thrift://localhost:9088
+--target_database test
+--target_catalog_conf warehouse=my_warehouse
+```
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
index 9a1666bcea..25b1c5809f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneHiveAction.java
@@ -19,17 +19,22 @@
package org.apache.paimon.flink.action;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.clone.CloneHiveUtils;
+import org.apache.paimon.flink.clone.hive.CloneFileInfo;
+import org.apache.paimon.flink.clone.hive.CloneHiveUtils;
+import org.apache.paimon.flink.clone.hive.CommitTableOperator;
+import org.apache.paimon.flink.clone.hive.CopyHiveFilesFunction;
+import org.apache.paimon.flink.clone.hive.DataFileInfo;
+import org.apache.paimon.flink.clone.hive.ListHiveFilesFunction;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.options.CatalogOptions;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import javax.annotation.Nullable;
-import java.util.List;
import java.util.Map;
/** Clone source files managed by HiveMetaStore and commit metas to construct
Paimon table. */
@@ -91,23 +96,33 @@ public class CloneHiveAction extends ActionBase {
source, new CloneHiveUtils.TableChannelComputer(),
parallelism);
// create target table, list files and group by <table, partition>
- DataStream<List<CloneHiveUtils.CloneFilesInfo>> files =
+ DataStream<CloneFileInfo> files =
partitionedSource
.process(
-
CloneHiveUtils.createTargetTableAndListFilesFunction(
+ new ListHiveFilesFunction(
sourceCatalogConfig,
targetCatalogConfig, whereSql))
.name("List Files")
.setParallelism(parallelism);
// copy files and commit
- DataStream<Void> committed =
- files.forward()
+ DataStream<DataFileInfo> dataFile =
+ files.rebalance()
.process(
- CloneHiveUtils.copyAndCommitFunction(
- sourceCatalogConfig,
targetCatalogConfig))
- .name("Copy and Commit")
+ new CopyHiveFilesFunction(sourceCatalogConfig,
targetCatalogConfig))
+ .name("Copy Files")
.setParallelism(parallelism);
+ DataStream<DataFileInfo> partitionedDataFile =
+ FlinkStreamPartitioner.partition(
+ dataFile, new
CloneHiveUtils.DataFileChannelComputer(), parallelism);
+
+ DataStream<Long> committed =
+ partitionedDataFile
+ .transform(
+ "Commit table",
+ BasicTypeInfo.LONG_TYPE_INFO,
+ new CommitTableOperator(targetCatalogConfig))
+ .setParallelism(parallelism);
committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveUtils.java
deleted file mode 100644
index f2b8f63cd6..0000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveUtils.java
+++ /dev/null
@@ -1,452 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.clone;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.DelegateCatalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.flink.FlinkCatalogFactory;
-import org.apache.paimon.flink.action.CloneHiveAction;
-import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
-import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.SimpleColStats;
-import org.apache.paimon.format.SimpleStatsExtractor;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.hive.HiveCatalog;
-import org.apache.paimon.hive.migrate.HiveMigrateUtils;
-import org.apache.paimon.hive.migrate.HivePartitionFiles;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.migrate.FileMetaUtils;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.statistics.SimpleColStatsCollector;
-import org.apache.paimon.stats.SimpleStats;
-import org.apache.paimon.stats.SimpleStatsConverter;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.BatchTableCommit;
-import org.apache.paimon.table.sink.ChannelComputer;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.StatsCollectorFactories;
-import org.apache.paimon.utils.StringUtils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
-import static org.apache.paimon.utils.Preconditions.checkState;
-
-/** Utils for building {@link CloneHiveAction}. */
-public class CloneHiveUtils {
-
- private static final Logger LOG =
LoggerFactory.getLogger(CloneHiveUtils.class);
-
- public static DataStream<Tuple2<Identifier, Identifier>> buildSource(
- String sourceDatabase,
- String sourceTableName,
- String targetDatabase,
- String targetTableName,
- Catalog sourceCatalog,
- StreamExecutionEnvironment env)
- throws Exception {
- List<Tuple2<Identifier, Identifier>> result = new ArrayList<>();
- HiveCatalog hiveCatalog = checkAndGetHiveCatalog(sourceCatalog);
- if (StringUtils.isNullOrWhitespaceOnly(sourceDatabase)) {
- checkArgument(
- StringUtils.isNullOrWhitespaceOnly(sourceTableName),
- "sourceTableName must be blank when database is null.");
- checkArgument(
- StringUtils.isNullOrWhitespaceOnly(targetDatabase),
- "targetDatabase must be blank when clone all tables in a
catalog.");
- checkArgument(
- StringUtils.isNullOrWhitespaceOnly(targetTableName),
- "targetTableName must be blank when clone all tables in a
catalog.");
-
- for (Identifier identifier :
HiveMigrateUtils.listTables(hiveCatalog)) {
- result.add(new Tuple2<>(identifier, identifier));
- }
- } else if (StringUtils.isNullOrWhitespaceOnly(sourceTableName)) {
- checkArgument(
- !StringUtils.isNullOrWhitespaceOnly(targetDatabase),
- "targetDatabase must not be blank when clone all tables in
a database.");
- checkArgument(
- StringUtils.isNullOrWhitespaceOnly(targetTableName),
- "targetTableName must be blank when clone all tables in a
catalog.");
-
- for (Identifier identifier :
HiveMigrateUtils.listTables(hiveCatalog, sourceDatabase)) {
- result.add(
- new Tuple2<>(
- identifier,
- Identifier.create(targetDatabase,
identifier.getObjectName())));
- }
- } else {
- checkArgument(
- !StringUtils.isNullOrWhitespaceOnly(targetDatabase),
- "targetDatabase must not be blank when clone a table.");
- checkArgument(
- !StringUtils.isNullOrWhitespaceOnly(targetTableName),
- "targetTableName must not be blank when clone a table.");
- result.add(
- new Tuple2<>(
- Identifier.create(sourceDatabase, sourceTableName),
- Identifier.create(targetDatabase,
targetTableName)));
- }
-
- checkState(!result.isEmpty(), "Didn't find any table in source
catalog.");
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("The clone identifiers of source table and target table
are: {}", result);
- }
- return env.fromCollection(result).forceNonParallel();
- }
-
- public static ProcessFunction<Tuple2<Identifier, Identifier>,
List<CloneFilesInfo>>
- createTargetTableAndListFilesFunction(
- Map<String, String> sourceCatalogConfig,
- Map<String, String> targetCatalogConfig,
- @Nullable String whereSql) {
- return new ProcessFunction<Tuple2<Identifier, Identifier>,
List<CloneFilesInfo>>() {
- @Override
- public void processElement(
- Tuple2<Identifier, Identifier> tuple,
- ProcessFunction<Tuple2<Identifier, Identifier>,
List<CloneFilesInfo>>.Context
- context,
- Collector<List<CloneFilesInfo>> collector)
- throws Exception {
- String sourceType =
sourceCatalogConfig.get(CatalogOptions.METASTORE.key());
- checkNotNull(sourceType);
- try (Catalog sourceCatalog =
- FlinkCatalogFactory.createPaimonCatalog(
- Options.fromMap(sourceCatalogConfig));
- Catalog targetCatalog =
- FlinkCatalogFactory.createPaimonCatalog(
- Options.fromMap(targetCatalogConfig)))
{
-
- HiveCatalog hiveCatalog =
checkAndGetHiveCatalog(sourceCatalog);
-
- Schema schema =
HiveMigrateUtils.hiveTableToPaimonSchema(hiveCatalog, tuple.f0);
- Map<String, String> options = schema.options();
- // only support Hive to unaware-bucket table now
- options.put(CoreOptions.BUCKET.key(), "-1");
- schema =
- new Schema(
- schema.fields(),
- schema.partitionKeys(),
- schema.primaryKeys(),
- options,
- schema.comment());
- targetCatalog.createTable(tuple.f1, schema, false);
- FileStoreTable table = (FileStoreTable)
targetCatalog.getTable(tuple.f1);
- PartitionPredicate predicate =
- getPartitionPredicate(
- whereSql,
table.schema().logicalPartitionType(), tuple.f0);
-
- List<HivePartitionFiles> allPartitions =
- HiveMigrateUtils.listFiles(
- hiveCatalog,
- tuple.f0,
- table.schema().logicalPartitionType(),
- table.coreOptions().partitionDefaultName(),
- predicate);
- List<CloneFilesInfo> cloneFilesInfos = new ArrayList<>();
- for (HivePartitionFiles partitionFiles : allPartitions) {
- cloneFilesInfos.add(CloneFilesInfo.fromHive(tuple.f1,
partitionFiles, 0));
- }
- collector.collect(cloneFilesInfos);
- }
- }
- };
- }
-
- public static ProcessFunction<List<CloneFilesInfo>, Void>
copyAndCommitFunction(
- Map<String, String> sourceCatalogConfig, Map<String, String>
targetCatalogConfig) {
- return new ProcessFunction<List<CloneFilesInfo>, Void>() {
-
- @Override
- public void processElement(
- List<CloneFilesInfo> cloneFilesInfos,
- ProcessFunction<List<CloneFilesInfo>, Void>.Context
context,
- Collector<Void> collector)
- throws Exception {
- try (Catalog targetCatalog =
- FlinkCatalogFactory.createPaimonCatalog(
- Options.fromMap(targetCatalogConfig))) {
-
- // source FileIO
- CatalogContext sourceContext =
-
CatalogContext.create(Options.fromMap(sourceCatalogConfig));
- String warehouse =
checkNotNull(sourceContext.options().get(WAREHOUSE));
- FileIO sourceFileIO = FileIO.get(new Path(warehouse),
sourceContext);
-
- // group by table
- Map<Identifier, List<CloneFilesInfo>> groupedFiles = new
HashMap<>();
- for (CloneFilesInfo files : cloneFilesInfos) {
- groupedFiles
- .computeIfAbsent(files.identifier(), k -> new
ArrayList<>())
- .add(files);
- }
-
- for (Map.Entry<Identifier, List<CloneFilesInfo>> entry :
- groupedFiles.entrySet()) {
- FileStoreTable targetTable =
- (FileStoreTable)
targetCatalog.getTable(entry.getKey());
- commit(entry.getValue(), sourceFileIO, targetTable);
- }
- }
- }
- };
- }
-
- private static void commit(
- List<CloneFilesInfo> cloneFilesInfos, FileIO sourceFileIO,
FileStoreTable targetTable)
- throws Exception {
- List<CommitMessage> commitMessages = new ArrayList<>();
- for (CloneFilesInfo onePartitionFiles : cloneFilesInfos) {
- commitMessages.add(
- copyFileAndGetCommitMessage(onePartitionFiles,
sourceFileIO, targetTable));
- }
- try (BatchTableCommit commit =
targetTable.newBatchWriteBuilder().newCommit()) {
- commit.commit(commitMessages);
- }
- }
-
- private static CommitMessage copyFileAndGetCommitMessage(
- CloneFilesInfo cloneFilesInfo, FileIO sourceFileIO, FileStoreTable
targetTable)
- throws IOException {
- // util for collecting stats
- CoreOptions options = targetTable.coreOptions();
- SimpleColStatsCollector.Factory[] factories =
- StatsCollectorFactories.createStatsFactories(
- options.statsMode(), options,
targetTable.rowType().getFieldNames());
-
- SimpleStatsExtractor simpleStatsExtractor =
- FileFormat.fromIdentifier(cloneFilesInfo.format(),
options.toConfiguration())
- .createStatsExtractor(targetTable.rowType(), factories)
- .orElseThrow(
- () ->
- new RuntimeException(
- "Can't get table stats
extractor for format "
- +
cloneFilesInfo.format()));
- RowType rowTypeWithSchemaId =
-
targetTable.schemaManager().schema(targetTable.schema().id()).logicalRowType();
-
- SimpleStatsConverter statsArraySerializer = new
SimpleStatsConverter(rowTypeWithSchemaId);
-
- List<Path> paths = cloneFilesInfo.paths();
- List<Long> fileSizes = cloneFilesInfo.fileSizes();
- List<DataFileMeta> dataFileMetas = new ArrayList<>();
- for (int i = 0; i < paths.size(); i++) {
- Path path = paths.get(i);
- long fileSize = fileSizes.get(i);
-
- // extract stats
- Pair<SimpleColStats[], SimpleStatsExtractor.FileInfo> fileInfo =
- simpleStatsExtractor.extractWithFileInfo(sourceFileIO,
path, fileSize);
- SimpleStats stats =
statsArraySerializer.toBinaryAllMode(fileInfo.getLeft());
-
- // new file name
- String suffix = "." + cloneFilesInfo.format();
- String fileName = path.getName();
- String newFileName = fileName.endsWith(suffix) ? fileName :
fileName + suffix;
-
- // copy files
- Path targetFilePath =
- targetTable
- .store()
- .pathFactory()
- .bucketPath(cloneFilesInfo.partition(),
cloneFilesInfo.bucket());
- IOUtils.copyBytes(
- sourceFileIO.newInputStream(path),
- targetTable
- .fileIO()
- .newOutputStream(new Path(targetFilePath,
newFileName), false));
-
- // to DataFileMeta
- DataFileMeta dataFileMeta =
- DataFileMeta.forAppend(
- newFileName,
- fileSize,
- fileInfo.getRight().getRowCount(),
- stats,
- 0,
- 0,
- targetTable.schema().id(),
- Collections.emptyList(),
- null,
- FileSource.APPEND,
- null,
- null);
- dataFileMetas.add(dataFileMeta);
- }
- return FileMetaUtils.commitFile(
- cloneFilesInfo.partition(),
targetTable.coreOptions().bucket(), dataFileMetas);
- }
-
- private static HiveCatalog checkAndGetHiveCatalog(Catalog catalog) {
- Catalog rootCatalog = DelegateCatalog.rootCatalog(catalog);
- checkArgument(
- rootCatalog instanceof HiveCatalog,
- "Only support HiveCatalog now but found %s.",
- rootCatalog.getClass().getName());
- return (HiveCatalog) rootCatalog;
- }
-
- @VisibleForTesting
- @Nullable
- static PartitionPredicate getPartitionPredicate(
- @Nullable String whereSql, RowType partitionType, Identifier
tableId) throws Exception {
- if (whereSql == null) {
- return null;
- }
-
- SimpleSqlPredicateConvertor simpleSqlPredicateConvertor =
- new SimpleSqlPredicateConvertor(partitionType);
- try {
- Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate(whereSql);
- return PartitionPredicate.fromPredicate(partitionType, predicate);
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to parse partition filter sql '"
- + whereSql
- + "' for table "
- + tableId.getFullName(),
- e);
- }
- }
-
- // ---------------------------------- Classes
----------------------------------
-
- /** Shuffle tables. */
- public static class TableChannelComputer
- implements ChannelComputer<Tuple2<Identifier, Identifier>> {
-
- private static final long serialVersionUID = 1L;
-
- private transient int numChannels;
-
- @Override
- public void setup(int numChannels) {
- this.numChannels = numChannels;
- }
-
- @Override
- public int channel(Tuple2<Identifier, Identifier> record) {
- return Math.floorMod(
- Objects.hash(record.f1.getDatabaseName(),
record.f1.getTableName()),
- numChannels);
- }
-
- @Override
- public String toString() {
- return "shuffle by identifier hash";
- }
- }
-
- /** Files grouped by (table, partition, bucket) with necessary
information. */
- public static class CloneFilesInfo implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final Identifier identifier;
- private final BinaryRow partition;
- private final int bucket;
- private final List<Path> paths;
- private final List<Long> fileSizes;
- private final String format;
-
- public CloneFilesInfo(
- Identifier identifier,
- BinaryRow partition,
- int bucket,
- List<Path> paths,
- List<Long> fileSizes,
- String format) {
- this.identifier = identifier;
- this.partition = partition;
- this.bucket = bucket;
- this.paths = paths;
- this.fileSizes = fileSizes;
- this.format = format;
- }
-
- public Identifier identifier() {
- return identifier;
- }
-
- public BinaryRow partition() {
- return partition;
- }
-
- public int bucket() {
- return bucket;
- }
-
- public List<Path> paths() {
- return paths;
- }
-
- public List<Long> fileSizes() {
- return fileSizes;
- }
-
- public String format() {
- return format;
- }
-
- public static CloneFilesInfo fromHive(
- Identifier identifier, HivePartitionFiles hivePartitionFiles,
int bucket) {
- return new CloneFilesInfo(
- identifier,
- hivePartitionFiles.partition(),
- bucket,
- hivePartitionFiles.paths(),
- hivePartitionFiles.fileSizes(),
- hivePartitionFiles.format());
- }
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneFileInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneFileInfo.java
new file mode 100644
index 0000000000..80b228752f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneFileInfo.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.hive;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.migrate.HivePartitionFiles;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** Clone File (table, partition) with necessary information. */
+public class CloneFileInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Identifier identifier;
+ private final byte[] partition;
+ private final Path path;
+ private final long fileSize;
+ private final String format;
+
+ public CloneFileInfo(
+ Identifier identifier, BinaryRow partition, Path path, long
fileSize, String format) {
+ this.identifier = identifier;
+ this.partition = serializeBinaryRow(partition);
+ this.path = path;
+ this.fileSize = fileSize;
+ this.format = format;
+ }
+
+ public Identifier identifier() {
+ return identifier;
+ }
+
+ public BinaryRow partition() {
+ return deserializeBinaryRow(partition);
+ }
+
+ public Path path() {
+ return path;
+ }
+
+ public long fileSize() {
+ return fileSize;
+ }
+
+ public String format() {
+ return format;
+ }
+
+ public static List<CloneFileInfo> fromHive(Identifier identifier,
HivePartitionFiles files) {
+ List<CloneFileInfo> result = new ArrayList<>();
+ for (int i = 0; i < files.paths().size(); i++) {
+ Path path = files.paths().get(i);
+ long fileSize = files.fileSizes().get(i);
+ result.add(
+ new CloneFileInfo(
+ identifier, files.partition(), path, fileSize,
files.format()));
+ }
+ return result;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneHiveUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneHiveUtils.java
new file mode 100644
index 0000000000..1afed4a9ce
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CloneHiveUtils.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.hive;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.DelegateCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.CloneHiveAction;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.migrate.HiveCloneUtils;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Utils for building {@link CloneHiveAction}. */
+public class CloneHiveUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CloneHiveUtils.class);
+
+ public static DataStream<Tuple2<Identifier, Identifier>> buildSource(
+ String sourceDatabase,
+ String sourceTableName,
+ String targetDatabase,
+ String targetTableName,
+ Catalog sourceCatalog,
+ StreamExecutionEnvironment env)
+ throws Exception {
+ List<Tuple2<Identifier, Identifier>> result = new ArrayList<>();
+ HiveCatalog hiveCatalog = getRootHiveCatalog(sourceCatalog);
+ if (StringUtils.isNullOrWhitespaceOnly(sourceDatabase)) {
+ checkArgument(
+ StringUtils.isNullOrWhitespaceOnly(sourceTableName),
+ "sourceTableName must be blank when database is null.");
+ checkArgument(
+ StringUtils.isNullOrWhitespaceOnly(targetDatabase),
+ "targetDatabase must be blank when clone all tables in a
catalog.");
+ checkArgument(
+ StringUtils.isNullOrWhitespaceOnly(targetTableName),
+ "targetTableName must be blank when clone all tables in a
catalog.");
+
+ for (Identifier identifier :
HiveCloneUtils.listTables(hiveCatalog)) {
+ result.add(new Tuple2<>(identifier, identifier));
+ }
+ } else if (StringUtils.isNullOrWhitespaceOnly(sourceTableName)) {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(targetDatabase),
+ "targetDatabase must not be blank when clone all tables in
a database.");
+ checkArgument(
+ StringUtils.isNullOrWhitespaceOnly(targetTableName),
+ "targetTableName must be blank when clone all tables in a
catalog.");
+
+ for (Identifier identifier :
HiveCloneUtils.listTables(hiveCatalog, sourceDatabase)) {
+ result.add(
+ new Tuple2<>(
+ identifier,
+ Identifier.create(targetDatabase,
identifier.getObjectName())));
+ }
+ } else {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(targetDatabase),
+ "targetDatabase must not be blank when clone a table.");
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(targetTableName),
+ "targetTableName must not be blank when clone a table.");
+ result.add(
+ new Tuple2<>(
+ Identifier.create(sourceDatabase, sourceTableName),
+ Identifier.create(targetDatabase,
targetTableName)));
+ }
+
+ checkState(!result.isEmpty(), "Didn't find any table in source
catalog.");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The clone identifiers of source table and target table
are: {}", result);
+ }
+ return env.fromCollection(result).forceNonParallel();
+ }
+
+ public static HiveCatalog getRootHiveCatalog(Catalog catalog) {
+ Catalog rootCatalog = DelegateCatalog.rootCatalog(catalog);
+ checkArgument(
+ rootCatalog instanceof HiveCatalog,
+ "Only support HiveCatalog now but found %s.",
+ rootCatalog.getClass().getName());
+ return (HiveCatalog) rootCatalog;
+ }
+
+ // ---------------------------------- Classes
----------------------------------
+
+ /** Shuffle tables. */
+ public static class TableChannelComputer
+ implements ChannelComputer<Tuple2<Identifier, Identifier>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient int numChannels;
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ }
+
+ @Override
+ public int channel(Tuple2<Identifier, Identifier> record) {
+ return Math.floorMod(
+ Objects.hash(record.f1.getDatabaseName(),
record.f1.getTableName()),
+ numChannels);
+ }
+
+ @Override
+ public String toString() {
+ return "shuffle by identifier hash";
+ }
+ }
+
+ /** Shuffle tables. */
+ public static class DataFileChannelComputer implements
ChannelComputer<DataFileInfo> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient int numChannels;
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ }
+
+ @Override
+ public int channel(DataFileInfo record) {
+ return Math.floorMod(Objects.hash(record.identifier()),
numChannels);
+ }
+
+ @Override
+ public String toString() {
+ return "shuffle by identifier hash";
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CommitTableOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CommitTableOperator.java
new file mode 100644
index 0000000000..a51ba6c097
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CommitTableOperator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.hive;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitMessage;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
+
+/** Commit table operator. */
+public class CommitTableOperator extends AbstractStreamOperator<Long>
+ implements OneInputStreamOperator<DataFileInfo, Long>, BoundedOneInput
{
+
+ private static final long serialVersionUID = 1L;
+
+ private final Map<String, String> catalogConfig;
+
+ private transient DataFileMetaSerializer dataFileSerializer;
+ private transient Map<Identifier, Map<BinaryRow, List<DataFileMeta>>>
files;
+
+ public CommitTableOperator(Map<String, String> catalogConfig) {
+ this.catalogConfig = catalogConfig;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.dataFileSerializer = new DataFileMetaSerializer();
+ this.files = new HashMap<>();
+ }
+
+ @Override
+ public void processElement(StreamRecord<DataFileInfo> streamRecord) throws
Exception {
+ DataFileInfo file = streamRecord.getValue();
+ BinaryRow partition = file.partition();
+ List<DataFileMeta> files =
+ this.files
+ .computeIfAbsent(file.identifier(), k -> new
HashMap<>())
+ .computeIfAbsent(partition, k -> new ArrayList<>());
+
files.add(dataFileSerializer.deserializeFromBytes(file.dataFileMeta()));
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ try (Catalog catalog =
createPaimonCatalog(Options.fromMap(catalogConfig))) {
+ for (Map.Entry<Identifier, Map<BinaryRow, List<DataFileMeta>>>
entry :
+ files.entrySet()) {
+ List<CommitMessage> commitMessages = new ArrayList<>();
+ for (Map.Entry<BinaryRow, List<DataFileMeta>> listEntry :
+ entry.getValue().entrySet()) {
+ commitMessages.add(
+ FileMetaUtils.commitFile(listEntry.getKey(), 0,
listEntry.getValue()));
+ }
+
+ Table table = catalog.getTable(entry.getKey());
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.commit(commitMessages);
+ }
+ }
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
new file mode 100644
index 0000000000..20bca892dd
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.hive;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.SimpleColStats;
+import org.apache.paimon.format.SimpleStatsExtractor;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.stats.SimpleStatsConverter;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.StatsCollectorFactories;
+
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+import java.util.Map;
+
+/** Copy files for table. */
+public class CopyHiveFilesFunction extends CopyProcessFunction<CloneFileInfo,
DataFileInfo> {
+
+ private static final long serialVersionUID = 1L;
+
+ public CopyHiveFilesFunction(
+ Map<String, String> sourceCatalogConfig, Map<String, String>
targetCatalogConfig) {
+ super(sourceCatalogConfig, targetCatalogConfig);
+ }
+
+ @Override
+ public void processElement(
+ CloneFileInfo cloneFileInfo,
+ ProcessFunction<CloneFileInfo, DataFileInfo>.Context context,
+ Collector<DataFileInfo> collector)
+ throws Exception {
+ Identifier identifier = cloneFileInfo.identifier();
+ long fileSize = cloneFileInfo.fileSize();
+ String format = cloneFileInfo.format();
+ Path path = cloneFileInfo.path();
+ BinaryRow partition = cloneFileInfo.partition();
+
+ FileIO sourceFileIO = hiveCatalog.fileIO();
+ FileStoreTable targetTable = (FileStoreTable) getTable(identifier);
+ // util for collecting stats
+ CoreOptions options = targetTable.coreOptions();
+ SimpleColStatsCollector.Factory[] factories =
+ StatsCollectorFactories.createStatsFactories(
+ options.statsMode(), options,
targetTable.rowType().getFieldNames());
+
+ SimpleStatsExtractor simpleStatsExtractor =
+ FileFormat.fromIdentifier(format, options.toConfiguration())
+ .createStatsExtractor(targetTable.rowType(), factories)
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Can't get table stats
extractor for format "
+ + format));
+ RowType rowTypeWithSchemaId =
+
targetTable.schemaManager().schema(targetTable.schema().id()).logicalRowType();
+
+ SimpleStatsConverter statsArraySerializer = new
SimpleStatsConverter(rowTypeWithSchemaId);
+
+ // extract stats
+ Pair<SimpleColStats[], SimpleStatsExtractor.FileInfo> fileInfo =
+ simpleStatsExtractor.extractWithFileInfo(sourceFileIO, path,
fileSize);
+ SimpleStats stats =
statsArraySerializer.toBinaryAllMode(fileInfo.getLeft());
+
+ // new file name
+ String suffix = "." + format;
+ String fileName = path.getName();
+ String newFileName = fileName.endsWith(suffix) ? fileName : fileName +
suffix;
+
+ // copy files
+ Path targetFilePath =
targetTable.store().pathFactory().bucketPath(partition, 0);
+ IOUtils.copyBytes(
+ sourceFileIO.newInputStream(path),
+ targetTable.fileIO().newOutputStream(new Path(targetFilePath,
newFileName), false));
+
+ // to DataFileMeta
+ DataFileMeta dataFileMeta =
+ DataFileMeta.forAppend(
+ newFileName,
+ fileSize,
+ fileInfo.getRight().getRowCount(),
+ stats,
+ 0,
+ 0,
+ targetTable.schema().id(),
+ Collections.emptyList(),
+ null,
+ FileSource.APPEND,
+ null,
+ null);
+
+ collector.collect(
+ new DataFileInfo(
+ identifier, partition,
dataFileSerializer.serializeToBytes(dataFileMeta)));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
new file mode 100644
index 0000000000..d083a6dc3d
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyProcessFunction.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.hive;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
+import static
org.apache.paimon.flink.clone.hive.CloneHiveUtils.getRootHiveCatalog;
+
+/** Abstract function for copying tables. */
+public abstract class CopyProcessFunction<I, O> extends ProcessFunction<I, O> {
+
+ protected final Map<String, String> sourceCatalogConfig;
+ protected final Map<String, String> targetCatalogConfig;
+
+ protected transient HiveCatalog hiveCatalog;
+ protected transient Catalog targetCatalog;
+
+ protected transient Map<Identifier, Table> tableCache;
+ protected transient DataFileMetaSerializer dataFileSerializer;
+
+ public CopyProcessFunction(
+ Map<String, String> sourceCatalogConfig, Map<String, String>
targetCatalogConfig) {
+ this.sourceCatalogConfig = sourceCatalogConfig;
+ this.targetCatalogConfig = targetCatalogConfig;
+ }
+
+ @Override
+ public void open(OpenContext openContext) throws Exception {
+ super.open(openContext);
+ this.hiveCatalog =
+
getRootHiveCatalog(createPaimonCatalog(Options.fromMap(sourceCatalogConfig)));
+ this.targetCatalog =
createPaimonCatalog(Options.fromMap(targetCatalogConfig));
+ this.dataFileSerializer = new DataFileMetaSerializer();
+ this.tableCache = new HashMap<>();
+ }
+
+ protected Table getTable(Identifier identifier) {
+ return tableCache.computeIfAbsent(
+ identifier,
+ k -> {
+ try {
+ return targetCatalog.getTable(k);
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ this.hiveCatalog.close();
+ this.targetCatalog.close();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/DataFileInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/DataFileInfo.java
new file mode 100644
index 0000000000..2baa11af61
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/DataFileInfo.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.hive;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+
+import java.io.Serializable;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** Data File (table, partition) with necessary information. */
+public class DataFileInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Identifier identifier;
+ private final byte[] partition;
+ private final byte[] dataFileMeta;
+
+ public DataFileInfo(Identifier identifier, BinaryRow partition, byte[]
dataFileMeta) {
+ this.identifier = identifier;
+ this.partition = serializeBinaryRow(partition);
+ this.dataFileMeta = dataFileMeta;
+ }
+
+ public Identifier identifier() {
+ return identifier;
+ }
+
+ public BinaryRow partition() {
+ return deserializeBinaryRow(partition);
+ }
+
+ public byte[] dataFileMeta() {
+ return dataFileMeta;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
new file mode 100644
index 0000000000..6e95257929
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/ListHiveFilesFunction.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.hive;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
+import org.apache.paimon.hive.migrate.HiveCloneUtils;
+import org.apache.paimon.hive.migrate.HivePartitionFiles;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** List files for table. */
+public class ListHiveFilesFunction
+ extends CopyProcessFunction<Tuple2<Identifier, Identifier>,
CloneFileInfo> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Nullable private final String whereSql;
+
+ public ListHiveFilesFunction(
+ Map<String, String> sourceCatalogConfig,
+ Map<String, String> targetCatalogConfig,
+ @Nullable String whereSql) {
+ super(sourceCatalogConfig, targetCatalogConfig);
+ this.whereSql = whereSql;
+ }
+
+ @Override
+ public void processElement(
+ Tuple2<Identifier, Identifier> tuple,
+ ProcessFunction<Tuple2<Identifier, Identifier>,
CloneFileInfo>.Context context,
+ Collector<CloneFileInfo> collector)
+ throws Exception {
+ String sourceType =
sourceCatalogConfig.get(CatalogOptions.METASTORE.key());
+ checkNotNull(sourceType);
+
+ Schema schema = HiveCloneUtils.hiveTableToPaimonSchema(hiveCatalog,
tuple.f0);
+ Map<String, String> options = schema.options();
+ // only support Hive to unaware-bucket table now
+ options.put(CoreOptions.BUCKET.key(), "-1");
+ schema =
+ new Schema(
+ schema.fields(),
+ schema.partitionKeys(),
+ schema.primaryKeys(),
+ options,
+ schema.comment());
+ targetCatalog.createTable(tuple.f1, schema, false);
+ FileStoreTable table = (FileStoreTable)
targetCatalog.getTable(tuple.f1);
+ PartitionPredicate predicate =
+ getPartitionPredicate(whereSql,
table.schema().logicalPartitionType(), tuple.f0);
+
+ List<HivePartitionFiles> allPartitions =
+ HiveCloneUtils.listFiles(
+ hiveCatalog,
+ tuple.f0,
+ table.schema().logicalPartitionType(),
+ table.coreOptions().partitionDefaultName(),
+ predicate);
+ for (HivePartitionFiles partitionFiles : allPartitions) {
+ CloneFileInfo.fromHive(tuple.f1,
partitionFiles).forEach(collector::collect);
+ }
+ }
+
+ @VisibleForTesting
+ @Nullable
+ public static PartitionPredicate getPartitionPredicate(
+ @Nullable String whereSql, RowType partitionType, Identifier
tableId) throws Exception {
+ if (whereSql == null) {
+ return null;
+ }
+
+ SimpleSqlPredicateConvertor simpleSqlPredicateConvertor =
+ new SimpleSqlPredicateConvertor(partitionType);
+ try {
+ Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate(whereSql);
+ return PartitionPredicate.fromPredicate(partitionType, predicate);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to parse partition filter sql '"
+ + whereSql
+ + "' for table "
+ + tableId.getFullName(),
+ e);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/CloneHiveUtilsTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/CloneHiveUtilsTest.java
index fa43522132..29dc714b72 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/CloneHiveUtilsTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/CloneHiveUtilsTest.java
@@ -22,12 +22,14 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.flink.clone.hive.CloneHiveUtils;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
+import static
org.apache.paimon.flink.clone.hive.ListHiveFilesFunction.getPartitionPredicate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -43,25 +45,22 @@ public class CloneHiveUtilsTest {
.field("b", DataTypes.STRING())
.build();
- PartitionPredicate p = CloneHiveUtils.getPartitionPredicate("a=1",
partitionType, tableId);
+ PartitionPredicate p = getPartitionPredicate("a=1", partitionType,
tableId);
assertThat(p.test(twoColumnsPartition(1, "2"))).isTrue();
assertThat(p.test(twoColumnsPartition(2, "1"))).isFalse();
- p = CloneHiveUtils.getPartitionPredicate("a=1 OR b='2'",
partitionType, tableId);
+ p = getPartitionPredicate("a=1 OR b='2'", partitionType, tableId);
assertThat(p.test(twoColumnsPartition(1, "1"))).isTrue();
assertThat(p.test(twoColumnsPartition(2, "2"))).isTrue();
assertThat(p.test(twoColumnsPartition(2, "1"))).isFalse();
// c not in partition fields
- assertThatThrownBy(
- () ->
- CloneHiveUtils.getPartitionPredicate(
- "a=1 OR c=1", partitionType, tableId))
+ assertThatThrownBy(() -> getPartitionPredicate("a=1 OR c=1",
partitionType, tableId))
.hasMessage(
"Failed to parse partition filter sql 'a=1 OR c=1' for
table test_db.test_table");
// no partition keys
- assertThatThrownBy(() -> CloneHiveUtils.getPartitionPredicate("a=1",
RowType.of(), tableId))
+ assertThatThrownBy(() -> getPartitionPredicate("a=1", RowType.of(),
tableId))
.hasMessage(
"Failed to parse partition filter sql 'a=1' for table
test_db.test_table");
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrateUtils.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
similarity index 98%
rename from
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrateUtils.java
rename to
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
index 260850718d..8532cbecf6 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrateUtils.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
@@ -52,10 +52,10 @@ import java.util.stream.Collectors;
import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
-/** Utils for migrating Hive table to Paimon table. */
-public class HiveMigrateUtils {
+/** Utils for cloning Hive table to Paimon table. */
+public class HiveCloneUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(HiveMigrateUtils.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveCloneUtils.class);
private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
p -> !p.getPath().getName().startsWith("_") &&
!p.getPath().getName().startsWith(".");
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
index c960449ba9..5296c3b0fb 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneHiveActionITCase.java
@@ -31,7 +31,6 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -97,9 +96,6 @@ public class CloneHiveActionITCase extends ActionITCaseBase {
"metastore=hive",
"--catalog_conf",
"uri=thrift://localhost:" + PORT,
- "--catalog_conf",
- "warehouse="
- +
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
"--target_database",
"test",
"--target_table",
@@ -160,10 +156,6 @@ public class CloneHiveActionITCase extends
ActionITCaseBase {
"metastore=hive",
"--catalog_conf",
"uri=thrift://localhost:" + PORT,
- "--catalog_conf",
- "warehouse="
- + System.getProperty(
-
HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
"--target_database",
"test",
"--target_table",
@@ -218,9 +210,6 @@ public class CloneHiveActionITCase extends ActionITCaseBase
{
"metastore=hive",
"--catalog_conf",
"uri=thrift://localhost:" + PORT,
- "--catalog_conf",
- "warehouse="
- +
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
"--target_database",
"test",
"--target_table",
@@ -278,9 +267,6 @@ public class CloneHiveActionITCase extends ActionITCaseBase
{
"metastore=hive",
"--catalog_conf",
"uri=thrift://localhost:" + PORT,
- "--catalog_conf",
- "warehouse="
- +
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
"--target_database",
"test",
"--target_catalog_conf",
@@ -339,9 +325,6 @@ public class CloneHiveActionITCase extends ActionITCaseBase
{
"metastore=hive",
"--catalog_conf",
"uri=thrift://localhost:" + PORT,
- "--catalog_conf",
- "warehouse="
- +
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
"--target_database",
"test",
"--target_catalog_conf",