This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c937b53d3b [clone] refactor clone from hive to support csv and json 
table clone (#6200)
c937b53d3b is described below

commit c937b53d3bdf961993ab53ebe2108d55d8148a4d
Author: shyjsarah <[email protected]>
AuthorDate: Thu Sep 4 21:54:04 2025 +0800

    [clone] refactor clone from hive to support csv and json table clone (#6200)
---
 .../paimon/flink/clone/CloneHiveTableUtils.java    |  77 ++++++++++++-
 .../paimon/flink/clone/ClonePaimonTableUtils.java  |  20 +++-
 .../flink/clone/files/ListCloneFilesFunction.java  |  96 +---------------
 .../CloneHiveSchemaFunction.java}                  | 110 ++++++++----------
 .../ClonePaimonSchemaFunction.java}                |  47 ++++----
 .../paimon/flink/clone/schema/CloneSchemaInfo.java |  46 ++++++++
 .../flink/clone/spits/ListCloneSplitsFunction.java |  48 +-------
 .../paimon/hive/clone/HiveCloneExtractor.java      |   2 +
 .../apache/paimon/hive/clone/HiveCloneUtils.java   |  27 ++++-
 .../paimon/hive/clone/HiveTableCloneExtractor.java |  32 ++++++
 .../paimon/hive/procedure/CloneActionITCase.java   | 124 ++++++++++++++++++++-
 .../apache/paimon/hudi/HudiHiveCloneExtractor.java |   5 +
 .../paimon/iceberg/IcebergHiveCloneExtractor.java  |   5 +
 13 files changed, 402 insertions(+), 237 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
index 3ea672184a..fae777f579 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
@@ -28,6 +28,14 @@ import 
org.apache.paimon.flink.clone.files.CloneFilesFunction;
 import org.apache.paimon.flink.clone.files.DataFileInfo;
 import org.apache.paimon.flink.clone.files.ListCloneFilesFunction;
 import org.apache.paimon.flink.clone.files.ShuffleDataFileByTableComputer;
+import org.apache.paimon.flink.clone.schema.CloneHiveSchemaFunction;
+import org.apache.paimon.flink.clone.schema.CloneSchemaInfo;
+import org.apache.paimon.flink.clone.spits.CloneSplitInfo;
+import org.apache.paimon.flink.clone.spits.CloneSplitsFunction;
+import org.apache.paimon.flink.clone.spits.CommitMessageInfo;
+import org.apache.paimon.flink.clone.spits.CommitMessageTableOperator;
+import org.apache.paimon.flink.clone.spits.ListCloneSplitsFunction;
+import org.apache.paimon.flink.clone.spits.ShuffleCommitMessageByTableComputer;
 import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
 import org.apache.paimon.hive.HiveCatalog;
 import org.apache.paimon.utils.StringUtils;
