This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 56a97e91aa [clone] support specifying target prefer file format (#6273)
56a97e91aa is described below
commit 56a97e91aa09cc1f07ebd4d144f733920136c334
Author: shyjsarah <[email protected]>
AuthorDate: Wed Sep 17 13:17:12 2025 +0800
[clone] support specifying target prefer file format (#6273)
---
.../apache/paimon/flink/action/CloneAction.java | 17 +++++-
.../paimon/flink/action/CloneActionFactory.java | 4 +-
.../paimon/flink/clone/CloneFileFormatUtils.java | 44 +++++++++++++++
.../paimon/flink/clone/CloneHiveTableUtils.java | 5 +-
.../paimon/flink/clone/ClonePaimonTableUtils.java | 5 +-
.../clone/schema/CloneHiveSchemaFunction.java | 14 ++++-
.../clone/schema/ClonePaimonSchemaFunction.java | 12 +++-
.../paimon/flink/procedure/CloneProcedure.java | 6 ++
.../paimon/hive/procedure/CloneActionITCase.java | 64 ++++++++++++++++++++++
9 files changed, 162 insertions(+), 9 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
index 920d8414ef..0280e9badf 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
@@ -20,9 +20,11 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.catalog.CachingCatalog;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.clone.CloneFileFormatUtils;
import org.apache.paimon.flink.clone.CloneHiveTableUtils;
import org.apache.paimon.flink.clone.ClonePaimonTableUtils;
import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.utils.StringUtils;
import javax.annotation.Nullable;
@@ -44,6 +46,7 @@ public class CloneAction extends ActionBase {
@Nullable private final String whereSql;
@Nullable private final List<String> includedTables;
@Nullable private final List<String> excludedTables;
+ @Nullable private final String preferFileFormat;
private final String cloneFrom;
public CloneAction(
@@ -57,6 +60,7 @@ public class CloneAction extends ActionBase {
@Nullable String whereSql,
@Nullable List<String> includedTables,
@Nullable List<String> excludedTables,
+ @Nullable String preferFileFormat,
String cloneFrom) {
super(sourceCatalogConfig);
@@ -84,6 +88,11 @@ public class CloneAction extends ActionBase {
this.whereSql = whereSql;
this.includedTables = includedTables;
this.excludedTables = excludedTables;
+ CloneFileFormatUtils.validateFileFormat(preferFileFormat);
+ this.preferFileFormat =
+ StringUtils.isNullOrWhitespaceOnly(preferFileFormat)
+ ? preferFileFormat
+ : preferFileFormat.toLowerCase();
this.cloneFrom = cloneFrom;
}
@@ -103,7 +112,8 @@ public class CloneAction extends ActionBase {
parallelism,
whereSql,
includedTables,
- excludedTables);
+ excludedTables,
+ preferFileFormat);
break;
case "paimon":
ClonePaimonTableUtils.build(
@@ -118,7 +128,8 @@ public class CloneAction extends ActionBase {
parallelism,
whereSql,
includedTables,
- excludedTables);
+ excludedTables,
+ preferFileFormat);
break;
}
}
@@ -128,4 +139,6 @@ public class CloneAction extends ActionBase {
build();
execute("Clone job");
}
+
+ private void validateFileFormat(String preferFileFormat) {}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
index 5c25d7583c..c0258ec70f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
@@ -38,6 +38,7 @@ public class CloneActionFactory implements ActionFactory {
private static final String WHERE = "where";
private static final String INCLUDED_TABLES = "included_tables";
private static final String EXCLUDED_TABLES = "excluded_tables";
+ private static final String PREFER_FILE_FORMAT = "prefer_file_format";
private static final String CLONE_FROM = "clone_from";
@Override
@@ -74,7 +75,7 @@ public class CloneActionFactory implements ActionFactory {
if (StringUtils.isNullOrWhitespaceOnly(cloneFrom)) {
cloneFrom = "hive";
}
-
+ String preferFileFormat = params.get(PREFER_FILE_FORMAT);
CloneAction cloneAction =
new CloneAction(
params.get(DATABASE),
@@ -87,6 +88,7 @@ public class CloneActionFactory implements ActionFactory {
params.get(WHERE),
includedTables,
excludedTables,
+ preferFileFormat,
cloneFrom);
return Optional.of(cloneAction);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileFormatUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileFormatUtils.java
new file mode 100644
index 0000000000..b1c73fce7d
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileFormatUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.flink.action.CloneAction;
+import org.apache.paimon.utils.StringUtils;
+
+/** Utils for file format in {@link CloneAction}. */
+public class CloneFileFormatUtils {
+
+ public static void validateFileFormat(String fileFormat) {
+ if (StringUtils.isNullOrWhitespaceOnly(fileFormat)) {
+ return;
+ }
+ String fileFormatLower = fileFormat.toLowerCase();
+ String[] supportedFileFormat = new String[] {"parquet", "orc", "avro"};
+ for (String supportedFormat : supportedFileFormat) {
+ if (fileFormatLower.equals(supportedFormat)) {
+ return;
+ }
+ }
+ throw new IllegalArgumentException(
+ "Unsupported file format: "
+ + fileFormat
+ + ". Supported file formats are: "
+ + String.join(", ", supportedFileFormat));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
index fae777f579..c32c9b53c5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
@@ -155,7 +155,8 @@ public class CloneHiveTableUtils {
int parallelism,
@Nullable String whereSql,
@Nullable List<String> includedTables,
- @Nullable List<String> excludedTables)
+ @Nullable List<String> excludedTables,
+ @Nullable String preferFileFormat)
throws Exception {
// list source tables
DataStream<Tuple2<Identifier, Identifier>> source =
@@ -178,7 +179,7 @@ public class CloneHiveTableUtils {
partitionedSource
.process(
new CloneHiveSchemaFunction(
- sourceCatalogConfig,
targetCatalogConfig))
+ sourceCatalogConfig,
targetCatalogConfig, preferFileFormat))
.name("Clone Schema")
.setParallelism(parallelism);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
index 5a66da2c66..a4984e69ed 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
@@ -137,7 +137,8 @@ public class ClonePaimonTableUtils {
int parallelism,
@Nullable String whereSql,
@Nullable List<String> includedTables,
- @Nullable List<String> excludedTables)
+ @Nullable List<String> excludedTables,
+ @Nullable String preferFileFormat)
throws Exception {
// list source tables
DataStream<Tuple2<Identifier, Identifier>> source =
@@ -160,7 +161,7 @@ public class ClonePaimonTableUtils {
partitionedSource
.process(
new ClonePaimonSchemaFunction(
- sourceCatalogConfig,
targetCatalogConfig))
+ sourceCatalogConfig,
targetCatalogConfig, preferFileFormat))
.name("Clone Schema")
.setParallelism(parallelism);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
index a1d7f374f3..1dceffba4a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
@@ -29,6 +29,7 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.StringUtils;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,6 +39,8 @@ import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -59,14 +62,18 @@ public class CloneHiveSchemaFunction
protected final Map<String, String> sourceCatalogConfig;
protected final Map<String, String> targetCatalogConfig;
+ @Nullable protected final String preferFileFormat;
protected transient HiveCatalog hiveCatalog;
protected transient Catalog targetCatalog;
public CloneHiveSchemaFunction(
- Map<String, String> sourceCatalogConfig, Map<String, String>
targetCatalogConfig) {
+ Map<String, String> sourceCatalogConfig,
+ Map<String, String> targetCatalogConfig,
+ @Nullable String preferFileFormat) {
this.sourceCatalogConfig = sourceCatalogConfig;
this.targetCatalogConfig = targetCatalogConfig;
+ this.preferFileFormat = preferFileFormat;
}
/**
@@ -106,6 +113,11 @@ public class CloneHiveSchemaFunction
boolean supportCloneSplits =
Boolean.parseBoolean(options.get(HiveCloneUtils.SUPPORT_CLONE_SPLITS));
options.remove(HiveCloneUtils.SUPPORT_CLONE_SPLITS);
+ if (supportCloneSplits) {
+ if (!StringUtils.isNullOrWhitespaceOnly(preferFileFormat)) {
+ options.put(CoreOptions.FILE_FORMAT.key(), preferFileFormat);
+ }
+ }
// only support Hive to unaware-bucket table now
options.put(CoreOptions.BUCKET.key(), "-1");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
index 23b16560e5..73b560f62b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
@@ -18,11 +18,13 @@
package org.apache.paimon.flink.clone.schema;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.StringUtils;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -45,14 +47,18 @@ public class ClonePaimonSchemaFunction
private final Map<String, String> sourceCatalogConfig;
private final Map<String, String> targetCatalogConfig;
+ private final String preferFileFormat;
private transient Catalog sourceCatalog;
private transient Catalog targetCatalog;
public ClonePaimonSchemaFunction(
- Map<String, String> sourceCatalogConfig, Map<String, String>
targetCatalogConfig) {
+ Map<String, String> sourceCatalogConfig,
+ Map<String, String> targetCatalogConfig,
+ String preferFileFormat) {
this.sourceCatalogConfig = sourceCatalogConfig;
this.targetCatalogConfig = targetCatalogConfig;
+ this.preferFileFormat = preferFileFormat;
}
/**
@@ -111,6 +117,10 @@ public class ClonePaimonSchemaFunction
builder.option(BUCKET.key(), "-2");
}
+ if (!StringUtils.isNullOrWhitespaceOnly(preferFileFormat)) {
+ builder.option(CoreOptions.FILE_FORMAT.key(), preferFileFormat);
+ }
+
targetCatalog.createTable(tuple.f1, builder.build(), true);
CloneSchemaInfo cloneSchemaInfo = new CloneSchemaInfo(tuple, true);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
index 67d06d3187..1f60707c7c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
@@ -66,6 +66,10 @@ public class CloneProcedure extends ProcedureBase {
name = "excluded_tables",
type = @DataTypeHint("STRING"),
isOptional = true),
+ @ArgumentHint(
+ name = "prefer_file_format",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
@ArgumentHint(
name = "clone_from",
type = @DataTypeHint("STRING"),
@@ -83,6 +87,7 @@ public class CloneProcedure extends ProcedureBase {
String where,
String includedTablesStr,
String excludedTablesStr,
+ String preferFileFormat,
String cloneFrom)
throws Exception {
Map<String, String> sourceCatalogConfig =
@@ -112,6 +117,7 @@ public class CloneProcedure extends ProcedureBase {
where,
includedTables,
excludedTables,
+ preferFileFormat,
cloneFrom);
return execute(procedureContext, action, "Clone Job");
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
index 9c49ee92d5..76dc132f64 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.hive.procedure;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
@@ -53,6 +54,7 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
/** Tests for {@link CloneAction}. */
public class CloneActionITCase extends ActionITCaseBase {
@@ -946,6 +948,68 @@ public class CloneActionITCase extends ActionITCaseBase {
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}
+ @Test
+ public void testMigrateWithPreferFileFormat() throws Exception {
+ String format = "orc";
+ String preferFileFormat = "parquet";
+ String dbName = "hivedb" + StringUtils.randomNumericString(10);
+ String tableName = "hivetable" + StringUtils.randomNumericString(10);
+
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+ tEnv.useCatalog("HIVE");
+ tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql("CREATE DATABASE " + dbName);
+ sql(
+ tEnv,
+ "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3
INT)"
+ + "STORED AS %s ",
+ dbName,
+ tableName,
+ format);
+ sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));
+
+ tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
+ tEnv.useCatalog("PAIMON_GE");
+
+ List<Row> r1 = sql(tEnv, "SELECT * FROM %s.%s", dbName, tableName);
+
+ tEnv.executeSql(
+ "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = '"
+ warehouse + "')");
+ tEnv.useCatalog("PAIMON");
+ tEnv.executeSql("CREATE DATABASE test");
+
+ List<String> args =
+ new ArrayList<>(
+ Arrays.asList(
+ "clone",
+ "--database",
+ dbName,
+ "--table",
+ tableName,
+ "--catalog_conf",
+ "metastore=hive",
+ "--catalog_conf",
+ "uri=thrift://localhost:" + PORT,
+ "--target_database",
+ "test",
+ "--target_table",
+ "test_table",
+ "--target_catalog_conf",
+ "warehouse=" + warehouse,
+ "--prefer_file_format",
+ preferFileFormat));
+
+ createAction(CloneAction.class, args).run();
+ FileStoreTable paimonTable =
+ paimonTable(tEnv, "PAIMON", Identifier.create("test",
"test_table"));
+ assertEquals(paimonTable.options().get(CoreOptions.FILE_FORMAT.key()),
preferFileFormat);
+
+ List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
+ Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+ }
+
private String[] ddls(String format) {
// has primary key
String ddl0 =