This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 87d3ee48b1 [flink] Fix mixed FileIO in CloneAction (#5320)
87d3ee48b1 is described below
commit 87d3ee48b1756cbc8e1c89a76cef022879d6d6df
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Thu Mar 20 18:19:20 2025 +0800
[flink] Fix mixed FileIO in CloneAction (#5320)
---
.../flink/clone/CopyMetaFilesForCloneOperator.java | 77 +++++++++++-----------
1 file changed, 39 insertions(+), 38 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyMetaFilesForCloneOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyMetaFilesForCloneOperator.java
index 4065bfe4c3..7d837d996e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyMetaFilesForCloneOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyMetaFilesForCloneOperator.java
@@ -24,6 +24,7 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestEntry;
@@ -33,6 +34,7 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -108,13 +110,13 @@ public class CopyMetaFilesForCloneOperator extends
ProcessFunction<Tuple2<String
// 2. copy all schema files
SchemaManager sourceSchemaManager = sourceTable.schemaManager();
SchemaManager targetSchemaManager = targetTable.schemaManager();
+ FileIO sourceTableFileIO = sourceTable.fileIO();
+ FileIO targetTableFileIO = targetTable.fileIO();
for (long schemaId : sourceSchemaManager.listAllIds()) {
- targetTable
- .fileIO()
- .copyFile(
- sourceSchemaManager.toSchemaPath(schemaId),
- targetSchemaManager.toSchemaPath(schemaId),
- true);
+ IOUtils.copyBytes(
+
sourceTableFileIO.newInputStream(sourceSchemaManager.toSchemaPath(schemaId)),
+ targetTableFileIO.newOutputStream(
+ targetSchemaManager.toSchemaPath(schemaId), true));
}
// 3. copy latest snapshot files
@@ -125,42 +127,41 @@ public class CopyMetaFilesForCloneOperator extends
ProcessFunction<Tuple2<String
Snapshot latestSnapshot = sourceSnapshotManager.latestSnapshot();
if (latestSnapshot != null) {
long snapshotId = latestSnapshot.id();
- targetTable
- .fileIO()
- .copyFile(
- sourceSnapshotManager.snapshotPath(snapshotId),
- targetSnapshotManager.snapshotPath(snapshotId),
- true);
+ IOUtils.copyBytes(
+ sourceTableFileIO.newInputStream(
+ sourceSnapshotManager.snapshotPath(snapshotId)),
+ targetTableFileIO.newOutputStream(
+ targetSnapshotManager.snapshotPath(snapshotId),
true));
}
FileStorePathFactory sourcePathFactory = sourceStore.pathFactory();
FileStorePathFactory targetPathFactory = targetStore.pathFactory();
// 4. copy manifest list files
if (latestSnapshot != null) {
- targetTable
- .fileIO()
- .copyFile(
-
sourcePathFactory.toManifestListPath(latestSnapshot.baseManifestList()),
+ IOUtils.copyBytes(
+ sourceTableFileIO.newInputStream(
+ sourcePathFactory.toManifestListPath(
+ latestSnapshot.baseManifestList())),
+ targetTableFileIO.newOutputStream(
targetPathFactory.toManifestListPath(latestSnapshot.baseManifestList()),
- true);
+ true));
- targetTable
- .fileIO()
- .copyFile(
+ IOUtils.copyBytes(
+ sourceTableFileIO.newInputStream(
sourcePathFactory.toManifestListPath(
- latestSnapshot.deltaManifestList()),
+ latestSnapshot.deltaManifestList())),
+ targetTableFileIO.newOutputStream(
targetPathFactory.toManifestListPath(
latestSnapshot.deltaManifestList()),
- true);
+ true));
String changelogManifestList =
latestSnapshot.changelogManifestList();
if (changelogManifestList != null) {
- targetTable
- .fileIO()
- .copyFile(
-
sourcePathFactory.toManifestListPath(changelogManifestList),
-
targetPathFactory.toManifestListPath(changelogManifestList),
- true);
+ IOUtils.copyBytes(
+ sourceTableFileIO.newInputStream(
+
sourcePathFactory.toManifestListPath(changelogManifestList)),
+ targetTableFileIO.newOutputStream(
+
targetPathFactory.toManifestListPath(changelogManifestList), true));
}
}
@@ -170,12 +171,12 @@ public class CopyMetaFilesForCloneOperator extends
ProcessFunction<Tuple2<String
IndexFileHandler indexFileHandler =
sourceStore.newIndexFileHandler();
String indexManifest = latestSnapshot.indexManifest();
if (indexManifest != null &&
indexFileHandler.existsManifest(indexManifest)) {
- targetTable
- .fileIO()
- .copyFile(
-
sourcePathFactory.indexManifestFileFactory().toPath(indexManifest),
+ IOUtils.copyBytes(
+ sourceTableFileIO.newInputStream(
+
sourcePathFactory.indexManifestFileFactory().toPath(indexManifest)),
+ targetTableFileIO.newOutputStream(
targetPathFactory.indexManifestFileFactory().toPath(indexManifest),
- true);
+ true));
// read index files
List<IndexManifestEntry> indexManifestEntries =
@@ -204,16 +205,16 @@ public class CopyMetaFilesForCloneOperator extends
ProcessFunction<Tuple2<String
// 6. copy statistics file
if (latestSnapshot != null && latestSnapshot.statistics() != null) {
- targetTable
- .fileIO()
- .copyFile(
+ IOUtils.copyBytes(
+ sourceTableFileIO.newInputStream(
sourcePathFactory
.statsFileFactory()
- .toPath(latestSnapshot.statistics()),
+ .toPath(latestSnapshot.statistics())),
+ targetTableFileIO.newOutputStream(
targetPathFactory
.statsFileFactory()
.toPath(latestSnapshot.statistics()),
- true);
+ true));
}
// pick manifest files