This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 30c18cf28a [core] Remove 'write-manifest-cache' because new
writer-coordinator (#5803)
30c18cf28a is described below
commit 30c18cf28ac0f0b12cb5496ebdb8ea1c8c325154
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 27 16:44:36 2025 +0800
[core] Remove 'write-manifest-cache' because new writer-coordinator (#5803)
---
.../shortcodes/generated/core_configuration.html | 6 ----
.../main/java/org/apache/paimon/CoreOptions.java | 11 ------
.../java/org/apache/paimon/AbstractFileStore.java | 33 ++++-------------
.../org/apache/paimon/AppendOnlyFileStore.java | 23 ++++--------
.../src/main/java/org/apache/paimon/FileStore.java | 6 +---
.../java/org/apache/paimon/KeyValueFileStore.java | 23 ++++--------
.../java/org/apache/paimon/manifest/FileEntry.java | 6 +---
.../paimon/manifest/ManifestCacheFilter.java | 36 -------------------
.../paimon/operation/AbstractFileStoreScan.java | 34 ------------------
.../org/apache/paimon/operation/FileStoreScan.java | 3 --
.../paimon/privilege/PrivilegedFileStore.java | 8 ++---
.../paimon/privilege/PrivilegedFileStoreTable.java | 8 ++---
.../paimon/table/AppendOnlyFileStoreTable.java | 10 ++----
.../paimon/table/DelegatedFileStoreTable.java | 8 ++---
.../org/apache/paimon/table/FileStoreTable.java | 6 +---
.../paimon/table/PrimaryKeyFileStoreTable.java | 10 ++----
.../apache/paimon/table/object/ObjectTable.java | 6 +---
.../java/org/apache/paimon/utils/ObjectsCache.java | 14 +++-----
.../java/org/apache/paimon/utils/ObjectsFile.java | 12 +++----
.../apache/paimon/table/SimpleTableTestBase.java | 41 ----------------------
.../org/apache/paimon/utils/ObjectsCacheTest.java | 38 ++------------------
.../sink/cdc/CdcAppendTableWriteOperator.java | 2 +-
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 10 ++----
.../apache/paimon/flink/sink/FlinkWriteSink.java | 2 +-
.../paimon/flink/sink/NoopStoreSinkWriteState.java | 9 +----
.../paimon/flink/sink/StoreSinkWriteImpl.java | 6 +---
.../paimon/flink/sink/StoreSinkWriteState.java | 2 --
.../paimon/flink/sink/StoreSinkWriteStateImpl.java | 5 ---
.../apache/paimon/flink/BatchFileStoreITCase.java | 13 -------
29 files changed, 53 insertions(+), 338 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index ebd4b950cc..096c6b58b4 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1164,12 +1164,6 @@ If the data size allocated for the sorting task is
uneven,which may lead to perf
<td>Boolean</td>
<td>Whether the write buffer can be spillable. Enabled by default
when using object storage or when 'target-file-size' is greater than
'write-buffer-size'.</td>
</tr>
- <tr>
- <td><h5>write-manifest-cache</h5></td>
- <td style="word-wrap: break-word;">0 bytes</td>
- <td>MemorySize</td>
- <td>Cache size for reading manifest files for write
initialization.</td>
- </tr>
<tr>
<td><h5>write-max-writers-to-spill</h5></td>
<td style="word-wrap: break-word;">10</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index c269a969a9..5a8ffb3082 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -530,13 +530,6 @@ public class CoreOptions implements Serializable {
.withDescription(
"When in batch append inserting, if the writer
number is greater than this option, we open the buffer cache and spill function
to avoid out-of-memory. ");
- public static final ConfigOption<MemorySize> WRITE_MANIFEST_CACHE =
- key("write-manifest-cache")
- .memoryType()
- .defaultValue(MemorySize.ofMebiBytes(0))
- .withDescription(
- "Cache size for reading manifest files for write
initialization.");
-
public static final ConfigOption<Integer> LOCAL_SORT_MAX_NUM_FILE_HANDLES =
key("local-sort.max-num-file-handles")
.intType()
@@ -1868,10 +1861,6 @@ public class CoreOptions implements Serializable {
return options.get(MANIFEST_FULL_COMPACTION_FILE_SIZE);
}
- public MemorySize writeManifestCache() {
- return options.get(WRITE_MANIFEST_CACHE);
- }
-
public String partitionDefaultName() {
return options.get(PARTITION_DEFAULT_NAME);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 556d8a7d7b..411c8268b9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -38,7 +38,6 @@ import org.apache.paimon.metastore.AddPartitionTagCallback;
import org.apache.paimon.metastore.TagPreviewCommitCallback;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.FileStoreCommitImpl;
-import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.operation.PartitionExpire;
@@ -97,7 +96,6 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
protected final RowType partitionType;
protected final CatalogEnvironment catalogEnvironment;
- @Nullable private final SegmentsCache<Path> writeManifestCache;
@Nullable private SegmentsCache<Path> readManifestCache;
@Nullable private Cache<Path, Snapshot> snapshotCache;
@@ -116,9 +114,6 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
this.options = options;
this.partitionType = partitionType;
this.catalogEnvironment = catalogEnvironment;
- this.writeManifestCache =
- SegmentsCache.create(
- options.pageSize(), options.writeManifestCache(),
Long.MAX_VALUE);
}
@Override
@@ -193,10 +188,6 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
@Override
public ManifestFile.Factory manifestFileFactory() {
- return manifestFileFactory(false);
- }
-
- protected ManifestFile.Factory manifestFileFactory(boolean forWrite) {
return new ManifestFile.Factory(
fileIO,
schemaManager,
@@ -205,21 +196,17 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
options.manifestCompression(),
pathFactory(),
options.manifestTargetSize().getBytes(),
- forWrite ? writeManifestCache : readManifestCache);
+ readManifestCache);
}
@Override
public ManifestList.Factory manifestListFactory() {
- return manifestListFactory(false);
- }
-
- protected ManifestList.Factory manifestListFactory(boolean forWrite) {
return new ManifestList.Factory(
fileIO,
FileFormat.manifestFormat(options),
options.manifestCompression(),
pathFactory(),
- forWrite ? writeManifestCache : readManifestCache);
+ readManifestCache);
}
@Override
@@ -256,12 +243,12 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
new StatsFile(fileIO, pathFactory().statsFileFactory()));
}
- protected ManifestsReader newManifestsReader(boolean forWrite) {
+ protected ManifestsReader newManifestsReader() {
return new ManifestsReader(
partitionType,
options.partitionDefaultName(),
snapshotManager(),
- manifestListFactory(forWrite));
+ manifestListFactory());
}
@Override
@@ -279,14 +266,6 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
return schemaManager.mergeSchema(rowType, allowExplicitCast);
}
- protected abstract FileStoreScan newScan(ScanType scanType);
-
- protected enum ScanType {
- FOR_READ,
- FOR_WRITE,
- FOR_COMMIT
- }
-
@Override
public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable
table) {
SnapshotManager snapshotManager = snapshotManager();
@@ -308,7 +287,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
manifestFileFactory(),
manifestListFactory(),
indexManifestFileFactory(),
- newScan(ScanType.FOR_COMMIT),
+ newScan(),
options.bucket(),
options.manifestTargetSize(),
options.manifestFullCompactionThresholdSize(),
@@ -448,7 +427,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
expirationTime,
checkInterval,
expireStrategy,
- newScan(ScanType.FOR_COMMIT),
+ newScan(),
newCommit(commitUser, table),
partitionHandler,
options.endInputCheckPartitionExpire(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 39a6649ca7..a277ce6530 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -22,7 +22,6 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.operation.AppendFileStoreWrite;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
@@ -72,11 +71,6 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
return options.bucket() == -1 ? BucketMode.BUCKET_UNAWARE :
BucketMode.HASH_FIXED;
}
- @Override
- public AppendOnlyFileStoreScan newScan() {
- return newScan(ScanType.FOR_READ);
- }
-
@Override
public RawFileSplitRead newRead() {
return new RawFileSplitRead(
@@ -91,14 +85,11 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
@Override
public BaseAppendFileStoreWrite newWrite(String commitUser) {
- return newWrite(commitUser, null, null);
+ return newWrite(commitUser, null);
}
@Override
- public BaseAppendFileStoreWrite newWrite(
- String commitUser,
- @Nullable ManifestCacheFilter manifestFilter,
- @Nullable Integer writeId) {
+ public BaseAppendFileStoreWrite newWrite(String commitUser, @Nullable
Integer writeId) {
DeletionVectorsMaintainer.Factory dvMaintainerFactory =
options.deletionVectorsEnabled()
?
DeletionVectorsMaintainer.factory(newIndexFileHandler())
@@ -112,7 +103,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
partitionType,
pathFactory(),
snapshotManager(),
-
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
+ newScan(),
options,
dvMaintainerFactory,
tableName);
@@ -126,7 +117,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
partitionType,
pathFactory(),
snapshotManager(),
-
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
+ newScan(),
options,
dvMaintainerFactory,
tableName);
@@ -134,7 +125,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
}
@Override
- protected AppendOnlyFileStoreScan newScan(ScanType scanType) {
+ public AppendOnlyFileStoreScan newScan() {
BucketSelectConverter bucketSelectConverter =
predicate -> {
if (bucketMode() != BucketMode.HASH_FIXED) {
@@ -158,12 +149,12 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
};
return new AppendOnlyFileStoreScan(
- newManifestsReader(scanType == ScanType.FOR_WRITE),
+ newManifestsReader(),
bucketSelectConverter,
snapshotManager(),
schemaManager,
schema,
- manifestFileFactory(scanType == ScanType.FOR_WRITE),
+ manifestFileFactory(),
options.scanManifestParallelism(),
options.fileIndexReadEnabled());
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 12fc60a383..62fdd751fc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -21,7 +21,6 @@ package org.apache.paimon;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestFile;
-import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.ChangelogDeletion;
@@ -88,10 +87,7 @@ public interface FileStore<T> {
FileStoreWrite<T> newWrite(String commitUser);
- FileStoreWrite<T> newWrite(
- String commitUser,
- @Nullable ManifestCacheFilter manifestFilter,
- @Nullable Integer writeId);
+ FileStoreWrite<T> newWrite(String commitUser, @Nullable Integer writeId);
FileStoreCommit newCommit(String commitUser, FileStoreTable table);
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 4687bdc2d4..8b94890c92 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -25,7 +25,6 @@ import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
import org.apache.paimon.io.KeyValueFileReaderFactory;
-import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.BucketSelectConverter;
@@ -112,11 +111,6 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
}
}
- @Override
- public KeyValueFileStoreScan newScan() {
- return newScan(ScanType.FOR_READ);
- }
-
@Override
public MergeFileSplitRead newRead() {
return new MergeFileSplitRead(
@@ -155,14 +149,11 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
@Override
public AbstractFileStoreWrite<KeyValue> newWrite(String commitUser) {
- return newWrite(commitUser, null, null);
+ return newWrite(commitUser, null);
}
@Override
- public AbstractFileStoreWrite<KeyValue> newWrite(
- String commitUser,
- @Nullable ManifestCacheFilter manifestFilter,
- @Nullable Integer writeId) {
+ public AbstractFileStoreWrite<KeyValue> newWrite(String commitUser,
@Nullable Integer writeId) {
DynamicBucketIndexMaintainer.Factory indexFactory = null;
if (bucketMode() == BucketMode.HASH_DYNAMIC) {
indexFactory = new
DynamicBucketIndexMaintainer.Factory(newIndexFileHandler());
@@ -185,7 +176,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
this::pathFactory,
newReaderFactoryBuilder(),
snapshotManager(),
-
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
+ newScan(),
options,
tableName,
writeId);
@@ -205,7 +196,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
pathFactory(),
this::pathFactory,
snapshotManager(),
-
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
+ newScan(),
indexFactory,
deletionVectorsMaintainerFactory,
options,
@@ -215,7 +206,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
}
@Override
- protected KeyValueFileStoreScan newScan(ScanType scanType) {
+ public KeyValueFileStoreScan newScan() {
BucketMode bucketMode = bucketMode();
BucketSelectConverter bucketSelectConverter =
keyFilter -> {
@@ -237,13 +228,13 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
};
return new KeyValueFileStoreScan(
- newManifestsReader(scanType == ScanType.FOR_WRITE),
+ newManifestsReader(),
bucketSelectConverter,
snapshotManager(),
schemaManager,
schema,
keyValueFieldsExtractor,
- manifestFileFactory(scanType == ScanType.FOR_WRITE),
+ manifestFileFactory(),
options.scanManifestParallelism(),
options.deletionVectorsEnabled(),
options.mergeEngine(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index a09bea2acf..52b029563e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -232,11 +232,7 @@ public interface FileEntry {
return readDeletedEntries(
m ->
manifestFile.read(
- m.fileName(),
- m.fileSize(),
- Filter.alwaysTrue(),
- deletedFilter(),
- Filter.alwaysTrue()),
+ m.fileName(), m.fileSize(), deletedFilter(),
Filter.alwaysTrue()),
manifestFiles,
manifestReadParallelism);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCacheFilter.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCacheFilter.java
deleted file mode 100644
index 32ef757b5a..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCacheFilter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.manifest;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.operation.AbstractFileStoreScan;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * Filter for manifest cache, this is used in {@link AbstractFileStoreScan}
for improving cache
- * utilization. NOTE: Please use this interface with caution and make sure
that only filtered data
- * is required, otherwise it will cause correctness issues.
- */
-@ThreadSafe
-@FunctionalInterface
-public interface ManifestCacheFilter {
-
- boolean test(BinaryRow partition, int bucket);
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index ec856e57b5..f8cd260f52 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.manifest.BucketEntry;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileEntry.Identifier;
-import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.manifest.ManifestFile;
@@ -62,7 +61,6 @@ import java.util.stream.Collectors;
import static
org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
import static
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
import static
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
/** Default implementation of {@link FileStoreScan}. */
@@ -88,7 +86,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private Filter<ManifestEntry> manifestEntryFilter = null;
private Filter<String> fileNameFilter = null;
- private ManifestCacheFilter manifestCacheFilter = null;
private ScanMetrics scanMetrics = null;
private boolean dropStats;
@@ -162,13 +159,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) {
- if (manifestCacheFilter != null &&
manifestFileFactory.isCacheEnabled()) {
- checkArgument(
- manifestCacheFilter.test(partition, bucket),
- String.format(
- "This is a bug! The partition %s and bucket %s is
filtered!",
- partition, bucket));
- }
withPartitionFilter(Collections.singletonList(partition));
withBucket(bucket);
return this;
@@ -216,12 +206,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return this;
}
- @Override
- public FileStoreScan withManifestCacheFilter(ManifestCacheFilter
manifestFilter) {
- this.manifestCacheFilter = manifestFilter;
- return this;
- }
-
@Override
public FileStoreScan withDataFileNameFilter(Filter<String> fileNameFilter)
{
this.fileNameFilter = fileNameFilter;
@@ -457,7 +441,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
.read(
manifest.fileName(),
manifest.fileSize(),
- createCacheRowFilter(),
createEntryRowFilter().and(additionalFilter),
entry ->
(additionalTFilter == null ||
additionalTFilter.test(entry))
@@ -478,23 +461,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return entry.copyWithoutStats();
}
- /**
- * According to the {@link ManifestCacheFilter}, entry that needs to be
cached will be retained,
- * so the entry that will not be accessed in the future will not be cached.
- *
- * <p>Implemented to {@link InternalRow} is for performance (No
deserialization).
- */
- private Filter<InternalRow> createCacheRowFilter() {
- if (manifestCacheFilter == null) {
- return Filter.alwaysTrue();
- }
-
- Function<InternalRow, BinaryRow> partitionGetter =
- ManifestEntrySerializer.partitionGetter();
- Function<InternalRow, Integer> bucketGetter =
ManifestEntrySerializer.bucketGetter();
- return row -> manifestCacheFilter.test(partitionGetter.apply(row),
bucketGetter.apply(row));
- }
-
/**
* Read the corresponding entries based on the current required partition
and bucket.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index e3759960f8..739e8b916f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -22,7 +22,6 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.BucketEntry;
import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
@@ -80,8 +79,6 @@ public interface FileStoreScan {
FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter);
- FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);
-
FileStoreScan withDataFileNameFilter(Filter<String> fileNameFilter);
FileStoreScan withMetrics(ScanMetrics metrics);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index c30b4a94be..88cac126be 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -25,7 +25,6 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestFile;
-import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.ChangelogDeletion;
@@ -147,12 +146,9 @@ public class PrivilegedFileStore<T> implements
FileStore<T> {
}
@Override
- public FileStoreWrite<T> newWrite(
- String commitUser,
- @Nullable ManifestCacheFilter manifestFilter,
- @Nullable Integer writeId) {
+ public FileStoreWrite<T> newWrite(String commitUser, @Nullable Integer
writeId) {
privilegeChecker.assertCanInsert(identifier);
- return wrapped.newWrite(commitUser, manifestFilter, writeId);
+ return wrapped.newWrite(commitUser, writeId);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index 26e9ec1ea1..2aec02d0c8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -21,7 +21,6 @@ package org.apache.paimon.privilege;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.DelegatedFileStoreTable;
@@ -230,12 +229,9 @@ public class PrivilegedFileStoreTable extends
DelegatedFileStoreTable {
}
@Override
- public TableWriteImpl<?> newWrite(
- String commitUser,
- @Nullable ManifestCacheFilter manifestFilter,
- @Nullable Integer writeId) {
+ public TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer
writeId) {
privilegeChecker.assertCanInsert(identifier);
- return wrapped.newWrite(commitUser, manifestFilter, writeId);
+ return wrapped.newWrite(commitUser, writeId);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 240163cf3d..b060954b6b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -23,7 +23,6 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
import org.apache.paimon.operation.FileStoreScan;
@@ -127,15 +126,12 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
@Override
public TableWriteImpl<InternalRow> newWrite(String commitUser) {
- return newWrite(commitUser, null, null);
+ return newWrite(commitUser, null);
}
@Override
- public TableWriteImpl<InternalRow> newWrite(
- String commitUser,
- @Nullable ManifestCacheFilter manifestFilter,
- @Nullable Integer writeId) {
- BaseAppendFileStoreWrite writer = store().newWrite(commitUser,
manifestFilter, writeId);
+ public TableWriteImpl<InternalRow> newWrite(String commitUser, @Nullable
Integer writeId) {
+ BaseAppendFileStoreWrite writer = store().newWrite(commitUser,
writeId);
return new TableWriteImpl<>(
rowType(),
writer,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 5f5bc68b3b..a0e33f620a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -25,7 +25,6 @@ import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.schema.SchemaManager;
@@ -300,11 +299,8 @@ public abstract class DelegatedFileStoreTable implements
FileStoreTable {
}
@Override
- public TableWriteImpl<?> newWrite(
- String commitUser,
- @Nullable ManifestCacheFilter manifestFilter,
- @Nullable Integer writeId) {
- return wrapped.newWrite(commitUser, manifestFilter, writeId);
+ public TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer
writeId) {
+ return wrapped.newWrite(commitUser, writeId);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 49aaced3c5..e7b8129e3c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -23,7 +23,6 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.operation.LocalOrphanFilesClean;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.schema.TableSchema;
@@ -116,10 +115,7 @@ public interface FileStoreTable extends DataTable {
@Override
TableWriteImpl<?> newWrite(String commitUser);
- TableWriteImpl<?> newWrite(
- String commitUser,
- @Nullable ManifestCacheFilter manifestFilter,
- @Nullable Integer writeId);
+ TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer writeId);
@Override
TableCommitImpl newCommit(String commitUser);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 1a22822779..4901638b65 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -24,7 +24,6 @@ import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.operation.FileStoreScan;
@@ -152,18 +151,15 @@ public class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
@Override
public TableWriteImpl<KeyValue> newWrite(String commitUser) {
- return newWrite(commitUser, null, null);
+ return newWrite(commitUser, null);
}
@Override
- public TableWriteImpl<KeyValue> newWrite(
- String commitUser,
- @Nullable ManifestCacheFilter manifestFilter,
- @Nullable Integer writeId) {
+ public TableWriteImpl<KeyValue> newWrite(String commitUser, @Nullable
Integer writeId) {
KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
rowType(),
- store().newWrite(commitUser, manifestFilter, writeId),
+ store().newWrite(commitUser, writeId),
createRowKeyExtractor(),
(record, rowKind) ->
kv.replace(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
index d63d547656..c829d212e9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
@@ -19,7 +19,6 @@
package org.apache.paimon.table.object;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DelegatedFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
@@ -159,10 +158,7 @@ public interface ObjectTable extends FileStoreTable {
}
@Override
- public TableWriteImpl<?> newWrite(
- String commitUser,
- @Nullable ManifestCacheFilter manifestFilter,
- @Nullable Integer writeId) {
+ public TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer
writeId) {
throw new UnsupportedOperationException("Object table does not
support Write.");
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
index 8fe13943a3..813e3b84db 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
@@ -70,11 +70,7 @@ public class ObjectsCache<K, V> {
}
public List<V> read(
- K key,
- @Nullable Long fileSize,
- Filter<InternalRow> loadFilter,
- Filter<InternalRow> readFilter,
- Filter<V> readVFilter)
+ K key, @Nullable Long fileSize, Filter<InternalRow> readFilter,
Filter<V> readVFilter)
throws IOException {
Segments segments = cache.getIfPresents(key);
if (segments != null) {
@@ -90,7 +86,7 @@ public class ObjectsCache<K, V> {
fileSize = fileSizeFunction.apply(key);
}
if (fileSize <= cache.maxElementSize()) {
- segments = readSegments(key, fileSize, loadFilter);
+ segments = readSegments(key, fileSize);
cache.put(key, segments);
return readFromSegments(segments, readFilter, readVFilter);
} else {
@@ -124,7 +120,7 @@ public class ObjectsCache<K, V> {
}
}
- private Segments readSegments(K key, @Nullable Long fileSize,
Filter<InternalRow> loadFilter) {
+ private Segments readSegments(K key, @Nullable Long fileSize) {
InternalRowSerializer formatSerializer = this.formatSerializer.get();
try (CloseableIterator<InternalRow> iterator = reader.apply(key,
fileSize)) {
ArrayList<MemorySegment> segments = new ArrayList<>();
@@ -134,9 +130,7 @@ public class ObjectsCache<K, V> {
new SimpleCollectingOutputView(segments, segmentSource,
cache.pageSize());
while (iterator.hasNext()) {
InternalRow row = iterator.next();
- if (loadFilter.test(row)) {
- formatSerializer.serializeToPages(row, output);
- }
+ formatSerializer.serializeToPages(row, output);
}
return new Segments(segments,
output.getCurrentPositionInSegment());
} catch (Exception e) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 54ae78def2..241fd5eb90 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -102,8 +102,7 @@ public class ObjectsFile<T> implements SimpleFileReader<T> {
}
public List<T> read(String fileName, @Nullable Long fileSize) {
- return read(
- fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue(),
Filter.alwaysTrue());
+ return read(fileName, fileSize, Filter.alwaysTrue(),
Filter.alwaysTrue());
}
public List<T> readWithIOException(String fileName) throws IOException {
@@ -112,8 +111,7 @@ public class ObjectsFile<T> implements SimpleFileReader<T> {
public List<T> readWithIOException(String fileName, @Nullable Long
fileSize)
throws IOException {
- return readWithIOException(
- fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue(),
Filter.alwaysTrue());
+ return readWithIOException(fileName, fileSize, Filter.alwaysTrue(),
Filter.alwaysTrue());
}
public boolean exists(String fileName) {
@@ -127,11 +125,10 @@ public class ObjectsFile<T> implements
SimpleFileReader<T> {
public List<T> read(
String fileName,
@Nullable Long fileSize,
- Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter,
Filter<T> readTFilter) {
try {
- return readWithIOException(fileName, fileSize, loadFilter,
readFilter, readTFilter);
+ return readWithIOException(fileName, fileSize, readFilter,
readTFilter);
} catch (IOException e) {
throw new RuntimeException("Failed to read " + fileName, e);
}
@@ -140,13 +137,12 @@ public class ObjectsFile<T> implements
SimpleFileReader<T> {
private List<T> readWithIOException(
String fileName,
@Nullable Long fileSize,
- Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter,
Filter<T> readTFilter)
throws IOException {
Path path = pathFactory.toPath(fileName);
if (cache != null) {
- return cache.read(path, fileSize, loadFilter, readFilter,
readTFilter);
+ return cache.read(path, fileSize, readFilter, readTFilter);
}
return readFromIterator(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index 2fd84c43e8..434d1048db 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -38,7 +38,6 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.operation.FileStoreTestUtils;
-import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.ReaderSupplier;
@@ -534,46 +533,6 @@ public abstract class SimpleTableTestBase {
commit.close();
}
- @Test
- public void testManifestCache() throws Exception {
- FileStoreTable table =
- createFileStoreTable(
- conf ->
- conf.set(
- CoreOptions.WRITE_MANIFEST_CACHE,
- MemorySize.ofMebiBytes(1)));
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
-
- // lots of commits, produce lots of manifest
- List<String> expected = new ArrayList<>();
- int cnt = 50;
- for (int i = 0; i < cnt; i++) {
- write.write(rowData(i, i, (long) i));
- commit.commit(i, write.prepareCommit(false, i));
- expected.add(
-
String.format("%s|%s|%s|binary|varbinary|mapKey:mapVal|multiset", i, i, i));
- }
- write.close();
-
- // create new write and reload manifests
- write = table.newWrite(commitUser);
- for (int i = 0; i < cnt; i++) {
- write.write(rowData(i, i + 1, (long) i + 1));
- expected.add(
- String.format(
-
"%s|%s|%s|binary|varbinary|mapKey:mapVal|multiset", i, i + 1, i + 1));
- }
- commit.commit(cnt, write.prepareCommit(false, cnt));
- commit.close();
-
- // check result
- List<String> result =
- getResult(table.newRead(), table.newScan().plan().splits(),
BATCH_ROW_TO_STRING);
- assertThat(result.size()).isEqualTo(expected.size());
- assertThat(result).containsExactlyInAnyOrderElementsOf(expected);
- }
-
@Test
public void testWriteWithoutCompactionAndExpiration() throws Exception {
FileStoreTable table =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
index 7e52814a82..b22c616ca7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
@@ -62,25 +62,19 @@ public class ObjectsCacheTest {
cache.withCacheMetrics(scanMetrics.getCacheMetrics());
// test empty
map.put("k1", Collections.emptyList());
- List<String> values =
- cache.read(
- "k1", null, Filter.alwaysTrue(), Filter.alwaysTrue(),
Filter.alwaysTrue());
+ List<String> values = cache.read("k1", null, Filter.alwaysTrue(),
Filter.alwaysTrue());
assertThat(values).isEmpty();
assertThat(scanMetrics.getCacheMetrics().getMissedObject()).hasValue(1);
// test values
List<String> expect = Arrays.asList("v1", "v2", "v3");
map.put("k2", expect);
- values =
- cache.read(
- "k2", null, Filter.alwaysTrue(), Filter.alwaysTrue(),
Filter.alwaysTrue());
+ values = cache.read("k2", null, Filter.alwaysTrue(),
Filter.alwaysTrue());
assertThat(values).containsExactlyElementsOf(expect);
assertThat(scanMetrics.getCacheMetrics().getMissedObject()).hasValue(2);
// test cache
- values =
- cache.read(
- "k2", null, Filter.alwaysTrue(), Filter.alwaysTrue(),
Filter.alwaysTrue());
+ values = cache.read("k2", null, Filter.alwaysTrue(),
Filter.alwaysTrue());
assertThat(values).containsExactlyElementsOf(expect);
assertThat(scanMetrics.getCacheMetrics().getHitObject()).hasValue(1);
@@ -89,35 +83,10 @@ public class ObjectsCacheTest {
cache.read(
"k2",
null,
- Filter.alwaysTrue(),
r -> r.getString(0).toString().endsWith("2"),
Filter.alwaysTrue());
assertThat(values).containsExactly("v2");
- // test load filter
- expect = Arrays.asList("v1", "v2", "v3");
- map.put("k3", expect);
- values =
- cache.read(
- "k3",
- null,
- r -> r.getString(0).toString().endsWith("2"),
- Filter.alwaysTrue(),
- Filter.alwaysTrue());
- assertThat(values).containsExactly("v2");
-
- // test load filter empty
- expect = Arrays.asList("v1", "v2", "v3");
- map.put("k4", expect);
- values =
- cache.read(
- "k4",
- null,
- r -> r.getString(0).toString().endsWith("5"),
- Filter.alwaysTrue(),
- Filter.alwaysTrue());
- assertThat(values).isEmpty();
-
// test read concurrently
map.clear();
for (int i = 0; i < 10; i++) {
@@ -133,7 +102,6 @@ public class ObjectsCacheTest {
k,
null,
Filter.alwaysTrue(),
- Filter.alwaysTrue(),
Filter.alwaysTrue()))
.containsExactly(k);
} catch (IOException e) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
index b3e4153727..270ee693b9 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
@@ -50,7 +50,7 @@ public class CdcAppendTableWriteOperator extends
CdcRecordStoreWriteOperator {
StoreSinkWriteState.StateValueFilter stateFilter) {
// No conflicts will occur in append only unaware bucket writer, so no
state
// is needed.
- return new NoopStoreSinkWriteState(subtaskId, stateFilter);
+ return new NoopStoreSinkWriteState(subtaskId);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 3484210014..c8d635422f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -338,19 +338,13 @@ public class FlinkSinkBuilder {
boolean isStreaming = isStreaming(input);
boolean isAdaptiveParallelismEnabled =
AdaptiveParallelism.isEnabled(input.getExecutionEnvironment());
- boolean writeMCacheEnabled =
table.coreOptions().writeManifestCache().getBytes() > 0;
boolean hashDynamicMode = table.bucketMode() ==
BucketMode.HASH_DYNAMIC;
if (parallelismUndefined
&& !isStreaming
&& isAdaptiveParallelismEnabled
- && (writeMCacheEnabled || hashDynamicMode)) {
+ && hashDynamicMode) {
List<String> messages = new ArrayList<>();
- if (writeMCacheEnabled) {
- messages.add("Write Manifest Cache");
- }
- if (hashDynamicMode) {
- messages.add("Dynamic Bucket Mode");
- }
+ messages.add("Dynamic Bucket Mode");
String parallelismSource;
if (input.getParallelism() > 0) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index 8dc2044734..581472d7b8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -91,7 +91,7 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T> {
StoreSinkWriteState.StateValueFilter stateFilter) {
// No conflicts will occur in append only unaware
bucket writer, so no state
// is needed.
- return new NoopStoreSinkWriteState(subtaskId,
stateFilter);
+ return new NoopStoreSinkWriteState(subtaskId);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
index 7e994e2015..484637152f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
@@ -29,16 +29,9 @@ import java.util.List;
public class NoopStoreSinkWriteState implements StoreSinkWriteState {
private final int subtaskId;
- private final StateValueFilter stateValueFilter;
- public NoopStoreSinkWriteState(int subtaskId, StateValueFilter
stateValueFilter) {
+ public NoopStoreSinkWriteState(int subtaskId) {
this.subtaskId = subtaskId;
- this.stateValueFilter = stateValueFilter;
- }
-
- @Override
- public StateValueFilter stateValueFilter() {
- return stateValueFilter;
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 4600aa0bbf..163fc6b578 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -141,11 +141,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
"memoryPool and memoryPoolFactory cannot be set at the same
time.");
TableWriteImpl<?> tableWrite =
- table.newWrite(
- commitUser,
- (part, bucket) ->
-
state.stateValueFilter().filter(table.name(), part, bucket),
- state.getSubtaskId())
+ table.newWrite(commitUser, state.getSubtaskId())
.withIOManager(paimonIOManager)
.withIgnorePreviousFiles(ignorePreviousFiles)
.withExecutionMode(isStreamingMode)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
index e7b3bcf254..b4ff35708c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
@@ -30,8 +30,6 @@ import java.util.List;
*/
public interface StoreSinkWriteState {
- StoreSinkWriteState.StateValueFilter stateValueFilter();
-
@Nullable
List<StoreSinkWriteState.StateValue> get(String tableName, String key);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
index 7f4379c241..e9703e2f95 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
@@ -89,11 +89,6 @@ public class StoreSinkWriteStateImpl implements
StoreSinkWriteState {
}
}
- @Override
- public StoreSinkWriteState.StateValueFilter stateValueFilter() {
- return stateValueFilter;
- }
-
@Override
public @Nullable List<StoreSinkWriteState.StateValue> get(String
tableName, String key) {
Map<String, List<StoreSinkWriteState.StateValue>> innerMap =
map.get(tableName);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 0398fcae2d..2d968697d2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -97,19 +97,6 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, 33,
333));
}
- @Test
- public void testAQEWithWriteManifest() {
- batchSql("ALTER TABLE T SET ('write-manifest-cache' = '1 mb')");
- batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
- batchSql("INSERT INTO T SELECT a, b, c FROM T GROUP BY a,b,c");
- assertThat(batchSql("SELECT * FROM T"))
- .containsExactlyInAnyOrder(
- Row.of(1, 11, 111),
- Row.of(2, 22, 222),
- Row.of(1, 11, 111),
- Row.of(2, 22, 222));
- }
-
@Test
public void testAQEWithDynamicBucket() {
batchSql("CREATE TABLE IF NOT EXISTS D_T (a INT PRIMARY KEY NOT
ENFORCED, b INT, c INT)");