This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c937b53d3b [clone] refactor clone from hive to support csv and json
table clone (#6200)
c937b53d3b is described below
commit c937b53d3bdf961993ab53ebe2108d55d8148a4d
Author: shyjsarah <[email protected]>
AuthorDate: Thu Sep 4 21:54:04 2025 +0800
[clone] refactor clone from hive to support csv and json table clone (#6200)
---
.../paimon/flink/clone/CloneHiveTableUtils.java | 77 ++++++++++++-
.../paimon/flink/clone/ClonePaimonTableUtils.java | 20 +++-
.../flink/clone/files/ListCloneFilesFunction.java | 96 +---------------
.../CloneHiveSchemaFunction.java} | 110 ++++++++----------
.../ClonePaimonSchemaFunction.java} | 47 ++++----
.../paimon/flink/clone/schema/CloneSchemaInfo.java | 46 ++++++++
.../flink/clone/spits/ListCloneSplitsFunction.java | 48 +-------
.../paimon/hive/clone/HiveCloneExtractor.java | 2 +
.../apache/paimon/hive/clone/HiveCloneUtils.java | 27 ++++-
.../paimon/hive/clone/HiveTableCloneExtractor.java | 32 ++++++
.../paimon/hive/procedure/CloneActionITCase.java | 124 ++++++++++++++++++++-
.../apache/paimon/hudi/HudiHiveCloneExtractor.java | 5 +
.../paimon/iceberg/IcebergHiveCloneExtractor.java | 5 +
13 files changed, 402 insertions(+), 237 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
index 3ea672184a..fae777f579 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
@@ -28,6 +28,14 @@ import
org.apache.paimon.flink.clone.files.CloneFilesFunction;
import org.apache.paimon.flink.clone.files.DataFileInfo;
import org.apache.paimon.flink.clone.files.ListCloneFilesFunction;
import org.apache.paimon.flink.clone.files.ShuffleDataFileByTableComputer;
+import org.apache.paimon.flink.clone.schema.CloneHiveSchemaFunction;
+import org.apache.paimon.flink.clone.schema.CloneSchemaInfo;
+import org.apache.paimon.flink.clone.spits.CloneSplitInfo;
+import org.apache.paimon.flink.clone.spits.CloneSplitsFunction;
+import org.apache.paimon.flink.clone.spits.CommitMessageInfo;
+import org.apache.paimon.flink.clone.spits.CommitMessageTableOperator;
+import org.apache.paimon.flink.clone.spits.ListCloneSplitsFunction;
+import org.apache.paimon.flink.clone.spits.ShuffleCommitMessageByTableComputer;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.utils.StringUtils;
@@ -165,9 +173,72 @@ public class CloneHiveTableUtils {
FlinkStreamPartitioner.partition(
source, new ShuffleIdentifierByTableComputer(),
parallelism);
- // create target table, list files and group by <table, partition>
- DataStream<CloneFileInfo> files =
+ // create target table, check support clone splits
+ DataStream<CloneSchemaInfo> schemaInfos =
partitionedSource
+ .process(
+ new CloneHiveSchemaFunction(
+ sourceCatalogConfig,
targetCatalogConfig))
+ .name("Clone Schema")
+ .setParallelism(parallelism);
+
+ buildForCloneSplits(
+ sourceCatalogConfig, targetCatalogConfig, parallelism,
whereSql, schemaInfos);
+
+ buildForCloneFile(
+ sourceCatalogConfig, targetCatalogConfig, parallelism,
whereSql, schemaInfos);
+ }
+
+ public static void buildForCloneSplits(
+ Map<String, String> sourceCatalogConfig,
+ Map<String, String> targetCatalogConfig,
+ int parallelism,
+ @Nullable String whereSql,
+ DataStream<CloneSchemaInfo> schemaInfos) {
+
+ // list splits
+ DataStream<CloneSplitInfo> splits =
+ schemaInfos
+ .filter(cloneSchemaInfo ->
cloneSchemaInfo.supportCloneSplits())
+ .rebalance()
+ .process(
+ new ListCloneSplitsFunction(
+ sourceCatalogConfig,
targetCatalogConfig, whereSql))
+ .name("List Splits")
+ .setParallelism(parallelism);
+
+ // copy splits and commit
+ DataStream<CommitMessageInfo> commitMessage =
+ splits.rebalance()
+ .process(new CloneSplitsFunction(sourceCatalogConfig,
targetCatalogConfig))
+ .name("Copy Splits")
+ .setParallelism(parallelism);
+
+ DataStream<CommitMessageInfo> partitionedCommitMessage =
+ FlinkStreamPartitioner.partition(
+ commitMessage, new
ShuffleCommitMessageByTableComputer(), parallelism);
+
+ DataStream<Long> committed =
+ partitionedCommitMessage
+ .transform(
+ "Commit Table Splits",
+ BasicTypeInfo.LONG_TYPE_INFO,
+ new
CommitMessageTableOperator(targetCatalogConfig))
+ .setParallelism(parallelism);
+ committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
+ }
+
+ public static void buildForCloneFile(
+ Map<String, String> sourceCatalogConfig,
+ Map<String, String> targetCatalogConfig,
+ int parallelism,
+ @Nullable String whereSql,
+ DataStream<CloneSchemaInfo> schemaInfos) {
+ // list files and group by <table, partition>
+ DataStream<CloneFileInfo> files =
+ schemaInfos
+ .filter(cloneSchemaInfo ->
!cloneSchemaInfo.supportCloneSplits())
+ .rebalance()
.process(
new ListCloneFilesFunction(
sourceCatalogConfig,
targetCatalogConfig, whereSql))
@@ -188,7 +259,7 @@ public class CloneHiveTableUtils {
DataStream<Long> committed =
partitionedDataFile
.transform(
- "Commit table",
+ "Commit Table Files",
BasicTypeInfo.LONG_TYPE_INFO,
new
CloneFilesCommitOperator(targetCatalogConfig))
.setParallelism(parallelism);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
index 7c63df3271..5a66da2c66 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
@@ -21,6 +21,8 @@ package org.apache.paimon.flink.clone;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.CloneAction;
+import org.apache.paimon.flink.clone.schema.ClonePaimonSchemaFunction;
+import org.apache.paimon.flink.clone.schema.CloneSchemaInfo;
import org.apache.paimon.flink.clone.spits.CloneSplitInfo;
import org.apache.paimon.flink.clone.spits.CloneSplitsFunction;
import org.apache.paimon.flink.clone.spits.CommitMessageInfo;
@@ -153,19 +155,29 @@ public class ClonePaimonTableUtils {
FlinkStreamPartitioner.partition(
source, new ShuffleIdentifierByTableComputer(),
parallelism);
- DataStream<CloneSplitInfo> splits =
+ // create target table
+ DataStream<CloneSchemaInfo> schemaInfos =
partitionedSource
+ .process(
+ new ClonePaimonSchemaFunction(
+ sourceCatalogConfig,
targetCatalogConfig))
+ .name("Clone Schema")
+ .setParallelism(parallelism);
+
+ // list splits
+ DataStream<CloneSplitInfo> splits =
+ schemaInfos
.process(
new ListCloneSplitsFunction(
sourceCatalogConfig,
targetCatalogConfig, whereSql))
- .name("List Files")
+ .name("List Splits")
.setParallelism(parallelism);
// copy splits and commit
DataStream<CommitMessageInfo> commitMessage =
splits.rebalance()
.process(new CloneSplitsFunction(sourceCatalogConfig,
targetCatalogConfig))
- .name("Copy Files")
+ .name("Copy Splits")
.setParallelism(parallelism);
DataStream<CommitMessageInfo> partitionedCommitMessage =
@@ -175,7 +187,7 @@ public class ClonePaimonTableUtils {
DataStream<Long> committed =
partitionedCommitMessage
.transform(
- "Commit table",
+ "Commit Table",
BasicTypeInfo.LONG_TYPE_INFO,
new
CommitMessageTableOperator(targetCatalogConfig))
.setParallelism(parallelism);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
index 8957b217d1..332feb7cf1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
@@ -18,19 +18,14 @@
package org.apache.paimon.flink.clone.files;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.clone.schema.CloneSchemaInfo;
import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
import org.apache.paimon.hive.clone.HiveCloneUtils;
import org.apache.paimon.hive.clone.HivePartitionFiles;
-import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -39,18 +34,12 @@ import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-
-import static org.apache.paimon.CoreOptions.FILE_FORMAT;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
-import static org.apache.paimon.utils.Preconditions.checkState;
/** List files for table. */
public class ListCloneFilesFunction
- extends CloneFilesProcessFunction<Tuple2<Identifier, Identifier>,
CloneFileInfo> {
+ extends CloneFilesProcessFunction<CloneSchemaInfo, CloneFileInfo> {
private static final long serialVersionUID = 1L;
@@ -66,45 +55,11 @@ public class ListCloneFilesFunction
@Override
public void processElement(
- Tuple2<Identifier, Identifier> tuple,
- ProcessFunction<Tuple2<Identifier, Identifier>,
CloneFileInfo>.Context context,
+ CloneSchemaInfo cloneSchemaInfo,
+ ProcessFunction<CloneSchemaInfo, CloneFileInfo>.Context context,
Collector<CloneFileInfo> collector)
throws Exception {
- String sourceType =
sourceCatalogConfig.get(CatalogOptions.METASTORE.key());
- checkNotNull(sourceType);
-
- // create database if not exists
- Map<String, String> databaseOptions =
- HiveCloneUtils.getDatabaseOptions(hiveCatalog,
tuple.f0.getDatabaseName());
- targetCatalog.createDatabase(tuple.f1.getDatabaseName(), true,
databaseOptions);
-
- Schema schema = HiveCloneUtils.hiveTableToPaimonSchema(hiveCatalog,
tuple.f0);
- Map<String, String> options = schema.options();
- // only support Hive to unaware-bucket table now
- options.put(CoreOptions.BUCKET.key(), "-1");
- schema =
- new Schema(
- schema.fields(),
- schema.partitionKeys(),
- schema.primaryKeys(),
- options,
- schema.comment());
- try {
- Table existedTable = targetCatalog.getTable(tuple.f1);
-
- checkState(
- existedTable instanceof FileStoreTable,
- String.format(
- "existed paimon table '%s' is not a
FileStoreTable, but a %s",
- tuple.f1, existedTable.getClass().getName()));
- checkCompatible(schema, (FileStoreTable) existedTable);
-
- LOG.info("paimon table '{}' already exists, use it as target
table.", tuple.f1);
- } catch (Catalog.TableNotExistException e) {
- LOG.info("create target paimon table '{}'.", tuple.f1);
-
- targetCatalog.createTable(tuple.f1, schema, false);
- }
+ Tuple2<Identifier, Identifier> tuple =
cloneSchemaInfo.identifierTuple();
FileStoreTable table = (FileStoreTable)
targetCatalog.getTable(tuple.f1);
PartitionPredicate predicate =
@@ -127,47 +82,6 @@ public class ListCloneFilesFunction
}
}
- private void checkCompatible(Schema sourceSchema, FileStoreTable
existedTable) {
- Schema existedSchema = existedTable.schema().toSchema();
-
- // check primary keys
- checkState(
- existedSchema.primaryKeys().isEmpty(),
- "Can not clone data to existed paimon table which has primary
keys. Existed paimon table is "
- + existedTable.name());
-
- // check bucket
- checkState(
- existedTable.coreOptions().bucket() == -1,
- "Can not clone data to existed paimon table which bucket is
not -1. Existed paimon table is "
- + existedTable.name());
-
- // check format
- checkState(
- Objects.equals(
- sourceSchema.options().get(FILE_FORMAT.key()),
- existedTable.coreOptions().formatType()),
- "source table format is not compatible with existed paimon
table format.");
-
- // check partition keys
- List<String> sourcePartitionFields = sourceSchema.partitionKeys();
- List<String> existedPartitionFields = existedSchema.partitionKeys();
-
- checkState(
- sourcePartitionFields.size() == existedPartitionFields.size()
- && new
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
- "source table partition keys is not compatible with existed
paimon table partition keys.");
-
- // check all fields
- List<DataField> sourceFields = sourceSchema.fields();
- List<DataField> existedFields = existedSchema.fields();
-
- checkState(
- existedFields.size() >= sourceFields.size()
- && new
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
- "source table partition keys is not compatible with existed
paimon table partition keys.");
- }
-
@Nullable
public static PartitionPredicate getPartitionPredicate(
@Nullable String whereSql, RowType partitionType, Identifier
tableId) throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
similarity index 66%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
index 8957b217d1..a1d7f374f3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
@@ -16,28 +16,27 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.clone.files;
+package org.apache.paimon.flink.clone.schema;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
+import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.hive.clone.HiveCloneUtils;
-import org.apache.paimon.hive.clone.HivePartitionFiles;
import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.RowType;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
-
-import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.List;
@@ -45,30 +44,52 @@ import java.util.Map;
import java.util.Objects;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
+import static
org.apache.paimon.flink.clone.CloneHiveTableUtils.getRootHiveCatalog;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.apache.paimon.utils.Preconditions.checkState;
-/** List files for table. */
-public class ListCloneFilesFunction
- extends CloneFilesProcessFunction<Tuple2<Identifier, Identifier>,
CloneFileInfo> {
+/** clone schema for hive table. */
+public class CloneHiveSchemaFunction
+ extends ProcessFunction<Tuple2<Identifier, Identifier>,
CloneSchemaInfo> {
private static final long serialVersionUID = 1L;
- @Nullable private final String whereSql;
+ protected static final Logger LOG =
LoggerFactory.getLogger(CloneHiveSchemaFunction.class);
+
+ protected final Map<String, String> sourceCatalogConfig;
+ protected final Map<String, String> targetCatalogConfig;
+
+ protected transient HiveCatalog hiveCatalog;
+ protected transient Catalog targetCatalog;
+
+ public CloneHiveSchemaFunction(
+ Map<String, String> sourceCatalogConfig, Map<String, String>
targetCatalogConfig) {
+ this.sourceCatalogConfig = sourceCatalogConfig;
+ this.targetCatalogConfig = targetCatalogConfig;
+ }
- public ListCloneFilesFunction(
- Map<String, String> sourceCatalogConfig,
- Map<String, String> targetCatalogConfig,
- @Nullable String whereSql) {
- super(sourceCatalogConfig, targetCatalogConfig);
- this.whereSql = whereSql;
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
+ public void open(Configuration conf) throws Exception {
+ this.hiveCatalog =
+
getRootHiveCatalog(createPaimonCatalog(Options.fromMap(sourceCatalogConfig)));
+ this.targetCatalog =
createPaimonCatalog(Options.fromMap(targetCatalogConfig));
}
@Override
public void processElement(
Tuple2<Identifier, Identifier> tuple,
- ProcessFunction<Tuple2<Identifier, Identifier>,
CloneFileInfo>.Context context,
- Collector<CloneFileInfo> collector)
+ ProcessFunction<Tuple2<Identifier, Identifier>,
CloneSchemaInfo>.Context context,
+ Collector<CloneSchemaInfo> collector)
throws Exception {
String sourceType =
sourceCatalogConfig.get(CatalogOptions.METASTORE.key());
checkNotNull(sourceType);
@@ -80,6 +101,12 @@ public class ListCloneFilesFunction
Schema schema = HiveCloneUtils.hiveTableToPaimonSchema(hiveCatalog,
tuple.f0);
Map<String, String> options = schema.options();
+
+ // check support clone splits
+ boolean supportCloneSplits =
+
Boolean.parseBoolean(options.get(HiveCloneUtils.SUPPORT_CLONE_SPLITS));
+ options.remove(HiveCloneUtils.SUPPORT_CLONE_SPLITS);
+
// only support Hive to unaware-bucket table now
options.put(CoreOptions.BUCKET.key(), "-1");
schema =
@@ -106,25 +133,8 @@ public class ListCloneFilesFunction
targetCatalog.createTable(tuple.f1, schema, false);
}
- FileStoreTable table = (FileStoreTable)
targetCatalog.getTable(tuple.f1);
- PartitionPredicate predicate =
- getPartitionPredicate(whereSql,
table.schema().logicalPartitionType(), tuple.f0);
-
- try {
- List<HivePartitionFiles> allPartitions =
- HiveCloneUtils.listFiles(
- hiveCatalog,
- tuple.f0,
- table.schema().logicalPartitionType(),
- table.coreOptions().partitionDefaultName(),
- predicate);
- for (HivePartitionFiles partitionFiles : allPartitions) {
- CloneFileInfo.fromHive(tuple.f1,
partitionFiles).forEach(collector::collect);
- }
- } catch (Exception e) {
- throw new Exception(
- "Failed to list clone files for table " +
tuple.f0.getFullName(), e);
- }
+ CloneSchemaInfo schemaInfo = new CloneSchemaInfo(tuple,
supportCloneSplits);
+ collector.collect(schemaInfo);
}
private void checkCompatible(Schema sourceSchema, FileStoreTable
existedTable) {
@@ -167,26 +177,4 @@ public class ListCloneFilesFunction
&& new
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
"source table partition keys is not compatible with existed
paimon table partition keys.");
}
-
- @Nullable
- public static PartitionPredicate getPartitionPredicate(
- @Nullable String whereSql, RowType partitionType, Identifier
tableId) throws Exception {
- if (whereSql == null) {
- return null;
- }
-
- SimpleSqlPredicateConvertor simpleSqlPredicateConvertor =
- new SimpleSqlPredicateConvertor(partitionType);
- try {
- Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate(whereSql);
- return PartitionPredicate.fromPredicate(partitionType, predicate);
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to parse partition filter sql '"
- + whereSql
- + "' for table "
- + tableId.getFullName(),
- e);
- }
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
similarity index 77%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
index 9fdb5226b9..23b16560e5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
@@ -16,16 +16,13 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.clone.spits;
+package org.apache.paimon.flink.clone.schema;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
-import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableScan;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -33,37 +30,29 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
-import javax.annotation.Nullable;
-
-import java.util.List;
import java.util.Map;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
-import static
org.apache.paimon.flink.clone.files.ListCloneFilesFunction.getPartitionPredicate;
/** List splits for table. */
-public class ListCloneSplitsFunction
- extends ProcessFunction<Tuple2<Identifier, Identifier>,
CloneSplitInfo> {
+public class ClonePaimonSchemaFunction
+ extends ProcessFunction<Tuple2<Identifier, Identifier>,
CloneSchemaInfo> {
private static final long serialVersionUID = 1L;
private final Map<String, String> sourceCatalogConfig;
private final Map<String, String> targetCatalogConfig;
- @Nullable private final String whereSql;
private transient Catalog sourceCatalog;
private transient Catalog targetCatalog;
- public ListCloneSplitsFunction(
- Map<String, String> sourceCatalogConfig,
- Map<String, String> targetCatalogConfig,
- @Nullable String whereSql) {
+ public ClonePaimonSchemaFunction(
+ Map<String, String> sourceCatalogConfig, Map<String, String>
targetCatalogConfig) {
this.sourceCatalogConfig = sourceCatalogConfig;
this.targetCatalogConfig = targetCatalogConfig;
- this.whereSql = whereSql;
}
/**
@@ -84,8 +73,8 @@ public class ListCloneSplitsFunction
@Override
public void processElement(
Tuple2<Identifier, Identifier> tuple,
- ProcessFunction<Tuple2<Identifier, Identifier>,
CloneSplitInfo>.Context context,
- Collector<CloneSplitInfo> collector)
+ ProcessFunction<Tuple2<Identifier, Identifier>,
CloneSchemaInfo>.Context context,
+ Collector<CloneSchemaInfo> collector)
throws Exception {
// create database if not exists
@@ -124,16 +113,18 @@ public class ListCloneSplitsFunction
targetCatalog.createTable(tuple.f1, builder.build(), true);
- PartitionPredicate predicate =
- getPartitionPredicate(
- whereSql,
-
sourceTable.rowType().project(sourceTable.partitionKeys()),
- tuple.f0);
- TableScan scan =
sourceTable.newReadBuilder().withPartitionFilter(predicate).newScan();
- List<Split> splits = scan.plan().splits();
- for (Split split : splits) {
- CloneSplitInfo splitInfo = new CloneSplitInfo(tuple.f0, tuple.f1,
split);
- collector.collect(splitInfo);
+ CloneSchemaInfo cloneSchemaInfo = new CloneSchemaInfo(tuple, true);
+ collector.collect(cloneSchemaInfo);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (sourceCatalog != null) {
+ this.sourceCatalog.close();
+ }
+ if (targetCatalog != null) {
+ this.targetCatalog.close();
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneSchemaInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneSchemaInfo.java
new file mode 100644
index 0000000000..51f924422e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneSchemaInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.schema;
+
+import org.apache.paimon.catalog.Identifier;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/** Clone hive table schema with necessary information. */
+public class CloneSchemaInfo {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Tuple2<Identifier, Identifier> identifierTuple;
+ private final boolean supportCloneSplits;
+
+ public CloneSchemaInfo(
+ Tuple2<Identifier, Identifier> identifierTuple, boolean
supportCloneSplits) {
+ this.identifierTuple = identifierTuple;
+ this.supportCloneSplits = supportCloneSplits;
+ }
+
+ public Tuple2<Identifier, Identifier> identifierTuple() {
+ return identifierTuple;
+ }
+
+ public boolean supportCloneSplits() {
+ return supportCloneSplits;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
index 9fdb5226b9..f364c43800 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
@@ -20,9 +20,9 @@ package org.apache.paimon.flink.clone.spits;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.clone.schema.CloneSchemaInfo;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
@@ -38,15 +38,11 @@ import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
-import static org.apache.paimon.CoreOptions.BUCKET;
-import static org.apache.paimon.CoreOptions.BUCKET_KEY;
-import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
import static
org.apache.paimon.flink.clone.files.ListCloneFilesFunction.getPartitionPredicate;
/** List splits for table. */
-public class ListCloneSplitsFunction
- extends ProcessFunction<Tuple2<Identifier, Identifier>,
CloneSplitInfo> {
+public class ListCloneSplitsFunction extends ProcessFunction<CloneSchemaInfo,
CloneSplitInfo> {
private static final long serialVersionUID = 1L;
@@ -83,47 +79,13 @@ public class ListCloneSplitsFunction
@Override
public void processElement(
- Tuple2<Identifier, Identifier> tuple,
- ProcessFunction<Tuple2<Identifier, Identifier>,
CloneSplitInfo>.Context context,
+ CloneSchemaInfo cloneSchemaInfo,
+ ProcessFunction<CloneSchemaInfo, CloneSplitInfo>.Context context,
Collector<CloneSplitInfo> collector)
throws Exception {
-
- // create database if not exists
- targetCatalog.createDatabase(tuple.f1.getDatabaseName(), true);
+ Tuple2<Identifier, Identifier> tuple =
cloneSchemaInfo.identifierTuple();
Table sourceTable = sourceCatalog.getTable(tuple.f0);
- Schema.Builder builder = Schema.newBuilder();
- sourceTable
- .rowType()
- .getFields()
- .forEach(
- f -> builder.column(f.name(), f.type(),
f.description(), f.defaultValue()));
- builder.partitionKeys(sourceTable.partitionKeys());
- builder.primaryKey(sourceTable.primaryKeys());
- sourceTable
- .options()
- .forEach(
- (k, v) -> {
- if (k.equalsIgnoreCase(BUCKET.key())
- || k.equalsIgnoreCase(PATH.key())) {
- return;
- }
- builder.option(k, v);
- });
-
- if (sourceTable.primaryKeys().isEmpty()) {
- // for append table with bucket
- if (sourceTable.options().containsKey(BUCKET_KEY.key())) {
- builder.option(BUCKET.key(),
sourceTable.options().get(BUCKET.key()));
- builder.option(BUCKET_KEY.key(),
sourceTable.options().get(BUCKET_KEY.key()));
- }
- } else {
- // for primary key table, only postpone bucket supports clone
- builder.option(BUCKET.key(), "-2");
- }
-
- targetCatalog.createTable(tuple.f1, builder.build(), true);
-
PartitionPredicate predicate =
getPartitionPredicate(
whereSql,
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneExtractor.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneExtractor.java
index 51ec34bced..c8573f1a0e 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneExtractor.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneExtractor.java
@@ -57,6 +57,8 @@ public interface HiveCloneExtractor {
Map<String, String> extractOptions(Table table);
+ boolean supportCloneSplits(String format);
+
List<HiveCloneExtractor> EXTRACTORS =
FactoryUtil.discoverFactories(
HiveCloneExtractor.class.getClassLoader(),
HiveCloneExtractor.class);
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
index 433d536347..2672aaa450 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.slf4j.Logger;
@@ -56,6 +57,8 @@ public class HiveCloneUtils {
public static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
p -> !p.getPath().getName().startsWith("_") &&
!p.getPath().getName().startsWith(".");
+ public static final String SUPPORT_CLONE_SPLITS = "support.clone.splits";
+
public static Map<String, String> getDatabaseOptions(
HiveCatalog hiveCatalog, String databaseName) throws Exception {
IMetaStoreClient client = hiveCatalog.getHmsClient();
@@ -186,14 +189,28 @@ public class HiveCloneUtils {
predicate);
}
- private static String parseFormat(StorageDescriptor storageDescriptor) {
- String serder = storageDescriptor.getSerdeInfo().toString();
- if (serder.contains("avro")) {
+ private static String parseFormat(StorageDescriptor sd) {
+ SerDeInfo serdeInfo = sd.getSerdeInfo();
+ if (serdeInfo == null) {
+ return null;
+ }
+ String serLib =
+ serdeInfo.getSerializationLib() == null
+ ? ""
+ : serdeInfo.getSerializationLib().toLowerCase();
+ String inputFormat = sd.getInputFormat() == null ? "" :
sd.getInputFormat();
+ if (serLib.contains("avro")) {
return "avro";
- } else if (serder.contains("parquet")) {
+ } else if (serLib.contains("parquet")) {
return "parquet";
- } else if (serder.contains("orc")) {
+ } else if (serLib.contains("orc")) {
return "orc";
+ } else if (inputFormat.contains("Text")) {
+ if (serLib.contains("json")) {
+ return "json";
+ } else {
+ return "csv";
+ }
}
return null;
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveTableCloneExtractor.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveTableCloneExtractor.java
index d363be73ad..0d3baf2942 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveTableCloneExtractor.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveTableCloneExtractor.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.migrate.FileMetaUtils;
import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FormatTable;
import org.apache.paimon.types.RowType;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -51,6 +52,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.FILE_COMPRESSION;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.hive.clone.HiveCloneUtils.HIDDEN_PATH_FILTER;
+import static org.apache.paimon.hive.clone.HiveCloneUtils.SUPPORT_CLONE_SPLITS;
import static org.apache.paimon.hive.clone.HiveCloneUtils.parseFormat;
/** A {@link HiveCloneExtractor} for hive tables. */
@@ -110,6 +112,12 @@ public class HiveTableCloneExtractor implements
HiveCloneExtractor {
}
String format = parseFormat(table);
+ if (supportCloneSplits(format)) {
+ Map<String, String> cloneSplitsOptions =
getOptionsWhenCloneSplits(table, format);
+ paimonOptions.putAll(cloneSplitsOptions);
+ return paimonOptions;
+ }
+
paimonOptions.put(FILE_FORMAT.key(), format);
Map<String, String> formatOptions = getIdentifierPrefixOptions(format,
hiveTableOptions);
Map<String, String> sdFormatOptions =
@@ -124,6 +132,16 @@ public class HiveTableCloneExtractor implements
HiveCloneExtractor {
return paimonOptions;
}
+ @Override
+ public boolean supportCloneSplits(String format) {
+ for (FormatTable.Format supportFormat : FormatTable.Format.values()) {
+ if (supportFormat.name().equalsIgnoreCase(format)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private static List<HivePartitionFiles> listFromPureHiveTable(
IMetaStoreClient client,
Identifier identifier,
@@ -216,4 +234,18 @@ public class HiveTableCloneExtractor implements
HiveCloneExtractor {
}
return result;
}
+
+ public static Map<String, String> getOptionsWhenCloneSplits(Table table,
String format) {
+ Map<String, String> result = new HashMap<>();
+ if (FormatTable.Format.JSON.name().equalsIgnoreCase(format)) {
+ result.put(FILE_FORMAT.key(),
FormatTable.Format.PARQUET.name().toLowerCase());
+ } else if (FormatTable.Format.CSV.name().equalsIgnoreCase(format)) {
+ result.put(FILE_FORMAT.key(),
FormatTable.Format.PARQUET.name().toLowerCase());
+ } else {
+ result.put(FILE_FORMAT.key(), format);
+ }
+ // only for clone
+ result.put(SUPPORT_CLONE_SPLITS, "true");
+ return result;
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
index d6ff369c1b..9c49ee92d5 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
@@ -826,6 +826,126 @@ public class CloneActionITCase extends ActionITCaseBase {
Assertions.assertThatList(actualDB2Tables).containsExactlyInAnyOrderElementsOf(db2Tables);
}
+ @Test
+ public void testMigrateCsvTable() throws Exception {
+ String format = "textfile";
+ String dbName = "hivedb" + StringUtils.randomNumericString(10);
+ String tableName = "hivetable" + StringUtils.randomNumericString(10);
+
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+ tEnv.useCatalog("HIVE");
+ tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql("CREATE DATABASE " + dbName);
+ sql(
+ tEnv,
+ "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3
INT) "
+ + "ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' STORED AS %s ",
+ dbName,
+ tableName,
+ format);
+ sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));
+
+ tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
+ tEnv.useCatalog("PAIMON_GE");
+
+ List<Row> r1 = sql(tEnv, "SELECT * FROM %s.%s", dbName, tableName);
+
+ tEnv.executeSql(
+ "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '"
+ warehouse + "')");
+ tEnv.useCatalog("PAIMON");
+ tEnv.executeSql("CREATE DATABASE test");
+
+ List<String> args =
+ new ArrayList<>(
+ Arrays.asList(
+ "clone",
+ "--database",
+ dbName,
+ "--table",
+ tableName,
+ "--catalog_conf",
+ "metastore=hive",
+ "--catalog_conf",
+ "uri=thrift://localhost:" + PORT,
+ "--target_database",
+ "test",
+ "--target_table",
+ "test_table",
+ "--target_catalog_conf",
+ "warehouse=" + warehouse));
+
+ createAction(CloneAction.class, args).run();
+ FileStoreTable paimonTable =
+ paimonTable(tEnv, "PAIMON", Identifier.create("test",
"test_table"));
+
+ assertThat(paimonTable.partitionKeys()).containsExactly("id2", "id3");
+
+ List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
+ Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+ }
+
+ @Test
+ public void testMigrateJsonTable() throws Exception {
+ String format = "textfile";
+ String dbName = "hivedb" + StringUtils.randomNumericString(10);
+ String tableName = "hivetable" + StringUtils.randomNumericString(10);
+
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+ tEnv.useCatalog("HIVE");
+ tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql("CREATE DATABASE " + dbName);
+ sql(
+ tEnv,
+ "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3
INT)"
+ + "ROW FORMAT SERDE
'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS %s ",
+ dbName,
+ tableName,
+ format);
+ sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));
+
+ tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
+ tEnv.useCatalog("PAIMON_GE");
+
+ List<Row> r1 = sql(tEnv, "SELECT * FROM %s.%s", dbName, tableName);
+
+ tEnv.executeSql(
+ "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '"
+ warehouse + "')");
+ tEnv.useCatalog("PAIMON");
+ tEnv.executeSql("CREATE DATABASE test");
+
+ List<String> args =
+ new ArrayList<>(
+ Arrays.asList(
+ "clone",
+ "--database",
+ dbName,
+ "--table",
+ tableName,
+ "--catalog_conf",
+ "metastore=hive",
+ "--catalog_conf",
+ "uri=thrift://localhost:" + PORT,
+ "--target_database",
+ "test",
+ "--target_table",
+ "test_table",
+ "--target_catalog_conf",
+ "warehouse=" + warehouse));
+
+ createAction(CloneAction.class, args).run();
+ FileStoreTable paimonTable =
+ paimonTable(tEnv, "PAIMON", Identifier.create("test",
"test_table"));
+
+ assertThat(paimonTable.partitionKeys()).containsExactly("id2", "id3");
+
+ List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
+ Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+ }
+
private String[] ddls(String format) {
// has primary key
String ddl0 =
@@ -893,14 +1013,14 @@ public class CloneActionITCase extends ActionITCaseBase {
private String randomFormat() {
ThreadLocalRandom random = ThreadLocalRandom.current();
int i = random.nextInt(3);
- String[] formats = new String[] {"orc", "parquet", "avro"};
+ String[] formats = new String[] {"orc", "parquet", "avro", "textfile"};
return formats[i];
}
private String randomFormat(String excludedFormat) {
ThreadLocalRandom random = ThreadLocalRandom.current();
int i = random.nextInt(3);
- String[] formats = new String[] {"orc", "parquet", "avro"};
+ String[] formats = new String[] {"orc", "parquet", "avro", "textfile"};
if (Objects.equals(excludedFormat, formats[i])) {
return formats[(i + 1) % 3];
}
diff --git
a/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiHiveCloneExtractor.java
b/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiHiveCloneExtractor.java
index 9eacb127bf..bd353a240b 100644
---
a/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiHiveCloneExtractor.java
+++
b/paimon-hudi/src/main/java/org/apache/paimon/hudi/HudiHiveCloneExtractor.java
@@ -112,6 +112,11 @@ public class HudiHiveCloneExtractor extends
HiveTableCloneExtractor {
}
}
+ @Override
+ public boolean supportCloneSplits(String format) {
+ return false;
+ }
+
private static void checkTableType(Map<String, String> conf) {
String type = conf.getOrDefault("table.type",
HoodieTableType.COPY_ON_WRITE.name());
checkArgument(
diff --git
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergHiveCloneExtractor.java
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergHiveCloneExtractor.java
index cea2948f14..920dfcf3df 100644
---
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergHiveCloneExtractor.java
+++
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergHiveCloneExtractor.java
@@ -194,6 +194,11 @@ public class IcebergHiveCloneExtractor extends
HiveTableCloneExtractor {
return paimonOptions;
}
+ @Override
+ public boolean supportCloneSplits(String format) {
+ return false;
+ }
+
private HivePartitionFiles toHivePartitionFiles(List<DataFile> dataFiles,
BinaryRow partition) {
List<org.apache.paimon.fs.Path> paths = new
ArrayList<>(dataFiles.size());
List<Long> fileSizes = new ArrayList<>(dataFiles.size());