This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b377440410 [hive][clone] support clone file format including options 
(#5575)
b377440410 is described below

commit b377440410d3074e856cb8fb7a0b9f8359da15af
Author: shyjsarah <[email protected]>
AuthorDate: Thu May 8 09:59:41 2025 +0800

    [hive][clone] support clone file format including options (#5575)
---
 .../paimon/flink/clone/ListCloneFilesFunction.java |  9 ++++
 .../apache/paimon/hive/migrate/HiveCloneUtils.java | 53 +++++++++++++++++++++
 .../paimon/hive/procedure/CloneActionITCase.java   | 55 +++++++++++++++-------
 3 files changed, 101 insertions(+), 16 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
index f41a743536..8f0592302f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
@@ -43,7 +43,9 @@ import javax.annotation.Nullable;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
@@ -136,6 +138,13 @@ public class ListCloneFilesFunction
                 "Can not clone data to existed paimon table which bucket is 
not -1. Existed paimon table is "
                         + existedTable.name());
 
+        // check format
+        checkState(
+                Objects.equals(
+                        sourceSchema.options().get(FILE_FORMAT.key()),
+                        existedTable.coreOptions().formatType()),
+                "source table format is not compatible with existed paimon 
table format.");
+
         // check partition keys
         List<String> sourcePartitionFields = sourceSchema.partitionKeys();
         List<String> existedPartitionFields = existedSchema.partitionKeys();
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
index 359b628b3f..6f88cd32ea 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
@@ -49,9 +49,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.FILE_COMPRESSION;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
 
 /** Utils for cloning Hive table to Paimon table. */
@@ -122,6 +125,20 @@ public class HiveCloneUtils {
             paimonOptions.put("hive.comment", hiveTableOptions.get("comment"));
         }
 
+        String format = parseFormat(hiveTable);
+        paimonOptions.put(FILE_FORMAT.key(), format);
+        Map<String, String> formatOptions = getIdentifierPrefixOptions(format, 
hiveTableOptions);
+        Map<String, String> sdFormatOptions =
+                getIdentifierPrefixOptions(
+                        format, 
hiveTable.getSd().getSerdeInfo().getParameters());
+        formatOptions.putAll(sdFormatOptions);
+        paimonOptions.putAll(formatOptions);
+
+        String compression = parseCompression(hiveTable, format, 
formatOptions);
+        if (compression != null) {
+            paimonOptions.put(FILE_COMPRESSION.key(), compression);
+        }
+
         Schema.Builder schemaBuilder =
                 Schema.newBuilder()
                         .comment(hiveTableOptions.get("comment"))
@@ -226,4 +243,40 @@ public class HiveCloneUtils {
         }
         return format;
     }
+
+    private static String parseCompression(StorageDescriptor 
storageDescriptor) {
+        Map<String, String> serderParams = 
storageDescriptor.getSerdeInfo().getParameters();
+        if (serderParams.containsKey("compression")) {
+            return serderParams.get("compression");
+        }
+        return null;
+    }
+
+    private static String parseCompression(
+            Table table, String format, Map<String, String> formatOptions) {
+        String compression = null;
+        if (Objects.equals(format, "avro")) {
+            compression = formatOptions.getOrDefault("avro.codec", 
parseCompression(table.getSd()));
+        } else if (Objects.equals(format, "parquet")) {
+            compression =
+                    formatOptions.getOrDefault(
+                            "parquet.compression", 
parseCompression(table.getSd()));
+        } else if (Objects.equals(format, "orc")) {
+            compression =
+                    formatOptions.getOrDefault("orc.compress", 
parseCompression(table.getSd()));
+        }
+        return compression;
+    }
+
+    public static Map<String, String> getIdentifierPrefixOptions(
+            String formatIdentifier, Map<String, String> options) {
+        Map<String, String> result = new HashMap<>();
+        String prefix = formatIdentifier.toLowerCase() + ".";
+        for (String key : options.keySet()) {
+            if (key.toLowerCase().startsWith(prefix)) {
+                result.put(prefix + key.substring(prefix.length()), 
options.get(key));
+            }
+        }
+        return result;
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
index 83b948767f..056fdaa76f 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
@@ -43,6 +43,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -407,7 +408,7 @@ public class CloneActionITCase extends ActionITCaseBase {
         tEnv.executeSql("CREATE DATABASE test");
         // create a paimon table with the same name
         int ddlIndex = ThreadLocalRandom.current().nextInt(0, 4);
-        tEnv.executeSql(ddls()[ddlIndex]);
+        tEnv.executeSql(ddls(format)[ddlIndex]);
 
         List<String> args =
                 new ArrayList<>(
@@ -428,7 +429,7 @@ public class CloneActionITCase extends ActionITCaseBase {
                                 "--target_catalog_conf",
                                 "warehouse=" + warehouse));
 
-        if (ddlIndex < 3) {
+        if (ddlIndex < 4) {
             assertThatThrownBy(() -> createAction(CloneAction.class, 
args).run())
                     .rootCause()
                     .hasMessageContaining(exceptionMsg()[ddlIndex]);
@@ -499,31 +500,48 @@ public class CloneActionITCase extends ActionITCaseBase {
         Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
     }
 
-    private String[] ddls() {
+    private String[] ddls(String format) {
         // has primary key
         String ddl0 =
                 "CREATE TABLE test.test_table (id string, id2 int, id3 int, 
PRIMARY KEY (id, id2, id3) NOT ENFORCED) "
-                        + "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
+                        + "PARTITIONED BY (id2, id3) with ('bucket' = '-1', 
'file.format' = '"
+                        + format
+                        + ");";
         // has different partition keys
         String ddl1 =
                 "CREATE TABLE test.test_table (id string, id2 int, id3 int) "
-                        + "PARTITIONED BY (id, id3) with ('bucket' = '-1');";
+                        + "PARTITIONED BY (id, id3) with ('bucket' = '-1', 
'file.format' = '"
+                        + format
+                        + "');";
         // size of fields is different
         String ddl2 =
                 "CREATE TABLE test.test_table (id2 int, id3 int) "
-                        + "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
-        // normal
+                        + "PARTITIONED BY (id2, id3) with ('bucket' = '-1', 
'file.format' = '"
+                        + format
+                        + "');";
+
+        // different format
         String ddl3 =
+                "CREATE TABLE test.test_table (id2 int, id3 int) "
+                        + "PARTITIONED BY (id2, id3) with ('bucket' = '-1', 
'file.format' = '"
+                        + randomFormat(format)
+                        + "');";
+
+        // normal
+        String ddl4 =
                 "CREATE TABLE test.test_table (id string, id2 int, id3 int) "
-                        + "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
-        return new String[] {ddl0, ddl1, ddl2, ddl3};
+                        + "PARTITIONED BY (id2, id3) with ('bucket' = '-1', 
'file.format' = '"
+                        + format
+                        + "');";
+        return new String[] {ddl0, ddl1, ddl2, ddl3, ddl4};
     }
 
     private String[] exceptionMsg() {
         return new String[] {
             "Can not clone data to existed paimon table which has primary 
keys",
             "source table partition keys is not compatible with existed paimon 
table partition keys.",
-            "source table partition keys is not compatible with existed paimon 
table partition keys."
+            "source table partition keys is not compatible with existed paimon 
table partition keys.",
+            "source table format is not compatible with existed paimon table 
format."
         };
     }
 
@@ -549,13 +567,18 @@ public class CloneActionITCase extends ActionITCaseBase {
     private String randomFormat() {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         int i = random.nextInt(3);
-        if (i == 0) {
-            return "orc";
-        } else if (i == 1) {
-            return "parquet";
-        } else {
-            return "avro";
+        String[] formats = new String[] {"orc", "parquet", "avro"};
+        return formats[i];
+    }
+
+    private String randomFormat(String excludedFormat) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int i = random.nextInt(3);
+        String[] formats = new String[] {"orc", "parquet", "avro"};
+        if (Objects.equals(excludedFormat, formats[i])) {
+            return formats[(i + 1) % 3];
         }
+        return formats[i];
     }
 
     protected FileStoreTable paimonTable(

Reply via email to