This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0d61b363a2 [core] file io support TwoPhaseOutputStream (#6287)
0d61b363a2 is described below
commit 0d61b363a2598b883e5ed3a18bf9d7ebd8d9691c
Author: jerry <[email protected]>
AuthorDate: Fri Sep 19 15:48:16 2025 +0800
[core] file io support TwoPhaseOutputStream (#6287)
---
.../src/main/java/org/apache/paimon/fs/FileIO.java | 20 ++
.../org/apache/paimon/fs/MultiPartUploadStore.java | 50 +++
.../fs/MultiPartUploadTwoPhaseOutputStream.java | 222 ++++++++++++
.../paimon/fs/RenamingTwoPhaseOutputStream.java | 145 ++++++++
.../org/apache/paimon/fs/TwoPhaseOutputStream.java | 54 +++
.../fs/RenamingTwoPhaseOutputStreamTest.java | 141 ++++++++
.../main/java/org/apache/paimon/oss/OSSFileIO.java | 15 +
.../org/apache/paimon/oss/OSSMultiPartUpload.java | 72 ++++
.../apache/paimon/oss/OssTwoPhaseOutputStream.java | 44 +++
.../paimon/oss/OssTwoPhaseOutputStreamTest.java | 382 +++++++++++++++++++++
.../apache/paimon/s3/HadoopCompliantFileIO.java | 4 +-
.../main/java/org/apache/paimon/s3/S3FileIO.java | 13 +
.../org/apache/paimon/s3/S3MultiPartUpload.java | 120 +++++++
.../apache/paimon/s3/S3TwoPhaseOutputStream.java | 44 +++
.../paimon/s3/S3TwoPhaseOutputStreamTest.java | 368 ++++++++++++++++++++
15 files changed, 1692 insertions(+), 2 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index b7037693ee..5e5fe9fbfe 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -93,6 +93,26 @@ public interface FileIO extends Serializable, Closeable {
*/
PositionOutputStream newOutputStream(Path path, boolean overwrite) throws
IOException;
+ /**
+ * Opens a TwoPhaseOutputStream at the indicated Path for transactional
writing.
+ *
+ * <p>This method creates a stream that supports transactional writing
operations. The written
+ * data becomes visible only after calling commit on the returned
committer from closeForCommit
+ * method.
+ *
+ * @param path the file target path
+ * @param overwrite if a file with this name already exists, then if true,
the file will be
+ * overwritten, and if false an error will be thrown.
+ * @return a TwoPhaseOutputStream that supports transactional writes
+ * @throws IOException Thrown, if the stream could not be opened because
of an I/O, or because a
+ * file already exists at that path and the write mode indicates to
not overwrite the file.
+ * @throws UnsupportedOperationException if the filesystem does not
support transactional writes
+ */
+ default TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean
overwrite)
+ throws IOException {
+ return new RenamingTwoPhaseOutputStream(this, path, overwrite);
+ }
+
/**
* Return a file status object that represents the path.
*
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java
new file mode 100644
index 0000000000..2a18bed8a2
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/** MultiPartUploadStore. */
+public interface MultiPartUploadStore<T, C> {
+
+ default String pathToObject(Path hadoopPath) {
+ if (!hadoopPath.isAbsolute()) {
+ hadoopPath = new Path(workingDirectory(), hadoopPath);
+ }
+
+ return hadoopPath.toUri().getPath().substring(1);
+ }
+
+ Path workingDirectory();
+
+ String startMultiPartUpload(String objectName) throws IOException;
+
+ C completeMultipartUpload(
+ String objectName, String uploadId, List<T> partETags, long
numBytesInParts)
+ throws IOException;
+
+ T uploadPart(String objectName, String uploadId, int partNumber, File
file, long byteLength)
+ throws IOException;
+
+ void abortMultipartUpload(String objectName, String uploadId) throws
IOException;
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
new file mode 100644
index 0000000000..362635b4df
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/** According to multipart upload to support two phase commit. */
+public abstract class MultiPartUploadTwoPhaseOutputStream<T, C> extends
TwoPhaseOutputStream {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MultiPartUploadTwoPhaseOutputStream.class);
+
+ private final ByteArrayOutputStream buffer;
+ private final List<T> uploadedParts;
+ private final MultiPartUploadStore<T, C> multiPartUploadStore;
+ private final String objectName;
+
+ private String uploadId;
+ private long position;
+ private boolean closed = false;
+
+ public MultiPartUploadTwoPhaseOutputStream(
+ MultiPartUploadStore<T, C> multiPartUploadStore,
org.apache.hadoop.fs.Path hadoopPath)
+ throws IOException {
+ this.multiPartUploadStore = multiPartUploadStore;
+ this.buffer = new ByteArrayOutputStream();
+ this.uploadedParts = new ArrayList<>();
+ this.objectName = multiPartUploadStore.pathToObject(hadoopPath);
+ this.uploadId = multiPartUploadStore.startMultiPartUpload(objectName);
+ this.position = 0;
+ }
+
+ public abstract long partSizeThreshold();
+
+ @Override
+ public long getPos() throws IOException {
+ return position;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (closed) {
+ throw new IOException("Stream is closed");
+ }
+ buffer.write(b);
+ position++;
+ if (buffer.size() >= partSizeThreshold()) {
+ uploadPart();
+ }
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (closed) {
+ throw new IOException("Stream is closed");
+ }
+ buffer.write(b, off, len);
+ position += len;
+ if (buffer.size() >= partSizeThreshold()) {
+ uploadPart();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (closed) {
+ throw new IOException("Stream is closed");
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ Committer committer = closeForCommit();
+ committer.commit();
+ }
+ }
+
+ @Override
+ public Committer closeForCommit() throws IOException {
+ if (closed) {
+ throw new IOException("Stream is already closed");
+ }
+ closed = true;
+
+ if (buffer.size() > 0) {
+ uploadPart();
+ }
+
+ return new MultiPartUploadCommitter(
+ multiPartUploadStore, uploadId, uploadedParts, objectName,
position);
+ }
+
+ private void uploadPart() throws IOException {
+ if (buffer.size() == 0) {
+ return;
+ }
+
+ File tempFile = null;
+ try {
+ byte[] data = buffer.toByteArray();
+ tempFile = Files.createTempFile("multi-part-" + UUID.randomUUID(),
".tmp").toFile();
+ try (FileOutputStream fos = new FileOutputStream(tempFile)) {
+ fos.write(data);
+ fos.flush();
+ }
+ T partETag =
+ multiPartUploadStore.uploadPart(
+ objectName, uploadId, uploadedParts.size() + 1,
tempFile, data.length);
+ uploadedParts.add(partETag);
+ buffer.reset();
+ } catch (Exception e) {
+ throw new IOException(
+ "Failed to upload part "
+ + (uploadedParts.size() + 1)
+ + " for upload ID: "
+ + uploadId,
+ e);
+ } finally {
+ if (tempFile != null && tempFile.exists()) {
+ if (!tempFile.delete()) {
+ LOG.warn("Failed to delete temporary file: {}",
tempFile.getAbsolutePath());
+ }
+ }
+ }
+ }
+
+ private static class MultiPartUploadCommitter<T, C> implements Committer {
+
+ private final MultiPartUploadStore<T, C> multiPartUploadStore;
+ private final String uploadId;
+ private final String objectName;
+ private final List<T> uploadedParts;
+ private final long byteLength;
+ private boolean committed = false;
+ private boolean discarded = false;
+
+ public MultiPartUploadCommitter(
+ MultiPartUploadStore<T, C> multiPartUploadStore,
+ String uploadId,
+ List<T> uploadedParts,
+ String objectName,
+ long byteLength) {
+ this.multiPartUploadStore = multiPartUploadStore;
+ this.uploadId = uploadId;
+ this.objectName = objectName;
+ this.uploadedParts = new ArrayList<>(uploadedParts);
+ this.byteLength = byteLength;
+ }
+
+ @Override
+ public void commit() throws IOException {
+ if (committed) {
+ return;
+ }
+ if (discarded) {
+ throw new IOException("Cannot commit: committer has been
discarded");
+ }
+
+ try {
+ multiPartUploadStore.completeMultipartUpload(
+ objectName, uploadId, uploadedParts, byteLength);
+ committed = true;
+ LOG.info(
+ "Successfully committed multipart upload with ID: {}
for objectName: {}",
+ uploadId,
+ objectName);
+ } catch (Exception e) {
+ throw new IOException("Failed to commit multipart upload with
ID: " + uploadId, e);
+ }
+ }
+
+ @Override
+ public void discard() throws IOException {
+ if (discarded) {
+ return;
+ }
+
+ try {
+ multiPartUploadStore.abortMultipartUpload(objectName,
uploadId);
+ discarded = true;
+ LOG.info(
+ "Successfully discarded multipart upload with ID: {}
for objectName: {}",
+ uploadId,
+ objectName);
+ } catch (Exception e) {
+ LOG.warn("Failed to discard multipart upload with ID: {}",
uploadId, e);
+ }
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
new file mode 100644
index 0000000000..0da827d920
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.apache.paimon.annotation.Public;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * A {@link TwoPhaseOutputStream} implementation that writes to a temporary
file and commits by
+ * renaming to the target path. This follows HDFS-style commit semantics.
+ */
+@Public
+public class RenamingTwoPhaseOutputStream extends TwoPhaseOutputStream {
+
+ private final FileIO fileIO;
+ private final Path targetPath;
+ private final Path tempPath;
+ private final PositionOutputStream tempOutputStream;
+
+ public RenamingTwoPhaseOutputStream(FileIO fileIO, Path targetPath,
boolean overwrite)
+ throws IOException {
+ if (!overwrite && fileIO.exists(targetPath)) {
+ throw new IOException("File " + targetPath + " already exists.");
+ }
+ this.fileIO = fileIO;
+ this.targetPath = targetPath;
+ this.tempPath = generateTempPath(targetPath);
+
+ // Create temporary file
+ this.tempOutputStream = fileIO.newOutputStream(tempPath, overwrite);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ tempOutputStream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ tempOutputStream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ tempOutputStream.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ tempOutputStream.flush();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return tempOutputStream.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ tempOutputStream.close();
+ }
+
+ @Override
+ public Committer closeForCommit() throws IOException {
+ close();
+ return new TempFileCommitter(fileIO, tempPath, targetPath);
+ }
+
+ /**
+ * Generate a temporary file path based on the target path. The temp file
will be in the same
+ * directory as the target with a unique suffix.
+ */
+ private Path generateTempPath(Path targetPath) {
+ String tempFileName = ".tmp." + UUID.randomUUID();
+ return new Path(targetPath.getParent(), tempFileName);
+ }
+
+ /** Committer implementation that renames temporary file to target path. */
+ private static class TempFileCommitter implements Committer {
+
+ private final FileIO fileIO;
+ private final Path tempPath;
+ private final Path targetPath;
+ private boolean committed = false;
+ private boolean discarded = false;
+
+ public TempFileCommitter(FileIO fileIO, Path tempPath, Path
targetPath) {
+ this.fileIO = fileIO;
+ this.tempPath = tempPath;
+ this.targetPath = targetPath;
+ }
+
+ @Override
+ public void commit() throws IOException {
+ if (committed || discarded) {
+ throw new IOException("Committer has already been used");
+ }
+
+ try {
+ Path parentDir = targetPath.getParent();
+ if (parentDir != null && !fileIO.exists(parentDir)) {
+ fileIO.mkdirs(parentDir);
+ }
+
+ if (!fileIO.rename(tempPath, targetPath)) {
+ throw new IOException("Failed to rename " + tempPath + "
to " + targetPath);
+ }
+
+ committed = true;
+
+ } catch (IOException e) {
+ // Clean up temp file on failure
+ fileIO.deleteQuietly(tempPath);
+ throw new IOException(
+ "Failed to commit temporary file " + tempPath + " to "
+ targetPath, e);
+ }
+ }
+
+ @Override
+ public void discard() {
+ if (!committed && !discarded) {
+ fileIO.deleteQuietly(tempPath);
+ discarded = true;
+ }
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
new file mode 100644
index 0000000000..401486e91b
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.io.IOException;
+
+/** TwoPhaseOutputStream provides a way to write to a file and get a committer
that can commit. */
+public abstract class TwoPhaseOutputStream extends PositionOutputStream {
+ /**
+ * Closes the stream for writing and returns a committer that can be used
to make the written
+ * data visible.
+ *
+ * <p>After calling this method, the stream should not be used for writing
anymore. The returned
+ * committer can be used to commit the data or discard it.
+ *
+ * @return A committer that can be used to commit the data
+ * @throws IOException if an I/O error occurs during closing
+ */
+ public abstract Committer closeForCommit() throws IOException;
+
+ /** A committer interface that can commit or discard the written data. */
+ public interface Committer {
+
+ /**
+ * Commits the written data, making it visible.
+ *
+ * @throws IOException if an I/O error occurs during commit
+ */
+ void commit() throws IOException;
+
+ /**
+ * Discards the written data, cleaning up any temporary files or
resources.
+ *
+ * @throws IOException if an I/O error occurs during discard
+ */
+ void discard() throws IOException;
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStreamTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStreamTest.java
new file mode 100644
index 0000000000..a68ccb8700
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStreamTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link RenamingTwoPhaseOutputStream}. */
+public class RenamingTwoPhaseOutputStreamTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private FileIO fileIO;
+ private Path targetPath;
+
+ @BeforeEach
+ void setup() {
+ fileIO = new LocalFileIO();
+ targetPath = new Path(tempDir.resolve("target-file.txt").toString());
+ }
+
+ @Test
+ void testSuccessfulCommit() throws IOException {
+ RenamingTwoPhaseOutputStream stream =
+ new RenamingTwoPhaseOutputStream(fileIO, targetPath, false);
+
+ // Write some data
+ String testData = "Hello, World!";
+ stream.write(testData.getBytes());
+
+ // Close for commit
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+ // Target file should not exist yet
+ assertThat(fileIO.exists(targetPath)).isFalse();
+
+ // Commit the file
+ committer.commit();
+
+ // Now target file should exist with correct content
+ assertThat(fileIO.exists(targetPath)).isTrue();
+
+ // Read and verify content
+ byte[] content = Files.readAllBytes(Paths.get(targetPath.toString()));
+ assertThat(new String(content)).isEqualTo(testData);
+ }
+
+ @Test
+ void testDiscard() throws IOException {
+ RenamingTwoPhaseOutputStream stream =
+ new RenamingTwoPhaseOutputStream(fileIO, targetPath, false);
+
+ // Write some data
+ stream.write("Some data".getBytes());
+
+ // Close for commit
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+ // Discard instead of commit
+ committer.discard();
+
+ // Target file should not exist
+ assertThat(fileIO.exists(targetPath)).isFalse();
+ }
+
+ @Test
+ void testCloseWithoutCommit() throws IOException {
+ RenamingTwoPhaseOutputStream stream =
+ new RenamingTwoPhaseOutputStream(fileIO, targetPath, false);
+ // Write some data
+ stream.write("Some data".getBytes());
+
+ // Just close (not closeForCommit)
+ stream.close();
+
+ // Target file should not exist (temp file cleaned up)
+ assertThat(fileIO.exists(targetPath)).isFalse();
+ }
+
+ @Test
+ void testDoubleCommitThrows() throws IOException {
+ RenamingTwoPhaseOutputStream stream =
+ new RenamingTwoPhaseOutputStream(fileIO, targetPath, false);
+
+ stream.write("data".getBytes());
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+ // First commit should succeed
+ committer.commit();
+
+ // Second commit should throw
+ assertThatThrownBy(committer::commit).isInstanceOf(IOException.class);
+ }
+
+ @Test
+ void testPositionTracking() throws IOException {
+ RenamingTwoPhaseOutputStream stream =
+ new RenamingTwoPhaseOutputStream(fileIO, targetPath, false);
+
+ assertThat(stream.getPos()).isEqualTo(0);
+
+ stream.write("Hello".getBytes());
+ assertThat(stream.getPos()).isEqualTo(5);
+
+ stream.write(" World!".getBytes());
+ assertThat(stream.getPos()).isEqualTo(12);
+
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+ committer.commit();
+
+ // Verify final content
+ byte[] content = Files.readAllBytes(Paths.get(targetPath.toString()));
+ assertThat(new String(content)).isEqualTo("Hello World!");
+ }
+}
diff --git
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
index 4c457598e7..7e9d8fc68a 100644
---
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
+++
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
@@ -20,6 +20,8 @@ package org.apache.paimon.oss;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.IOUtils;
@@ -106,6 +108,19 @@ public class OSSFileIO extends HadoopCompliantFileIO {
}
}
+ @Override
+ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean
overwrite)
+ throws IOException {
+ if (!overwrite && this.exists(path)) {
+ throw new IOException("File " + path + " already exists.");
+ }
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+ FileSystem fs = getFileSystem(hadoopPath);
+ return new OssTwoPhaseOutputStream(
+ new
OSSMultiPartUpload((org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem) fs),
+ hadoopPath);
+ }
+
public Options hadoopOptions() {
return hadoopOptions;
}
diff --git
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java
new file mode 100644
index 0000000000..6ea486f683
--- /dev/null
+++
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.oss;
+
+import org.apache.paimon.fs.MultiPartUploadStore;
+
+import com.aliyun.oss.model.CompleteMultipartUploadResult;
+import com.aliyun.oss.model.PartETag;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
+import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/** Provides the multipart upload by Aliyun OSS. */
+public class OSSMultiPartUpload
+ implements MultiPartUploadStore<PartETag,
CompleteMultipartUploadResult> {
+
+ private AliyunOSSFileSystem fs;
+ private AliyunOSSFileSystemStore store;
+
+ public OSSMultiPartUpload(AliyunOSSFileSystem fs) {
+ this.fs = fs;
+ this.store = fs.getStore();
+ }
+
+ @Override
+ public Path workingDirectory() {
+ return fs.getWorkingDirectory();
+ }
+
+ @Override
+ public String startMultiPartUpload(String objectName) throws IOException {
+ return store.getUploadId(objectName);
+ }
+
+ @Override
+ public CompleteMultipartUploadResult completeMultipartUpload(
+ String objectName, String uploadId, List<PartETag> partETags, long
numBytesInParts) {
+ return store.completeMultipartUpload(objectName, uploadId, partETags);
+ }
+
+ @Override
+ public PartETag uploadPart(
+ String objectName, String uploadId, int partNumber, File file,
long byteLength)
+ throws IOException {
+ return store.uploadPart(file, objectName, uploadId, partNumber);
+ }
+
+ @Override
+ public void abortMultipartUpload(String objectName, String uploadId) {
+ store.abortMultipartUpload(objectName, uploadId);
+ }
+}
diff --git
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
new file mode 100644
index 0000000000..cce331d8d0
--- /dev/null
+++
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.oss;
+
+import org.apache.paimon.fs.MultiPartUploadStore;
+import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
+
+import com.aliyun.oss.model.CompleteMultipartUploadResult;
+import com.aliyun.oss.model.PartETag;
+
+import java.io.IOException;
+
+/** OSS implementation of TwoPhaseOutputStream using multipart upload. */
+public class OssTwoPhaseOutputStream
+ extends MultiPartUploadTwoPhaseOutputStream<PartETag,
CompleteMultipartUploadResult> {
+
+ public OssTwoPhaseOutputStream(
+ MultiPartUploadStore<PartETag, CompleteMultipartUploadResult>
multiPartUploadStore,
+ org.apache.hadoop.fs.Path hadoopPath)
+ throws IOException {
+ super(multiPartUploadStore, hadoopPath);
+ }
+
+ @Override
+ public long partSizeThreshold() {
+ return 10L << 20;
+ }
+}
diff --git
a/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OssTwoPhaseOutputStreamTest.java
b/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OssTwoPhaseOutputStreamTest.java
new file mode 100644
index 0000000000..ff71b571dc
--- /dev/null
+++
b/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OssTwoPhaseOutputStreamTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.oss;
+
+import org.apache.paimon.fs.TwoPhaseOutputStream;
+
+import com.aliyun.oss.model.CompleteMultipartUploadResult;
+import com.aliyun.oss.model.PartETag;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link OssTwoPhaseOutputStream}. */
+public class OssTwoPhaseOutputStreamTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private OssTwoPhaseOutputStream stream;
+ private MockOSSMultiPartUpload mockAccessor;
+ private org.apache.hadoop.fs.Path hadoopPath;
+ private File targetFile;
+
+ @BeforeEach
+ void setup() throws IOException {
+ hadoopPath = new org.apache.hadoop.fs.Path("/test/file.parquet");
+ targetFile = tempDir.resolve("target-file.parquet").toFile();
+ targetFile.getParentFile().mkdirs();
+
+ mockAccessor = new MockOSSMultiPartUpload(targetFile);
+ }
+
+ private OssTwoPhaseOutputStream createStream() throws IOException {
+ return new OssTwoPhaseOutputStream(mockAccessor, hadoopPath);
+ }
+
+ @Test
+ void testLargeDataMultipleParts() throws IOException {
+ stream = createStream();
+
+ // Write data larger than MIN_PART_SIZE to trigger automatic part
upload,
+ // plus some extra data to ensure there's remaining data for final
upload
+ byte[] largeData = new byte[120 * 1024 * 1024]; // 120MB - will
trigger first upload
+ for (int i = 0; i < largeData.length; i++) {
+ largeData[i] = (byte) (i % 256);
+ }
+
+ stream.write(largeData);
+
+ // Write additional data that will remain in buffer for final upload
+ byte[] extraData = "Additional data for final part".getBytes();
+ stream.write(extraData);
+
+ assertThat(stream.getPos()).isEqualTo(largeData.length +
extraData.length);
+
+ // Should have triggered automatic part upload
+ assertThat(mockAccessor.startMultipartUploadCalled).isTrue();
+ assertThat(mockAccessor.uploadPartCalls).isEqualTo(1); // One part
uploaded automatically
+
+ // Close for commit (uploads remaining data)
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+ // Should have uploaded the remaining data as a final part
+ assertThat(mockAccessor.uploadPartCalls).isEqualTo(2); // Initial +
final part
+
+ // Commit
+ committer.commit();
+
+ assertThat(mockAccessor.completeMultipartUploadCalled).isTrue();
+
+ // Verify target file contains all the data
+ assertThat(targetFile.exists()).isTrue();
+ byte[] writtenContent = Files.readAllBytes(targetFile.toPath());
+
+ // Combine expected data
+ byte[] expectedData = new byte[largeData.length + extraData.length];
+ System.arraycopy(largeData, 0, expectedData, 0, largeData.length);
+ System.arraycopy(extraData, 0, expectedData, largeData.length,
extraData.length);
+
+ assertThat(writtenContent).isEqualTo(expectedData);
+ }
+
+ @Test
+ void testDiscard() throws IOException {
+ stream = createStream();
+
+ stream.write("Some data".getBytes());
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+ // Discard instead of commit
+ committer.discard();
+
+ // Verify abort was called, not complete
+ assertThat(mockAccessor.abortMultipartUploadCalled).isTrue();
+ assertThat(mockAccessor.completeMultipartUploadCalled).isFalse();
+
+ // Target file should not exist
+ assertThat(targetFile.exists()).isFalse();
+ }
+
+ @Test
+ void testCommitFailure() throws IOException {
+ stream = createStream();
+ mockAccessor.completeMultipartUploadShouldFail = true;
+
+ stream.write("data".getBytes());
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+ assertThatThrownBy(committer::commit)
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("Failed to commit multipart upload");
+
+ // Target file should not exist on failed commit
+ assertThat(targetFile.exists()).isFalse();
+ }
+
+ @Test
+ void testUploadPartFailure() throws IOException {
+ stream = createStream();
+ mockAccessor.uploadPartShouldFail = true;
+
+ // Write data and then close to trigger uploadPart during
closeForCommit
+ stream.write("test data".getBytes());
+
+ assertThatThrownBy(() -> stream.closeForCommit())
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("Failed to upload part");
+ }
+
+ @Test
+ void testPositionTracking() throws IOException {
+ stream = createStream();
+
+ assertThat(stream.getPos()).isEqualTo(0);
+
+ stream.write("Hello".getBytes());
+ assertThat(stream.getPos()).isEqualTo(5);
+
+ stream.write(" OSS".getBytes());
+ assertThat(stream.getPos()).isEqualTo(9);
+
+ stream.write(" World!".getBytes());
+ assertThat(stream.getPos()).isEqualTo(16);
+
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+ committer.commit();
+
+ assertThat(mockAccessor.completeMultipartUploadCalled).isTrue();
+
+ // Verify final content
+ String writtenContent = new
String(Files.readAllBytes(targetFile.toPath()));
+ assertThat(writtenContent).isEqualTo("Hello OSS World!");
+ }
+
+ @Test
+ void testCommitAfterDiscard() throws IOException {
+ stream = createStream();
+ stream.write("data".getBytes());
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+ committer.discard();
+
+ assertThatThrownBy(committer::commit)
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("Cannot commit: committer has been
discarded");
+
+ // Target file should not exist
+ assertThat(targetFile.exists()).isFalse();
+ }
+
+ /**
+ * Mock implementation that actually uses local files to simulate OSS
multipart upload behavior.
+ * Extends OSSAccessor but overrides all methods to avoid initialization
issues.
+ */
+ private static class MockOSSMultiPartUpload extends OSSMultiPartUpload {
+
+ boolean startMultipartUploadCalled = false;
+ int uploadPartCalls = 0;
+ boolean completeMultipartUploadCalled = false;
+ int completeMultipartUploadCallCount = 0;
+ boolean abortMultipartUploadCalled = false;
+
+ boolean uploadPartShouldFail = false;
+ boolean completeMultipartUploadShouldFail = false;
+
+ private final String mockUploadId = "mock-upload-" + UUID.randomUUID();
+ private final List<File> tempPartFiles = new ArrayList<>();
+ private final File targetFile;
+
+ @SuppressWarnings("unused")
+ public MockOSSMultiPartUpload(File targetFile) {
+ super(createStubFileSystem()); // Create minimal stub to avoid
null pointer
+ this.targetFile = targetFile;
+ }
+
+ private static org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
createStubFileSystem() {
+ // Create a minimal stub to avoid NullPointerException during
initialization
+ return new StubAliyunOSSFileSystem();
+ }
+
+ @Override
+ public String pathToObject(org.apache.hadoop.fs.Path hadoopPath) {
+ return hadoopPath.toUri().getPath().substring(1);
+ }
+
+ @Override
+ public String startMultiPartUpload(String objectName) {
+ startMultipartUploadCalled = true;
+ return mockUploadId;
+ }
+
+ @Override
+ public PartETag uploadPart(
+ String objectName, String uploadId, int partNumber, File file,
long byteLength)
+ throws IOException {
+ uploadPartCalls++;
+
+ if (uploadPartShouldFail) {
+ throw new IOException("Mock upload part failure");
+ }
+
+ // Verify file exists and has content
+ if (!file.exists() || file.length() == 0) {
+ throw new IOException("Invalid file for upload: " + file);
+ }
+
+ // Store the part file in a temporary location (simulating storing
in OSS)
+ File partFile =
+ Files.createTempFile("mock-oss-part-" + partNumber + "-",
".tmp").toFile();
+ Files.copy(file.toPath(), partFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
+ tempPartFiles.add(partFile);
+
+ MockPartETag mockPartETag = new MockPartETag(partNumber,
"mock-etag-" + partNumber);
+ return mockPartETag;
+ }
+
+ @Override
+ public CompleteMultipartUploadResult completeMultipartUpload(
+ String objectName,
+ String uploadId,
+ List<PartETag> partETags,
+ long numBytesInParts) {
+ completeMultipartUploadCalled = true;
+ completeMultipartUploadCallCount++;
+
+ if (completeMultipartUploadShouldFail) {
+ throw new RuntimeException("Mock complete multipart upload
failure");
+ }
+
+ // Simulate combining all parts into the final target file
+ try {
+ try (FileOutputStream fos = new FileOutputStream(targetFile)) {
+ for (File partFile : tempPartFiles) {
+ try (FileInputStream fis = new
FileInputStream(partFile)) {
+ byte[] buffer = new byte[8192];
+ int bytesRead;
+ while ((bytesRead = fis.read(buffer)) != -1) {
+ fos.write(buffer, 0, bytesRead);
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to complete multipart
upload", e);
+ }
+
+ // Clean up temp part files
+ for (File partFile : tempPartFiles) {
+ partFile.delete();
+ }
+ tempPartFiles.clear();
+
+ return new MockCompleteMultipartUploadResult(objectName,
"mock-final-etag");
+ }
+
+ @Override
+ public void abortMultipartUpload(String objectName, String uploadId) {
+ abortMultipartUploadCalled = true;
+
+ // Clean up temp part files on abort
+ for (File partFile : tempPartFiles) {
+ partFile.delete();
+ }
+ tempPartFiles.clear();
+
+ // Ensure target file doesn't exist
+ if (targetFile.exists()) {
+ targetFile.delete();
+ }
+ }
+ }
+
+ /** Mock implementation of PartETag. */
+ private static class MockPartETag extends PartETag {
+ private final int partNumber;
+ private final String eTag;
+
+ public MockPartETag(int partNumber, String eTag) {
+ super(partNumber, eTag);
+ this.partNumber = partNumber;
+ this.eTag = eTag;
+ }
+
+ @Override
+ public int getPartNumber() {
+ return partNumber;
+ }
+
+ @Override
+ public String getETag() {
+ return eTag;
+ }
+ }
+
+ /** Mock implementation of CompleteMultipartUploadResult. */
+ private static class MockCompleteMultipartUploadResult extends
CompleteMultipartUploadResult {
+ private final String key;
+ private final String eTag;
+
+ public MockCompleteMultipartUploadResult(String key, String eTag) {
+ this.key = key;
+ this.eTag = eTag;
+ }
+
+ @Override
+ public String getKey() {
+ return key;
+ }
+
+ @Override
+ public String getETag() {
+ return eTag;
+ }
+ }
+
+ /**
+ * Minimal stub implementation to avoid NullPointerException during
OSSAccessor initialization.
+ */
+ private static class StubAliyunOSSFileSystem
+ extends org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem {
+ private final StubAliyunOSSFileSystemStore stubStore = new
StubAliyunOSSFileSystemStore();
+
+ @Override
+ public org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore
getStore() {
+ return stubStore;
+ }
+ }
+
+ /** Minimal stub implementation for the store. */
+ private static class StubAliyunOSSFileSystemStore
+ extends org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore {
+ // Empty stub - we override all methods in MockOSSAccessor anyway
+ }
+}
diff --git
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
index 9238251033..a662e8a075 100644
---
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
+++
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
@@ -123,11 +123,11 @@ public abstract class HadoopCompliantFileIO implements
FileIO {
return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst);
}
- private org.apache.hadoop.fs.Path path(Path path) {
+ protected org.apache.hadoop.fs.Path path(Path path) {
return new org.apache.hadoop.fs.Path(path.toUri());
}
- private FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws
IOException {
+ protected FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws
IOException {
if (fsMap == null) {
synchronized (this) {
if (fsMap == null) {
diff --git
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
index a58aa18eac..65017da573 100644
---
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
+++
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
@@ -20,6 +20,8 @@ package org.apache.paimon.s3;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
import org.apache.paimon.options.Options;
import org.apache.hadoop.conf.Configuration;
@@ -71,6 +73,17 @@ public class S3FileIO extends HadoopCompliantFileIO {
this.hadoopOptions =
mirrorCertainHadoopConfig(loadHadoopConfigFromContext(context));
}
+ @Override
+ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean
overwrite)
+ throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+ S3AFileSystem fs = (S3AFileSystem) getFileSystem(hadoopPath);
+ if (!overwrite && this.exists(path)) {
+ throw new IOException("File " + path + " already exists.");
+ }
+ return new S3TwoPhaseOutputStream(new S3MultiPartUpload(fs,
fs.getConf()), hadoopPath);
+ }
+
// add additional config entries from the IO config to the Hadoop config
private Options loadHadoopConfigFromContext(CatalogContext context) {
Options hadoopConfig = new Options();
diff --git
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
new file mode 100644
index 0000000000..4fd3590027
--- /dev/null
+++
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.s3;
+
+import org.apache.paimon.fs.MultiPartUploadStore;
+
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.store.audit.AuditSpan;
+import org.apache.hadoop.fs.store.audit.AuditSpanSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Provides the multipart upload by Amazon S3. */
+public class S3MultiPartUpload
+ implements MultiPartUploadStore<PartETag,
CompleteMultipartUploadResult> {
+
+ private final S3AFileSystem s3a;
+
+ private final InternalWriteOperationHelper s3accessHelper;
+
+ public S3MultiPartUpload(S3AFileSystem s3a, Configuration conf) {
+ checkNotNull(s3a);
+ this.s3accessHelper =
+ new InternalWriteOperationHelper(
+ s3a,
+ checkNotNull(conf),
+ s3a.createStoreContext().getInstrumentation(),
+ s3a.getAuditSpanSource(),
+ s3a.getActiveAuditSpan());
+ this.s3a = s3a;
+ }
+
+ @Override
+ public Path workingDirectory() {
+ return s3a.getWorkingDirectory();
+ }
+
+ @Override
+ public String startMultiPartUpload(String objectName) throws IOException {
+ return s3accessHelper.initiateMultiPartUpload(objectName);
+ }
+
+ @Override
+ public CompleteMultipartUploadResult completeMultipartUpload(
+ String objectName, String uploadId, List<PartETag> partETags, long
numBytesInParts)
+ throws IOException {
+ return s3accessHelper.completeMPUwithRetries(
+ objectName, uploadId, partETags, numBytesInParts, new
AtomicInteger(0));
+ }
+
+ @Override
+ public PartETag uploadPart(
+ String objectName, String uploadId, int partNumber, File file,
long byteLength)
+ throws IOException {
+ final UploadPartRequest uploadRequest =
+ s3accessHelper.newUploadPartRequest(
+ objectName,
+ uploadId,
+ partNumber,
+ checkedDownCast(byteLength),
+ null,
+ file,
+ 0L);
+ return s3accessHelper.uploadPart(uploadRequest).getPartETag();
+ }
+
+ @Override
+ public void abortMultipartUpload(String destKey, String uploadId) throws
IOException {
+ s3accessHelper.abortMultipartUpload(destKey, uploadId, false, null);
+ }
+
+ private static final class InternalWriteOperationHelper extends
WriteOperationHelper {
+
+ InternalWriteOperationHelper(
+ S3AFileSystem owner,
+ Configuration conf,
+ S3AStatisticsContext statisticsContext,
+ AuditSpanSource auditSpanSource,
+ AuditSpan auditSpan) {
+ super(owner, conf, statisticsContext, auditSpanSource, auditSpan);
+ }
+ }
+
+ private static int checkedDownCast(long value) {
+ int downCast = (int) value;
+ if (downCast != value) {
+ throw new IllegalArgumentException(
+ "Cannot downcast long value " + value + " to integer.");
+ }
+ return downCast;
+ }
+}
diff --git
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
new file mode 100644
index 0000000000..d66b134d5c
--- /dev/null
+++
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.s3;
+
+import org.apache.paimon.fs.MultiPartUploadStore;
+import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
+
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+
+import java.io.IOException;
+
+/** S3 implementation of TwoPhaseOutputStream using multipart upload. */
+public class S3TwoPhaseOutputStream
+ extends MultiPartUploadTwoPhaseOutputStream<PartETag,
CompleteMultipartUploadResult> {
+
+ public S3TwoPhaseOutputStream(
+ MultiPartUploadStore<PartETag, CompleteMultipartUploadResult>
multiPartUploadStore,
+ org.apache.hadoop.fs.Path hadoopPath)
+ throws IOException {
+ super(multiPartUploadStore, hadoopPath);
+ }
+
+ @Override
+ public long partSizeThreshold() {
+ return 5L << 20;
+ }
+}
diff --git
a/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3TwoPhaseOutputStreamTest.java
b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3TwoPhaseOutputStreamTest.java
new file mode 100644
index 0000000000..3712916e00
--- /dev/null
+++
b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3TwoPhaseOutputStreamTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.s3;
+
+import org.apache.paimon.fs.TwoPhaseOutputStream;
+
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link S3TwoPhaseOutputStream}. */
+class S3TwoPhaseOutputStreamTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private MockS3MultiPartUpload mockAccessor;
+ private S3TwoPhaseOutputStream stream;
+ private File targetFile;
+ private org.apache.hadoop.fs.Path hadoopPath;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ hadoopPath = new org.apache.hadoop.fs.Path("/test/file.parquet");
+ targetFile = tempDir.resolve("target-file.parquet").toFile();
+ targetFile.getParentFile().mkdirs();
+
+ mockAccessor = new MockS3MultiPartUpload(targetFile);
+ }
+
+ private S3TwoPhaseOutputStream createStream() throws IOException {
+ return new S3TwoPhaseOutputStream(mockAccessor, hadoopPath);
+ }
+
+ @Test
+ void testLargeDataMultipleParts() throws IOException {
+ stream = createStream();
+
+ // Create data larger than MIN_PART_SIZE (5MB) to trigger multiple
part uploads
+ byte[] bigData = new byte[6 * 1024 * 1024]; // 6MB
+ for (int i = 0; i < bigData.length; i++) {
+ bigData[i] = (byte) (i % 256);
+ }
+
+ // Write the large data
+ stream.write(bigData);
+
+ // This should trigger automatic part uploads during write
+ assertThat(mockAccessor.startMultipartUploadCalled).isTrue();
+ assertThat(mockAccessor.uploadPartCalls).isGreaterThan(0);
+
+ // Add more data to ensure closeForCommit creates another part
+ String additionalData = "Additional data for final part";
+ stream.write(additionalData.getBytes());
+
+ assertThat(stream.getPos()).isEqualTo(bigData.length +
additionalData.length());
+
+ // Close for commit
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+ // Should have uploaded multiple parts
+ assertThat(mockAccessor.uploadPartCalls).isGreaterThan(1);
+ assertThat(mockAccessor.completeMultipartUploadCalled).isFalse();
+
+ // Target file should not exist yet
+ assertThat(targetFile.exists()).isFalse();
+
+ // Commit
+ committer.commit();
+
+ // Verify complete multipart upload was called
+ assertThat(mockAccessor.completeMultipartUploadCalled).isTrue();
+ assertThat(mockAccessor.abortMultipartUploadCalled).isFalse();
+
+ // Target file should now exist with correct content
+ assertThat(targetFile.exists()).isTrue();
+
+ // Verify the content is correct by reading it back
+ byte[] writtenContent = Files.readAllBytes(targetFile.toPath());
+ assertThat(writtenContent).hasSize(bigData.length +
additionalData.length());
+
+ // Check first part (bigData)
+ for (int i = 0; i < bigData.length; i++) {
+ assertThat(writtenContent[i]).isEqualTo(bigData[i]);
+ }
+
+ // Check additional data at the end
+ byte[] additionalBytes = additionalData.getBytes();
+ for (int i = 0; i < additionalBytes.length; i++) {
+ assertThat(writtenContent[bigData.length +
i]).isEqualTo(additionalBytes[i]);
+ }
+ }
+
+ @Test
+ void testDiscard() throws IOException {
+ stream = createStream();
+ stream.write("Hello S3 World!".getBytes());
+
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+ // Verify initial state
+ assertThat(mockAccessor.startMultipartUploadCalled).isTrue();
+ assertThat(mockAccessor.uploadPartCalls).isEqualTo(1);
+ assertThat(targetFile.exists()).isFalse();
+
+ // Discard instead of commit
+ committer.discard();
+
+ // Verify abort was called
+ assertThat(mockAccessor.abortMultipartUploadCalled).isTrue();
+ assertThat(mockAccessor.completeMultipartUploadCalled).isFalse();
+
+ // Target file should not exist
+ assertThat(targetFile.exists()).isFalse();
+ }
+
+ @Test
+ void testCommitAfterDiscard() throws IOException {
+ stream = createStream();
+ stream.write("data".getBytes());
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+ committer.discard();
+
+ assertThatThrownBy(() -> committer.commit())
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("Cannot commit: committer has been
discarded");
+ }
+
+ @Test
+ void testSimpleCommit() throws IOException {
+ stream = createStream();
+
+ String testData = "Hello S3 World!";
+ stream.write(testData.getBytes());
+
+ assertThat(stream.getPos()).isEqualTo(testData.length());
+
+ // Close for commit
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+ // Target file should not exist yet
+ assertThat(targetFile.exists()).isFalse();
+
+ // Commit
+ committer.commit();
+
+ // Verify upload completed
+ assertThat(mockAccessor.startMultipartUploadCalled).isTrue();
+ assertThat(mockAccessor.uploadPartCalls).isEqualTo(1);
+ assertThat(mockAccessor.completeMultipartUploadCalled).isTrue();
+
+ // Target file should exist with correct content
+ assertThat(targetFile.exists()).isTrue();
+ String writtenContent = new
String(Files.readAllBytes(targetFile.toPath()));
+ assertThat(writtenContent).isEqualTo(testData);
+ }
+
+ @Test
+ void testDoubleDiscard() throws IOException {
+ stream = createStream();
+ stream.write("data".getBytes());
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+ committer.discard();
+ // Second discard should be safe (no-op)
+ committer.discard();
+
+ // Abort should only be called once
+ assertThat(mockAccessor.abortMultipartUploadCallCount).isEqualTo(1);
+
+ // Target file should not exist
+ assertThat(targetFile.exists()).isFalse();
+ }
+
+ /**
+ * Mock implementation that uses local files to simulate S3 multipart
upload behavior. Extends
+ * S3Accessor but overrides all methods to avoid initialization issues.
+ */
+ private static class MockS3MultiPartUpload extends S3MultiPartUpload {
+ private final List<File> tempPartFiles = new ArrayList<>();
+ private final File targetFile;
+ private final String mockUploadId = "mock-upload-id-12345";
+
+ // Test tracking variables
+ boolean startMultipartUploadCalled = false;
+ int uploadPartCalls = 0;
+ boolean completeMultipartUploadCalled = false;
+ boolean abortMultipartUploadCalled = false;
+ int abortMultipartUploadCallCount = 0;
+
+ @SuppressWarnings("unused")
+ public MockS3MultiPartUpload(File targetFile) {
+ super(createStubFileSystem(), new Configuration());
+ this.targetFile = targetFile;
+ }
+
+ private static S3AFileSystem createStubFileSystem() {
+ // Create minimal stub to avoid NullPointerException during
initialization
+ return new StubS3AFileSystem();
+ }
+
+ @Override
+ public String pathToObject(org.apache.hadoop.fs.Path hadoopPath) {
+ return hadoopPath.toUri().getPath().substring(1);
+ }
+
+ @Override
+ public String startMultiPartUpload(String key) {
+ startMultipartUploadCalled = true;
+ return mockUploadId;
+ }
+
+ @Override
+ public PartETag uploadPart(
+ String key, String uploadId, int partNumber, File inputFile,
long byteLength)
+ throws IOException {
+ uploadPartCalls++;
+
+ // Create a temporary copy of the part file
+ File tempPartFile = Files.createTempFile("s3-part-" + partNumber,
".tmp").toFile();
+ try (FileInputStream fis = new FileInputStream(inputFile);
+ FileOutputStream fos = new FileOutputStream(tempPartFile))
{
+ byte[] buffer = new byte[8192];
+ int bytesRead;
+ while ((bytesRead = fis.read(buffer)) != -1) {
+ fos.write(buffer, 0, bytesRead);
+ }
+ }
+ tempPartFiles.add(tempPartFile);
+
+ // Return mock UploadPartResult
+ return new PartETag(partNumber, "etag-" + partNumber);
+ }
+
+ @Override
+ public CompleteMultipartUploadResult completeMultipartUpload(
+ String destKey, String uploadId, List<PartETag> partETags,
long length) {
+ completeMultipartUploadCalled = true;
+
+ // Simulate combining all parts into the final target file
+ try {
+ try (FileOutputStream fos = new FileOutputStream(targetFile)) {
+ for (File partFile : tempPartFiles) {
+ try (FileInputStream fis = new
FileInputStream(partFile)) {
+ byte[] buffer = new byte[8192];
+ int bytesRead;
+ while ((bytesRead = fis.read(buffer)) != -1) {
+ fos.write(buffer, 0, bytesRead);
+ }
+ }
+ }
+ }
+
+ // Clean up temp files
+ for (File partFile : tempPartFiles) {
+ if (partFile.exists()) {
+ partFile.delete();
+ }
+ }
+ tempPartFiles.clear();
+
+ return new MockCompleteMultipartUploadResult("mock-bucket",
destKey, "mock-etag");
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to complete multipart
upload", e);
+ }
+ }
+
+ @Override
+ public void abortMultipartUpload(String destKey, String uploadId) {
+ abortMultipartUploadCalled = true;
+ abortMultipartUploadCallCount++;
+
+ // Clean up temp files
+ for (File partFile : tempPartFiles) {
+ if (partFile.exists()) {
+ partFile.delete();
+ }
+ }
+ tempPartFiles.clear();
+
+ // Clean up target file if it exists
+ if (targetFile.exists()) {
+ targetFile.delete();
+ }
+ }
+ }
+
+ /** Mock implementation of PartETag. */
+ private static class MockPartETag extends PartETag {
+ private final String eTag;
+
+ public MockPartETag(String eTag, int partNumber) {
+ super(partNumber, eTag);
+ this.eTag = eTag;
+ }
+
+ @Override
+ public String getETag() {
+ return eTag;
+ }
+ }
+
+ /** Mock implementation of CompleteMultipartUploadResult. */
+ private static class MockCompleteMultipartUploadResult extends
CompleteMultipartUploadResult {
+ private final String bucketName;
+ private final String key;
+ private final String eTag;
+
+ public MockCompleteMultipartUploadResult(String bucketName, String
key, String eTag) {
+ this.bucketName = bucketName;
+ this.key = key;
+ this.eTag = eTag;
+ }
+
+ @Override
+ public String getBucketName() {
+ return bucketName;
+ }
+
+ @Override
+ public String getKey() {
+ return key;
+ }
+
+ @Override
+ public String getETag() {
+ return eTag;
+ }
+ }
+
+ /**
+ * Minimal stub implementation to avoid NullPointerException during
S3Accessor initialization.
+ */
+ private static class StubS3AFileSystem extends S3AFileSystem {
+ // Minimal stub - no implementation needed for our mock
+ }
+}