This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b377440410 [hive][clone] support clone file format including options
(#5575)
b377440410 is described below
commit b377440410d3074e856cb8fb7a0b9f8359da15af
Author: shyjsarah <[email protected]>
AuthorDate: Thu May 8 09:59:41 2025 +0800
[hive][clone] support clone file format including options (#5575)
---
.../paimon/flink/clone/ListCloneFilesFunction.java | 9 ++++
.../apache/paimon/hive/migrate/HiveCloneUtils.java | 53 +++++++++++++++++++++
.../paimon/hive/procedure/CloneActionITCase.java | 55 +++++++++++++++-------
3 files changed, 101 insertions(+), 16 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
index f41a743536..8f0592302f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
@@ -43,7 +43,9 @@ import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -136,6 +138,13 @@ public class ListCloneFilesFunction
"Can not clone data to existed paimon table which bucket is
not -1. Existed paimon table is "
+ existedTable.name());
+ // check format
+ checkState(
+ Objects.equals(
+ sourceSchema.options().get(FILE_FORMAT.key()),
+ existedTable.coreOptions().formatType()),
+ "source table format is not compatible with existed paimon
table format.");
+
// check partition keys
List<String> sourcePartitionFields = sourceSchema.partitionKeys();
List<String> existedPartitionFields = existedSchema.partitionKeys();
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
index 359b628b3f..6f88cd32ea 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
@@ -49,9 +49,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.FILE_COMPRESSION;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
/** Utils for cloning Hive table to Paimon table. */
@@ -122,6 +125,20 @@ public class HiveCloneUtils {
paimonOptions.put("hive.comment", hiveTableOptions.get("comment"));
}
+ String format = parseFormat(hiveTable);
+ paimonOptions.put(FILE_FORMAT.key(), format);
+ Map<String, String> formatOptions = getIdentifierPrefixOptions(format,
hiveTableOptions);
+ Map<String, String> sdFormatOptions =
+ getIdentifierPrefixOptions(
+ format,
hiveTable.getSd().getSerdeInfo().getParameters());
+ formatOptions.putAll(sdFormatOptions);
+ paimonOptions.putAll(formatOptions);
+
+ String compression = parseCompression(hiveTable, format,
formatOptions);
+ if (compression != null) {
+ paimonOptions.put(FILE_COMPRESSION.key(), compression);
+ }
+
Schema.Builder schemaBuilder =
Schema.newBuilder()
.comment(hiveTableOptions.get("comment"))
@@ -226,4 +243,40 @@ public class HiveCloneUtils {
}
return format;
}
+
+ private static String parseCompression(StorageDescriptor
storageDescriptor) {
+ Map<String, String> serderParams =
storageDescriptor.getSerdeInfo().getParameters();
+ if (serderParams.containsKey("compression")) {
+ return serderParams.get("compression");
+ }
+ return null;
+ }
+
+ private static String parseCompression(
+ Table table, String format, Map<String, String> formatOptions) {
+ String compression = null;
+ if (Objects.equals(format, "avro")) {
+ compression = formatOptions.getOrDefault("avro.codec",
parseCompression(table.getSd()));
+ } else if (Objects.equals(format, "parquet")) {
+ compression =
+ formatOptions.getOrDefault(
+ "parquet.compression",
parseCompression(table.getSd()));
+ } else if (Objects.equals(format, "orc")) {
+ compression =
+ formatOptions.getOrDefault("orc.compress",
parseCompression(table.getSd()));
+ }
+ return compression;
+ }
+
+ public static Map<String, String> getIdentifierPrefixOptions(
+ String formatIdentifier, Map<String, String> options) {
+ Map<String, String> result = new HashMap<>();
+ String prefix = formatIdentifier.toLowerCase() + ".";
+ for (String key : options.keySet()) {
+ if (key.toLowerCase().startsWith(prefix)) {
+ result.put(prefix + key.substring(prefix.length()),
options.get(key));
+ }
+ }
+ return result;
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
index 83b948767f..056fdaa76f 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
@@ -43,6 +43,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
@@ -407,7 +408,7 @@ public class CloneActionITCase extends ActionITCaseBase {
tEnv.executeSql("CREATE DATABASE test");
// create a paimon table with the same name
int ddlIndex = ThreadLocalRandom.current().nextInt(0, 4);
- tEnv.executeSql(ddls()[ddlIndex]);
+ tEnv.executeSql(ddls(format)[ddlIndex]);
List<String> args =
new ArrayList<>(
@@ -428,7 +429,7 @@ public class CloneActionITCase extends ActionITCaseBase {
"--target_catalog_conf",
"warehouse=" + warehouse));
- if (ddlIndex < 3) {
+ if (ddlIndex < 4) {
assertThatThrownBy(() -> createAction(CloneAction.class,
args).run())
.rootCause()
.hasMessageContaining(exceptionMsg()[ddlIndex]);
@@ -499,31 +500,48 @@ public class CloneActionITCase extends ActionITCaseBase {
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}
- private String[] ddls() {
+ private String[] ddls(String format) {
// has primary key
String ddl0 =
"CREATE TABLE test.test_table (id string, id2 int, id3 int,
PRIMARY KEY (id, id2, id3) NOT ENFORCED) "
- + "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
+ + "PARTITIONED BY (id2, id3) with ('bucket' = '-1',
'file.format' = '"
+ + format
+ + ");";
// has different partition keys
String ddl1 =
"CREATE TABLE test.test_table (id string, id2 int, id3 int) "
- + "PARTITIONED BY (id, id3) with ('bucket' = '-1');";
+ + "PARTITIONED BY (id, id3) with ('bucket' = '-1',
'file.format' = '"
+ + format
+ + "');";
// size of fields is different
String ddl2 =
"CREATE TABLE test.test_table (id2 int, id3 int) "
- + "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
- // normal
+ + "PARTITIONED BY (id2, id3) with ('bucket' = '-1',
'file.format' = '"
+ + format
+ + "');";
+
+ // different format
String ddl3 =
+ "CREATE TABLE test.test_table (id2 int, id3 int) "
+ + "PARTITIONED BY (id2, id3) with ('bucket' = '-1',
'file.format' = '"
+ + randomFormat(format)
+ + "');";
+
+ // normal
+ String ddl4 =
"CREATE TABLE test.test_table (id string, id2 int, id3 int) "
- + "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
- return new String[] {ddl0, ddl1, ddl2, ddl3};
+ + "PARTITIONED BY (id2, id3) with ('bucket' = '-1',
'file.format' = '"
+ + format
+ + "');";
+ return new String[] {ddl0, ddl1, ddl2, ddl3, ddl4};
}
private String[] exceptionMsg() {
return new String[] {
"Can not clone data to existed paimon table which has primary
keys",
"source table partition keys is not compatible with existed paimon
table partition keys.",
- "source table partition keys is not compatible with existed paimon
table partition keys."
+ "source table partition keys is not compatible with existed paimon
table partition keys.",
+ "source table format is not compatible with existed paimon table
format."
};
}
@@ -549,13 +567,18 @@ public class CloneActionITCase extends ActionITCaseBase {
private String randomFormat() {
ThreadLocalRandom random = ThreadLocalRandom.current();
int i = random.nextInt(3);
- if (i == 0) {
- return "orc";
- } else if (i == 1) {
- return "parquet";
- } else {
- return "avro";
+ String[] formats = new String[] {"orc", "parquet", "avro"};
+ return formats[i];
+ }
+
+ private String randomFormat(String excludedFormat) {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int i = random.nextInt(3);
+ String[] formats = new String[] {"orc", "parquet", "avro"};
+ if (Objects.equals(excludedFormat, formats[i])) {
+ return formats[(i + 1) % 3];
}
+ return formats[i];
}
protected FileStoreTable paimonTable(