This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d3844605 [core] Drop stats in manifest file reading (#4534)
1d3844605 is described below

commit 1d3844605ab71d328d1519e08ebc350eeaaa5f5a
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Nov 19 15:37:54 2024 +0800

    [core] Drop stats in manifest file reading (#4534)
---
 docs/content/maintenance/metrics.md                |  10 -
 .../main/java/org/apache/paimon/utils/Filter.java  |   7 +
 .../org/apache/paimon/utils/ThreadPoolUtils.java   |   6 +-
 .../UnawareAppendTableCompactionCoordinator.java   |   8 +-
 .../java/org/apache/paimon/manifest/FileEntry.java |  25 ++-
 .../paimon/manifest/FilteredManifestEntry.java     |  30 +--
 .../org/apache/paimon/manifest/ManifestFile.java   |  13 --
 .../paimon/operation/AbstractFileStoreScan.java    | 209 +++++++++++++--------
 .../paimon/operation/AppendOnlyFileStoreScan.java  |   7 -
 .../org/apache/paimon/operation/FileStoreScan.java |   2 +
 .../paimon/operation/KeyValueFileStoreScan.java    |  38 +++-
 .../paimon/operation/LocalOrphanFilesClean.java    |   4 +-
 .../paimon/operation/metrics/ScanMetrics.java      |  12 --
 .../apache/paimon/operation/metrics/ScanStats.java |  22 +--
 .../apache/paimon/table/sink/TableCommitImpl.java  |   4 +-
 .../paimon/table/source/DataTableBatchScan.java    |   2 +-
 .../snapshot/IncrementalStartingScanner.java       |   6 +-
 .../table/source/snapshot/SnapshotReader.java      |   2 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |   6 +
 .../apache/paimon/table/system/AuditLogTable.java  |   6 +
 .../paimon/table/system/ReadOptimizedTable.java    |   3 +-
 .../paimon/utils/ManifestReadThreadPool.java       |   4 +-
 .../java/org/apache/paimon/utils/ObjectsCache.java |  17 +-
 .../java/org/apache/paimon/utils/ObjectsFile.java  |  27 ++-
 .../paimon/operation/metrics/ScanMetricsTest.java  |  26 +--
 .../paimon/table/PrimaryKeyFileStoreTableTest.java |  17 --
 .../org/apache/paimon/utils/ObjectsCacheTest.java  |  18 +-
 .../flink/source/ContinuousFileStoreSource.java    |   2 +-
 .../paimon/flink/source/FlinkSourceBuilder.java    |   2 +-
 .../paimon/flink/source/FlinkTableSource.java      |  20 +-
 .../paimon/flink/source/StaticFileStoreSource.java |   2 +-
 .../flink/source/operator/MonitorFunction.java     |   2 +-
 .../paimon/hive/utils/HiveSplitGenerator.java      |   3 +-
 .../paimon/spark/ColumnPruningAndPushDown.scala    |   2 +-
 .../commands/PaimonAnalyzeTableColumnCommand.scala |   7 +-
 .../apache/paimon/spark/sources/StreamHelper.scala |   3 +-
 36 files changed, 299 insertions(+), 275 deletions(-)

diff --git a/docs/content/maintenance/metrics.md 
b/docs/content/maintenance/metrics.md
index 139bdfff6..2c3067267 100644
--- a/docs/content/maintenance/metrics.md
+++ b/docs/content/maintenance/metrics.md
@@ -67,16 +67,6 @@ Below is lists of Paimon built-in metrics. They are 
summarized into types of sca
             <td>Gauge</td>
             <td>Number of scanned manifest files in the last scan.</td>
         </tr>
-        <tr>
-            <td>lastSkippedByPartitionAndStats</td>
-            <td>Gauge</td>
-            <td>Skipped table files by partition filter and value / key stats 
information in the last scan.</td>
-        </tr>
-        <tr>
-            <td>lastSkippedByWholeBucketFilesFilter</td>
-            <td>Gauge</td>
-            <td>Skipped table files by bucket level value filter (only primary 
key table) in the last scan.</td>
-        </tr>
         <tr>
             <td>lastScanSkippedTableFiles</td>
             <td>Gauge</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
index 2764bc773..4d9416252 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
@@ -37,6 +37,13 @@ public interface Filter<T> {
      */
     boolean test(T t);
 
+    default Filter<T> and(Filter<? super T> other) {
+        if (other == null) {
+            return this;
+        }
+        return t -> test(t) && other.test(t);
+    }
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     static <T> Filter<T> alwaysTrue() {
         return (Filter) ALWAYS_TRUE;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
index 02b5d73fc..112b9ad1c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
@@ -110,7 +110,9 @@ public class ThreadPoolUtils {
                                 if (stack.isEmpty()) {
                                     return;
                                 }
-                                activeList = randomlyExecute(executor, 
processor, stack.poll());
+                                activeList =
+                                        randomlyExecuteSequentialReturn(
+                                                executor, processor, 
stack.poll());
                             }
                         }
                     }
@@ -132,7 +134,7 @@ public class ThreadPoolUtils {
         awaitAllFutures(futures);
     }
 
-    public static <U, T> Iterator<T> randomlyExecute(
+    public static <U, T> Iterator<T> randomlyExecuteSequentialReturn(
             ExecutorService executor, Function<U, List<T>> processor, 
Collection<U> input) {
         List<Future<List<T>>> futures = new ArrayList<>(input.size());
         ClassLoader cl = Thread.currentThread().getContextClassLoader();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
index 842b22316..577f28d0f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
@@ -27,6 +27,7 @@ import 
org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintai
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.FileStoreTable;
@@ -441,7 +442,12 @@ public class UnawareAppendTableCompactionCoordinator {
                 }
 
                 if (currentIterator.hasNext()) {
-                    return currentIterator.next();
+                    ManifestEntry entry = currentIterator.next();
+                    if (entry.kind() == FileKind.DELETE) {
+                        continue;
+                    } else {
+                        return entry;
+                    }
                 }
                 currentIterator = null;
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 3b3e514e0..91e07a369 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -28,15 +28,17 @@ import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
 import static 
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
 
 /** Entry representing a file. */
@@ -214,7 +216,11 @@ public interface FileEntry {
         return readDeletedEntries(
                 m ->
                         manifestFile.read(
-                                m.fileName(), m.fileSize(), 
Filter.alwaysTrue(), deletedFilter()),
+                                m.fileName(),
+                                m.fileSize(),
+                                Filter.alwaysTrue(),
+                                deletedFilter(),
+                                Filter.alwaysTrue()),
                 manifestFiles,
                 manifestReadParallelism);
     }
@@ -234,11 +240,11 @@ public interface FileEntry {
                                 .filter(e -> e.kind() == FileKind.DELETE)
                                 .map(FileEntry::identifier)
                                 .collect(Collectors.toList());
-        Iterable<Identifier> identifiers =
-                sequentialBatchedExecute(processor, manifestFiles, 
manifestReadParallelism);
-        Set<Identifier> result = new HashSet<>();
-        for (Identifier identifier : identifiers) {
-            result.add(identifier);
+        Iterator<Identifier> identifiers =
+                randomlyExecuteSequentialReturn(processor, manifestFiles, 
manifestReadParallelism);
+        Set<Identifier> result = ConcurrentHashMap.newKeySet();
+        while (identifiers.hasNext()) {
+            result.add(identifiers.next());
         }
         return result;
     }
@@ -247,4 +253,9 @@ public interface FileEntry {
         Function<InternalRow, FileKind> getter = 
ManifestEntrySerializer.kindGetter();
         return row -> getter.apply(row) == FileKind.DELETE;
     }
+
+    static Filter<InternalRow> addFilter() {
+        Function<InternalRow, FileKind> getter = 
ManifestEntrySerializer.kindGetter();
+        return row -> getter.apply(row) == FileKind.ADD;
+    }
 }
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java
similarity index 54%
copy from paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
copy to 
paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java
index 2764bc773..29ae6f638 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java
@@ -16,29 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.utils;
+package org.apache.paimon.manifest;
 
-import org.apache.paimon.predicate.Predicate;
+/** Wrap a {@link ManifestEntry} to contain {@link #selected}. */
+public class FilteredManifestEntry extends ManifestEntry {
 
-/**
- * Represents a filter (boolean-valued function) of one argument. This class 
is for avoiding name
- * conflicting to {@link Predicate}.
- */
-@FunctionalInterface
-public interface Filter<T> {
-
-    Filter<?> ALWAYS_TRUE = t -> true;
+    private final boolean selected;
 
-    /**
-     * Evaluates this predicate on the given argument.
-     *
-     * @param t the input argument
-     * @return {@code true} if the input argument matches the predicate, 
otherwise {@code false}
-     */
-    boolean test(T t);
+    public FilteredManifestEntry(ManifestEntry entry, boolean selected) {
+        super(entry.kind(), entry.partition(), entry.bucket(), 
entry.totalBuckets(), entry.file());
+        this.selected = selected;
+    }
 
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    static <T> Filter<T> alwaysTrue() {
-        return (Filter) ALWAYS_TRUE;
+    public boolean selected() {
+        return selected;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 38181a823..128f5262a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -211,18 +211,5 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                     suggestedFileSize,
                     cache);
         }
-
-        public ObjectsFile<SimpleFileEntry> createSimpleFileEntryReader() {
-            RowType entryType = 
VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA);
-            return new ObjectsFile<>(
-                    fileIO,
-                    new SimpleFileEntrySerializer(),
-                    entryType,
-                    fileFormat.createReaderFactory(entryType),
-                    fileFormat.createWriterFactory(entryType),
-                    compression,
-                    pathFactory.manifestFileFactory(),
-                    cache);
-        }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 0e1f9357e..98e064451 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -23,7 +23,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.manifest.BucketEntry;
 import org.apache.paimon.manifest.FileEntry;
-import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.FileEntry.Identifier;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestEntrySerializer;
@@ -43,8 +43,6 @@ import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -62,6 +60,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
+import static 
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
 import static 
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkState;
@@ -193,6 +192,11 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return this;
     }
 
+    @Override
+    public FileStoreScan enableValueFilter() {
+        return this;
+    }
+
     @Override
     public FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter) 
{
         this.manifestEntryFilter = filter;
@@ -241,47 +245,46 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         Snapshot snapshot = manifestsResult.snapshot;
         List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
 
-        long startDataFiles =
-                manifestsResult.allManifests.stream()
-                        .mapToLong(f -> f.numAddedFiles() - 
f.numDeletedFiles())
-                        .sum();
-
-        Collection<ManifestEntry> mergedEntries =
-                readAndMergeFileEntries(manifests, this::readManifest);
-
-        long skippedByPartitionAndStats = startDataFiles - 
mergedEntries.size();
-
-        // We group files by bucket here, and filter them by the whole bucket 
filter.
-        // Why do this: because in primary key table, we can't just filter the 
value
-        // by the stat in files (see 
`PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
-        // but we can do this by filter the whole bucket files
-        List<ManifestEntry> files =
-                mergedEntries.stream()
-                        .collect(
-                                Collectors.groupingBy(
-                                        // we use LinkedHashMap to avoid 
disorder
-                                        file -> Pair.of(file.partition(), 
file.bucket()),
-                                        LinkedHashMap::new,
-                                        Collectors.toList()))
-                        .values()
-                        .stream()
-                        .map(this::filterWholeBucketByStats)
-                        .flatMap(Collection::stream)
-                        .collect(Collectors.toList());
+        Iterator<ManifestEntry> iterator = readManifestEntries(manifests, 
false);
+        List<ManifestEntry> files = new ArrayList<>();
+        while (iterator.hasNext()) {
+            files.add(iterator.next());
+        }
+
+        if (wholeBucketFilterEnabled()) {
+            // We group files by bucket here, and filter them by the whole 
bucket filter.
+            // Why do this: because in primary key table, we can't just filter 
the value
+            // by the stat in files (see 
`PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
+            // but we can do this by filter the whole bucket files
+            files =
+                    files.stream()
+                            .collect(
+                                    Collectors.groupingBy(
+                                            // we use LinkedHashMap to avoid 
disorder
+                                            file -> Pair.of(file.partition(), 
file.bucket()),
+                                            LinkedHashMap::new,
+                                            Collectors.toList()))
+                            .values()
+                            .stream()
+                            .map(this::filterWholeBucketByStats)
+                            .flatMap(Collection::stream)
+                            .collect(Collectors.toList());
+        }
+
+        List<ManifestEntry> result = files;
 
-        long skippedByWholeBucketFiles = mergedEntries.size() - files.size();
         long scanDuration = (System.nanoTime() - started) / 1_000_000;
-        checkState(
-                startDataFiles - skippedByPartitionAndStats - 
skippedByWholeBucketFiles
-                        == files.size());
         if (scanMetrics != null) {
+            long allDataFiles =
+                    manifestsResult.allManifests.stream()
+                            .mapToLong(f -> f.numAddedFiles() - 
f.numDeletedFiles())
+                            .sum();
             scanMetrics.reportScan(
                     new ScanStats(
                             scanDuration,
                             manifests.size(),
-                            skippedByPartitionAndStats,
-                            skippedByWholeBucketFiles,
-                            files.size()));
+                            allDataFiles - result.size(),
+                            result.size()));
         }
 
         return new Plan() {
@@ -299,12 +302,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
             @Override
             public List<ManifestEntry> files() {
-                if (dropStats) {
-                    return files.stream()
-                            .map(ManifestEntry::copyWithoutStats)
-                            .collect(Collectors.toList());
-                }
-                return files;
+                return result;
             }
         };
     }
