This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 039f2ff5 [FLINK-30082] Enable write-buffer-spillable by default only for object storage 039f2ff5 is described below commit 039f2ff5d2e25154b5ab4b04f4c36086ce51d9c3 Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Tue Nov 22 10:53:26 2022 +0800 [FLINK-30082] Enable write-buffer-spillable by default only for object storage This closes #389 --- .../layouts/shortcodes/generated/core_configuration.html | 4 ++-- .../java/org/apache/flink/table/store/CoreOptions.java | 9 +++++---- .../store/file/operation/KeyValueFileStoreWrite.java | 16 +++++++++++++++- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 4a1a6b26..e6efa021 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -250,9 +250,9 @@ </tr> <tr> <td><h5>write-buffer-spillable</h5></td> - <td style="word-wrap: break-word;">true</td> + <td style="word-wrap: break-word;">(none)</td> <td>Boolean</td> - <td>Whether the write buffer can be spillable.</td> + <td>Whether the write buffer can be spillable. Enabled by default when using object storage.</td> </tr> <tr> <td><h5>write-mode</h5></td> diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java index f54e8461..8e4d65dd 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java @@ -192,8 +192,9 @@ public class CoreOptions implements Serializable { public static final ConfigOption<Boolean> WRITE_BUFFER_SPILLABLE = ConfigOptions.key("write-buffer-spillable") .booleanType() - .defaultValue(true) - .withDescription("Whether the write buffer can be spillable."); + .noDefaultValue() + .withDescription( + "Whether the write buffer can be spillable. Enabled by default when using object storage."); public static final ConfigOption<Integer> LOCAL_SORT_MAX_NUM_FILE_HANDLES = ConfigOptions.key("local-sort.max-num-file-handles") @@ -459,8 +460,8 @@ public class CoreOptions implements Serializable { return options.get(WRITE_BUFFER_SIZE).getBytes(); } - public boolean writeBufferSpillable() { - return options.get(WRITE_BUFFER_SPILLABLE); + public boolean writeBufferSpillable(boolean usingObjectStore) { + return options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore); } public int localSortMaxNumFileHandles() { diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java index 1cbef1b9..e06ea0c3 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java @@ -18,6 +18,7 @@ package org.apache.flink.table.store.file.operation; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.store.CoreOptions; @@ -45,6 +46,8 @@ import org.apache.flink.table.types.logical.RowType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -63,6 +66,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { private final Supplier<Comparator<RowData>> keyComparatorSupplier; private final MergeFunction<KeyValue> mergeFunction; private final CoreOptions options; + private final FileStorePathFactory pathFactory; public KeyValueFileStoreWrite( SchemaManager schemaManager, @@ -96,6 +100,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { this.keyComparatorSupplier = keyComparatorSupplier; this.mergeFunction = mergeFunction; this.options = options; + this.pathFactory = pathFactory; } @Override @@ -140,7 +145,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { compactExecutor, levels); return new MergeTreeWriter( - options.writeBufferSpillable(), + bufferSpillable(), options.localSortMaxNumFileHandles(), ioManager, compactManager, @@ -152,6 +157,15 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { options.changelogProducer()); } + private boolean bufferSpillable() { + try { + return options.writeBufferSpillable( + pathFactory.root().getFileSystem().getKind() != FileSystemKind.FILE_SYSTEM); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private CompactManager createCompactManager( BinaryRowData partition, int bucket,