This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 785e4c77f0 [core] format table: fix two phase stream flush bug (#6620)
785e4c77f0 is described below

commit 785e4c77f06706cd918690e7d45c712af1d5d979
Author: jerry <[email protected]>
AuthorDate: Mon Nov 17 18:54:43 2025 +0800

    [core] format table: fix two phase stream flush bug (#6620)
---
 .../fs/MultiPartUploadTwoPhaseOutputStream.java    | 28 ++++++++---------
 .../MultiPartUploadTwoPhaseOutputStreamTest.java   | 35 +++++++++++++++++++++-
 2 files changed, 46 insertions(+), 17 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
index 3d1b5b0ce4..60f0bc68dd 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
@@ -84,9 +84,7 @@ public abstract class MultiPartUploadTwoPhaseOutputStream<T, 
C> extends TwoPhase
         }
         buffer.write(b);
         position++;
-        if (buffer.size() >= partSizeThreshold()) {
-            uploadPart();
-        }
+        uploadPartIfLargerThanThreshold();
     }
 
     @Override
@@ -102,9 +100,7 @@ public abstract class 
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
         int remaining = len;
         int offset = off;
         while (remaining > 0) {
-            if (buffer.size() >= partSizeThreshold()) {
-                uploadPart();
-            }
+            uploadPartIfLargerThanThreshold();
             int currentSize = buffer.size();
             int space = partSizeThreshold() - currentSize;
             int count = Math.min(remaining, space);
@@ -112,10 +108,7 @@ public abstract class 
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
             offset += count;
             remaining -= count;
             position += count;
-            // consume buffer if it is full
-            if (buffer.size() >= partSizeThreshold()) {
-                uploadPart();
-            }
+            uploadPartIfLargerThanThreshold();
         }
     }
 
@@ -124,7 +117,7 @@ public abstract class 
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
         if (closed) {
             throw new IOException("Stream is closed");
         }
-        uploadPart();
+        uploadPartIfLargerThanThreshold();
     }
 
     @Override
@@ -142,15 +135,18 @@ public abstract class 
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
             throw new IOException("Stream is already closed but committer is 
null");
         }
         closed = true;
+        // Only last upload part can be smaller than part size threshold
+        uploadPartUtil();
+        return committer();
+    }
 
-        if (buffer.size() > 0) {
-            uploadPart();
+    private void uploadPartIfLargerThanThreshold() throws IOException {
+        if (buffer.size() >= partSizeThreshold()) {
+            uploadPartUtil();
         }
-
-        return committer();
     }
 
-    private void uploadPart() throws IOException {
+    private void uploadPartUtil() throws IOException {
         if (buffer.size() == 0) {
             return;
         }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
index 0a002fd783..9d3f759737 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
@@ -144,7 +144,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
         stream.flush();
         assertThat(store.getUploadedParts())
                 .extracting(TestPart::getContent)
-                .containsExactly("abcab", "cdefg", "hij");
+                .containsExactly("abcab", "cdefg");
         TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
         assertThat(store.getUploadedParts()).hasSize(3);
 
@@ -157,6 +157,39 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
         assertThat(store.getAbortedUploadId()).isNull();
     }
 
+    @Test
+    void testFlushWhenBufferSizeIsSmallerThanThresholdDoesNotUpload() throws 
IOException {
+        TestMultiPartUploadTwoPhaseOutputStream stream =
+                new TestMultiPartUploadTwoPhaseOutputStream(store, objectPath, 
10);
+
+        // Write 3 bytes, which is less than the threshold of 10
+        stream.write("abc".getBytes(StandardCharsets.UTF_8));
+        assertThat(store.getUploadedParts()).isEmpty();
+
+        // Flush should not trigger upload since buffer size (3) < threshold 
(10)
+        stream.flush();
+        assertThat(store.getUploadedParts()).isEmpty();
+        assertThat(stream.getPos()).isEqualTo(3);
+
+        // Write another 4 bytes, total buffer size is 7, still less than 
threshold
+        stream.write("defg".getBytes(StandardCharsets.UTF_8));
+        assertThat(store.getUploadedParts()).isEmpty();
+
+        // Flush again, should still not upload since buffer size (7) < 
threshold (10)
+        stream.flush();
+        assertThat(store.getUploadedParts()).isEmpty();
+        assertThat(stream.getPos()).isEqualTo(7);
+
+        // Only when closeForCommit is called, the remaining buffer should be 
uploaded
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+        assertThat(store.getUploadedParts()).hasSize(1);
+        
assertThat(store.getUploadedParts().get(0).getContent()).isEqualTo("abcdefg");
+        
assertThat(store.getUploadedParts().get(0).getPartNumber()).isEqualTo(1);
+
+        committer.commit(fileIO);
+        assertThat(store.getCompletedBytes()).isEqualTo(7);
+    }
+
     /** Fake store implementation for testing. */
     private static class FakeMultiPartUploadStore
             implements MultiPartUploadStore<TestPart, String> {

Reply via email to