This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 30c18cf28a [core] Remove 'write-manifest-cache' because new 
writer-coordinator (#5803)
30c18cf28a is described below

commit 30c18cf28ac0f0b12cb5496ebdb8ea1c8c325154
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 27 16:44:36 2025 +0800

    [core] Remove 'write-manifest-cache' because new writer-coordinator (#5803)
---
 .../shortcodes/generated/core_configuration.html   |  6 ----
 .../main/java/org/apache/paimon/CoreOptions.java   | 11 ------
 .../java/org/apache/paimon/AbstractFileStore.java  | 33 ++++-------------
 .../org/apache/paimon/AppendOnlyFileStore.java     | 23 ++++--------
 .../src/main/java/org/apache/paimon/FileStore.java |  6 +---
 .../java/org/apache/paimon/KeyValueFileStore.java  | 23 ++++--------
 .../java/org/apache/paimon/manifest/FileEntry.java |  6 +---
 .../paimon/manifest/ManifestCacheFilter.java       | 36 -------------------
 .../paimon/operation/AbstractFileStoreScan.java    | 34 ------------------
 .../org/apache/paimon/operation/FileStoreScan.java |  3 --
 .../paimon/privilege/PrivilegedFileStore.java      |  8 ++---
 .../paimon/privilege/PrivilegedFileStoreTable.java |  8 ++---
 .../paimon/table/AppendOnlyFileStoreTable.java     | 10 ++----
 .../paimon/table/DelegatedFileStoreTable.java      |  8 ++---
 .../org/apache/paimon/table/FileStoreTable.java    |  6 +---
 .../paimon/table/PrimaryKeyFileStoreTable.java     | 10 ++----
 .../apache/paimon/table/object/ObjectTable.java    |  6 +---
 .../java/org/apache/paimon/utils/ObjectsCache.java | 14 +++-----
 .../java/org/apache/paimon/utils/ObjectsFile.java  | 12 +++----
 .../apache/paimon/table/SimpleTableTestBase.java   | 41 ----------------------
 .../org/apache/paimon/utils/ObjectsCacheTest.java  | 38 ++------------------
 .../sink/cdc/CdcAppendTableWriteOperator.java      |  2 +-
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java | 10 ++----
 .../apache/paimon/flink/sink/FlinkWriteSink.java   |  2 +-
 .../paimon/flink/sink/NoopStoreSinkWriteState.java |  9 +----
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |  6 +---
 .../paimon/flink/sink/StoreSinkWriteState.java     |  2 --
 .../paimon/flink/sink/StoreSinkWriteStateImpl.java |  5 ---
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 13 -------
 29 files changed, 53 insertions(+), 338 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index ebd4b950cc..096c6b58b4 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1164,12 +1164,6 @@ If the data size allocated for the sorting task is 
uneven,which may lead to perf
             <td>Boolean</td>
             <td>Whether the write buffer can be spillable. Enabled by default 
when using object storage or when 'target-file-size' is greater than 
'write-buffer-size'.</td>
         </tr>
-        <tr>
-            <td><h5>write-manifest-cache</h5></td>
-            <td style="word-wrap: break-word;">0 bytes</td>
-            <td>MemorySize</td>
-            <td>Cache size for reading manifest files for write 
initialization.</td>
-        </tr>
         <tr>
             <td><h5>write-max-writers-to-spill</h5></td>
             <td style="word-wrap: break-word;">10</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index c269a969a9..5a8ffb3082 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -530,13 +530,6 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "When in batch append inserting, if the writer 
number is greater than this option, we open the buffer cache and spill function 
to avoid out-of-memory. ");
 
-    public static final ConfigOption<MemorySize> WRITE_MANIFEST_CACHE =
-            key("write-manifest-cache")
-                    .memoryType()
-                    .defaultValue(MemorySize.ofMebiBytes(0))
-                    .withDescription(
-                            "Cache size for reading manifest files for write 
initialization.");
-
     public static final ConfigOption<Integer> LOCAL_SORT_MAX_NUM_FILE_HANDLES =
             key("local-sort.max-num-file-handles")
                     .intType()
@@ -1868,10 +1861,6 @@ public class CoreOptions implements Serializable {
         return options.get(MANIFEST_FULL_COMPACTION_FILE_SIZE);
     }
 
-    public MemorySize writeManifestCache() {
-        return options.get(WRITE_MANIFEST_CACHE);
-    }
-
     public String partitionDefaultName() {
         return options.get(PARTITION_DEFAULT_NAME);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 556d8a7d7b..411c8268b9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -38,7 +38,6 @@ import org.apache.paimon.metastore.AddPartitionTagCallback;
 import org.apache.paimon.metastore.TagPreviewCommitCallback;
 import org.apache.paimon.operation.ChangelogDeletion;
 import org.apache.paimon.operation.FileStoreCommitImpl;
-import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.operation.ManifestsReader;
 import org.apache.paimon.operation.PartitionExpire;
@@ -97,7 +96,6 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
     protected final RowType partitionType;
     protected final CatalogEnvironment catalogEnvironment;
 
-    @Nullable private final SegmentsCache<Path> writeManifestCache;
     @Nullable private SegmentsCache<Path> readManifestCache;
     @Nullable private Cache<Path, Snapshot> snapshotCache;
 
@@ -116,9 +114,6 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
         this.options = options;
         this.partitionType = partitionType;
         this.catalogEnvironment = catalogEnvironment;
-        this.writeManifestCache =
-                SegmentsCache.create(
-                        options.pageSize(), options.writeManifestCache(), 
Long.MAX_VALUE);
     }
 
     @Override
@@ -193,10 +188,6 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
 
     @Override
     public ManifestFile.Factory manifestFileFactory() {
-        return manifestFileFactory(false);
-    }
-
-    protected ManifestFile.Factory manifestFileFactory(boolean forWrite) {
         return new ManifestFile.Factory(
                 fileIO,
                 schemaManager,
@@ -205,21 +196,17 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 options.manifestCompression(),
                 pathFactory(),
                 options.manifestTargetSize().getBytes(),
-                forWrite ? writeManifestCache : readManifestCache);
+                readManifestCache);
     }
 
     @Override
     public ManifestList.Factory manifestListFactory() {
-        return manifestListFactory(false);
-    }
-
-    protected ManifestList.Factory manifestListFactory(boolean forWrite) {
         return new ManifestList.Factory(
                 fileIO,
                 FileFormat.manifestFormat(options),
                 options.manifestCompression(),
                 pathFactory(),
-                forWrite ? writeManifestCache : readManifestCache);
+                readManifestCache);
     }
 
     @Override
@@ -256,12 +243,12 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 new StatsFile(fileIO, pathFactory().statsFileFactory()));
     }
 