@@ -165,9 +173,72 @@ public class CloneHiveTableUtils {
                 FlinkStreamPartitioner.partition(
                         source, new ShuffleIdentifierByTableComputer(), 
parallelism);
 
-        // create target table, list files and group by <table, partition>
-        DataStream<CloneFileInfo> files =
+        // create target table, check support clone splits
+        DataStream<CloneSchemaInfo> schemaInfos =
                 partitionedSource
+                        .process(
+                                new CloneHiveSchemaFunction(
+                                        sourceCatalogConfig, 
targetCatalogConfig))
+                        .name("Clone Schema")
+                        .setParallelism(parallelism);
+
+        buildForCloneSplits(
+                sourceCatalogConfig, targetCatalogConfig, parallelism, 
whereSql, schemaInfos);
+
+        buildForCloneFile(
+                sourceCatalogConfig, targetCatalogConfig, parallelism, 
whereSql, schemaInfos);
+    }
+
+    public static void buildForCloneSplits(
+            Map<String, String> sourceCatalogConfig,
+            Map<String, String> targetCatalogConfig,
+            int parallelism,
+            @Nullable String whereSql,
+            DataStream<CloneSchemaInfo> schemaInfos) {
+
+        // list splits
+        DataStream<CloneSplitInfo> splits =
+                schemaInfos
+                        .filter(cloneSchemaInfo -> 
cloneSchemaInfo.supportCloneSplits())
+                        .rebalance()
+                        .process(
+                                new ListCloneSplitsFunction(
+                                        sourceCatalogConfig, 
targetCatalogConfig, whereSql))
+                        .name("List Splits")
+                        .setParallelism(parallelism);
+
+        // copy splits and commit
+        DataStream<CommitMessageInfo> commitMessage =
+                splits.rebalance()
+                        .process(new CloneSplitsFunction(sourceCatalogConfig, 
targetCatalogConfig))
+                        .name("Copy Splits")
+                        .setParallelism(parallelism);
+
+        DataStream<CommitMessageInfo> partitionedCommitMessage =
+                FlinkStreamPartitioner.partition(
+                        commitMessage, new 
ShuffleCommitMessageByTableComputer(), parallelism);
+
+        DataStream<Long> committed =
+                partitionedCommitMessage
+                        .transform(
+                                "Commit Table Splits",
+                                BasicTypeInfo.LONG_TYPE_INFO,
+                                new 
CommitMessageTableOperator(targetCatalogConfig))
+                        .setParallelism(parallelism);
+        committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
+    }
+
+    public static void buildForCloneFile(
+            Map<String, String> sourceCatalogConfig,
+            Map<String, String> targetCatalogConfig,
+            int parallelism,
+            @Nullable String whereSql,
+            DataStream<CloneSchemaInfo> schemaInfos) {
+        // list files and group by <table, partition>
+        DataStream<CloneFileInfo> files =
+                schemaInfos
+                        .filter(cloneSchemaInfo -> 
!cloneSchemaInfo.supportCloneSplits())
+                        .rebalance()
                         .process(
                                 new ListCloneFilesFunction(
                                         sourceCatalogConfig, 
targetCatalogConfig, whereSql))
@@ -188,7 +259,7 @@ public class CloneHiveTableUtils {
         DataStream<Long> committed =
                 partitionedDataFile
                         .transform(
-                                "Commit table",
+                                "Commit Table Files",
                                 BasicTypeInfo.LONG_TYPE_INFO,
                                 new 
CloneFilesCommitOperator(targetCatalogConfig))
                         .setParallelism(parallelism);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
index 7c63df3271..5a66da2c66 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
@@ -21,6 +21,8 @@ package org.apache.paimon.flink.clone;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.CloneAction;
+import org.apache.paimon.flink.clone.schema.ClonePaimonSchemaFunction;
+import org.apache.paimon.flink.clone.schema.CloneSchemaInfo;
 import org.apache.paimon.flink.clone.spits.CloneSplitInfo;
 import org.apache.paimon.flink.clone.spits.CloneSplitsFunction;
 import org.apache.paimon.flink.clone.spits.CommitMessageInfo;
@@ -153,19 +155,29 @@ public class ClonePaimonTableUtils {
                 FlinkStreamPartitioner.partition(
                         source, new ShuffleIdentifierByTableComputer(), 
parallelism);
 
-        DataStream<CloneSplitInfo> splits =
+        // create target table
+        DataStream<CloneSchemaInfo> schemaInfos =
                 partitionedSource
+                        .process(
+                                new ClonePaimonSchemaFunction(
+                                        sourceCatalogConfig, 
targetCatalogConfig))
+                        .name("Clone Schema")
+                        .setParallelism(parallelism);
+
+        // list splits
+        DataStream<CloneSplitInfo> splits =
+                schemaInfos
                         .process(
                                 new ListCloneSplitsFunction(
                                         sourceCatalogConfig, 
targetCatalogConfig, whereSql))
-                        .name("List Files")
+                        .name("List Splits")
                         .setParallelism(parallelism);
 
         // copy splits and commit
         DataStream<CommitMessageInfo> commitMessage =
                 splits.rebalance()
                         .process(new CloneSplitsFunction(sourceCatalogConfig, 
targetCatalogConfig))
-                        .name("Copy Files")
+                        .name("Copy Splits")
                         .setParallelism(parallelism);
 
         DataStream<CommitMessageInfo> partitionedCommitMessage =
@@ -175,7 +187,7 @@ public class ClonePaimonTableUtils {
         DataStream<Long> committed =
                 partitionedCommitMessage
                         .transform(
-                                "Commit table",
+                                "Commit Table",
                                 BasicTypeInfo.LONG_TYPE_INFO,
                                 new 
CommitMessageTableOperator(targetCatalogConfig))
                         .setParallelism(parallelism);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
index 8957b217d1..332feb7cf1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
@@ -18,19 +18,14 @@
 
 package org.apache.paimon.flink.clone.files;
 
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.clone.schema.CloneSchemaInfo;
 import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
 import org.apache.paimon.hive.clone.HiveCloneUtils;
 import org.apache.paimon.hive.clone.HivePartitionFiles;
-import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -39,18 +34,12 @@ import org.apache.flink.util.Collector;
 
 import javax.annotation.Nullable;
 
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-
-import static org.apache.paimon.CoreOptions.FILE_FORMAT;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
-import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** List files for table. */
 public class ListCloneFilesFunction
-        extends CloneFilesProcessFunction<Tuple2<Identifier, Identifier>, 
CloneFileInfo> {
+        extends CloneFilesProcessFunction<CloneSchemaInfo, CloneFileInfo> {
 
     private static final long serialVersionUID = 1L;
 
@@ -66,45 +55,11 @@ public class ListCloneFilesFunction
 
     @Override
     public void processElement(
-            Tuple2<Identifier, Identifier> tuple,
-            ProcessFunction<Tuple2<Identifier, Identifier>, 
CloneFileInfo>.Context context,
+            CloneSchemaInfo cloneSchemaInfo,
+            ProcessFunction<CloneSchemaInfo, CloneFileInfo>.Context context,
             Collector<CloneFileInfo> collector)
             throws Exception {
-        String sourceType = 
sourceCatalogConfig.get(CatalogOptions.METASTORE.key());
-        checkNotNull(sourceType);
-
-        // create database if not exists
-        Map<String, String> databaseOptions =
-                HiveCloneUtils.getDatabaseOptions(hiveCatalog, 
tuple.f0.getDatabaseName());
-        targetCatalog.createDatabase(tuple.f1.getDatabaseName(), true, 
databaseOptions);
-
-        Schema schema = HiveCloneUtils.hiveTableToPaimonSchema(hiveCatalog, 
tuple.f0);
-        Map<String, String> options = schema.options();
-        // only support Hive to unaware-bucket table now
-        options.put(CoreOptions.BUCKET.key(), "-1");
-        schema =
-                new Schema(
-                        schema.fields(),
-                        schema.partitionKeys(),
-                        schema.primaryKeys(),
-                        options,
-                        schema.comment());
-        try {
-            Table existedTable = targetCatalog.getTable(tuple.f1);
-
-            checkState(
-                    existedTable instanceof FileStoreTable,
-                    String.format(
-                            "existed paimon table '%s' is not a 
FileStoreTable, but a %s",
-                            tuple.f1, existedTable.getClass().getName()));
-            checkCompatible(schema, (FileStoreTable) existedTable);
-
-            LOG.info("paimon table '{}' already exists, use it as target 
table.", tuple.f1);
-        } catch (Catalog.TableNotExistException e) {
-            LOG.info("create target paimon table '{}'.", tuple.f1);
-
-            targetCatalog.createTable(tuple.f1, schema, false);
-        }
+        Tuple2<Identifier, Identifier> tuple = 
cloneSchemaInfo.identifierTuple();
 
         FileStoreTable table = (FileStoreTable) 
targetCatalog.getTable(tuple.f1);
         PartitionPredicate predicate =
@@ -127,47 +82,6 @@ public class ListCloneFilesFunction
         }
     }
 
-    private void checkCompatible(Schema sourceSchema, FileStoreTable 
existedTable) {
-        Schema existedSchema = existedTable.schema().toSchema();
-
-        // check primary keys
-        checkState(
-                existedSchema.primaryKeys().isEmpty(),
-                "Can not clone data to existed paimon table which has primary 
keys. Existed paimon table is "
-                        + existedTable.name());
-
-        // check bucket
-        checkState(
-                existedTable.coreOptions().bucket() == -1,
-                "Can not clone data to existed paimon table which bucket is 
not -1. Existed paimon table is "
-                        + existedTable.name());
-
-        // check format
-        checkState(
-                Objects.equals(
-                        sourceSchema.options().get(FILE_FORMAT.key()),
-                        existedTable.coreOptions().formatType()),
-                "source table format is not compatible with existed paimon 
table format.");
-
-        // check partition keys
-        List<String> sourcePartitionFields = sourceSchema.partitionKeys();
-        List<String> existedPartitionFields = existedSchema.partitionKeys();
-
-        checkState(
-                sourcePartitionFields.size() == existedPartitionFields.size()
-                        && new 
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
-                "source table partition keys is not compatible with existed 
paimon table partition keys.");
-
-        // check all fields
-        List<DataField> sourceFields = sourceSchema.fields();
-        List<DataField> existedFields = existedSchema.fields();
-
-        checkState(
-                existedFields.size() >= sourceFields.size()
-                        && new 
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
-                "source table partition keys is not compatible with existed 
paimon table partition keys.");
-    }
-
     @Nullable
     public static PartitionPredicate getPartitionPredicate(
             @Nullable String whereSql, RowType partitionType, Identifier 
tableId) throws Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
similarity index 66%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
index 8957b217d1..a1d7f374f3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
@@ -16,28 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone.files;
+package org.apache.paimon.flink.clone.schema;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
+import org.apache.paimon.hive.HiveCatalog;
 import org.apache.paimon.hive.clone.HiveCloneUtils;
-import org.apache.paimon.hive.clone.HivePartitionFiles;
 import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.RowType;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
-
-import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashSet;
 import java.util.List;
@@ -45,30 +44,52 @@ import java.util.Map;
 import java.util.Objects;
 
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
+import static 
org.apache.paimon.flink.clone.CloneHiveTableUtils.getRootHiveCatalog;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
-/** List files for table. */
-public class ListCloneFilesFunction
-        extends CloneFilesProcessFunction<Tuple2<Identifier, Identifier>, 
CloneFileInfo> {
+/** clone schema for hive table. */
+public class CloneHiveSchemaFunction
+        extends ProcessFunction<Tuple2<Identifier, Identifier>, 
CloneSchemaInfo> {
 
     private static final long serialVersionUID = 1L;
 
-    @Nullable private final String whereSql;
+    protected static final Logger LOG = 
LoggerFactory.getLogger(CloneHiveSchemaFunction.class);
+
+    protected final Map<String, String> sourceCatalogConfig;
+    protected final Map<String, String> targetCatalogConfig;
+
+    protected transient HiveCatalog hiveCatalog;
+    protected transient Catalog targetCatalog;
+
+    public CloneHiveSchemaFunction(
+            Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
+        this.sourceCatalogConfig = sourceCatalogConfig;
+        this.targetCatalogConfig = targetCatalogConfig;
+    }
 
-    public ListCloneFilesFunction(
-            Map<String, String> sourceCatalogConfig,
-            Map<String, String> targetCatalogConfig,
-            @Nullable String whereSql) {
-        super(sourceCatalogConfig, targetCatalogConfig);
-        this.whereSql = whereSql;
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public void open(OpenContext openContext) throws Exception {
+        open(new Configuration());
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
+    public void open(Configuration conf) throws Exception {
+        this.hiveCatalog =
+                
getRootHiveCatalog(createPaimonCatalog(Options.fromMap(sourceCatalogConfig)));
+        this.targetCatalog = 
createPaimonCatalog(Options.fromMap(targetCatalogConfig));
     }
 
     @Override
     public void processElement(
             Tuple2<Identifier, Identifier> tuple,
-            ProcessFunction<Tuple2<Identifier, Identifier>, 
CloneFileInfo>.Context context,
-            Collector<CloneFileInfo> collector)
+            ProcessFunction<Tuple2<Identifier, Identifier>, 
CloneSchemaInfo>.Context context,
+            Collector<CloneSchemaInfo> collector)
             throws Exception {
         String sourceType = 
sourceCatalogConfig.get(CatalogOptions.METASTORE.key());
         checkNotNull(sourceType);
@@ -80,6 +101,12 @@ public class ListCloneFilesFunction
 
         Schema schema = HiveCloneUtils.hiveTableToPaimonSchema(hiveCatalog, 
tuple.f0);
         Map<String, String> options = schema.options();
+
+        // check support clone splits
+        boolean supportCloneSplits =
+                
Boolean.parseBoolean(options.get(HiveCloneUtils.SUPPORT_CLONE_SPLITS));
+        options.remove(HiveCloneUtils.SUPPORT_CLONE_SPLITS);
+
         // only support Hive to unaware-bucket table now
         options.put(CoreOptions.BUCKET.key(), "-1");
         schema =
@@ -106,25 +133,8 @@ public class ListCloneFilesFunction
             targetCatalog.createTable(tuple.f1, schema, false);
         }
 
-        FileStoreTable table = (FileStoreTable) 
targetCatalog.getTable(tuple.f1);
-        PartitionPredicate predicate =
-                getPartitionPredicate(whereSql, 
table.schema().logicalPartitionType(), tuple.f0);
-
-        try {
-            List<HivePartitionFiles> allPartitions =
-                    HiveCloneUtils.listFiles(
-                            hiveCatalog,
-                            tuple.f0,
-                            table.schema().logicalPartitionType(),
-                            table.coreOptions().partitionDefaultName(),
-                            predicate);
-            for (HivePartitionFiles partitionFiles : allPartitions) {
-                CloneFileInfo.fromHive(tuple.f1, 
partitionFiles).forEach(collector::collect);
-            }
-        } catch (Exception e) {
-            throw new Exception(
-                    "Failed to list clone files for table " + 
tuple.f0.getFullName(), e);
-        }
+        CloneSchemaInfo schemaInfo = new CloneSchemaInfo(tuple, 
supportCloneSplits);
+        collector.collect(schemaInfo);
     }
 
     private void checkCompatible(Schema sourceSchema, FileStoreTable 
existedTable) {
@@ -167,26 +177,4 @@ public class ListCloneFilesFunction
                         && new 
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
                 "source table partition keys is not compatible with existed 
paimon table partition keys.");
     }
-
-    @Nullable
-    public static PartitionPredicate getPartitionPredicate(
-            @Nullable String whereSql, RowType partitionType, Identifier 
tableId) throws Exception {
-        if (whereSql == null) {
-            return null;
-        }
-
-        SimpleSqlPredicateConvertor simpleSqlPredicateConvertor =
-                new SimpleSqlPredicateConvertor(partitionType);
-        try {
-            Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate(whereSql);
-            return PartitionPredicate.fromPredicate(partitionType, predicate);
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    "Failed to parse partition filter sql '"
-                            + whereSql
-                            + "' for table "
-                            + tableId.getFullName(),
-                    e);
-        }
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
similarity index 77%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
index 9fdb5226b9..23b16560e5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
@@ -16,16 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.clone.spits;
+package org.apache.paimon.flink.clone.schema;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableScan;
 
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -33,37 +30,29 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 
-import javax.annotation.Nullable;
-
-import java.util.List;
 import java.util.Map;
 
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.BUCKET_KEY;
 import static org.apache.paimon.CoreOptions.PATH;
 import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
-import static 
org.apache.paimon.flink.clone.files.ListCloneFilesFunction.getPartitionPredicate;
 
 /** List splits for table. */
-public class ListCloneSplitsFunction
-        extends ProcessFunction<Tuple2<Identifier, Identifier>, 
CloneSplitInfo> {
+public class ClonePaimonSchemaFunction
+        extends ProcessFunction<Tuple2<Identifier, Identifier>, 
CloneSchemaInfo> {
 
     private static final long serialVersionUID = 1L;
 
     private final Map<String, String> sourceCatalogConfig;
     private final Map<String, String> targetCatalogConfig;
-    @Nullable private final String whereSql;
 
     private transient Catalog sourceCatalog;
     private transient Catalog targetCatalog;
 
-    public ListCloneSplitsFunction(
-            Map<String, String> sourceCatalogConfig,
-            Map<String, String> targetCatalogConfig,
-            @Nullable String whereSql) {
+    public ClonePaimonSchemaFunction(
+            Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
         this.sourceCatalogConfig = sourceCatalogConfig;
         this.targetCatalogConfig = targetCatalogConfig;
-        this.whereSql = whereSql;
     }
 
     /**
@@ -84,8 +73,8 @@ public class ListCloneSplitsFunction
     @Override
     public void processElement(
             Tuple2<Identifier, Identifier> tuple,
-            ProcessFunction<Tuple2<Identifier, Identifier>, 
CloneSplitInfo>.Context context,
-            Collector<CloneSplitInfo> collector)
+            ProcessFunction<Tuple2<Identifier, Identifier>, 
CloneSchemaInfo>.Context context,
+            Collector<CloneSchemaInfo> collector)
             throws Exception {
 
         // create database if not exists
@@ -124,16 +113,18 @@ public class ListCloneSplitsFunction
 
         targetCatalog.createTable(tuple.f1, builder.build(), true);
 
-        PartitionPredicate predicate =
-                getPartitionPredicate(
-                        whereSql,
-                        
sourceTable.rowType().project(sourceTable.partitionKeys()),
-                        tuple.f0);
-        TableScan scan = 
sourceTable.newReadBuilder().withPartitionFilter(predicate).newScan();
-        List<Split> splits = scan.plan().splits();
-        for (Split split : splits) {
-            CloneSplitInfo splitInfo = new CloneSplitInfo(tuple.f0, tuple.f1, 
split);
-            collector.collect(splitInfo);
+        CloneSchemaInfo cloneSchemaInfo = new CloneSchemaInfo(tuple, true);
+        collector.collect(cloneSchemaInfo);
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (sourceCatalog != null) {
+            this.sourceCatalog.close();
+        }
+        if (targetCatalog != null) {
+            this.targetCatalog.close();
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneSchemaInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneSchemaInfo.java
new file mode 100644
index 0000000000..51f924422e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneSchemaInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.schema;
+
+import org.apache.paimon.catalog.Identifier;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/** Clone hive table schema with necessary information. */
+public class CloneSchemaInfo {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Tuple2<Identifier, Identifier> identifierTuple;
+    private final boolean supportCloneSplits;
+
+    public CloneSchemaInfo(
+            Tuple2<Identifier, Identifier> identifierTuple, boolean 
supportCloneSplits) {
+        this.identifierTuple = identifierTuple;
+        this.supportCloneSplits = supportCloneSplits;
+    }
+
+    public Tuple2<Identifier, Identifier> identifierTuple() {
+        return identifierTuple;
+    }
+
+    public boolean supportCloneSplits() {
+        return supportCloneSplits;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
index 9fdb5226b9..f364c43800 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
@@ -20,9 +20,9 @@ package org.apache.paimon.flink.clone.spits;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.clone.schema.CloneSchemaInfo;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableScan;
@@ -38,15 +38,11 @@ import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.paimon.CoreOptions.BUCKET;
-import static org.apache.paimon.CoreOptions.BUCKET_KEY;
-import static org.apache.paimon.CoreOptions.PATH;
 import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
 import static 
org.apache.paimon.flink.clone.files.ListCloneFilesFunction.getPartitionPredicate;
 
 /** List splits for table. */
-public class ListCloneSplitsFunction
-        extends ProcessFunction<Tuple2<Identifier, Identifier>, 
CloneSplitInfo> {
+public class ListCloneSplitsFunction extends ProcessFunction<CloneSchemaInfo, 
CloneSplitInfo> {
 
     private static final long serialVersionUID = 1L;
 
@@ -83,47 +79,13 @@ public class ListCloneSplitsFunction
 
     @Override
     public void processElement(
-            Tuple2<Identifier, Identifier> tuple,
-            ProcessFunction<Tuple2<Identifier, Identifier>, 
CloneSplitInfo>.Context context,
+            CloneSchemaInfo cloneSchemaInfo,
+            ProcessFunction<CloneSchemaInfo, CloneSplitInfo>.Context context,
             Collector<CloneSplitInfo> collector)
             throws Exception {
-
-        // create database if not exists
-        targetCatalog.createDatabase(tuple.f1.getDatabaseName(), true);
+        Tuple2<Identifier, Identifier> tuple = 
cloneSchemaInfo.identifierTuple();
 
         Table sourceTable = sourceCatalog.getTable(tuple.f0);
-        Schema.Builder builder = Schema.newBuilder();
-        sourceTable
-                .rowType()
-                .getFields()
-                .forEach(
-                        f -> builder.column(f.name(), f.type(), 
f.description(), f.defaultValue()));
-        builder.partitionKeys(sourceTable.partitionKeys());
-        builder.primaryKey(sourceTable.primaryKeys());
-        sourceTable
-                .options()
-                .forEach(
-                        (k, v) -> {
-                            if (k.equalsIgnoreCase(BUCKET.key())
-                                    || k.equalsIgnoreCase(PATH.key())) {
-                                return;
-                            }
-                            builder.option(k, v);
-                        });
-
-        if (sourceTable.primaryKeys().isEmpty()) {
-            // for append table with bucket
-            if (sourceTable.options().containsKey(BUCKET_KEY.key())) {
-                builder.option(BUCKET.key(), 
sourceTable.options().get(BUCKET.key()));
-                builder.option(BUCKET_KEY.key(), 
sourceTable.options().get(BUCKET_KEY.key()));
-            }
-        } else {
-            // for primary key table, only postpone bucket supports clone
-            builder.option(BUCKET.key(), "-2");
-        }
-
-        targetCatalog.createTable(tuple.f1, builder.build(), true);
-
         PartitionPredicate predicate =
                 getPartitionPredicate(
                         whereSql,
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneExtractor.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneExtractor.java
index 51ec34bced..c8573f1a0e 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneExtractor.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneExtractor.java
@@ -57,6 +57,8 @@ public interface HiveCloneExtractor {
 
     Map<String, String> extractOptions(Table table);
 
+    boolean supportCloneSplits(String format);
+
     List<HiveCloneExtractor> EXTRACTORS =
             FactoryUtil.discoverFactories(
                     HiveCloneExtractor.class.getClassLoader(), 
HiveCloneExtractor.class);
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
index 433d536347..2672aaa450 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.slf4j.Logger;
@@ -56,6 +57,8 @@ public class HiveCloneUtils {
     public static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
             p -> !p.getPath().getName().startsWith("_") && 
!p.getPath().getName().startsWith(".");
 
+    public static final String SUPPORT_CLONE_SPLITS = "support.clone.splits";
+
     public static Map<String, String> getDatabaseOptions(
             HiveCatalog hiveCatalog, String databaseName) throws Exception {
         IMetaStoreClient client = hiveCatalog.getHmsClient();
@@ -186,14 +189,28 @@ public class HiveCloneUtils {
                         predicate);
     }
 
-    private static String parseFormat(StorageDescriptor storageDescriptor) {
-        String serder = storageDescriptor.getSerdeInfo().toString();
-        if (serder.contains("avro")) {
+    private static String parseFormat(StorageDescriptor sd) {
+        SerDeInfo serdeInfo = sd.getSerdeInfo();
+        if (serdeInfo == null) {
+            return null;
+        }
+        String serLib =
+                serdeInfo.getSerializationLib() == null
+                        ? ""
+                        : serdeInfo.getSerializationLib().toLowerCase();
+        String inputFormat = sd.getInputFormat() == null ? "" : 
sd.getInputFormat();
+        if (serLib.contains("avro")) {
             return "avro";
-        } else if (serder.contains("parquet")) {
+        } else if (serLib.contains("parquet")) {
             return "parquet";
-        } else if (serder.contains("orc")) {
+        } else if (serLib.contains("orc")) {
             return "orc";
+        } else if (inputFormat.contains("Text")) {
+            if (serLib.contains("json")) {
+                return "json";
+            } else {
+                return "csv";
+            }
         }
         return null;
     }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveTableCloneExtractor.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveTableCloneExtractor.java
index d363be73ad..0d3baf2942 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveTableCloneExtractor.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveTableCloneExtractor.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.migrate.FileMetaUtils;
 import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.types.RowType;
 
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -51,6 +52,7 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.FILE_COMPRESSION;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.hive.clone.HiveCloneUtils.HIDDEN_PATH_FILTER;
+import static org.apache.paimon.hive.clone.HiveCloneUtils.SUPPORT_CLONE_SPLITS;
 import static org.apache.paimon.hive.clone.HiveCloneUtils.parseFormat;
 
 /** A {@link HiveCloneExtractor} for hive tables. */
@@ -110,6 +112,12 @@ public class HiveTableCloneExtractor implements 
HiveCloneExtractor {
         }
 
         String format = parseFormat(table);
+        if (supportCloneSplits(format)) {
+            Map<String, String> cloneSplitsOptions = 
getOptionsWhenCloneSplits(table, format);
+            paimonOptions.putAll(cloneSplitsOptions);
+            return paimonOptions;
+        }
+
         paimonOptions.put(FILE_FORMAT.key(), format);
         Map<String, String> formatOptions = getIdentifierPrefixOptions(format, 
hiveTableOptions);
         Map<String, String> sdFormatOptions =
@@ -124,6 +132,16 @@ public class HiveTableCloneExtractor implements 
HiveCloneExtractor {
         return paimonOptions;
     }
 
+    @Override
+    public boolean supportCloneSplits(String format) {
+        for (FormatTable.Format supportFormat : FormatTable.Format.values()) {
+            if (supportFormat.name().equalsIgnoreCase(format)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private static List<HivePartitionFiles> listFromPureHiveTable(
             IMetaStoreClient client,
             Identifier identifier,
@@ -216,4 +234,18 @@ public class HiveTableCloneExtractor implements 
HiveCloneExtractor {
         }
         return result;
     }
+
+    public static Map<String, String> getOptionsWhenCloneSplits(Table table, 
String format) {
+        Map<String, String> result = new HashMap<>();
+        if (FormatTable.Format.JSON.name().equalsIgnoreCase(format)) {
+            result.put(FILE_FORMAT.key(), 
FormatTable.Format.PARQUET.name().toLowerCase());
+        } else if (FormatTable.Format.CSV.name().equalsIgnoreCase(format)) {
+            result.put(FILE_FORMAT.key(), 
FormatTable.Format.PARQUET.name().toLowerCase());
+        } else {
+            result.put(FILE_FORMAT.key(), format);
+        }
+        // only for clone
+        result.put(SUPPORT_CLONE_SPLITS, "true");
+        return result;
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
index d6ff369c1b..9c49ee92d5 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
@@ -826,6 +826,126 @@ public class CloneActionITCase extends ActionITCaseBase {
         
Assertions.assertThatList(actualDB2Tables).containsExactlyInAnyOrderElementsOf(db2Tables);
     }
 
+    @Test
+    public void testMigrateCsvTable() throws Exception {
+        String format = "textfile";
+        String dbName = "hivedb" + StringUtils.randomNumericString(10);
+        String tableName = "hivetable" + StringUtils.randomNumericString(10);
+
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+        tEnv.useCatalog("HIVE");
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql("CREATE DATABASE " + dbName);
+        sql(
+                tEnv,
+                "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3 
INT) "
+                        + "ROW FORMAT SERDE 
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' STORED AS %s ",
+                dbName,
+                tableName,
+                format);
+        sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
+        tEnv.useCatalog("PAIMON_GE");
+
+        List<Row> r1 = sql(tEnv, "SELECT * FROM %s.%s", dbName, tableName);
+
+        tEnv.executeSql(
+                "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '" 
+ warehouse + "')");
+        tEnv.useCatalog("PAIMON");
+        tEnv.executeSql("CREATE DATABASE test");
+
+        List<String> args =
+                new ArrayList<>(
+                        Arrays.asList(
+                                "clone",
+                                "--database",
+                                dbName,
+                                "--table",
+                                tableName,
+                                "--catalog_conf",
+                                "metastore=hive",
+                                "--catalog_conf",
+                                "uri=thrift://localhost:" + PORT,
+                                "--target_database",
+                                "test",
+                                "--target_table",
+                                "test_table",
+                                "--target_catalog_conf",
+                                "warehouse=" + warehouse));
+
+        createAction(CloneAction.class, args).run();
+        FileStoreTable paimonTable =
+                paimonTable(tEnv, "PAIMON", Identifier.create("test", 
"test_table"));
+
+        assertThat(paimonTable.partitionKeys()).containsExactly("id2", "id3");
+
+        List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
+        Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+    }
+
+    @Test
+    public void testMigrateJsonTable() throws Exception {
+        String format = "textfile";
+        String dbName = "hivedb" + StringUtils.randomNumericString(10);
+        String tableName = "hivetable" + StringUtils.randomNumericString(10);
+
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+        tEnv.useCatalog("HIVE");
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql("CREATE DATABASE " + dbName);
+        sql(
+                tEnv,
+                "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3 
INT)"
+                        + "ROW FORMAT SERDE 
'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS %s ",
+                dbName,
+                tableName,
+                format);
+        sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
+        tEnv.useCatalog("PAIMON_GE");
+
+        List<Row> r1 = sql(tEnv, "SELECT * FROM %s.%s", dbName, tableName);
+
+        tEnv.executeSql(
+                "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '" 
+ warehouse + "')");
+        tEnv.useCatalog("PAIMON");
+        tEnv.executeSql("CREATE DATABASE test");
+
+        List<String> args =
+                new ArrayList<>(
+                        Arrays.asList(
+                                "clone",
+                                "--database",
+                                dbName,
+                                "--table",
+                                tableName,
+                                "--catalog_conf",
+                                "metastore=hive",
+                                "--catalog_conf",
+                                "uri=thrift://localhost:" + PORT,
+                                "--target_database",
+                                "test",
+                                "--target_table",
+                                "test_table",
+                                "--target_catalog_conf",
+                                "warehouse=" + warehouse));
+
+        createAction(CloneAction.class, args).run();
+        FileStoreTable paimonTable =
+                paimonTable(tEnv, "PAIMON", Identifier.create("test", 
"test_table"));
+
+        assertThat(paimonTable.partitionKeys()).containsExactly("id2", "id3");
+
+        List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
+        Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+    }
+
     private String[] ddls(String format) {
         // has primary key
         String ddl0 =
@@ -893,14 +1013,14 @@ public class CloneActionITCase extends ActionITCaseBase {
     private String randomFormat() {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         int i = random.nextInt(3);
-        String[] formats = new String[] {"orc", "parquet", "avro"};
+        String[] formats = new String[] {"orc", "parquet", "avro", "textfile"};
         return formats[i];
     }
 
     private String randomFormat(String excludedFormat) {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         int i = random.nextInt(3);
-        String[] formats = new String[] {"orc", "parquet", "avro"};
+        String[] formats = new String[] {"orc", "parquet", "avro", "textfile"};
         if (Objects.equals(excludedFormat, formats[i])) {
             return formats[(i + 1) % 3];
         }
diff --git 
a/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiHiveCloneExtractor.java 
b/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiHiveCloneExtractor.java
index 9eacb127bf..bd353a240b 100644
--- 
a/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiHiveCloneExtractor.java
+++ 
b/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiHiveCloneExtractor.java
@@ -112,6 +112,11 @@ public class HudiHiveCloneExtractor extends 
HiveTableCloneExtractor {
         }
     }
 
+    @Override
+    public boolean supportCloneSplits(String format) {
+        return false;
+    }
+
     private static void checkTableType(Map<String, String> conf) {
         String type = conf.getOrDefault("table.type", 
HoodieTableType.COPY_ON_WRITE.name());
         checkArgument(
diff --git 
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergHiveCloneExtractor.java
 
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergHiveCloneExtractor.java
index cea2948f14..920dfcf3df 100644
--- 
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergHiveCloneExtractor.java
+++ 
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergHiveCloneExtractor.java
@@ -194,6 +194,11 @@ public class IcebergHiveCloneExtractor extends 
HiveTableCloneExtractor {
         return paimonOptions;
     }
 
+    @Override
+    public boolean supportCloneSplits(String format) {
+        return false;
+    }
+
     private HivePartitionFiles toHivePartitionFiles(List<DataFile> dataFiles, 
BinaryRow partition) {
         List<org.apache.paimon.fs.Path> paths = new 
ArrayList<>(dataFiles.size());
         List<Long> fileSizes = new ArrayList<>(dataFiles.size());

Reply via email to