This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 79331c7ebb [core] format table: fix two-phase commit in FileIO for
which use multi-part upload (#6525)
79331c7ebb is described below
commit 79331c7ebbe2277988c5c87f8a44d5fee4c3754e
Author: jerry <[email protected]>
AuthorDate: Fri Nov 7 13:40:03 2025 +0800
[core] format table: fix two-phase commit in FileIO for which use
multi-part upload (#6525)
---
.../paimon/fs/BaseMultiPartUploadCommitter.java | 36 +++++++++++++---------
.../fs/MultiPartUploadTwoPhaseOutputStream.java | 21 ++++++++-----
.../paimon/fs/RenamingTwoPhaseOutputStream.java | 2 +-
.../org/apache/paimon/fs/TwoPhaseOutputStream.java | 2 +-
.../org/apache/paimon/rest/RESTTokenFileIO.java | 7 +++++
.../MultiPartUploadTwoPhaseOutputStreamTest.java | 9 +++---
.../paimon/table/format/FormatTableCommit.java | 2 +-
.../apache/paimon/jindo/HadoopCompliantFileIO.java | 2 +-
.../java/org/apache/paimon/jindo/JindoFileIO.java | 3 +-
.../apache/paimon/jindo/JindoMultiPartUpload.java | 13 +++++---
.../jindo/JindoMultiPartUploadCommitter.java | 8 +++--
.../paimon/jindo/JindoTwoPhaseOutputStream.java | 13 ++++----
.../main/java/org/apache/paimon/oss/OSSFileIO.java | 3 +-
.../paimon/oss/OSSMultiPartUploadCommitter.java | 8 +++--
.../apache/paimon/oss/OssTwoPhaseOutputStream.java | 13 ++++----
.../main/java/org/apache/paimon/s3/S3FileIO.java | 3 +-
.../paimon/s3/S3MultiPartUploadCommitter.java | 8 +++--
.../apache/paimon/s3/S3TwoPhaseOutputStream.java | 13 ++++----
18 files changed, 103 insertions(+), 63 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
b/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
index d0aa8d9e22..a8545bf0b4 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
@@ -18,6 +18,8 @@
package org.apache.paimon.fs;
+import org.apache.paimon.rest.RESTTokenFileIO;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,13 +38,19 @@ public abstract class BaseMultiPartUploadCommitter<T, C>
implements TwoPhaseOutp
private final String objectName;
private final List<T> uploadedParts;
private final long byteLength;
+ private final Path targetPath;
public BaseMultiPartUploadCommitter(
- String uploadId, List<T> uploadedParts, String objectName, long
byteLength) {
+ String uploadId,
+ List<T> uploadedParts,
+ String objectName,
+ long byteLength,
+ Path targetPath) {
this.uploadId = uploadId;
this.objectName = objectName;
this.uploadedParts = new ArrayList<>(uploadedParts);
this.byteLength = byteLength;
+ this.targetPath = targetPath;
}
protected abstract MultiPartUploadStore<T, C> multiPartUploadStore(
@@ -51,14 +59,9 @@ public abstract class BaseMultiPartUploadCommitter<T, C>
implements TwoPhaseOutp
@Override
public void commit(FileIO fileIO) throws IOException {
try {
- MultiPartUploadStore<T, C> multiPartUploadStore =
- multiPartUploadStore(fileIO, targetFilePath());
+ MultiPartUploadStore<T, C> multiPartUploadStore =
multiPartUploadStore(fileIO);
multiPartUploadStore.completeMultipartUpload(
objectName, uploadId, uploadedParts, byteLength);
- LOG.info(
- "Successfully committed multipart upload with ID: {} for
objectName: {}",
- uploadId,
- objectName);
} catch (Exception e) {
throw new IOException("Failed to commit multipart upload with ID:
" + uploadId, e);
}
@@ -67,23 +70,26 @@ public abstract class BaseMultiPartUploadCommitter<T, C>
implements TwoPhaseOutp
@Override
public void discard(FileIO fileIO) throws IOException {
try {
- MultiPartUploadStore<T, C> multiPartUploadStore =
- multiPartUploadStore(fileIO, targetFilePath());
+ MultiPartUploadStore<T, C> multiPartUploadStore =
multiPartUploadStore(fileIO);
multiPartUploadStore.abortMultipartUpload(objectName, uploadId);
- LOG.info(
- "Successfully discarded multipart upload with ID: {} for
objectName: {}",
- uploadId,
- objectName);
} catch (Exception e) {
LOG.warn("Failed to discard multipart upload with ID: {}",
uploadId, e);
}
}
@Override
- public Path targetFilePath() {
- return new Path(objectName);
+ public Path targetPath() {
+ return this.targetPath;
}
@Override
public void clean(FileIO fileIO) throws IOException {}
+
+ private MultiPartUploadStore<T, C> multiPartUploadStore(FileIO fileIO)
throws IOException {
+ if (fileIO instanceof RESTTokenFileIO) {
+ RESTTokenFileIO restTokenFileIO = (RESTTokenFileIO) fileIO;
+ fileIO = restTokenFileIO.fileIO();
+ }
+ return multiPartUploadStore(fileIO, targetPath());
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
index ed4cf3809e..3d1b5b0ce4 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
@@ -37,22 +37,28 @@ public abstract class
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
LoggerFactory.getLogger(MultiPartUploadTwoPhaseOutputStream.class);
private final ByteArrayOutputStream buffer;
- private final List<T> uploadedParts;
private final MultiPartUploadStore<T, C> multiPartUploadStore;
- private final String objectName;
- private String uploadId;
- private long position;
+ protected final String objectName;
+ protected final Path targetPath;
+ protected final String uploadId;
+
+ protected List<T> uploadedParts;
+ protected long position;
+
private boolean closed = false;
private Committer committer;
public MultiPartUploadTwoPhaseOutputStream(
- MultiPartUploadStore<T, C> multiPartUploadStore,
org.apache.hadoop.fs.Path hadoopPath)
+ MultiPartUploadStore<T, C> multiPartUploadStore,
+ org.apache.hadoop.fs.Path hadoopPath,
+ Path path)
throws IOException {
this.multiPartUploadStore = multiPartUploadStore;
this.buffer = new ByteArrayOutputStream();
this.uploadedParts = new ArrayList<>();
this.objectName = multiPartUploadStore.pathToObject(hadoopPath);
+ this.targetPath = path;
this.uploadId = multiPartUploadStore.startMultiPartUpload(objectName);
this.position = 0;
}
@@ -64,8 +70,7 @@ public abstract class MultiPartUploadTwoPhaseOutputStream<T,
C> extends TwoPhase
return 10 << 20;
}
- public abstract Committer committer(
- String uploadId, List<T> uploadedParts, String objectName, long
position);
+ public abstract Committer committer();
@Override
public long getPos() throws IOException {
@@ -142,7 +147,7 @@ public abstract class
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
uploadPart();
}
- return committer(uploadId, uploadedParts, objectName, position);
+ return committer();
}
private void uploadPart() throws IOException {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
index cccf731bff..f5dcdf7a1b 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
@@ -130,7 +130,7 @@ public class RenamingTwoPhaseOutputStream extends
TwoPhaseOutputStream {
}
@Override
- public Path targetFilePath() {
+ public Path targetPath() {
return targetPath;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
index 7975c4ea76..5f85b087d4 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
@@ -52,7 +52,7 @@ public abstract class TwoPhaseOutputStream extends
PositionOutputStream {
*/
void discard(FileIO fileIO) throws IOException;
- Path targetFilePath();
+ Path targetPath();
void clean(FileIO fileIO) throws IOException;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
index f5de31afb8..c619851ec5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
@@ -25,6 +25,7 @@ import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
@@ -112,6 +113,12 @@ public class RESTTokenFileIO implements FileIO {
return fileIO().newOutputStream(path, overwrite);
}
+ @Override
+ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean
overwrite)
+ throws IOException {
+ return fileIO().newTwoPhaseOutputStream(path, overwrite);
+ }
+
@Override
public FileStatus getFileStatus(Path path) throws IOException {
return fileIO().getFileStatus(path);
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
index ac20a65852..0a002fd783 100644
---
a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
@@ -68,7 +68,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
assertThat(store.getUploadedParts()).hasSize(3);
-
assertThat(committer.targetFilePath().toString()).isEqualTo(store.getStartedObjectName());
+
assertThat(committer.targetPath().toString()).isEqualTo(store.getStartedObjectName());
committer.commit(fileIO);
@@ -259,7 +259,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
private TestMultiPartUploadTwoPhaseOutputStream(
FakeMultiPartUploadStore store, org.apache.hadoop.fs.Path
path, int threshold)
throws IOException {
- super(store, path);
+ super(store, path, new Path(path.toString()));
this.store = store;
this.threshold = threshold;
}
@@ -270,8 +270,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
}
@Override
- public Committer committer(
- String uploadId, List<TestPart> uploadedParts, String
objectName, long position) {
+ public Committer committer() {
return new TestCommitter(store, uploadId, uploadedParts,
objectName, position);
}
}
@@ -319,7 +318,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
}
@Override
- public Path targetFilePath() {
+ public Path targetPath() {
return new Path(objectName);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
index e1fd5a903e..235dbd4310 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
@@ -91,7 +91,7 @@ public class FormatTableCommit implements BatchTableCommit {
} else if (overwrite) {
Set<Path> partitionPaths = new HashSet<>();
for (TwoPhaseOutputStream.Committer c : committers) {
- partitionPaths.add(c.targetFilePath().getParent());
+ partitionPaths.add(c.targetPath().getParent());
}
for (Path p : partitionPaths) {
deletePreviousDataFile(p);
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
index 98628d9338..7356b9687a 100644
---
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
@@ -114,7 +114,7 @@ public abstract class HadoopCompliantFileIO implements
FileIO {
protected org.apache.hadoop.fs.Path path(Path path) {
URI uri = path.toUri();
- if (uri.getScheme().equals("oss") && uri.getUserInfo() != null) {
+ if ("oss".equals(uri.getScheme()) && uri.getUserInfo() != null) {
path = new Path("oss:/" + uri.getPath());
}
return new org.apache.hadoop.fs.Path(path.toUri());
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
index 614ca36b79..970cfcc809 100644
---
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
@@ -133,7 +133,8 @@ public class JindoFileIO extends HadoopCompliantFileIO {
org.apache.hadoop.fs.Path hadoopPath = path(path);
Pair<JindoHadoopSystem, String> pair = getFileSystemPair(hadoopPath);
JindoHadoopSystem fs = pair.getKey();
- return new JindoTwoPhaseOutputStream(new JindoMultiPartUpload(fs,
hadoopPath), hadoopPath);
+ return new JindoTwoPhaseOutputStream(
+ new JindoMultiPartUpload(fs, hadoopPath), hadoopPath, path);
}
@Override
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
index ce5aab4122..6051d315ec 100644
---
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
@@ -37,17 +37,22 @@ import java.util.List;
/** Provides the multipart upload by Jindo. */
public class JindoMultiPartUpload implements
MultiPartUploadStore<JdoObjectPart, String> {
- private final JindoHadoopSystem fs;
private final JindoMpuStore mpuStore;
+ private final Path workingDirectory;
public JindoMultiPartUpload(JindoHadoopSystem fs, Path filePath) {
- this.fs = fs;
+ this.workingDirectory = fs.getWorkingDirectory();
this.mpuStore = fs.getMpuStore(filePath);
}
+ @Override
+ public String pathToObject(Path hadoopPath) {
+ return hadoopPath.toString();
+ }
+
@Override
public Path workingDirectory() {
- return fs.getWorkingDirectory();
+ return workingDirectory;
}
@Override
@@ -80,7 +85,7 @@ public class JindoMultiPartUpload implements
MultiPartUploadStore<JdoObjectPart,
ByteBuffer buffer;
try (FileInputStream fis = new FileInputStream(file);
FileChannel channel = fis.getChannel()) {
- buffer = ByteBuffer.allocate(byteLength);
+ buffer = ByteBuffer.allocateDirect(byteLength);
channel.read(buffer);
buffer.flip();
}
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
index 43448431b1..64205e82b0 100644
---
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
@@ -34,8 +34,12 @@ import java.util.List;
public class JindoMultiPartUploadCommitter
extends BaseMultiPartUploadCommitter<JdoObjectPart, String> {
public JindoMultiPartUploadCommitter(
- String uploadId, List<JdoObjectPart> uploadedParts, String
objectName, long position) {
- super(uploadId, uploadedParts, objectName, position);
+ String uploadId,
+ List<JdoObjectPart> uploadedParts,
+ String objectName,
+ long position,
+ Path targetPath) {
+ super(uploadId, uploadedParts, objectName, position, targetPath);
}
@Override
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
index fb85cbd3c3..9c6fb5df10 100644
---
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
@@ -20,11 +20,11 @@ package org.apache.paimon.jindo;
import org.apache.paimon.fs.MultiPartUploadStore;
import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
+import org.apache.paimon.fs.Path;
import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
import java.io.IOException;
-import java.util.List;
/** Jindo implementation of TwoPhaseOutputStream using multipart upload. */
public class JindoTwoPhaseOutputStream
@@ -32,14 +32,15 @@ public class JindoTwoPhaseOutputStream
public JindoTwoPhaseOutputStream(
MultiPartUploadStore<JdoObjectPart, String> multiPartUploadStore,
- org.apache.hadoop.fs.Path hadoopPath)
+ org.apache.hadoop.fs.Path hadoopPath,
+ Path targetPath)
throws IOException {
- super(multiPartUploadStore, hadoopPath);
+ super(multiPartUploadStore, hadoopPath, targetPath);
}
@Override
- public Committer committer(
- String uploadId, List<JdoObjectPart> uploadedParts, String
objectName, long position) {
- return new JindoMultiPartUploadCommitter(uploadId, uploadedParts,
objectName, position);
+ public Committer committer() {
+ return new JindoMultiPartUploadCommitter(
+ uploadId, uploadedParts, objectName, position, targetPath);
}
}
diff --git
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
index 3737a07f17..bdfa60e4af 100644
---
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
+++
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
@@ -123,7 +123,8 @@ public class OSSFileIO extends HadoopCompliantFileIO {
FileSystem fs = getFileSystem(hadoopPath);
return new OssTwoPhaseOutputStream(
new
OSSMultiPartUpload((org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem) fs),
- hadoopPath);
+ hadoopPath,
+ path);
}
public Options hadoopOptions() {
diff --git
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUploadCommitter.java
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUploadCommitter.java
index d1e0117ad0..dc2d7e3861 100644
---
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUploadCommitter.java
+++
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUploadCommitter.java
@@ -34,8 +34,12 @@ import java.util.List;
public class OSSMultiPartUploadCommitter
extends BaseMultiPartUploadCommitter<PartETag,
CompleteMultipartUploadResult> {
public OSSMultiPartUploadCommitter(
- String uploadId, List<PartETag> uploadedParts, String objectName,
long position) {
- super(uploadId, uploadedParts, objectName, position);
+ String uploadId,
+ List<PartETag> uploadedParts,
+ String objectName,
+ long position,
+ Path path) {
+ super(uploadId, uploadedParts, objectName, position, path);
}
@Override
diff --git
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
index edc15ac864..901c26c444 100644
---
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
+++
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
@@ -20,12 +20,12 @@ package org.apache.paimon.oss;
import org.apache.paimon.fs.MultiPartUploadStore;
import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
+import org.apache.paimon.fs.Path;
import com.aliyun.oss.model.CompleteMultipartUploadResult;
import com.aliyun.oss.model.PartETag;
import java.io.IOException;
-import java.util.List;
/** OSS implementation of TwoPhaseOutputStream using multipart upload. */
public class OssTwoPhaseOutputStream
@@ -33,14 +33,15 @@ public class OssTwoPhaseOutputStream
public OssTwoPhaseOutputStream(
MultiPartUploadStore<PartETag, CompleteMultipartUploadResult>
multiPartUploadStore,
- org.apache.hadoop.fs.Path hadoopPath)
+ org.apache.hadoop.fs.Path hadoopPath,
+ Path targetPath)
throws IOException {
- super(multiPartUploadStore, hadoopPath);
+ super(multiPartUploadStore, hadoopPath, targetPath);
}
@Override
- public Committer committer(
- String uploadId, List<PartETag> uploadedParts, String objectName,
long position) {
- return new OSSMultiPartUploadCommitter(uploadId, uploadedParts,
objectName, position);
+ public Committer committer() {
+ return new OSSMultiPartUploadCommitter(
+ uploadId, uploadedParts, objectName, position, targetPath);
}
}
diff --git
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
index 65017da573..8272518373 100644
---
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
+++
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
@@ -81,7 +81,8 @@ public class S3FileIO extends HadoopCompliantFileIO {
if (!overwrite && this.exists(path)) {
throw new IOException("File " + path + " already exists.");
}
- return new S3TwoPhaseOutputStream(new S3MultiPartUpload(fs,
fs.getConf()), hadoopPath);
+ return new S3TwoPhaseOutputStream(
+ new S3MultiPartUpload(fs, fs.getConf()), hadoopPath, path);
}
// add additional config entries from the IO config to the Hadoop config
diff --git
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java
index dcf4d24cd6..9d663e7be4 100644
---
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java
+++
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUploadCommitter.java
@@ -34,8 +34,12 @@ import java.util.List;
public class S3MultiPartUploadCommitter
extends BaseMultiPartUploadCommitter<PartETag,
CompleteMultipartUploadResult> {
public S3MultiPartUploadCommitter(
- String uploadId, List<PartETag> uploadedParts, String objectName,
long position) {
- super(uploadId, uploadedParts, objectName, position);
+ String uploadId,
+ List<PartETag> uploadedParts,
+ String objectName,
+ long position,
+ Path targetPath) {
+ super(uploadId, uploadedParts, objectName, position, targetPath);
}
@Override
diff --git
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
index 76a424ecd3..03ad1e0977 100644
---
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
+++
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
@@ -20,12 +20,12 @@ package org.apache.paimon.s3;
import org.apache.paimon.fs.MultiPartUploadStore;
import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
+import org.apache.paimon.fs.Path;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.PartETag;
import java.io.IOException;
-import java.util.List;
/** S3 implementation of TwoPhaseOutputStream using multipart upload. */
public class S3TwoPhaseOutputStream
@@ -33,14 +33,15 @@ public class S3TwoPhaseOutputStream
public S3TwoPhaseOutputStream(
MultiPartUploadStore<PartETag, CompleteMultipartUploadResult>
multiPartUploadStore,
- org.apache.hadoop.fs.Path hadoopPath)
+ org.apache.hadoop.fs.Path hadoopPath,
+ Path targetPath)
throws IOException {
- super(multiPartUploadStore, hadoopPath);
+ super(multiPartUploadStore, hadoopPath, targetPath);
}
@Override
- public Committer committer(
- String uploadId, List<PartETag> uploadedParts, String objectName,
long position) {
- return new S3MultiPartUploadCommitter(uploadId, uploadedParts,
objectName, position);
+ public Committer committer() {
+ return new S3MultiPartUploadCommitter(
+ uploadId, uploadedParts, objectName, position, targetPath);
}
}