@@ -312,9 +310,15 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     @Override
     public List<SimpleFileEntry> readSimpleEntries() {
         List<ManifestFileMeta> manifests = readManifests().filteredManifests;
-        Collection<SimpleFileEntry> mergedEntries =
-                readAndMergeFileEntries(manifests, this::readSimpleEntries);
-        return new ArrayList<>(mergedEntries);
+        Iterator<SimpleFileEntry> iterator =
+                scanMode == ScanMode.ALL
+                        ? readAndMergeFileEntries(manifests, 
SimpleFileEntry::from, false)
+                        : readAndNoMergeFileEntries(manifests, 
SimpleFileEntry::from, false);
+        List<SimpleFileEntry> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+        return result;
     }
 
     @Override
@@ -343,23 +347,57 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     @Override
     public Iterator<ManifestEntry> readFileIterator() {
-        List<ManifestFileMeta> manifests = readManifests().filteredManifests;
-        Set<FileEntry.Identifier> deleteEntries =
-                FileEntry.readDeletedEntries(this::readSimpleEntries, 
manifests, parallelism);
-        Iterator<ManifestEntry> iterator =
-                sequentialBatchedExecute(this::readManifest, manifests, 
parallelism).iterator();
-        return Iterators.filter(
-                iterator,
-                entry ->
-                        entry != null
-                                && entry.kind() == FileKind.ADD
-                                && 
!deleteEntries.contains(entry.identifier()));
+        // useSequential: reduce memory and iterator can be stopping
+        return readManifestEntries(readManifests().filteredManifests, true);
+    }
+
+    private Iterator<ManifestEntry> readManifestEntries(
+            List<ManifestFileMeta> manifests, boolean useSequential) {
+        return scanMode == ScanMode.ALL
+                ? readAndMergeFileEntries(manifests, Function.identity(), 
useSequential)
+                : readAndNoMergeFileEntries(manifests, Function.identity(), 
useSequential);
+    }
+
+    private <T extends FileEntry> Iterator<T> readAndMergeFileEntries(
+            List<ManifestFileMeta> manifests,
+            Function<List<ManifestEntry>, List<T>> converter,
+            boolean useSequential) {
+        Set<Identifier> deletedEntries =
+                FileEntry.readDeletedEntries(
+                        manifest -> readManifest(manifest, 
FileEntry.deletedFilter(), null),
+                        manifests,
+                        parallelism);
+
+        manifests =
+                manifests.stream()
+                        .filter(file -> file.numAddedFiles() > 0)
+                        .collect(Collectors.toList());
+
+        Function<ManifestFileMeta, List<T>> processor =
+                manifest ->
+                        converter.apply(
+                                readManifest(
+                                        manifest,
+                                        FileEntry.addFilter(),
+                                        entry -> 
!deletedEntries.contains(entry.identifier())));
+        if (useSequential) {
+            return sequentialBatchedExecute(processor, manifests, 
parallelism).iterator();
+        } else {
+            return randomlyExecuteSequentialReturn(processor, manifests, 
parallelism);
+        }
     }
 