-    protected ManifestsReader newManifestsReader(boolean forWrite) {
+    protected ManifestsReader newManifestsReader() {
         return new ManifestsReader(
                 partitionType,
                 options.partitionDefaultName(),
                 snapshotManager(),
-                manifestListFactory(forWrite));
+                manifestListFactory());
     }
 
     @Override
@@ -279,14 +266,6 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
         return schemaManager.mergeSchema(rowType, allowExplicitCast);
     }
 
-    protected abstract FileStoreScan newScan(ScanType scanType);
-
-    protected enum ScanType {
-        FOR_READ,
-        FOR_WRITE,
-        FOR_COMMIT
-    }
-
     @Override
     public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable 
table) {
         SnapshotManager snapshotManager = snapshotManager();
@@ -308,7 +287,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 manifestFileFactory(),
                 manifestListFactory(),
                 indexManifestFileFactory(),
-                newScan(ScanType.FOR_COMMIT),
+                newScan(),
                 options.bucket(),
                 options.manifestTargetSize(),
                 options.manifestFullCompactionThresholdSize(),
@@ -448,7 +427,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 expirationTime,
                 checkInterval,
                 expireStrategy,
-                newScan(ScanType.FOR_COMMIT),
+                newScan(),
                 newCommit(commitUser, table),
                 partitionHandler,
                 options.endInputCheckPartitionExpire(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 39a6649ca7..a277ce6530 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -22,7 +22,6 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.operation.AppendFileStoreWrite;
 import org.apache.paimon.operation.AppendOnlyFileStoreScan;
 import org.apache.paimon.operation.BaseAppendFileStoreWrite;
@@ -72,11 +71,6 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
         return options.bucket() == -1 ? BucketMode.BUCKET_UNAWARE : 
BucketMode.HASH_FIXED;
     }
 
-    @Override
-    public AppendOnlyFileStoreScan newScan() {
-        return newScan(ScanType.FOR_READ);
-    }
-
     @Override
     public RawFileSplitRead newRead() {
         return new RawFileSplitRead(
@@ -91,14 +85,11 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
 
     @Override
     public BaseAppendFileStoreWrite newWrite(String commitUser) {
-        return newWrite(commitUser, null, null);
+        return newWrite(commitUser, null);
     }
 
     @Override
-    public BaseAppendFileStoreWrite newWrite(
-            String commitUser,
-            @Nullable ManifestCacheFilter manifestFilter,
-            @Nullable Integer writeId) {
+    public BaseAppendFileStoreWrite newWrite(String commitUser, @Nullable 
Integer writeId) {
         DeletionVectorsMaintainer.Factory dvMaintainerFactory =
                 options.deletionVectorsEnabled()
                         ? 
DeletionVectorsMaintainer.factory(newIndexFileHandler())
@@ -112,7 +103,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                     partitionType,
                     pathFactory(),
                     snapshotManager(),
-                    
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
+                    newScan(),
                     options,
                     dvMaintainerFactory,
                     tableName);
@@ -126,7 +117,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                     partitionType,
                     pathFactory(),
                     snapshotManager(),
-                    
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
+                    newScan(),
                     options,
                     dvMaintainerFactory,
                     tableName);
@@ -134,7 +125,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
     }
 
     @Override
-    protected AppendOnlyFileStoreScan newScan(ScanType scanType) {
+    public AppendOnlyFileStoreScan newScan() {
         BucketSelectConverter bucketSelectConverter =
                 predicate -> {
                     if (bucketMode() != BucketMode.HASH_FIXED) {
@@ -158,12 +149,12 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 };
 
         return new AppendOnlyFileStoreScan(
-                newManifestsReader(scanType == ScanType.FOR_WRITE),
+                newManifestsReader(),
                 bucketSelectConverter,
                 snapshotManager(),
                 schemaManager,
                 schema,
-                manifestFileFactory(scanType == ScanType.FOR_WRITE),
+                manifestFileFactory(),
                 options.scanManifestParallelism(),
                 options.fileIndexReadEnabled());
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 12fc60a383..62fdd751fc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -21,7 +21,6 @@ package org.apache.paimon;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.IndexManifestFile;
-import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.operation.ChangelogDeletion;
@@ -88,10 +87,7 @@ public interface FileStore<T> {
 
     FileStoreWrite<T> newWrite(String commitUser);
 
-    FileStoreWrite<T> newWrite(
-            String commitUser,
-            @Nullable ManifestCacheFilter manifestFilter,
-            @Nullable Integer writeId);
+    FileStoreWrite<T> newWrite(String commitUser, @Nullable Integer writeId);
 
     FileStoreCommit newCommit(String commitUser, FileStoreTable table);
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 4687bdc2d4..8b94890c92 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -25,7 +25,6 @@ import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.index.DynamicBucketIndexMaintainer;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
-import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.operation.BucketSelectConverter;
@@ -112,11 +111,6 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
         }
     }
 
-    @Override
-    public KeyValueFileStoreScan newScan() {
-        return newScan(ScanType.FOR_READ);
-    }
-
     @Override
     public MergeFileSplitRead newRead() {
         return new MergeFileSplitRead(
@@ -155,14 +149,11 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
 
     @Override
     public AbstractFileStoreWrite<KeyValue> newWrite(String commitUser) {
-        return newWrite(commitUser, null, null);
+        return newWrite(commitUser, null);
     }
 
     @Override
-    public AbstractFileStoreWrite<KeyValue> newWrite(
-            String commitUser,
-            @Nullable ManifestCacheFilter manifestFilter,
-            @Nullable Integer writeId) {
+    public AbstractFileStoreWrite<KeyValue> newWrite(String commitUser, 
@Nullable Integer writeId) {
         DynamicBucketIndexMaintainer.Factory indexFactory = null;
         if (bucketMode() == BucketMode.HASH_DYNAMIC) {
             indexFactory = new 
DynamicBucketIndexMaintainer.Factory(newIndexFileHandler());
@@ -185,7 +176,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                     this::pathFactory,
                     newReaderFactoryBuilder(),
                     snapshotManager(),
-                    
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
+                    newScan(),
                     options,
                     tableName,
                     writeId);
@@ -205,7 +196,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                     pathFactory(),
                     this::pathFactory,
                     snapshotManager(),
-                    
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
+                    newScan(),
                     indexFactory,
                     deletionVectorsMaintainerFactory,
                     options,
@@ -215,7 +206,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
     }
 
     @Override
-    protected KeyValueFileStoreScan newScan(ScanType scanType) {
+    public KeyValueFileStoreScan newScan() {
         BucketMode bucketMode = bucketMode();
         BucketSelectConverter bucketSelectConverter =
                 keyFilter -> {
@@ -237,13 +228,13 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 };
 
         return new KeyValueFileStoreScan(
-                newManifestsReader(scanType == ScanType.FOR_WRITE),
+                newManifestsReader(),
                 bucketSelectConverter,
                 snapshotManager(),
                 schemaManager,
                 schema,
                 keyValueFieldsExtractor,
-                manifestFileFactory(scanType == ScanType.FOR_WRITE),
+                manifestFileFactory(),
                 options.scanManifestParallelism(),
                 options.deletionVectorsEnabled(),
                 options.mergeEngine(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index a09bea2acf..52b029563e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -232,11 +232,7 @@ public interface FileEntry {
         return readDeletedEntries(
                 m ->
                         manifestFile.read(
-                                m.fileName(),
-                                m.fileSize(),
-                                Filter.alwaysTrue(),
-                                deletedFilter(),
-                                Filter.alwaysTrue()),
+                                m.fileName(), m.fileSize(), deletedFilter(), 
Filter.alwaysTrue()),
                 manifestFiles,
                 manifestReadParallelism);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCacheFilter.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCacheFilter.java
deleted file mode 100644
index 32ef757b5a..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCacheFilter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.manifest;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.operation.AbstractFileStoreScan;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * Filter for manifest cache, this is used in {@link AbstractFileStoreScan} 
for improving cache
- * utilization. NOTE: Please use this interface with caution and make sure 
that only filtered data
- * is required, otherwise it will cause correctness issues.
- */
-@ThreadSafe
-@FunctionalInterface
-public interface ManifestCacheFilter {
-
-    boolean test(BinaryRow partition, int bucket);
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index ec856e57b5..f8cd260f52 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.manifest.BucketEntry;
 import org.apache.paimon.manifest.FileEntry;
 import org.apache.paimon.manifest.FileEntry.Identifier;
-import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestEntrySerializer;
 import org.apache.paimon.manifest.ManifestFile;
@@ -62,7 +61,6 @@ import java.util.stream.Collectors;
 import static 
org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
 import static 
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
 import static 
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
 
 /** Default implementation of {@link FileStoreScan}. */
@@ -88,7 +86,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private Filter<ManifestEntry> manifestEntryFilter = null;
     private Filter<String> fileNameFilter = null;
 
-    private ManifestCacheFilter manifestCacheFilter = null;
     private ScanMetrics scanMetrics = null;
     private boolean dropStats;
 
@@ -162,13 +159,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     @Override
     public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) {
-        if (manifestCacheFilter != null && 
manifestFileFactory.isCacheEnabled()) {
-            checkArgument(
-                    manifestCacheFilter.test(partition, bucket),
-                    String.format(
-                            "This is a bug! The partition %s and bucket %s is 
filtered!",
-                            partition, bucket));
-        }
         withPartitionFilter(Collections.singletonList(partition));
         withBucket(bucket);
         return this;
@@ -216,12 +206,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return this;
     }
 
-    @Override
-    public FileStoreScan withManifestCacheFilter(ManifestCacheFilter 
manifestFilter) {
-        this.manifestCacheFilter = manifestFilter;
-        return this;
-    }
-
     @Override
     public FileStoreScan withDataFileNameFilter(Filter<String> fileNameFilter) 
{
         this.fileNameFilter = fileNameFilter;
@@ -457,7 +441,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                         .read(
                                 manifest.fileName(),
                                 manifest.fileSize(),
-                                createCacheRowFilter(),
                                 createEntryRowFilter().and(additionalFilter),
                                 entry ->
                                         (additionalTFilter == null || 
additionalTFilter.test(entry))
@@ -478,23 +461,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return entry.copyWithoutStats();
     }
 
-    /**
-     * According to the {@link ManifestCacheFilter}, entry that needs to be 
cached will be retained,
-     * so the entry that will not be accessed in the future will not be cached.
-     *
-     * <p>Implemented to {@link InternalRow} is for performance (No 
deserialization).
-     */
-    private Filter<InternalRow> createCacheRowFilter() {
-        if (manifestCacheFilter == null) {
-            return Filter.alwaysTrue();
-        }
-
-        Function<InternalRow, BinaryRow> partitionGetter =
-                ManifestEntrySerializer.partitionGetter();
-        Function<InternalRow, Integer> bucketGetter = 
ManifestEntrySerializer.bucketGetter();
-        return row -> manifestCacheFilter.test(partitionGetter.apply(row), 
bucketGetter.apply(row));
-    }
-
     /**
      * Read the corresponding entries based on the current required partition 
and bucket.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index e3759960f8..739e8b916f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -22,7 +22,6 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.BucketEntry;
 import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.PartitionEntry;
@@ -80,8 +79,6 @@ public interface FileStoreScan {
 
     FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter);
 
-    FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);
-
     FileStoreScan withDataFileNameFilter(Filter<String> fileNameFilter);
 
     FileStoreScan withMetrics(ScanMetrics metrics);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index c30b4a94be..88cac126be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -25,7 +25,6 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.IndexManifestFile;
-import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.operation.ChangelogDeletion;
@@ -147,12 +146,9 @@ public class PrivilegedFileStore<T> implements 
FileStore<T> {
     }
 
     @Override
-    public FileStoreWrite<T> newWrite(
-            String commitUser,
-            @Nullable ManifestCacheFilter manifestFilter,
-            @Nullable Integer writeId) {
+    public FileStoreWrite<T> newWrite(String commitUser, @Nullable Integer 
writeId) {
         privilegeChecker.assertCanInsert(identifier);
-        return wrapped.newWrite(commitUser, manifestFilter, writeId);
+        return wrapped.newWrite(commitUser, writeId);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index 26e9ec1ea1..2aec02d0c8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -21,7 +21,6 @@ package org.apache.paimon.privilege;
 import org.apache.paimon.FileStore;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.table.DelegatedFileStoreTable;
@@ -230,12 +229,9 @@ public class PrivilegedFileStoreTable extends 
DelegatedFileStoreTable {
     }
 
     @Override
-    public TableWriteImpl<?> newWrite(
-            String commitUser,
-            @Nullable ManifestCacheFilter manifestFilter,
-            @Nullable Integer writeId) {
+    public TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer 
writeId) {
         privilegeChecker.assertCanInsert(identifier);
-        return wrapped.newWrite(commitUser, manifestFilter, writeId);
+        return wrapped.newWrite(commitUser, writeId);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 240163cf3d..b060954b6b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -23,7 +23,6 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.operation.AppendOnlyFileStoreScan;
 import org.apache.paimon.operation.BaseAppendFileStoreWrite;
 import org.apache.paimon.operation.FileStoreScan;
@@ -127,15 +126,12 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
 
     @Override
     public TableWriteImpl<InternalRow> newWrite(String commitUser) {
-        return newWrite(commitUser, null, null);
+        return newWrite(commitUser, null);
     }
 
     @Override
-    public TableWriteImpl<InternalRow> newWrite(
-            String commitUser,
-            @Nullable ManifestCacheFilter manifestFilter,
-            @Nullable Integer writeId) {
-        BaseAppendFileStoreWrite writer = store().newWrite(commitUser, 
manifestFilter, writeId);
+    public TableWriteImpl<InternalRow> newWrite(String commitUser, @Nullable 
Integer writeId) {
+        BaseAppendFileStoreWrite writer = store().newWrite(commitUser, 
writeId);
         return new TableWriteImpl<>(
                 rowType(),
                 writer,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 5f5bc68b3b..a0e33f620a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -25,7 +25,6 @@ import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.schema.SchemaManager;
@@ -300,11 +299,8 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
     }
 
     @Override
-    public TableWriteImpl<?> newWrite(
-            String commitUser,
-            @Nullable ManifestCacheFilter manifestFilter,
-            @Nullable Integer writeId) {
-        return wrapped.newWrite(commitUser, manifestFilter, writeId);
+    public TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer 
writeId) {
+        return wrapped.newWrite(commitUser, writeId);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 49aaced3c5..e7b8129e3c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -23,7 +23,6 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.operation.LocalOrphanFilesClean;
 import org.apache.paimon.options.ExpireConfig;
 import org.apache.paimon.schema.TableSchema;
@@ -116,10 +115,7 @@ public interface FileStoreTable extends DataTable {
     @Override
     TableWriteImpl<?> newWrite(String commitUser);
 
-    TableWriteImpl<?> newWrite(
-            String commitUser,
-            @Nullable ManifestCacheFilter manifestFilter,
-            @Nullable Integer writeId);
+    TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer writeId);
 
     @Override
     TableCommitImpl newCommit(String commitUser);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 1a22822779..4901638b65 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -24,7 +24,6 @@ import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.mergetree.compact.LookupMergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.operation.FileStoreScan;
@@ -152,18 +151,15 @@ public class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
 
     @Override
     public TableWriteImpl<KeyValue> newWrite(String commitUser) {
-        return newWrite(commitUser, null, null);
+        return newWrite(commitUser, null);
     }
 
     @Override
-    public TableWriteImpl<KeyValue> newWrite(
-            String commitUser,
-            @Nullable ManifestCacheFilter manifestFilter,
-            @Nullable Integer writeId) {
+    public TableWriteImpl<KeyValue> newWrite(String commitUser, @Nullable 
Integer writeId) {
         KeyValue kv = new KeyValue();
         return new TableWriteImpl<>(
                 rowType(),
-                store().newWrite(commitUser, manifestFilter, writeId),
+                store().newWrite(commitUser, writeId),
                 createRowKeyExtractor(),
                 (record, rowKind) ->
                         kv.replace(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
index d63d547656..c829d212e9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.table.object;
 
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.DelegatedFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
@@ -159,10 +158,7 @@ public interface ObjectTable extends FileStoreTable {
         }
 
         @Override
-        public TableWriteImpl<?> newWrite(
-                String commitUser,
-                @Nullable ManifestCacheFilter manifestFilter,
-                @Nullable Integer writeId) {
+        public TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer 
writeId) {
             throw new UnsupportedOperationException("Object table does not 
support Write.");
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
index 8fe13943a3..813e3b84db 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
@@ -70,11 +70,7 @@ public class ObjectsCache<K, V> {
     }
 
     public List<V> read(
-            K key,
-            @Nullable Long fileSize,
-            Filter<InternalRow> loadFilter,
-            Filter<InternalRow> readFilter,
-            Filter<V> readVFilter)
+            K key, @Nullable Long fileSize, Filter<InternalRow> readFilter, 
Filter<V> readVFilter)
             throws IOException {
         Segments segments = cache.getIfPresents(key);
         if (segments != null) {
@@ -90,7 +86,7 @@ public class ObjectsCache<K, V> {
                 fileSize = fileSizeFunction.apply(key);
             }
             if (fileSize <= cache.maxElementSize()) {
-                segments = readSegments(key, fileSize, loadFilter);
+                segments = readSegments(key, fileSize);
                 cache.put(key, segments);
                 return readFromSegments(segments, readFilter, readVFilter);
             } else {
@@ -124,7 +120,7 @@ public class ObjectsCache<K, V> {
         }
     }
 
-    private Segments readSegments(K key, @Nullable Long fileSize, 
Filter<InternalRow> loadFilter) {
+    private Segments readSegments(K key, @Nullable Long fileSize) {
         InternalRowSerializer formatSerializer = this.formatSerializer.get();
         try (CloseableIterator<InternalRow> iterator = reader.apply(key, 
fileSize)) {
             ArrayList<MemorySegment> segments = new ArrayList<>();
@@ -134,9 +130,7 @@ public class ObjectsCache<K, V> {
                     new SimpleCollectingOutputView(segments, segmentSource, 
cache.pageSize());
             while (iterator.hasNext()) {
                 InternalRow row = iterator.next();
-                if (loadFilter.test(row)) {
-                    formatSerializer.serializeToPages(row, output);
-                }
+                formatSerializer.serializeToPages(row, output);
             }
             return new Segments(segments, 
output.getCurrentPositionInSegment());
         } catch (Exception e) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 54ae78def2..241fd5eb90 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -102,8 +102,7 @@ public class ObjectsFile<T> implements SimpleFileReader<T> {
     }
 
     public List<T> read(String fileName, @Nullable Long fileSize) {
-        return read(
-                fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue(), 
Filter.alwaysTrue());
+        return read(fileName, fileSize, Filter.alwaysTrue(), 
Filter.alwaysTrue());
     }
 
     public List<T> readWithIOException(String fileName) throws IOException {
@@ -112,8 +111,7 @@ public class ObjectsFile<T> implements SimpleFileReader<T> {
 
     public List<T> readWithIOException(String fileName, @Nullable Long 
fileSize)
             throws IOException {
-        return readWithIOException(
-                fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue(), 
Filter.alwaysTrue());
+        return readWithIOException(fileName, fileSize, Filter.alwaysTrue(), 
Filter.alwaysTrue());
     }
 
     public boolean exists(String fileName) {
@@ -127,11 +125,10 @@ public class ObjectsFile<T> implements 
SimpleFileReader<T> {
     public List<T> read(
             String fileName,
             @Nullable Long fileSize,
-            Filter<InternalRow> loadFilter,
             Filter<InternalRow> readFilter,
             Filter<T> readTFilter) {
         try {
-            return readWithIOException(fileName, fileSize, loadFilter, 
readFilter, readTFilter);
+            return readWithIOException(fileName, fileSize, readFilter, 
readTFilter);
         } catch (IOException e) {
             throw new RuntimeException("Failed to read " + fileName, e);
         }
@@ -140,13 +137,12 @@ public class ObjectsFile<T> implements 
SimpleFileReader<T> {
     private List<T> readWithIOException(
             String fileName,
             @Nullable Long fileSize,
-            Filter<InternalRow> loadFilter,
             Filter<InternalRow> readFilter,
             Filter<T> readTFilter)
             throws IOException {
         Path path = pathFactory.toPath(fileName);
         if (cache != null) {
-            return cache.read(path, fileSize, loadFilter, readFilter, 
readTFilter);
+            return cache.read(path, fileSize, readFilter, readTFilter);
         }
 
         return readFromIterator(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index 2fd84c43e8..434d1048db 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -38,7 +38,6 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.operation.FileStoreTestUtils;
-import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.ReaderSupplier;
@@ -534,46 +533,6 @@ public abstract class SimpleTableTestBase {
         commit.close();
     }
 
-    @Test
-    public void testManifestCache() throws Exception {
-        FileStoreTable table =
-                createFileStoreTable(
-                        conf ->
-                                conf.set(
-                                        CoreOptions.WRITE_MANIFEST_CACHE,
-                                        MemorySize.ofMebiBytes(1)));
-        StreamTableWrite write = table.newWrite(commitUser);
-        StreamTableCommit commit = table.newCommit(commitUser);
-
-        // lots of commits, produce lots of manifest
-        List<String> expected = new ArrayList<>();
-        int cnt = 50;
-        for (int i = 0; i < cnt; i++) {
-            write.write(rowData(i, i, (long) i));
-            commit.commit(i, write.prepareCommit(false, i));
-            expected.add(
-                    
String.format("%s|%s|%s|binary|varbinary|mapKey:mapVal|multiset", i, i, i));
-        }
-        write.close();
-
-        // create new write and reload manifests
-        write = table.newWrite(commitUser);
-        for (int i = 0; i < cnt; i++) {
-            write.write(rowData(i, i + 1, (long) i + 1));
-            expected.add(
-                    String.format(
-                            
"%s|%s|%s|binary|varbinary|mapKey:mapVal|multiset", i, i + 1, i + 1));
-        }
-        commit.commit(cnt, write.prepareCommit(false, cnt));
-        commit.close();
-
-        // check result
-        List<String> result =
-                getResult(table.newRead(), table.newScan().plan().splits(), 
BATCH_ROW_TO_STRING);
-        assertThat(result.size()).isEqualTo(expected.size());
-        assertThat(result).containsExactlyInAnyOrderElementsOf(expected);
-    }
-
     @Test
     public void testWriteWithoutCompactionAndExpiration() throws Exception {
         FileStoreTable table =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
index 7e52814a82..b22c616ca7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
@@ -62,25 +62,19 @@ public class ObjectsCacheTest {
         cache.withCacheMetrics(scanMetrics.getCacheMetrics());
         // test empty
         map.put("k1", Collections.emptyList());
-        List<String> values =
-                cache.read(
-                        "k1", null, Filter.alwaysTrue(), Filter.alwaysTrue(), 
Filter.alwaysTrue());
+        List<String> values = cache.read("k1", null, Filter.alwaysTrue(), 
Filter.alwaysTrue());
         assertThat(values).isEmpty();
         
assertThat(scanMetrics.getCacheMetrics().getMissedObject()).hasValue(1);
 
         // test values
         List<String> expect = Arrays.asList("v1", "v2", "v3");
         map.put("k2", expect);
-        values =
-                cache.read(
-                        "k2", null, Filter.alwaysTrue(), Filter.alwaysTrue(), 
Filter.alwaysTrue());
+        values = cache.read("k2", null, Filter.alwaysTrue(), 
Filter.alwaysTrue());
         assertThat(values).containsExactlyElementsOf(expect);
         
assertThat(scanMetrics.getCacheMetrics().getMissedObject()).hasValue(2);
 
         // test cache
-        values =
-                cache.read(
-                        "k2", null, Filter.alwaysTrue(), Filter.alwaysTrue(), 
Filter.alwaysTrue());
+        values = cache.read("k2", null, Filter.alwaysTrue(), 
Filter.alwaysTrue());
         assertThat(values).containsExactlyElementsOf(expect);
         assertThat(scanMetrics.getCacheMetrics().getHitObject()).hasValue(1);
 
@@ -89,35 +83,10 @@ public class ObjectsCacheTest {
                 cache.read(
                         "k2",
                         null,
-                        Filter.alwaysTrue(),
                         r -> r.getString(0).toString().endsWith("2"),
                         Filter.alwaysTrue());
         assertThat(values).containsExactly("v2");
 
-        // test load filter
-        expect = Arrays.asList("v1", "v2", "v3");
-        map.put("k3", expect);
-        values =
-                cache.read(
-                        "k3",
-                        null,
-                        r -> r.getString(0).toString().endsWith("2"),
-                        Filter.alwaysTrue(),
-                        Filter.alwaysTrue());
-        assertThat(values).containsExactly("v2");
-
-        // test load filter empty
-        expect = Arrays.asList("v1", "v2", "v3");
-        map.put("k4", expect);
-        values =
-                cache.read(
-                        "k4",
-                        null,
-                        r -> r.getString(0).toString().endsWith("5"),
-                        Filter.alwaysTrue(),
-                        Filter.alwaysTrue());
-        assertThat(values).isEmpty();
-
         // test read concurrently
         map.clear();
         for (int i = 0; i < 10; i++) {
@@ -133,7 +102,6 @@ public class ObjectsCacheTest {
                                                         k,
                                                         null,
                                                         Filter.alwaysTrue(),
-                                                        Filter.alwaysTrue(),
                                                         Filter.alwaysTrue()))
                                         .containsExactly(k);
                             } catch (IOException e) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
index b3e4153727..270ee693b9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
@@ -50,7 +50,7 @@ public class CdcAppendTableWriteOperator extends 
CdcRecordStoreWriteOperator {
             StoreSinkWriteState.StateValueFilter stateFilter) {
         // No conflicts will occur in append only unaware bucket writer, so no 
state
         // is needed.
-        return new NoopStoreSinkWriteState(subtaskId, stateFilter);
+        return new NoopStoreSinkWriteState(subtaskId);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 3484210014..c8d635422f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -338,19 +338,13 @@ public class FlinkSinkBuilder {
             boolean isStreaming = isStreaming(input);
             boolean isAdaptiveParallelismEnabled =
                     
AdaptiveParallelism.isEnabled(input.getExecutionEnvironment());
-            boolean writeMCacheEnabled = 
table.coreOptions().writeManifestCache().getBytes() > 0;
             boolean hashDynamicMode = table.bucketMode() == 
BucketMode.HASH_DYNAMIC;
             if (parallelismUndefined
                     && !isStreaming
                     && isAdaptiveParallelismEnabled
-                    && (writeMCacheEnabled || hashDynamicMode)) {
+                    && hashDynamicMode) {
                 List<String> messages = new ArrayList<>();
-                if (writeMCacheEnabled) {
-                    messages.add("Write Manifest Cache");
-                }
-                if (hashDynamicMode) {
-                    messages.add("Dynamic Bucket Mode");
-                }
+                messages.add("Dynamic Bucket Mode");
 
                 String parallelismSource;
                 if (input.getParallelism() > 0) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index 8dc2044734..581472d7b8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -91,7 +91,7 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T> {
                             StoreSinkWriteState.StateValueFilter stateFilter) {
                         // No conflicts will occur in append only unaware 
bucket writer, so no state
                         // is needed.
-                        return new NoopStoreSinkWriteState(subtaskId, 
stateFilter);
+                        return new NoopStoreSinkWriteState(subtaskId);
                     }
 
                     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
index 7e994e2015..484637152f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
@@ -29,16 +29,9 @@ import java.util.List;
 public class NoopStoreSinkWriteState implements StoreSinkWriteState {
 
     private final int subtaskId;
-    private final StateValueFilter stateValueFilter;
 
-    public NoopStoreSinkWriteState(int subtaskId, StateValueFilter 
stateValueFilter) {
+    public NoopStoreSinkWriteState(int subtaskId) {
         this.subtaskId = subtaskId;
-        this.stateValueFilter = stateValueFilter;
-    }
-
-    @Override
-    public StateValueFilter stateValueFilter() {
-        return stateValueFilter;
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 4600aa0bbf..163fc6b578 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -141,11 +141,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
                 "memoryPool and memoryPoolFactory cannot be set at the same 
time.");
 
         TableWriteImpl<?> tableWrite =
-                table.newWrite(
-                                commitUser,
-                                (part, bucket) ->
-                                        
state.stateValueFilter().filter(table.name(), part, bucket),
-                                state.getSubtaskId())
+                table.newWrite(commitUser, state.getSubtaskId())
                         .withIOManager(paimonIOManager)
                         .withIgnorePreviousFiles(ignorePreviousFiles)
                         .withExecutionMode(isStreamingMode)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
index e7b3bcf254..b4ff35708c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
@@ -30,8 +30,6 @@ import java.util.List;
  */
 public interface StoreSinkWriteState {
 
-    StoreSinkWriteState.StateValueFilter stateValueFilter();
-
     @Nullable
     List<StoreSinkWriteState.StateValue> get(String tableName, String key);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
index 7f4379c241..e9703e2f95 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
@@ -89,11 +89,6 @@ public class StoreSinkWriteStateImpl implements 
StoreSinkWriteState {
         }
     }
 
-    @Override
-    public StoreSinkWriteState.StateValueFilter stateValueFilter() {
-        return stateValueFilter;
-    }
-
     @Override
     public @Nullable List<StoreSinkWriteState.StateValue> get(String 
tableName, String key) {
         Map<String, List<StoreSinkWriteState.StateValue>> innerMap = 
map.get(tableName);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 0398fcae2d..2d968697d2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -97,19 +97,6 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
                         Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, 33, 
333));
     }
 
-    @Test
-    public void testAQEWithWriteManifest() {
-        batchSql("ALTER TABLE T SET ('write-manifest-cache' = '1 mb')");
-        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
-        batchSql("INSERT INTO T SELECT a, b, c FROM T GROUP BY a,b,c");
-        assertThat(batchSql("SELECT * FROM T"))
-                .containsExactlyInAnyOrder(
-                        Row.of(1, 11, 111),
-                        Row.of(2, 22, 222),
-                        Row.of(1, 11, 111),
-                        Row.of(2, 22, 222));
-    }
-
     @Test
     public void testAQEWithDynamicBucket() {
         batchSql("CREATE TABLE IF NOT EXISTS D_T (a INT PRIMARY KEY NOT 
ENFORCED, b INT, c INT)");

Reply via email to