This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 56a97e91aa [clone] support specifying target prefer file format (#6273)
56a97e91aa is described below

commit 56a97e91aa09cc1f07ebd4d144f733920136c334
Author: shyjsarah <[email protected]>
AuthorDate: Wed Sep 17 13:17:12 2025 +0800

    [clone] support specifying target prefer file format (#6273)
---
 .../apache/paimon/flink/action/CloneAction.java    | 17 +++++-
 .../paimon/flink/action/CloneActionFactory.java    |  4 +-
 .../paimon/flink/clone/CloneFileFormatUtils.java   | 44 +++++++++++++++
 .../paimon/flink/clone/CloneHiveTableUtils.java    |  5 +-
 .../paimon/flink/clone/ClonePaimonTableUtils.java  |  5 +-
 .../clone/schema/CloneHiveSchemaFunction.java      | 14 ++++-
 .../clone/schema/ClonePaimonSchemaFunction.java    | 12 +++-
 .../paimon/flink/procedure/CloneProcedure.java     |  6 ++
 .../paimon/hive/procedure/CloneActionITCase.java   | 64 ++++++++++++++++++++++
 9 files changed, 162 insertions(+), 9 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
index 920d8414ef..0280e9badf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
@@ -20,9 +20,11 @@ package org.apache.paimon.flink.action;
 
 import org.apache.paimon.catalog.CachingCatalog;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.clone.CloneFileFormatUtils;
 import org.apache.paimon.flink.clone.CloneHiveTableUtils;
 import org.apache.paimon.flink.clone.ClonePaimonTableUtils;
 import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.utils.StringUtils;
 
 import javax.annotation.Nullable;
 
@@ -44,6 +46,7 @@ public class CloneAction extends ActionBase {
     @Nullable private final String whereSql;
     @Nullable private final List<String> includedTables;
     @Nullable private final List<String> excludedTables;
+    @Nullable private final String preferFileFormat;
     private final String cloneFrom;
 
     public CloneAction(
@@ -57,6 +60,7 @@ public class CloneAction extends ActionBase {
             @Nullable String whereSql,
             @Nullable List<String> includedTables,
             @Nullable List<String> excludedTables,
+            @Nullable String preferFileFormat,
             String cloneFrom) {
         super(sourceCatalogConfig);
 
@@ -84,6 +88,11 @@ public class CloneAction extends ActionBase {
         this.whereSql = whereSql;
         this.includedTables = includedTables;
         this.excludedTables = excludedTables;
+        CloneFileFormatUtils.validateFileFormat(preferFileFormat);
+        this.preferFileFormat =
+                StringUtils.isNullOrWhitespaceOnly(preferFileFormat)
+                        ? preferFileFormat
+                        : preferFileFormat.toLowerCase();
         this.cloneFrom = cloneFrom;
     }
 
@@ -103,7 +112,8 @@ public class CloneAction extends ActionBase {
                         parallelism,
                         whereSql,
                         includedTables,
-                        excludedTables);
+                        excludedTables,
+                        preferFileFormat);
                 break;
             case "paimon":
                 ClonePaimonTableUtils.build(
@@ -118,7 +128,8 @@ public class CloneAction extends ActionBase {
                         parallelism,
                         whereSql,
                         includedTables,
-                        excludedTables);
+                        excludedTables,
+                        preferFileFormat);
                 break;
         }
     }
@@ -128,4 +139,6 @@ public class CloneAction extends ActionBase {
         build();
         execute("Clone job");
     }
+
+    private void validateFileFormat(String preferFileFormat) {}
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
index 5c25d7583c..c0258ec70f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
@@ -38,6 +38,7 @@ public class CloneActionFactory implements ActionFactory {
     private static final String WHERE = "where";
     private static final String INCLUDED_TABLES = "included_tables";
     private static final String EXCLUDED_TABLES = "excluded_tables";
+    private static final String PREFER_FILE_FORMAT = "prefer_file_format";
     private static final String CLONE_FROM = "clone_from";
 
     @Override
@@ -74,7 +75,7 @@ public class CloneActionFactory implements ActionFactory {
         if (StringUtils.isNullOrWhitespaceOnly(cloneFrom)) {
             cloneFrom = "hive";
         }
-
+        String preferFileFormat = params.get(PREFER_FILE_FORMAT);
         CloneAction cloneAction =
                 new CloneAction(
                         params.get(DATABASE),
@@ -87,6 +88,7 @@ public class CloneActionFactory implements ActionFactory {
                         params.get(WHERE),
                         includedTables,
                         excludedTables,
+                        preferFileFormat,
                         cloneFrom);
 
         return Optional.of(cloneAction);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileFormatUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileFormatUtils.java
new file mode 100644
index 0000000000..b1c73fce7d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileFormatUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.flink.action.CloneAction;
+import org.apache.paimon.utils.StringUtils;
+
+/** Utils for file format in {@link CloneAction}. */
+public class CloneFileFormatUtils {
+
+    public static void validateFileFormat(String fileFormat) {
+        if (StringUtils.isNullOrWhitespaceOnly(fileFormat)) {
+            return;
+        }
+        String fileFormatLower = fileFormat.toLowerCase();
+        String[] supportedFileFormat = new String[] {"parquet", "orc", "avro"};
+        for (String supportedFormat : supportedFileFormat) {
+            if (fileFormatLower.equals(supportedFormat)) {
+                return;
+            }
+        }
+        throw new IllegalArgumentException(
+                "Unsupported file format: "
+                        + fileFormat
+                        + ". Supported file formats are: "
+                        + String.join(", ", supportedFileFormat));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
index fae777f579..c32c9b53c5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
@@ -155,7 +155,8 @@ public class CloneHiveTableUtils {
             int parallelism,
             @Nullable String whereSql,
             @Nullable List<String> includedTables,
-            @Nullable List<String> excludedTables)
+            @Nullable List<String> excludedTables,
+            @Nullable String preferFileFormat)
             throws Exception {
         // list source tables
         DataStream<Tuple2<Identifier, Identifier>> source =
@@ -178,7 +179,7 @@ public class CloneHiveTableUtils {
                 partitionedSource
                         .process(
                                 new CloneHiveSchemaFunction(
-                                        sourceCatalogConfig, 
targetCatalogConfig))
+                                        sourceCatalogConfig, 
targetCatalogConfig, preferFileFormat))
                         .name("Clone Schema")
                         .setParallelism(parallelism);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
index 5a66da2c66..a4984e69ed 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
@@ -137,7 +137,8 @@ public class ClonePaimonTableUtils {
             int parallelism,
             @Nullable String whereSql,
             @Nullable List<String> includedTables,
-            @Nullable List<String> excludedTables)
+            @Nullable List<String> excludedTables,
+            @Nullable String preferFileFormat)
             throws Exception {
         // list source tables
         DataStream<Tuple2<Identifier, Identifier>> source =
@@ -160,7 +161,7 @@ public class ClonePaimonTableUtils {
                 partitionedSource
                         .process(
                                 new ClonePaimonSchemaFunction(
-                                        sourceCatalogConfig, 
targetCatalogConfig))
+                                        sourceCatalogConfig, 
targetCatalogConfig, preferFileFormat))
                         .name("Clone Schema")
                         .setParallelism(parallelism);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
index a1d7f374f3..1dceffba4a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
@@ -29,6 +29,7 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,6 +39,8 @@ import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -59,14 +62,18 @@ public class CloneHiveSchemaFunction
 
     protected final Map<String, String> sourceCatalogConfig;
     protected final Map<String, String> targetCatalogConfig;
+    @Nullable protected final String preferFileFormat;
 
     protected transient HiveCatalog hiveCatalog;
     protected transient Catalog targetCatalog;
 
     public CloneHiveSchemaFunction(
-            Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
+            Map<String, String> sourceCatalogConfig,
+            Map<String, String> targetCatalogConfig,
+            @Nullable String preferFileFormat) {
         this.sourceCatalogConfig = sourceCatalogConfig;
         this.targetCatalogConfig = targetCatalogConfig;
+        this.preferFileFormat = preferFileFormat;
     }
 
     /**
@@ -106,6 +113,11 @@ public class CloneHiveSchemaFunction
         boolean supportCloneSplits =
                 
Boolean.parseBoolean(options.get(HiveCloneUtils.SUPPORT_CLONE_SPLITS));
         options.remove(HiveCloneUtils.SUPPORT_CLONE_SPLITS);
+        if (supportCloneSplits) {
+            if (!StringUtils.isNullOrWhitespaceOnly(preferFileFormat)) {
+                options.put(CoreOptions.FILE_FORMAT.key(), preferFileFormat);
+            }
+        }
 
         // only support Hive to unaware-bucket table now
         options.put(CoreOptions.BUCKET.key(), "-1");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
index 23b16560e5..73b560f62b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
@@ -18,11 +18,13 @@
 
 package org.apache.paimon.flink.clone.schema;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -45,14 +47,18 @@ public class ClonePaimonSchemaFunction
 
     private final Map<String, String> sourceCatalogConfig;
     private final Map<String, String> targetCatalogConfig;
+    private final String preferFileFormat;
 
     private transient Catalog sourceCatalog;
     private transient Catalog targetCatalog;
 
     public ClonePaimonSchemaFunction(
-            Map<String, String> sourceCatalogConfig, Map<String, String> 
targetCatalogConfig) {
+            Map<String, String> sourceCatalogConfig,
+            Map<String, String> targetCatalogConfig,
+            String preferFileFormat) {
         this.sourceCatalogConfig = sourceCatalogConfig;
         this.targetCatalogConfig = targetCatalogConfig;
+        this.preferFileFormat = preferFileFormat;
     }
 
     /**
@@ -111,6 +117,10 @@ public class ClonePaimonSchemaFunction
             builder.option(BUCKET.key(), "-2");
         }
 
+        if (!StringUtils.isNullOrWhitespaceOnly(preferFileFormat)) {
+            builder.option(CoreOptions.FILE_FORMAT.key(), preferFileFormat);
+        }
+
         targetCatalog.createTable(tuple.f1, builder.build(), true);
 
         CloneSchemaInfo cloneSchemaInfo = new CloneSchemaInfo(tuple, true);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
index 67d06d3187..1f60707c7c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
@@ -66,6 +66,10 @@ public class CloneProcedure extends ProcedureBase {
                         name = "excluded_tables",
                         type = @DataTypeHint("STRING"),
                         isOptional = true),
+                @ArgumentHint(
+                        name = "prefer_file_format",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true),
                 @ArgumentHint(
                         name = "clone_from",
                         type = @DataTypeHint("STRING"),
@@ -83,6 +87,7 @@ public class CloneProcedure extends ProcedureBase {
             String where,
             String includedTablesStr,
             String excludedTablesStr,
+            String preferFileFormat,
             String cloneFrom)
             throws Exception {
         Map<String, String> sourceCatalogConfig =
@@ -112,6 +117,7 @@ public class CloneProcedure extends ProcedureBase {
                         where,
                         includedTables,
                         excludedTables,
+                        preferFileFormat,
                         cloneFrom);
         return execute(procedureContext, action, "Clone Job");
     }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
index 9c49ee92d5..76dc132f64 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.hive.procedure;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
@@ -53,6 +54,7 @@ import java.util.stream.Collectors;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
 
 /** Tests for {@link CloneAction}. */
 public class CloneActionITCase extends ActionITCaseBase {
@@ -946,6 +948,68 @@ public class CloneActionITCase extends ActionITCaseBase {
         Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
     }
 
+    @Test
+    public void testMigrateWithPreferFileFormat() throws Exception {
+        String format = "orc";
+        String preferFileFormat = "parquet";
+        String dbName = "hivedb" + StringUtils.randomNumericString(10);
+        String tableName = "hivetable" + StringUtils.randomNumericString(10);
+
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+        tEnv.useCatalog("HIVE");
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql("CREATE DATABASE " + dbName);
+        sql(
+                tEnv,
+                "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3 
INT)"
+                        + "STORED AS %s ",
+                dbName,
+                tableName,
+                format);
+        sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
+        tEnv.useCatalog("PAIMON_GE");
+
+        List<Row> r1 = sql(tEnv, "SELECT * FROM %s.%s", dbName, tableName);
+
+        tEnv.executeSql(
+                "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '" 
+ warehouse + "')");
+        tEnv.useCatalog("PAIMON");
+        tEnv.executeSql("CREATE DATABASE test");
+
+        List<String> args =
+                new ArrayList<>(
+                        Arrays.asList(
+                                "clone",
+                                "--database",
+                                dbName,
+                                "--table",
+                                tableName,
+                                "--catalog_conf",
+                                "metastore=hive",
+                                "--catalog_conf",
+                                "uri=thrift://localhost:" + PORT,
+                                "--target_database",
+                                "test",
+                                "--target_table",
+                                "test_table",
+                                "--target_catalog_conf",
+                                "warehouse=" + warehouse,
+                                "--prefer_file_format",
+                                preferFileFormat));
+
+        createAction(CloneAction.class, args).run();
+        FileStoreTable paimonTable =
+                paimonTable(tEnv, "PAIMON", Identifier.create("test", 
"test_table"));
+        assertEquals(paimonTable.options().get(CoreOptions.FILE_FORMAT.key()), 
preferFileFormat);
+
+        List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
+        Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+    }
+
     private String[] ddls(String format) {
         // has primary key
         String ddl0 =

Reply via email to