-    public <T extends FileEntry> Collection<T> readAndMergeFileEntries(
-            List<ManifestFileMeta> manifests, Function<ManifestFileMeta, 
List<T>> manifestReader) {
-        return FileEntry.mergeEntries(
-                sequentialBatchedExecute(manifestReader, manifests, 
parallelism));
+    private <T extends FileEntry> Iterator<T> readAndNoMergeFileEntries(
+            List<ManifestFileMeta> manifests,
+            Function<List<ManifestEntry>, List<T>> converter,
+            boolean useSequential) {
+        Function<ManifestFileMeta, List<T>> reader =
+                manifest -> converter.apply(readManifest(manifest));
+        if (useSequential) {
+            return sequentialBatchedExecute(reader, manifests, 
parallelism).iterator();
+        } else {
+            return randomlyExecuteSequentialReturn(reader, manifests, 
parallelism);
+        }
     }
 
     private ManifestsReader.Result readManifests() {
@@ -384,12 +422,24 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     /** Note: Keep this thread-safe. */
     protected abstract boolean filterByStats(ManifestEntry entry);
 
-    /** Note: Keep this thread-safe. */
-    protected abstract List<ManifestEntry> 
filterWholeBucketByStats(List<ManifestEntry> entries);
+    protected boolean wholeBucketFilterEnabled() {
+        return false;
+    }
+
+    protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry> 
entries) {
+        return entries;
+    }
 
     /** Note: Keep this thread-safe. */
     @Override
     public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
+        return readManifest(manifest, null, null);
+    }
+
+    private List<ManifestEntry> readManifest(
+            ManifestFileMeta manifest,
+            @Nullable Filter<InternalRow> additionalFilter,
+            @Nullable Filter<ManifestEntry> additionalTFilter) {
         List<ManifestEntry> entries =
                 manifestFileFactory
                         .create()
@@ -397,29 +447,24 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                                 manifest.fileName(),
                                 manifest.fileSize(),
                                 createCacheRowFilter(),
-                                createEntryRowFilter());
-        List<ManifestEntry> filteredEntries = new ArrayList<>(entries.size());
-        for (ManifestEntry entry : entries) {
-            if ((manifestEntryFilter == null || 
manifestEntryFilter.test(entry))
-                    && filterByStats(entry)) {
-                filteredEntries.add(entry);
+                                createEntryRowFilter().and(additionalFilter),
+                                entry ->
+                                        (additionalTFilter == null || 
additionalTFilter.test(entry))
+                                                && (manifestEntryFilter == null
+                                                        || 
manifestEntryFilter.test(entry))
+                                                && filterByStats(entry));
+        if (dropStats) {
+            List<ManifestEntry> copied = new ArrayList<>(entries.size());
+            for (ManifestEntry entry : entries) {
+                copied.add(dropStats(entry));
             }
+            entries = copied;
         }
-        return filteredEntries;
+        return entries;
     }
 
