This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 79331c7ebb [core] format table: fix two-phase commit in FileIO for 
which use multi-part upload (#6525)
79331c7ebb is described below

commit 79331c7ebbe2277988c5c87f8a44d5fee4c3754e
Author: jerry <[email protected]>
AuthorDate: Fri Nov 7 13:40:03 2025 +0800

    [core] format table: fix two-phase commit in FileIO for which use 
multi-part upload (#6525)
---
 .../paimon/fs/BaseMultiPartUploadCommitter.java    | 36 +++++++++++++---------
 .../fs/MultiPartUploadTwoPhaseOutputStream.java    | 21 ++++++++-----
 .../paimon/fs/RenamingTwoPhaseOutputStream.java    |  2 +-
 .../org/apache/paimon/fs/TwoPhaseOutputStream.java |  2 +-
 .../org/apache/paimon/rest/RESTTokenFileIO.java    |  7 +++++
 .../MultiPartUploadTwoPhaseOutputStreamTest.java   |  9 +++---
 .../paimon/table/format/FormatTableCommit.java     |  2 +-
 .../apache/paimon/jindo/HadoopCompliantFileIO.java |  2 +-
 .../java/org/apache/paimon/jindo/JindoFileIO.java  |  3 +-
 .../apache/paimon/jindo/JindoMultiPartUpload.java  | 13 +++++---
 .../jindo/JindoMultiPartUploadCommitter.java       |  8 +++--
 .../paimon/jindo/JindoTwoPhaseOutputStream.java    | 13 ++++----
 .../main/java/org/apache/paimon/oss/OSSFileIO.java |  3 +-
 .../paimon/oss/OSSMultiPartUploadCommitter.java    |  8 +++--
 .../apache/paimon/oss/OssTwoPhaseOutputStream.java | 13 ++++----
 .../main/java/org/apache/paimon/s3/S3FileIO.java   |  3 +-
 .../paimon/s3/S3MultiPartUploadCommitter.java      |  8 +++--
 .../apache/paimon/s3/S3TwoPhaseOutputStream.java   | 13 ++++----
 18 files changed, 103 insertions(+), 63 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
index d0aa8d9e22..a8545bf0b4 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.fs;
 
+import org.apache.paimon.rest.RESTTokenFileIO;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,13 +38,19 @@ public abstract class BaseMultiPartUploadCommitter<T, C> 
implements TwoPhaseOutp
     private final String objectName;
     private final List<T> uploadedParts;
     private final long byteLength;
+    private final Path targetPath;
 
     public BaseMultiPartUploadCommitter(
-            String uploadId, List<T> uploadedParts, String objectName, long 
byteLength) {
+            String uploadId,
+            List<T> uploadedParts,
+            String objectName,
+            long byteLength,
+            Path targetPath) {
         this.uploadId = uploadId;
         this.objectName = objectName;
         this.uploadedParts = new ArrayList<>(uploadedParts);
         this.byteLength = byteLength;
+        this.targetPath = targetPath;
     }
 
     protected abstract MultiPartUploadStore<T, C> multiPartUploadStore(
@@ -51,14 +59,9 @@ public abstract class BaseMultiPartUploadCommitter<T, C> 
implements TwoPhaseOutp
     @Override
     public void commit(FileIO fileIO) throws IOException {
         try {
-            MultiPartUploadStore<T, C> multiPartUploadStore =
-                    multiPartUploadStore(fileIO, targetFilePath());
+            MultiPartUploadStore<T, C> multiPartUploadStore = 
multiPartUploadStore(fileIO);
             multiPartUploadStore.completeMultipartUpload(
                     objectName, uploadId, uploadedParts, byteLength);
-            LOG.info(
-                    "Successfully committed multipart upload with ID: {} for 
objectName: {}",
-                    uploadId,
-                    objectName);
         } catch (Exception e) {
             throw new IOException("Failed to commit multipart upload with ID: 
" + uploadId, e);
         }
@@ -67,23 +70,26 @@ public abstract class BaseMultiPartUploadCommitter<T, C> 
implements TwoPhaseOutp
     @Override
     public void discard(FileIO fileIO) throws IOException {
         try {
-            MultiPartUploadStore<T, C> multiPartUploadStore =
-                    multiPartUploadStore(fileIO, targetFilePath());
+            MultiPartUploadStore<T, C> multiPartUploadStore = 
multiPartUploadStore(fileIO);
             multiPartUploadStore.abortMultipartUpload(objectName, uploadId);
-            LOG.info(
-                    "Successfully discarded multipart upload with ID: {} for 
objectName: {}",
-                    uploadId,
-                    objectName);
         } catch (Exception e) {
             LOG.warn("Failed to discard multipart upload with ID: {}", 
uploadId, e);
         }
     }
 
     @Override
-    public Path targetFilePath() {
-        return new Path(objectName);
+    public Path targetPath() {
+        return this.targetPath;
     }
 
     @Override
     public void clean(FileIO fileIO) throws IOException {}
+
+    private MultiPartUploadStore<T, C> multiPartUploadStore(FileIO fileIO) 
throws IOException {
+        if (fileIO instanceof RESTTokenFileIO) {
+            RESTTokenFileIO restTokenFileIO = (RESTTokenFileIO) fileIO;
+            fileIO = restTokenFileIO.fileIO();
+        }
+        return multiPartUploadStore(fileIO, targetPath());
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
index ed4cf3809e..3d1b5b0ce4 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
@@ -37,22 +37,28 @@ public abstract class 
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
             LoggerFactory.getLogger(MultiPartUploadTwoPhaseOutputStream.class);
 
     private final ByteArrayOutputStream buffer;
-    private final List<T> uploadedParts;
     private final MultiPartUploadStore<T, C> multiPartUploadStore;
-    private final String objectName;
 
-    private String uploadId;
-    private long position;
+    protected final String objectName;
+    protected final Path targetPath;
+    protected final String uploadId;
+
+    protected List<T> uploadedParts;
+    protected long position;
+
     private boolean closed = false;
     private Committer committer;
 
     public MultiPartUploadTwoPhaseOutputStream(
-            MultiPartUploadStore<T, C> multiPartUploadStore, 
org.apache.hadoop.fs.Path hadoopPath)
+            MultiPartUploadStore<T, C> multiPartUploadStore,
+            org.apache.hadoop.fs.Path hadoopPath,
+            Path path)
             throws IOException {
         this.multiPartUploadStore = multiPartUploadStore;
         this.buffer = new ByteArrayOutputStream();
         this.uploadedParts = new ArrayList<>();
         this.objectName = multiPartUploadStore.pathToObject(hadoopPath);
+        this.targetPath = path;
         this.uploadId = multiPartUploadStore.startMultiPartUpload(objectName);
         this.position = 0;
     }
@@ -64,8 +70,7 @@ public abstract class MultiPartUploadTwoPhaseOutputStream<T, 
C> extends TwoPhase
         return 10 << 20;
     }
 
-    public abstract Committer committer(
-            String uploadId, List<T> uploadedParts, String objectName, long 
position);
+    public abstract Committer committer();
 
     @Override
     public long getPos() throws IOException {
@@ -142,7 +147,7 @@ public abstract class 
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
             uploadPart();
         }
 
-        return committer(uploadId, uploadedParts, objectName, position);
+        return committer();
     }
 
     private void uploadPart() throws IOException {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
index cccf731bff..f5dcdf7a1b 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
@@ -130,7 +130,7 @@ public class RenamingTwoPhaseOutputStream extends 
TwoPhaseOutputStream {
         }
 
         @Override
-        public Path targetFilePath() {
+        public Path targetPath() {
             return targetPath;
         }
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
index 7975c4ea76..5f85b087d4 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
@@ -52,7 +52,7 @@ public abstract class TwoPhaseOutputStream extends 
PositionOutputStream {
          */
         void discard(FileIO fileIO) throws IOException;
 
-        Path targetFilePath();
+        Path targetPath();
 
         void clean(FileIO fileIO) throws IOException;
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
index f5de31afb8..c619851ec5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
@@ -25,6 +25,7 @@ import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.ConfigOptions;
 import org.apache.paimon.options.Options;
@@ -112,6 +113,12 @@ public class RESTTokenFileIO implements FileIO {
         return fileIO().newOutputStream(path, overwrite);
     }
 
+    @Override
+    public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean 
overwrite)
+            throws IOException {
+        return fileIO().newTwoPhaseOutputStream(path, overwrite);
+    }
+
     @Override
     public FileStatus getFileStatus(Path path) throws IOException {
         return fileIO().getFileStatus(path);
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
index ac20a65852..0a002fd783 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
@@ -68,7 +68,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
 
         TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
         assertThat(store.getUploadedParts()).hasSize(3);
-        
assertThat(committer.targetFilePath().toString()).isEqualTo(store.getStartedObjectName());
+        
assertThat(committer.targetPath().toString()).isEqualTo(store.getStartedObjectName());
 
         committer.commit(fileIO);
 
@@ -259,7 +259,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
         private TestMultiPartUploadTwoPhaseOutputStream(
                 FakeMultiPartUploadStore store, org.apache.hadoop.fs.Path 
path, int threshold)
                 throws IOException {
-            super(store, path);
+            super(store, path, new Path(path.toString()));
             this.store = store;
             this.threshold = threshold;
         }
@@ -270,8 +270,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
         }
 
         @Override
-        public Committer committer(
-                String uploadId, List<TestPart> uploadedParts, String 
objectName, long position) {
+        public Committer committer() {
             return new TestCommitter(store, uploadId, uploadedParts, 
objectName, position);
         }
     }
@@ -319,7 +318,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
         }
 
         @Override
-        public Path targetFilePath() {
+        public Path targetPath() {
             return new Path(objectName);
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
index e1fd5a903e..235dbd4310 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
@@ -91,7 +91,7 @@ public class FormatTableCommit implements BatchTableCommit {
             } else if (overwrite) {
                 Set<Path> partitionPaths = new HashSet<>();
                 for (TwoPhaseOutputStream.Committer c : committers) {
-                    partitionPaths.add(c.targetFilePath().getParent());
+                    partitionPaths.add(c.targetPath().getParent());
                 }
                 for (Path p : partitionPaths) {
                     deletePreviousDataFile(p);
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
index 98628d9338..7356b9687a 100644
--- 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
@@ -114,7 +114,7 @@ public abstract class HadoopCompliantFileIO implements 
FileIO {
 
     protected org.apache.hadoop.fs.Path path(Path path) {
         URI uri = path.toUri();
-        if (uri.getScheme().equals("oss") && uri.getUserInfo() != null) {
+        if ("oss".equals(uri.getScheme()) && uri.getUserInfo() != null) {
             path = new Path("oss:/" + uri.getPath());
         }
         return new org.apache.hadoop.fs.Path(path.toUri());
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
index 614ca36b79..970cfcc809 100644
--- 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
@@ -133,7 +133,8 @@ public class JindoFileIO extends HadoopCompliantFileIO {
         org.apache.hadoop.fs.Path hadoopPath = path(path);
         Pair<JindoHadoopSystem, String> pair = getFileSystemPair(hadoopPath);
         JindoHadoopSystem fs = pair.getKey();
-        return new JindoTwoPhaseOutputStream(new JindoMultiPartUpload(fs, 
hadoopPath), hadoopPath);
+        return new JindoTwoPhaseOutputStream(
+                new JindoMultiPartUpload(fs, hadoopPath), hadoopPath, path);
     }
 
     @Override
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
index ce5aab4122..6051d315ec 100644
--- 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
@@ -37,17 +37,22 @@ import java.util.List;
 /** Provides the multipart upload by Jindo. */
 public class JindoMultiPartUpload implements 
MultiPartUploadStore<JdoObjectPart, String> {
 
-    private final JindoHadoopSystem fs;
     private final JindoMpuStore mpuStore;
+    private final Path workingDirectory;
 
     public JindoMultiPartUpload(JindoHadoopSystem fs, Path filePath) {
-        this.fs = fs;
+        this.workingDirectory = fs.getWorkingDirectory();
         this.mpuStore = fs.getMpuStore(filePath);
     }
 
+    @Override
+    public String pathToObject(Path hadoopPath) {
+        return hadoopPath.toString();
+    }
+
     @Override
     public Path workingDirectory() {
-        return fs.getWorkingDirectory();
+        return workingDirectory;
     }
 
     @Override
@@ -80,7 +85,7 @@ public class JindoMultiPartUpload implements 
MultiPartUploadStore<JdoObjectPart,
             ByteBuffer buffer;
             try (FileInputStream fis = new FileInputStream(file);
                     FileChannel channel = fis.getChannel()) {
-                buffer = ByteBuffer.allocate(byteLength);
+                buffer = ByteBuffer.allocateDirect(byteLength);
                 channel.read(buffer);
                 buffer.flip();
             }
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
index 43448431b1..64205e82b0 100644
--- 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
@@ -34,8 +34,12 @@ import java.util.List;
 public class JindoMultiPartUploadCommitter
         extends BaseMultiPartUploadCommitter<JdoObjectPart, String> {
     public JindoMultiPartUploadCommitter(
-            String uploadId, List<JdoObjectPart> uploadedParts, String 
objectName, long position) {
-        super(uploadId, uploadedParts, objectName, position);
+            String uploadId,
+            List<JdoObjectPart> uploadedParts,
+            String objectName,
+            long position,
+            Path targetPath) {
+        super(uploadId, uploadedParts, objectName, position, targetPath);
     }
 
     @Override
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
index fb85cbd3c3..9c6fb5df10 100644
--- 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
@@ -20,11 +20,11 @@ package org.apache.paimon.jindo;
 
 import org.apache.paimon.fs.MultiPartUploadStore;
 import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
+import org.apache.paimon.fs.Path;
 
 import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
 
 import java.io.IOException;
-import java.util.List;
 
 /** Jindo implementation of TwoPhaseOutputStream using multipart upload. */
 public class JindoTwoPhaseOutputStream
@@ -32,14 +32,15 @@ public class JindoTwoPhaseOutputStream
 
     public JindoTwoPhaseOutputStream(
             MultiPartUploadStore<JdoObjectPart, String> multiPartUploadStore,
-            org.apache.hadoop.fs.Path hadoopPath)
+            org.apache.hadoop.fs.Path hadoopPath,
+            Path targetPath)
             throws IOException {
-        super(multiPartUploadStore, hadoopPath);
+        super(multiPartUploadStore, hadoopPath, targetPath);
     }
 
     @Override
-    public Committer committer(
-            String uploadId, List<JdoObjectPart> uploadedParts, String 
objectName, long position) {
-        return new JindoMultiPartUploadCommitter(uploadId, uploadedParts, 
objectName, position);
+    public Committer committer() {
+        return new JindoMultiPartUploadCommitter(
+                uploadId, uploadedParts, objectName, position, targetPath);
     }
 }
diff --git 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
index 3737a07f17..bdfa60e4af 100644
--- 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
+++ 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
@@ -123,7 +123,8 @@ public class OSSFileIO extends HadoopCompliantFileIO {
         FileSystem fs = getFileSystem(hadoopPath);
         return new OssTwoPhaseOutputStream(
                 new 
OSSMultiPartUpload((org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem) fs),
-                hadoopPath);
+                hadoopPath,
+                path);
     }
 
     public Options hadoopOptions() {
diff --git 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUploadCommitter.java
 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUploadCommitter.java
index d1e0117ad0..dc2d7e3861 100644
--- 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUploadCommitter.java
+++ 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUploadCommitter.java
@@ -34,8 +34,12 @@ import java.util.List;
 public class OSSMultiPartUploadCommitter
         extends BaseMultiPartUploadCommitter<PartETag, 
CompleteMultipartUploadResult> {
     public OSSMultiPartUploadCommitter(
-            String uploadId, List<PartETag> uploadedParts, String objectName, 
long position) {
-        super(uploadId, uploadedParts, objectName, position);
+            String uploadId,
+            List<PartETag> uploadedParts,
+            String objectName,
+            long position,
+            Path path) {
+        super(uploadId, uploadedParts, objectName, position, path);
     }
 
     @Override
diff --git 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
index edc15ac864..901c26c444 100644
--- 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
+++ 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
@@ -20,12 +20,12 @@ package org.apache.paimon.oss;
 
 import org.apache.paimon.fs.MultiPartUploadStore;
 import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
+import org.apache.paimon.fs.Path;
 
 import com.aliyun.oss.model.CompleteMultipartUploadResult;
 import com.aliyun.oss.model.PartETag;
 
 import java.io.IOException;
-import java.util.List;
 
 /** OSS implementation of TwoPhaseOutputStream using multipart upload. */
 public class OssTwoPhaseOutputStream
@@ -33,14 +33,15 @@ public class OssTwoPhaseOutputStream
 
     public OssTwoPhaseOutputStream(
             MultiPartUploadStore<PartETag, CompleteMultipartUploadResult> 
multiPartUploadStore,
-            org.apache.hadoop.fs.Path hadoopPath)
+            org.apache.hadoop.fs.Path hadoopPath,
+            Path targetPath)
             throws IOException {
-        super(multiPartUploadStore, hadoopPath);
+        super(multiPartUploadStore, hadoopPath, targetPath);
     }
 
     @Override
-    public Committer committer(
-            String uploadId, List<PartETag> uploadedParts, String objectName, 
long position) {
-        return new OSSMultiPartUploadCommitter(uploadId, uploadedParts, 
objectName, position);
+    public Committer committer() {
+        return new OSSMultiPartUploadCommitter(
+                uploadId, uploadedParts, objectName, position, targetPath);
     }
 }
diff --git 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
index 65017da573..8272518373 100644
--- 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
+++ 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
@@ -81,7 +81,8 @@ public class S3FileIO extends HadoopCompliantFileIO {
         if (!overwrite && this.exists(path)) {
             throw new IOException("File " + path + " already exists.");
         }
-        return new S3TwoPhaseOutputStream(new S3MultiPartUpload(fs, 
fs.getConf()), hadoopPath);
+        return new S3TwoPhaseOutputStream(
+                new S3MultiPartUpload(fs, fs.getConf()), hadoopPath, path);
     }
 
     // add additional config entries from the IO config to the Hadoop config
diff --git 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java
 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java
index dcf4d24cd6..9d663e7be4 100644
--- 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java
+++ 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java
@@ -34,8 +34,12 @@ import java.util.List;
 public class S3MultiPartUploadCommitter
         extends BaseMultiPartUploadCommitter<PartETag, 
CompleteMultipartUploadResult> {
     public S3MultiPartUploadCommitter(
-            String uploadId, List<PartETag> uploadedParts, String objectName, 
long position) {
-        super(uploadId, uploadedParts, objectName, position);
+            String uploadId,
+            List<PartETag> uploadedParts,
+            String objectName,
+            long position,
+            Path targetPath) {
+        super(uploadId, uploadedParts, objectName, position, targetPath);
     }
 
     @Override
diff --git 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
index 76a424ecd3..03ad1e0977 100644
--- 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
+++ 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
@@ -20,12 +20,12 @@ package org.apache.paimon.s3;
 
 import org.apache.paimon.fs.MultiPartUploadStore;
 import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
+import org.apache.paimon.fs.Path;
 
 import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
 import com.amazonaws.services.s3.model.PartETag;
 
 import java.io.IOException;
-import java.util.List;
 
 /** S3 implementation of TwoPhaseOutputStream using multipart upload. */
 public class S3TwoPhaseOutputStream
@@ -33,14 +33,15 @@ public class S3TwoPhaseOutputStream
 
     public S3TwoPhaseOutputStream(
             MultiPartUploadStore<PartETag, CompleteMultipartUploadResult> 
multiPartUploadStore,
-            org.apache.hadoop.fs.Path hadoopPath)
+            org.apache.hadoop.fs.Path hadoopPath,
+            Path targetPath)
             throws IOException {
-        super(multiPartUploadStore, hadoopPath);
+        super(multiPartUploadStore, hadoopPath, targetPath);
     }
 
     @Override
-    public Committer committer(
-            String uploadId, List<PartETag> uploadedParts, String objectName, 
long position) {
-        return new S3MultiPartUploadCommitter(uploadId, uploadedParts, 
objectName, position);
+    public Committer committer() {
+        return new S3MultiPartUploadCommitter(
+                uploadId, uploadedParts, objectName, position, targetPath);
     }
 }

Reply via email to