This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e2b5efa322 [hotfix] Extract some common codes of clone hive and 
migrate hive (#5497)
e2b5efa322 is described below

commit e2b5efa322e7d4ca569ae750a10c0b7607fa3a14
Author: yuzelin <[email protected]>
AuthorDate: Mon Apr 21 14:13:06 2025 +0800

    [hotfix] Extract some common codes of clone hive and migrate hive (#5497)
---
 .../org/apache/paimon/migrate/FileMetaUtils.java   | 32 ++++++-------
 .../flink/clone/hive/CopyHiveFilesFunction.java    | 56 ++++------------------
 .../apache/paimon/hive/migrate/HiveCloneUtils.java |  4 +-
 .../apache/paimon/hive/migrate/HiveMigrator.java   | 19 +-------
 4 files changed, 29 insertions(+), 82 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index e05c6ecfb5..54eff281cd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -104,19 +104,7 @@ public class FileMetaUtils {
             Map<Path, Path> rollback) {
 
         try {
-            CoreOptions options = ((FileStoreTable) table).coreOptions();
-            SimpleColStatsCollector.Factory[] factories =
-                    StatsCollectorFactories.createStatsFactories(
-                            options.statsMode(), options, 
table.rowType().getFieldNames());
-
-            SimpleStatsExtractor simpleStatsExtractor =
-                    FileFormat.fromIdentifier(format, 
options.toConfiguration())
-                            .createStatsExtractor(table.rowType(), factories)
-                            .orElseThrow(
-                                    () ->
-                                            new RuntimeException(
-                                                    "Can't get table stats 
extractor for format "
-                                                            + format));
+            SimpleStatsExtractor simpleStatsExtractor = 
createSimpleStatsExtractor(table, format);
             Path newPath = renameFile(fileIO, fileStatus.getPath(), dir, 
format, rollback);
             return constructFileMeta(
                     newPath.getName(),
@@ -169,8 +157,6 @@ public class FileMetaUtils {
         }
     }
 
-    // -----------------------------private 
method---------------------------------------------
-
     private static Path renameFile(
             FileIO fileIO, Path originPath, Path newDir, String format, 
Map<Path, Path> rollback)
             throws IOException {
@@ -184,7 +170,7 @@ public class FileMetaUtils {
         return newPath;
     }
 
-    private static DataFileMeta constructFileMeta(
+    public static DataFileMeta constructFileMeta(
             String fileName,
             long fileSize,
             Path path,
@@ -258,4 +244,18 @@ public class FileMetaUtils {
         binaryRowWriter.complete();
         return binaryRow;
     }
+
+    public static SimpleStatsExtractor createSimpleStatsExtractor(Table table, 
String format) {
+        CoreOptions options = ((FileStoreTable) table).coreOptions();
+        SimpleColStatsCollector.Factory[] factories =
+                StatsCollectorFactories.createStatsFactories(
+                        options.statsMode(), options, 
table.rowType().getFieldNames());
+
+        return FileFormat.fromIdentifier(format, options.toConfiguration())
+                .createStatsExtractor(table.rowType(), factories)
+                .orElseThrow(
+                        () ->
+                                new RuntimeException(
+                                        "Can't get table stats extractor for 
format " + format));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
index 20bca892dd..700bccbf1c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/hive/CopyHiveFilesFunction.java
@@ -18,29 +18,19 @@
 
 package org.apache.paimon.flink.clone.hive;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.SimpleColStats;
 import org.apache.paimon.format.SimpleStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.statistics.SimpleColStatsCollector;
-import org.apache.paimon.stats.SimpleStats;
-import org.apache.paimon.stats.SimpleStatsConverter;
+import org.apache.paimon.migrate.FileMetaUtils;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.StatsCollectorFactories;
 
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 
-import java.util.Collections;
 import java.util.Map;
 
 /** Copy files for table. */
@@ -60,36 +50,12 @@ public class CopyHiveFilesFunction extends 
CopyProcessFunction<CloneFileInfo, Da
             Collector<DataFileInfo> collector)
             throws Exception {
         Identifier identifier = cloneFileInfo.identifier();
-        long fileSize = cloneFileInfo.fileSize();
         String format = cloneFileInfo.format();
         Path path = cloneFileInfo.path();
         BinaryRow partition = cloneFileInfo.partition();
 
         FileIO sourceFileIO = hiveCatalog.fileIO();
         FileStoreTable targetTable = (FileStoreTable) getTable(identifier);
-        // util for collecting stats
-        CoreOptions options = targetTable.coreOptions();
-        SimpleColStatsCollector.Factory[] factories =
-                StatsCollectorFactories.createStatsFactories(
-                        options.statsMode(), options, 
targetTable.rowType().getFieldNames());
-
-        SimpleStatsExtractor simpleStatsExtractor =
-                FileFormat.fromIdentifier(format, options.toConfiguration())
-                        .createStatsExtractor(targetTable.rowType(), factories)
-                        .orElseThrow(
-                                () ->
-                                        new RuntimeException(
-                                                "Can't get table stats 
extractor for format "
-                                                        + format));
-        RowType rowTypeWithSchemaId =
-                
targetTable.schemaManager().schema(targetTable.schema().id()).logicalRowType();
-
-        SimpleStatsConverter statsArraySerializer = new 
SimpleStatsConverter(rowTypeWithSchemaId);
-
-        // extract stats
-        Pair<SimpleColStats[], SimpleStatsExtractor.FileInfo> fileInfo =
-                simpleStatsExtractor.extractWithFileInfo(sourceFileIO, path, 
fileSize);
-        SimpleStats stats = 
statsArraySerializer.toBinaryAllMode(fileInfo.getLeft());
 
         // new file name
         String suffix = "." + format;
@@ -103,20 +69,16 @@ public class CopyHiveFilesFunction extends 
CopyProcessFunction<CloneFileInfo, Da
                 targetTable.fileIO().newOutputStream(new Path(targetFilePath, 
newFileName), false));
 
         // to DataFileMeta
+        SimpleStatsExtractor simpleStatsExtractor =
+                FileMetaUtils.createSimpleStatsExtractor(targetTable, format);
         DataFileMeta dataFileMeta =
-                DataFileMeta.forAppend(
+                FileMetaUtils.constructFileMeta(
                         newFileName,
-                        fileSize,
-                        fileInfo.getRight().getRowCount(),
-                        stats,
-                        0,
-                        0,
-                        targetTable.schema().id(),
-                        Collections.emptyList(),
-                        null,
-                        FileSource.APPEND,
-                        null,
-                        null);
+                        cloneFileInfo.fileSize(),
+                        path,
+                        simpleStatsExtractor,
+                        sourceFileIO,
+                        targetTable);
 
         collector.collect(
                 new DataFileInfo(
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
index 8532cbecf6..5c982968f0 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java
@@ -57,7 +57,7 @@ public class HiveCloneUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HiveCloneUtils.class);
 
-    private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
+    public static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
             p -> !p.getPath().getName().startsWith("_") && 
!p.getPath().getName().startsWith(".");
 
     public static List<Identifier> listTables(HiveCatalog hiveCatalog) throws 
Exception {
@@ -186,7 +186,7 @@ public class HiveCloneUtils {
         return new HivePartitionFiles(partition, paths, fileSizes, format);
     }
 
-    private static String parseFormat(String serder) {
+    public static String parseFormat(String serder) {
         if (serder.contains("avro")) {
             return "avro";
         } else if (serder.contains("parquet")) {
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index 6450cb87c2..68215c9c1b 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -24,7 +24,6 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryWriter;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.hive.HiveCatalog;
 import org.apache.paimon.io.DataFileMeta;
@@ -56,10 +55,11 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
+import static org.apache.paimon.hive.migrate.HiveCloneUtils.HIDDEN_PATH_FILTER;
+import static org.apache.paimon.hive.migrate.HiveCloneUtils.parseFormat;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
 
@@ -69,9 +69,6 @@ public class HiveMigrator implements Migrator {
     private static final Logger LOG = 
LoggerFactory.getLogger(HiveMigrator.class);
     private ThreadPoolExecutor executor;
 
-    private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
-            p -> !p.getPath().getName().startsWith("_") && 
!p.getPath().getName().startsWith(".");
-
     private static final String PAIMON_SUFFIX = "_paimon_";
 
     private final FileIO fileIO;
@@ -362,18 +359,6 @@ public class HiveMigrator implements Migrator {
         }
     }
 
-    private String parseFormat(String serder) {
-        if (serder.contains("avro")) {
-            return "avro";
-        } else if (serder.contains("parquet")) {
-            return "parquet";
-        } else if (serder.contains("orc")) {
-            return "orc";
-        } else {
-            throw new UnsupportedOperationException("Unknown partition format: 
" + serder);
-        }
-    }
-
     /** One import task for one partition. */
     public static class MigrateTask implements Callable<CommitMessage> {
 

Reply via email to