-    /** Note: Keep this thread-safe. */
-    private List<SimpleFileEntry> readSimpleEntries(ManifestFileMeta manifest) 
{
-        return manifestFileFactory
-                .createSimpleFileEntryReader()
-                .read(
-                        manifest.fileName(),
-                        manifest.fileSize(),
-                        // use filter for ManifestEntry
-                        // currently, projection is not pushed down to file 
format
-                        // see SimpleFileEntrySerializer
-                        createCacheRowFilter(),
-                        createEntryRowFilter());
+    protected ManifestEntry dropStats(ManifestEntry entry) {
+        return entry.copyWithoutStats();
     }
 
     /**
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 60b4e7933..d2ca5da42 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -34,7 +34,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /** {@link FileStoreScan} for {@link AppendOnlyFileStore}. */
@@ -100,12 +99,6 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
                 && (!fileIndexReadEnabled || 
testFileIndex(entry.file().embeddedIndex(), entry));
     }
 
-    @Override
-    protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry> 
entries) {
-        // We don't need to filter per-bucket entries here
-        return entries;
-    }
-
     private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, 
ManifestEntry entry) {
         if (embeddedIndexBytes == null) {
             return true;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index e643bf161..7663f4822 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -73,6 +73,8 @@ public interface FileStoreScan {
 
     FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
 
+    FileStoreScan enableValueFilter();
+
     FileStoreScan withManifestEntryFilter(Filter<ManifestEntry> filter);
 
     FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index c368d9e51..8d8c51996 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -23,6 +23,7 @@ import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.fileindex.FileIndexPredicate;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FilteredManifestEntry;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.predicate.Predicate;
@@ -45,7 +46,6 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
-import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
 
 /** {@link FileStoreScan} for {@link KeyValueFileStore}. */
@@ -64,6 +64,8 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
     private final boolean fileIndexReadEnabled;
     private final Map<Long, Predicate> schemaId2DataFilter = new HashMap<>();
 
+    private boolean valueFilterForceEnabled = false;
+
     public KeyValueFileStoreScan(
             ManifestsReader manifestsReader,
             BucketSelectConverter bucketSelectConverter,
@@ -110,11 +112,17 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
         return this;
     }
 
+    @Override
+    public FileStoreScan enableValueFilter() {
+        this.valueFilterForceEnabled = true;
+        return this;
+    }
+
     /** Note: Keep this thread-safe. */
     @Override
     protected boolean filterByStats(ManifestEntry entry) {
         DataFileMeta file = entry.file();
-        if (isValueFilterEnabled(entry) && !filterByValueFilter(entry)) {
+        if (isValueFilterEnabled() && !filterByValueFilter(entry)) {
             return false;
         }
 
@@ -130,6 +138,14 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
         return true;
     }
 
+    @Override
+    protected ManifestEntry dropStats(ManifestEntry entry) {
+        if (!isValueFilterEnabled() && wholeBucketFilterEnabled()) {
+            return new FilteredManifestEntry(entry.copyWithoutStats(), 
filterByValueFilter(entry));
+        }
+        return entry.copyWithoutStats();
+    }
+
     private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, 
ManifestEntry entry) {
         if (embeddedIndexBytes == null) {
             return true;
@@ -150,14 +166,14 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
         }
     }
 
-    private boolean isValueFilterEnabled(ManifestEntry entry) {
+    private boolean isValueFilterEnabled() {
         if (valueFilter == null) {
             return false;
         }
 
         switch (scanMode) {
             case ALL:
-                return (deletionVectorsEnabled || mergeEngine == FIRST_ROW) && 
entry.level() > 0;
+                return valueFilterForceEnabled;
             case DELTA:
                 return false;
             case CHANGELOG:
@@ -168,13 +184,13 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
         }
     }
 
-    /** Note: Keep this thread-safe. */
     @Override
-    protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry> 
entries) {
-        if (valueFilter == null || scanMode != ScanMode.ALL) {
-            return entries;
-        }
+    protected boolean wholeBucketFilterEnabled() {
+        return valueFilter != null && scanMode == ScanMode.ALL;
+    }
 
+    @Override
+    protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry> 
entries) {
         return noOverlapping(entries)
                 ? filterWholeBucketPerFile(entries)
                 : filterWholeBucketAllFiles(entries);
@@ -207,6 +223,10 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
     }
 
     private boolean filterByValueFilter(ManifestEntry entry) {
+        if (entry instanceof FilteredManifestEntry) {
+            return ((FilteredManifestEntry) entry).selected();
+        }
+
         DataFileMeta file = entry.file();
         SimpleStatsEvolution.Result result =
                 fieldValueStatsConverters
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index 3ee108c10..a5eea6d65 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -53,7 +53,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
-import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
+import static 
org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
 import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
 
 /**
@@ -180,7 +180,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean 
{
                                 .filter(this::oldEnough)
                                 .map(FileStatus::getPath)
                                 .collect(Collectors.toList());
-        Iterator<Path> allPaths = randomlyExecute(executor, processor, 
fileDirs);
+        Iterator<Path> allPaths = randomlyExecuteSequentialReturn(executor, 
processor, fileDirs);
         Map<String, Path> result = new HashMap<>();
         while (allPaths.hasNext()) {
             Path next = allPaths.next();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
index 9fcbb8960..96f0aec1c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
@@ -49,12 +49,6 @@ public class ScanMetrics {
     public static final String SCAN_DURATION = "scanDuration";
     public static final String LAST_SCANNED_MANIFESTS = "lastScannedManifests";
 
-    public static final String LAST_SKIPPED_BY_PARTITION_AND_STATS =
-            "lastSkippedByPartitionAndStats";
-
-    public static final String LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER =
-            "lastSkippedByWholeBucketFilesFilter";
-
     public static final String LAST_SCAN_SKIPPED_TABLE_FILES = 
"lastScanSkippedTableFiles";
 
     public static final String LAST_SCAN_RESULTED_TABLE_FILES = 
"lastScanResultedTableFiles";
@@ -66,12 +60,6 @@ public class ScanMetrics {
         metricGroup.gauge(
                 LAST_SCANNED_MANIFESTS,
                 () -> latestScan == null ? 0L : 
latestScan.getScannedManifests());
-        metricGroup.gauge(
-                LAST_SKIPPED_BY_PARTITION_AND_STATS,
-                () -> latestScan == null ? 0L : 
latestScan.getSkippedByPartitionAndStats());
-        metricGroup.gauge(
-                LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER,
-                () -> latestScan == null ? 0L : 
latestScan.getSkippedByWholeBucketFiles());
         metricGroup.gauge(
                 LAST_SCAN_SKIPPED_TABLE_FILES,
                 () -> latestScan == null ? 0L : 
latestScan.getSkippedTableFiles());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
index e760282e6..700619c36 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
@@ -25,23 +25,15 @@ public class ScanStats {
     // the unit is milliseconds
     private final long duration;
     private final long scannedManifests;
-    private final long skippedByPartitionAndStats;
 
-    private final long skippedByWholeBucketFiles;
     private final long skippedTableFiles;
     private final long resultedTableFiles;
 
     public ScanStats(
-            long duration,
-            long scannedManifests,
-            long skippedByPartitionAndStats,
-            long skippedByWholeBucketFiles,
-            long resultedTableFiles) {
+            long duration, long scannedManifests, long skippedTableFiles, long 
resultedTableFiles) {
         this.duration = duration;
         this.scannedManifests = scannedManifests;
-        this.skippedByPartitionAndStats = skippedByPartitionAndStats;
-        this.skippedByWholeBucketFiles = skippedByWholeBucketFiles;
-        this.skippedTableFiles = skippedByPartitionAndStats + 
skippedByWholeBucketFiles;
+        this.skippedTableFiles = skippedTableFiles;
         this.resultedTableFiles = resultedTableFiles;
     }
 
@@ -60,16 +52,6 @@ public class ScanStats {
         return resultedTableFiles;
     }
 
-    @VisibleForTesting
-    protected long getSkippedByPartitionAndStats() {
-        return skippedByPartitionAndStats;
-    }
-
-    @VisibleForTesting
-    protected long getSkippedByWholeBucketFiles() {
-        return skippedByWholeBucketFiles;
-    }
-
     @VisibleForTesting
     protected long getDuration() {
         return duration;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index b4f8fa47d..73c55942a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -68,7 +68,7 @@ import static 
org.apache.paimon.CoreOptions.ExpireExecutionMode;
 import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER;
 import static 
org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
 import static org.apache.paimon.utils.Preconditions.checkState;
-import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
+import static 
org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
 
 /** An abstraction layer above {@link FileStoreCommit} to provide snapshot 
commit and expiration. */
 public class TableCommitImpl implements InnerTableCommit {
@@ -292,7 +292,7 @@ public class TableCommitImpl implements InnerTableCommit {
 
         List<Path> nonExistFiles =
                 Lists.newArrayList(
-                        randomlyExecute(
+                        randomlyExecuteSequentialReturn(
                                 getExecutorService(null),
                                 f -> nonExists.test(f) ? singletonList(f) : 
emptyList(),
                                 files));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index d3e8a2adb..635802cc9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -51,7 +51,7 @@ public class DataTableBatchScan extends AbstractDataTableScan 
{
         this.hasNext = true;
         this.defaultValueAssigner = defaultValueAssigner;
         if (pkTable && (options.deletionVectorsEnabled() || 
options.mergeEngine() == FIRST_ROW)) {
-            snapshotReader.withLevelFilter(level -> level > 0);
+            snapshotReader.withLevelFilter(level -> level > 
0).enableValueFilter();
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 358d86cbe..9bfb54f2c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -31,7 +31,6 @@ import org.apache.paimon.table.source.PlanImpl;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.utils.ManifestReadThreadPool;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -50,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
+import static 
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** {@link StartingScanner} for incremental changes by snapshot. */
@@ -84,7 +84,7 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
                         .collect(Collectors.toList());
 
         Iterator<ManifestFileMeta> manifests =
-                ManifestReadThreadPool.randomlyExecute(
+                randomlyExecuteSequentialReturn(
                         id -> {
                             Snapshot snapshot = snapshotManager.snapshot(id);
                             switch (scanMode) {
@@ -111,7 +111,7 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
                         reader.parallelism());
 
         Iterator<ManifestEntry> entries =
-                ManifestReadThreadPool.randomlyExecute(
+                randomlyExecuteSequentialReturn(
                         reader::readManifest, Lists.newArrayList(manifests), 
reader.parallelism());
 
         while (entries.hasNext()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index b59cf98bb..f3e0a92b8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -77,6 +77,8 @@ public interface SnapshotReader {
 
     SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
 
+    SnapshotReader enableValueFilter();
+
     SnapshotReader withManifestEntryFilter(Filter<ManifestEntry> filter);
 
     SnapshotReader withBucket(int bucket);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 7ce537ee5..ce01bdba9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -234,6 +234,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
+    @Override
+    public SnapshotReader enableValueFilter() {
+        scan.enableValueFilter();
+        return this;
+    }
+
     @Override
     public SnapshotReader withManifestEntryFilter(Filter<ManifestEntry> 
filter) {
         scan.withManifestEntryFilter(filter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index e56ee9041..b0cbe0772 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -319,6 +319,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader enableValueFilter() {
+            wrapped.enableValueFilter();
+            return this;
+        }
+
         @Override
         public SnapshotReader withManifestEntryFilter(Filter<ManifestEntry> 
filter) {
             wrapped.withManifestEntryFilter(filter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index e28ae3760..deb149791 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -120,7 +120,8 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
     public SnapshotReader newSnapshotReader() {
         if (wrapped.schema().primaryKeys().size() > 0) {
             return wrapped.newSnapshotReader()
-                    .withLevelFilter(level -> level == 
coreOptions().numLevels() - 1);
+                    .withLevelFilter(level -> level == 
coreOptions().numLevels() - 1)
+                    .enableValueFilter();
         } else {
             return wrapped.newSnapshotReader();
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
index d967e778f..49fcfc8bd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
@@ -54,9 +54,9 @@ public class ManifestReadThreadPool {
     }
 
     /** This method aims to parallel process tasks with randomly but return 
values sequentially. */
-    public static <T, U> Iterator<T> randomlyExecute(
+    public static <T, U> Iterator<T> randomlyExecuteSequentialReturn(
             Function<U, List<T>> processor, List<U> input, @Nullable Integer 
threadNum) {
         ThreadPoolExecutor executor = getExecutorService(threadNum);
-        return ThreadPoolUtils.randomlyExecute(executor, processor, input);
+        return ThreadPoolUtils.randomlyExecuteSequentialReturn(executor, 
processor, input);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
index 8c490e008..1c9d9664f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
@@ -66,11 +66,12 @@ public class ObjectsCache<K, V> {
             K key,
             @Nullable Long fileSize,
             Filter<InternalRow> loadFilter,
-            Filter<InternalRow> readFilter)
+            Filter<InternalRow> readFilter,
+            Filter<V> readVFilter)
             throws IOException {
         Segments segments = cache.getIfPresents(key);
         if (segments != null) {
-            return readFromSegments(segments, readFilter);
+            return readFromSegments(segments, readFilter, readVFilter);
         } else {
             if (fileSize == null) {
                 fileSize = fileSizeFunction.apply(key);
@@ -78,15 +79,16 @@ public class ObjectsCache<K, V> {
             if (fileSize <= cache.maxElementSize()) {
                 segments = readSegments(key, fileSize, loadFilter);
                 cache.put(key, segments);
-                return readFromSegments(segments, readFilter);
+                return readFromSegments(segments, readFilter, readVFilter);
             } else {
                 return readFromIterator(
-                        reader.apply(key, fileSize), projectedSerializer, 
readFilter);
+                        reader.apply(key, fileSize), projectedSerializer, 
readFilter, readVFilter);
             }
         }
     }
 
-    private List<V> readFromSegments(Segments segments, Filter<InternalRow> 
readFilter)
+    private List<V> readFromSegments(
+            Segments segments, Filter<InternalRow> readFilter, Filter<V> 
readVFilter)
             throws IOException {
         InternalRowSerializer formatSerializer = this.formatSerializer.get();
         List<V> entries = new ArrayList<>();
@@ -98,7 +100,10 @@ public class ObjectsCache<K, V> {
             try {
                 formatSerializer.mapFromPages(binaryRow, view);
                 if (readFilter.test(binaryRow)) {
-                    entries.add(projectedSerializer.fromRow(binaryRow));
+                    V v = projectedSerializer.fromRow(binaryRow);
+                    if (readVFilter.test(v)) {
+                        entries.add(v);
+                    }
                 }
             } catch (EOFException e) {
                 return entries;
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 3c261f410..b0bea8e66 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -94,7 +94,8 @@ public class ObjectsFile<T> implements SimpleFileReader<T> {
     }
 
     public List<T> read(String fileName, @Nullable Long fileSize) {
-        return read(fileName, fileSize, Filter.alwaysTrue(), 
Filter.alwaysTrue());
+        return read(
+                fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue(), 
Filter.alwaysTrue());
     }
 
     public List<T> readWithIOException(String fileName) throws IOException {
@@ -103,7 +104,8 @@ public class ObjectsFile<T> implements SimpleFileReader<T> {
 
     public List<T> readWithIOException(String fileName, @Nullable Long 
fileSize)
             throws IOException {
-        return readWithIOException(fileName, fileSize, Filter.alwaysTrue(), 
Filter.alwaysTrue());
+        return readWithIOException(
+                fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue(), 
Filter.alwaysTrue());
     }
 
     public boolean exists(String fileName) {
@@ -118,9 +120,10 @@ public class ObjectsFile<T> implements SimpleFileReader<T> 
{
             String fileName,
             @Nullable Long fileSize,
             Filter<InternalRow> loadFilter,
-            Filter<InternalRow> readFilter) {
+            Filter<InternalRow> readFilter,
+            Filter<T> readTFilter) {
         try {
-            return readWithIOException(fileName, fileSize, loadFilter, 
readFilter);
+            return readWithIOException(fileName, fileSize, loadFilter, 
readFilter, readTFilter);
         } catch (IOException e) {
             throw new RuntimeException("Failed to read " + fileName, e);
         }
@@ -130,14 +133,16 @@ public class ObjectsFile<T> implements 
SimpleFileReader<T> {
             String fileName,
             @Nullable Long fileSize,
             Filter<InternalRow> loadFilter,
-            Filter<InternalRow> readFilter)
+            Filter<InternalRow> readFilter,
+            Filter<T> readTFilter)
             throws IOException {
         Path path = pathFactory.toPath(fileName);
         if (cache != null) {
-            return cache.read(path, fileSize, loadFilter, readFilter);
+            return cache.read(path, fileSize, loadFilter, readFilter, 
readTFilter);
         }
 
-        return readFromIterator(createIterator(path, fileSize), serializer, 
readFilter);
+        return readFromIterator(
+                createIterator(path, fileSize), serializer, readFilter, 
readTFilter);
     }
 
     public String writeWithoutRolling(Collection<T> records) {
@@ -184,13 +189,17 @@ public class ObjectsFile<T> implements 
SimpleFileReader<T> {
     public static <V> List<V> readFromIterator(
             CloseableIterator<InternalRow> inputIterator,
             ObjectSerializer<V> serializer,
-            Filter<InternalRow> readFilter) {
+            Filter<InternalRow> readFilter,
+            Filter<V> readVFilter) {
         try (CloseableIterator<InternalRow> iterator = inputIterator) {
             List<V> result = new ArrayList<>();
             while (iterator.hasNext()) {
                 InternalRow row = iterator.next();
                 if (readFilter.test(row)) {
-                    result.add(serializer.fromRow(row));
+                    V v = serializer.fromRow(row);
+                    if (readVFilter.test(v)) {
+                        result.add(v);
+                    }
                 }
             }
             return result;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
index 2b9d0e0cb..a0427d95c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
@@ -48,9 +48,7 @@ public class ScanMetricsTest {
                         ScanMetrics.SCAN_DURATION,
                         ScanMetrics.LAST_SCANNED_MANIFESTS,
                         ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES,
-                        ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES,
-                        ScanMetrics.LAST_SKIPPED_BY_PARTITION_AND_STATS,
-                        ScanMetrics.LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER);
+                        ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES);
     }
 
     /** Tests that the metrics are updated properly. */
@@ -66,14 +64,6 @@ public class ScanMetricsTest {
                 (Histogram) 
registeredGenericMetrics.get(ScanMetrics.SCAN_DURATION);
         Gauge<Long> lastScannedManifests =
                 (Gauge<Long>) 
registeredGenericMetrics.get(ScanMetrics.LAST_SCANNED_MANIFESTS);
-        Gauge<Long> lastSkippedByPartitionAndStats =
-                (Gauge<Long>)
-                        registeredGenericMetrics.get(
-                                
ScanMetrics.LAST_SKIPPED_BY_PARTITION_AND_STATS);
-        Gauge<Long> lastSkippedByWholeBucketFilesFilter =
-                (Gauge<Long>)
-                        registeredGenericMetrics.get(
-                                
ScanMetrics.LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER);
         Gauge<Long> lastScanSkippedTableFiles =
                 (Gauge<Long>)
                         
registeredGenericMetrics.get(ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES);
@@ -85,8 +75,6 @@ public class ScanMetricsTest {
         assertThat(scanDuration.getCount()).isEqualTo(0);
         assertThat(scanDuration.getStatistics().size()).isEqualTo(0);
         assertThat(lastScannedManifests.getValue()).isEqualTo(0);
-        assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(0);
-        
assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(0);
         assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(0);
         assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(0);
 
@@ -104,9 +92,7 @@ public class ScanMetricsTest {
         assertThat(scanDuration.getStatistics().getMax()).isEqualTo(200);
         assertThat(scanDuration.getStatistics().getStdDev()).isEqualTo(0);
         assertThat(lastScannedManifests.getValue()).isEqualTo(20);
-        assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(25);
-        
assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(32);
-        assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(57);
+        assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(25);
         assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(10);
 
         // report again
@@ -123,19 +109,17 @@ public class ScanMetricsTest {
         assertThat(scanDuration.getStatistics().getMax()).isEqualTo(500);
         
assertThat(scanDuration.getStatistics().getStdDev()).isCloseTo(212.132, 
offset(0.001));
         assertThat(lastScannedManifests.getValue()).isEqualTo(22);
-        assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(30);
-        
assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(33);
-        assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(63);
+        assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(30);
         assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(8);
     }
 
     private void reportOnce(ScanMetrics scanMetrics) {
-        ScanStats scanStats = new ScanStats(200, 20, 25, 32, 10);
+        ScanStats scanStats = new ScanStats(200, 20, 25, 10);
         scanMetrics.reportScan(scanStats);
     }
 
     private void reportAgain(ScanMetrics scanMetrics) {
-        ScanStats scanStats = new ScanStats(500, 22, 30, 33, 8);
+        ScanStats scanStats = new ScanStats(500, 22, 30, 8);
         scanMetrics.reportScan(scanStats);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 133913c48..51c8b328d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -121,17 +121,6 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** Tests for {@link PrimaryKeyFileStoreTable}. */
 public class PrimaryKeyFileStoreTableTest extends FileStoreTableTestBase {
 
-    protected static final RowType COMPATIBILITY_ROW_TYPE =
-            RowType.of(
-                    new DataType[] {
-                        DataTypes.INT(),
-                        DataTypes.INT(),
-                        DataTypes.BIGINT(),
-                        DataTypes.BINARY(1),
-                        DataTypes.VARBINARY(1)
-                    },
-                    new String[] {"pt", "a", "b", "c", "d"});
-
     protected static final Function<InternalRow, String> 
COMPATIBILITY_BATCH_ROW_TO_STRING =
             rowData ->
                     rowData.getInt(0)
@@ -144,12 +133,6 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                             + "|"
                             + new String(rowData.getBinary(4));
 
-    protected static final Function<InternalRow, String> 
COMPATIBILITY_CHANGELOG_ROW_TO_STRING =
-            rowData ->
-                    rowData.getRowKind().shortString()
-                            + " "
-                            + COMPATIBILITY_BATCH_ROW_TO_STRING.apply(rowData);
-
     @Test
     public void testMultipleWriters() throws Exception {
         WriteSelector selector =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
index 8a4f0b061..9d3275e3a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
@@ -58,17 +58,23 @@ public class ObjectsCacheTest {
 
         // test empty
         map.put("k1", Collections.emptyList());
-        List<String> values = cache.read("k1", null, Filter.alwaysTrue(), 
Filter.alwaysTrue());
+        List<String> values =
+                cache.read(
+                        "k1", null, Filter.alwaysTrue(), Filter.alwaysTrue(), 
Filter.alwaysTrue());
         assertThat(values).isEmpty();
 
         // test values
         List<String> expect = Arrays.asList("v1", "v2", "v3");
         map.put("k2", expect);
-        values = cache.read("k2", null, Filter.alwaysTrue(), 
Filter.alwaysTrue());
+        values =
+                cache.read(
+                        "k2", null, Filter.alwaysTrue(), Filter.alwaysTrue(), 
Filter.alwaysTrue());
         assertThat(values).containsExactlyElementsOf(expect);
 
         // test cache
-        values = cache.read("k2", null, Filter.alwaysTrue(), 
Filter.alwaysTrue());
+        values =
+                cache.read(
+                        "k2", null, Filter.alwaysTrue(), Filter.alwaysTrue(), 
Filter.alwaysTrue());
         assertThat(values).containsExactlyElementsOf(expect);
 
         // test filter
@@ -77,7 +83,8 @@ public class ObjectsCacheTest {
                         "k2",
                         null,
                         Filter.alwaysTrue(),
-                        r -> r.getString(0).toString().endsWith("2"));
+                        r -> r.getString(0).toString().endsWith("2"),
+                        Filter.alwaysTrue());
         assertThat(values).containsExactly("v2");
 
         // test load filter
@@ -88,6 +95,7 @@ public class ObjectsCacheTest {
                         "k3",
                         null,
                         r -> r.getString(0).toString().endsWith("2"),
+                        Filter.alwaysTrue(),
                         Filter.alwaysTrue());
         assertThat(values).containsExactly("v2");
 
@@ -99,6 +107,7 @@ public class ObjectsCacheTest {
                         "k4",
                         null,
                         r -> r.getString(0).toString().endsWith("5"),
+                        Filter.alwaysTrue(),
                         Filter.alwaysTrue());
         assertThat(values).isEmpty();
 
@@ -117,6 +126,7 @@ public class ObjectsCacheTest {
                                                         k,
                                                         null,
                                                         Filter.alwaysTrue(),
+                                                        Filter.alwaysTrue(),
                                                         Filter.alwaysTrue()))
                                         .containsExactly(k);
                             } catch (IOException e) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index b7eb1d625..559976921 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -77,7 +77,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
             nextSnapshotId = checkpoint.currentSnapshotId();
             splits = checkpoint.splits();
         }
-        StreamTableScan scan = readBuilder.dropStats().newStreamScan();
+        StreamTableScan scan = readBuilder.newStreamScan();
         if (metricGroup(context) != null) {
             ((StreamDataTableScan) scan)
                     .withMetricsRegistry(new 
FlinkMetricRegistry(context.metricGroup()));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index a648bfba6..b3dcd4840 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -178,7 +178,7 @@ public class FlinkSourceBuilder {
         if (limit != null) {
             readBuilder.withLimit(limit.intValue());
         }
-        return readBuilder;
+        return readBuilder.dropStats();
     }
 
     private DataStream<RowData> buildStaticFileSource() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 9bfd36fdf..12b579589 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -31,6 +31,7 @@ import org.apache.paimon.predicate.PredicateVisitor;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -172,12 +173,7 @@ public abstract class FlinkTableSource
     protected void scanSplitsForInference() {
         if (splitStatistics == null) {
             if (table instanceof DataTable) {
-                List<PartitionEntry> partitionEntries =
-                        table.newReadBuilder()
-                                .withFilter(predicate)
-                                .dropStats()
-                                .newScan()
-                                .listPartitionEntries();
+                List<PartitionEntry> partitionEntries = 
newTableScan().listPartitionEntries();
                 long totalSize = 0;
                 long rowCount = 0;
                 for (PartitionEntry entry : partitionEntries) {
@@ -188,13 +184,7 @@ public abstract class FlinkTableSource
                 splitStatistics =
                         new SplitStatistics((int) (totalSize / splitTargetSize 
+ 1), rowCount);
             } else {
-                List<Split> splits =
-                        table.newReadBuilder()
-                                .withFilter(predicate)
-                                .dropStats()
-                                .newScan()
-                                .plan()
-                                .splits();
+                List<Split> splits = newTableScan().plan().splits();
                 splitStatistics =
                         new SplitStatistics(
                                 splits.size(), 
splits.stream().mapToLong(Split::rowCount).sum());
@@ -202,6 +192,10 @@ public abstract class FlinkTableSource
         }
     }
 
+    private TableScan newTableScan() {
+        return 
table.newReadBuilder().dropStats().withFilter(predicate).newScan();
+    }
+
     /** Split statistics for inferring row count and parallelism size. */
     protected static class SplitStatistics {
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index c388a6dcc..af425aab5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -87,7 +87,7 @@ public class StaticFileStoreSource extends FlinkSource {
 
     private List<FileStoreSourceSplit> getSplits(SplitEnumeratorContext 
context) {
         FileStoreSourceSplitGenerator splitGenerator = new 
FileStoreSourceSplitGenerator();
-        TableScan scan = readBuilder.dropStats().newScan();
+        TableScan scan = readBuilder.newScan();
         // register scan metrics
         if (context.metricGroup() != null) {
             ((InnerTableScan) scan)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
index f21922670..3805f6f8c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
@@ -106,7 +106,7 @@ public class MonitorFunction extends 
RichSourceFunction<Split>
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws 
Exception {
-        this.scan = readBuilder.dropStats().newStreamScan();
+        this.scan = readBuilder.newStreamScan();
 
         this.checkpointState =
                 context.getOperatorStateStore()
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
index 33cbc19e0..144afab8e 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
@@ -96,7 +96,8 @@ public class HiveSplitGenerator {
                     
scan.withFilter(PredicateBuilder.and(predicatePerPartition));
                 }
             }
-            scan.plan()
+            scan.dropStats()
+                    .plan()
                     .splits()
                     .forEach(
                             split ->
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
index 95c8f4b3a..f29c146b7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
@@ -62,7 +62,7 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
       _readBuilder.withFilter(pushedPredicate)
     }
     pushDownLimit.foreach(_readBuilder.withLimit)
-    _readBuilder
+    _readBuilder.dropStats()
   }
 
   final def metadataColumns: Seq[PaimonMetadataColumn] = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
index 19f73cb6c..9a88ca2e4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.commands
 
+import org.apache.paimon.manifest.PartitionEntry
 import org.apache.paimon.schema.TableSchema
 import org.apache.paimon.spark.SparkTable
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
@@ -64,11 +65,9 @@ case class PaimonAnalyzeTableColumnCommand(
     // compute stats
     val totalSize = table
       .newScan()
-      .plan()
-      .splits()
+      .listPartitionEntries()
       .asScala
-      .flatMap { case split: DataSplit => split.dataFiles().asScala }
-      .map(_.fileSize())
+      .map(_.fileSizeInBytes())
       .sum
     val (mergedRecordCount, colStats) =
       PaimonStatsUtils.computeColumnStats(sparkSession, relation, attributes)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
index b44a66fce..7e61d71ac 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -44,7 +44,8 @@ private[spark] trait StreamHelper {
 
   var lastTriggerMillis: Long
 
-  private lazy val streamScan: StreamDataTableScan = table.newStreamScan()
+  private lazy val streamScan: StreamDataTableScan =
+    table.newStreamScan().dropStats().asInstanceOf[StreamDataTableScan]
 
   private lazy val partitionSchema: StructType =
     SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(), 
table.partitionKeys()))

Reply via email to