This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 87d3ee48b1 [flink] Fix mixed FileIO in CloneAction (#5320)
87d3ee48b1 is described below

commit 87d3ee48b1756cbc8e1c89a76cef022879d6d6df
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Thu Mar 20 18:19:20 2025 +0800

    [flink] Fix mixed FileIO in CloneAction (#5320)
---
 .../flink/clone/CopyMetaFilesForCloneOperator.java | 77 +++++++++++-----------
 1 file changed, 39 insertions(+), 38 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyMetaFilesForCloneOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyMetaFilesForCloneOperator.java
index 4065bfe4c3..7d837d996e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyMetaFilesForCloneOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyMetaFilesForCloneOperator.java
@@ -24,6 +24,7 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.IndexManifestEntry;
@@ -33,6 +34,7 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -108,13 +110,13 @@ public class CopyMetaFilesForCloneOperator extends 
ProcessFunction<Tuple2<String
         // 2. copy all schema files
         SchemaManager sourceSchemaManager = sourceTable.schemaManager();
         SchemaManager targetSchemaManager = targetTable.schemaManager();
+        FileIO sourceTableFileIO = sourceTable.fileIO();
+        FileIO targetTableFileIO = targetTable.fileIO();
         for (long schemaId : sourceSchemaManager.listAllIds()) {
-            targetTable
-                    .fileIO()
-                    .copyFile(
-                            sourceSchemaManager.toSchemaPath(schemaId),
-                            targetSchemaManager.toSchemaPath(schemaId),
-                            true);
+            IOUtils.copyBytes(
+                    
sourceTableFileIO.newInputStream(sourceSchemaManager.toSchemaPath(schemaId)),
+                    targetTableFileIO.newOutputStream(
+                            targetSchemaManager.toSchemaPath(schemaId), true));
         }
 
         // 3. copy latest snapshot files
@@ -125,42 +127,41 @@ public class CopyMetaFilesForCloneOperator extends 
ProcessFunction<Tuple2<String
         Snapshot latestSnapshot = sourceSnapshotManager.latestSnapshot();
         if (latestSnapshot != null) {
             long snapshotId = latestSnapshot.id();
-            targetTable
-                    .fileIO()
-                    .copyFile(
-                            sourceSnapshotManager.snapshotPath(snapshotId),
-                            targetSnapshotManager.snapshotPath(snapshotId),
-                            true);
+            IOUtils.copyBytes(
+                    sourceTableFileIO.newInputStream(
+                            sourceSnapshotManager.snapshotPath(snapshotId)),
+                    targetTableFileIO.newOutputStream(
+                            targetSnapshotManager.snapshotPath(snapshotId), 
true));
         }
 
         FileStorePathFactory sourcePathFactory = sourceStore.pathFactory();
         FileStorePathFactory targetPathFactory = targetStore.pathFactory();
         // 4. copy manifest list files
         if (latestSnapshot != null) {
-            targetTable
-                    .fileIO()
-                    .copyFile(
-                            
sourcePathFactory.toManifestListPath(latestSnapshot.baseManifestList()),
+            IOUtils.copyBytes(
+                    sourceTableFileIO.newInputStream(
+                            sourcePathFactory.toManifestListPath(
+                                    latestSnapshot.baseManifestList())),
+                    targetTableFileIO.newOutputStream(
                             
targetPathFactory.toManifestListPath(latestSnapshot.baseManifestList()),
-                            true);
+                            true));
 
-            targetTable
-                    .fileIO()
-                    .copyFile(
+            IOUtils.copyBytes(
+                    sourceTableFileIO.newInputStream(
                             sourcePathFactory.toManifestListPath(
-                                    latestSnapshot.deltaManifestList()),
+                                    latestSnapshot.deltaManifestList())),
+                    targetTableFileIO.newOutputStream(
                             targetPathFactory.toManifestListPath(
                                     latestSnapshot.deltaManifestList()),
-                            true);
+                            true));
 
             String changelogManifestList = 
latestSnapshot.changelogManifestList();
             if (changelogManifestList != null) {
-                targetTable
-                        .fileIO()
-                        .copyFile(
-                                
sourcePathFactory.toManifestListPath(changelogManifestList),
-                                
targetPathFactory.toManifestListPath(changelogManifestList),
-                                true);
+                IOUtils.copyBytes(
+                        sourceTableFileIO.newInputStream(
+                                
sourcePathFactory.toManifestListPath(changelogManifestList)),
+                        targetTableFileIO.newOutputStream(
+                                
targetPathFactory.toManifestListPath(changelogManifestList), true));
             }
         }
 
@@ -170,12 +171,12 @@ public class CopyMetaFilesForCloneOperator extends 
ProcessFunction<Tuple2<String
             IndexFileHandler indexFileHandler = 
sourceStore.newIndexFileHandler();
             String indexManifest = latestSnapshot.indexManifest();
             if (indexManifest != null && 
indexFileHandler.existsManifest(indexManifest)) {
-                targetTable
-                        .fileIO()
-                        .copyFile(
-                                
sourcePathFactory.indexManifestFileFactory().toPath(indexManifest),
+                IOUtils.copyBytes(
+                        sourceTableFileIO.newInputStream(
+                                
sourcePathFactory.indexManifestFileFactory().toPath(indexManifest)),
+                        targetTableFileIO.newOutputStream(
                                 
targetPathFactory.indexManifestFileFactory().toPath(indexManifest),
-                                true);
+                                true));
 
                 // read index files
                 List<IndexManifestEntry> indexManifestEntries =
@@ -204,16 +205,16 @@ public class CopyMetaFilesForCloneOperator extends 
ProcessFunction<Tuple2<String
 
         // 6. copy statistics file
         if (latestSnapshot != null && latestSnapshot.statistics() != null) {
-            targetTable
-                    .fileIO()
-                    .copyFile(
+            IOUtils.copyBytes(
+                    sourceTableFileIO.newInputStream(
                             sourcePathFactory
                                     .statsFileFactory()
-                                    .toPath(latestSnapshot.statistics()),
+                                    .toPath(latestSnapshot.statistics())),
+                    targetTableFileIO.newOutputStream(
                             targetPathFactory
                                     .statsFileFactory()
                                     .toPath(latestSnapshot.statistics()),
-                            true);
+                            true));
         }
 
         // pick manifest files

Reply via email to