This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b1ccfb82eb [core] Enables buffer spill when targetFileSize is greater 
than write buffer size. (#5121)
b1ccfb82eb is described below

commit b1ccfb82eb23a830e1fdf0452d1f4a42d85c557f
Author: HunterXHunter <[email protected]>
AuthorDate: Fri Feb 28 11:46:01 2025 +0800

    [core] Enables buffer spill when targetFileSize is greater than write 
buffer size. (#5121)
---
 .../shortcodes/generated/core_configuration.html   |  2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   | 11 +++--
 .../paimon/operation/AppendOnlyFileStoreWrite.java |  2 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   |  2 +-
 .../apache/paimon/append/AppendOnlyWriterTest.java | 48 ++++++++++++++++++++++
 .../paimon/flink/sink/WriterOperatorTest.java      |  2 +
 6 files changed, 61 insertions(+), 6 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index fc5c04477b..6a2510369d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1048,7 +1048,7 @@ If the data size allocated for the sorting task is 
uneven,which may lead to perf
             <td><h5>write-buffer-spillable</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Boolean</td>
-            <td>Whether the write buffer can be spillable. Enabled by default 
when using object storage.</td>
+            <td>Whether the write buffer can be spillable. Enabled by default 
when using object storage or when 'target-file-size' is greater than 
'write-buffer-size'.</td>
         </tr>
         <tr>
             <td><h5>write-manifest-cache</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 4aabb8b4d9..dcb5588fa7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -479,7 +479,7 @@ public class CoreOptions implements Serializable {
                     .booleanType()
                     .noDefaultValue()
                     .withDescription(
-                            "Whether the write buffer can be spillable. 
Enabled by default when using object storage.");
+                            "Whether the write buffer can be spillable. 
Enabled by default when using object storage or when 'target-file-size' is 
greater than 'write-buffer-size'.");
 
     public static final ConfigOption<Boolean> WRITE_BUFFER_FOR_APPEND =
             key("write-buffer-for-append")
@@ -1957,9 +1957,14 @@ public class CoreOptions implements Serializable {
         return options.get(WRITE_BUFFER_SIZE).getBytes();
     }
 
-    public boolean writeBufferSpillable(boolean usingObjectStore, boolean 
isStreaming) {
+    public boolean writeBufferSpillable(
+            boolean usingObjectStore, boolean isStreaming, boolean 
hasPrimaryKey) {
         // if not streaming mode, we turn spillable on by default.
-        return 
options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore || 
!isStreaming);
+        return options.getOptional(WRITE_BUFFER_SPILLABLE)
+                .orElse(
+                        usingObjectStore
+                                || !isStreaming
+                                || targetFileSize(hasPrimaryKey) > 
writeBufferSize());
     }
 
     public MemorySize writeBufferSpillDiskSize() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 4a6196453d..b3007a3620 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -125,7 +125,7 @@ public abstract class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<Inte
                 pathFactory.createDataFilePathFactory(partition, bucket),
                 restoreIncrement,
                 options.useWriteBufferForAppend() || forceBufferSpill,
-                options.writeBufferSpillable(fileIO.isObjectStore(), 
isStreamingMode)
+                options.writeBufferSpillable(fileIO.isObjectStore(), 
isStreamingMode, false)
                         || forceBufferSpill,
                 options.fileCompression(),
                 options.spillCompressOptions(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index d061e18161..0d2f824282 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -229,7 +229,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
 
     @VisibleForTesting
     public boolean bufferSpillable() {
-        return options.writeBufferSpillable(fileIO.isObjectStore(), 
isStreamingMode);
+        return options.writeBufferSpillable(fileIO.isObjectStore(), 
isStreamingMode, true);
     }
 
     private CompactManager createCompactManager(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index d28725e0bb..a87e7c9da2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -409,6 +409,54 @@ public class AppendOnlyWriterTest {
         writer.close();
     }
 
+    @Test
+    public void tesWriteBufferSpillAutoEnabled() {
+        HashMap<String, String> map = new HashMap<>();
+        // This is the default behavior,no object store and streaming mode.
+        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, 
true, false))
+                .isFalse();
+
+        // Using object store.
+        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(true, true, 
false))
+                .isTrue();
+
+        // Batch mode.
+        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, 
false, false))
+                .isTrue();
+
+        // Append only table.
+        map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "200 MB");
+        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, 
false, false))
+                .isTrue();
+
+        // Primary key table.
+        map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "100 MB");
+        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, 
false, true))
+                .isTrue();
+
+        // targetFileSize is greater than write buffer size.
+        map.clear();
+        map.put(CoreOptions.TARGET_FILE_SIZE.key(), "2 b");
+        map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "1 b");
+        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, 
true, false))
+                .isTrue();
+
+        // target-file-size is smaller than write-buffer-size.
+        map.clear();
+        map.put(CoreOptions.TARGET_FILE_SIZE.key(), "1 b");
+        map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "2 b");
+        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, 
true, false))
+                .isFalse();
+
+        // Set to false manually.
+        map.clear();
+        map.put(CoreOptions.TARGET_FILE_SIZE.key(), "2 b");
+        map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "1 b");
+        map.put(CoreOptions.WRITE_BUFFER_SPILLABLE.key(), "false");
+        
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, 
true, false))
+                .isFalse();
+    }
+
     @Test
     public void testMultipleFlush() throws Exception {
         AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
index 83af157450..2e55fc919b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -87,6 +87,7 @@ public class WriterOperatorTest {
         Options options = new Options();
         options.set("bucket", "1");
         options.set("write-buffer-size", "256 b");
+        options.set("write-buffer-spillable", "false");
         options.set("page-size", "32 b");
 
         FileStoreTable table =
@@ -332,6 +333,7 @@ public class WriterOperatorTest {
         Options options = new Options();
         options.set("bucket", "1");
         options.set("write-buffer-size", "256 b");
+        options.set("write-buffer-spillable", "false");
         options.set("page-size", "32 b");
 
         FileStoreTable fileStoreTable =

Reply via email to