This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b1ccfb82eb [core] Enables buffer spill when targetFileSize is greater
than write buffer size. (#5121)
b1ccfb82eb is described below
commit b1ccfb82eb23a830e1fdf0452d1f4a42d85c557f
Author: HunterXHunter <[email protected]>
AuthorDate: Fri Feb 28 11:46:01 2025 +0800
[core] Enables buffer spill when targetFileSize is greater than write
buffer size. (#5121)
---
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 11 +++--
.../paimon/operation/AppendOnlyFileStoreWrite.java | 2 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 2 +-
.../apache/paimon/append/AppendOnlyWriterTest.java | 48 ++++++++++++++++++++++
.../paimon/flink/sink/WriterOperatorTest.java | 2 +
6 files changed, 61 insertions(+), 6 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index fc5c04477b..6a2510369d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1048,7 +1048,7 @@ If the data size allocated for the sorting task is
uneven,which may lead to perf
<td><h5>write-buffer-spillable</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
- <td>Whether the write buffer can be spillable. Enabled by default
when using object storage.</td>
+ <td>Whether the write buffer can be spillable. Enabled by default
when using object storage or when 'target-file-size' is greater than
'write-buffer-size'.</td>
</tr>
<tr>
<td><h5>write-manifest-cache</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 4aabb8b4d9..dcb5588fa7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -479,7 +479,7 @@ public class CoreOptions implements Serializable {
.booleanType()
.noDefaultValue()
.withDescription(
- "Whether the write buffer can be spillable.
Enabled by default when using object storage.");
+ "Whether the write buffer can be spillable.
Enabled by default when using object storage or when 'target-file-size' is
greater than 'write-buffer-size'.");
public static final ConfigOption<Boolean> WRITE_BUFFER_FOR_APPEND =
key("write-buffer-for-append")
@@ -1957,9 +1957,14 @@ public class CoreOptions implements Serializable {
return options.get(WRITE_BUFFER_SIZE).getBytes();
}
- public boolean writeBufferSpillable(boolean usingObjectStore, boolean
isStreaming) {
+ public boolean writeBufferSpillable(
+ boolean usingObjectStore, boolean isStreaming, boolean
hasPrimaryKey) {
// if not streaming mode, we turn spillable on by default.
- return
options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore ||
!isStreaming);
+ return options.getOptional(WRITE_BUFFER_SPILLABLE)
+ .orElse(
+ usingObjectStore
+ || !isStreaming
+ || targetFileSize(hasPrimaryKey) >
writeBufferSize());
}
public MemorySize writeBufferSpillDiskSize() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 4a6196453d..b3007a3620 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -125,7 +125,7 @@ public abstract class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<Inte
pathFactory.createDataFilePathFactory(partition, bucket),
restoreIncrement,
options.useWriteBufferForAppend() || forceBufferSpill,
- options.writeBufferSpillable(fileIO.isObjectStore(),
isStreamingMode)
+ options.writeBufferSpillable(fileIO.isObjectStore(),
isStreamingMode, false)
|| forceBufferSpill,
options.fileCompression(),
options.spillCompressOptions(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index d061e18161..0d2f824282 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -229,7 +229,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
@VisibleForTesting
public boolean bufferSpillable() {
- return options.writeBufferSpillable(fileIO.isObjectStore(),
isStreamingMode);
+ return options.writeBufferSpillable(fileIO.isObjectStore(),
isStreamingMode, true);
}
private CompactManager createCompactManager(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index d28725e0bb..a87e7c9da2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -409,6 +409,54 @@ public class AppendOnlyWriterTest {
writer.close();
}
+ @Test
+ public void tesWriteBufferSpillAutoEnabled() {
+ HashMap<String, String> map = new HashMap<>();
+ // This is the default behavior,no object store and streaming mode.
+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false,
true, false))
+ .isFalse();
+
+ // Using object store.
+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(true, true,
false))
+ .isTrue();
+
+ // Batch mode.
+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false,
false, false))
+ .isTrue();
+
+ // Append only table.
+ map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "200 MB");
+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false,
false, false))
+ .isTrue();
+
+ // Primary key table.
+ map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "100 MB");
+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false,
false, true))
+ .isTrue();
+
+ // targetFileSize is greater than write buffer size.
+ map.clear();
+ map.put(CoreOptions.TARGET_FILE_SIZE.key(), "2 b");
+ map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "1 b");
+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false,
true, false))
+ .isTrue();
+
+ // target-file-size is smaller than write-buffer-size.
+ map.clear();
+ map.put(CoreOptions.TARGET_FILE_SIZE.key(), "1 b");
+ map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "2 b");
+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false,
true, false))
+ .isFalse();
+
+ // Set to false manually.
+ map.clear();
+ map.put(CoreOptions.TARGET_FILE_SIZE.key(), "2 b");
+ map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "1 b");
+ map.put(CoreOptions.WRITE_BUFFER_SPILLABLE.key(), "false");
+
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false,
true, false))
+ .isFalse();
+ }
+
@Test
public void testMultipleFlush() throws Exception {
AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
index 83af157450..2e55fc919b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -87,6 +87,7 @@ public class WriterOperatorTest {
Options options = new Options();
options.set("bucket", "1");
options.set("write-buffer-size", "256 b");
+ options.set("write-buffer-spillable", "false");
options.set("page-size", "32 b");
FileStoreTable table =
@@ -332,6 +333,7 @@ public class WriterOperatorTest {
Options options = new Options();
options.set("bucket", "1");
options.set("write-buffer-size", "256 b");
+ options.set("write-buffer-spillable", "false");
options.set("page-size", "32 b");
FileStoreTable fileStoreTable =