This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8526f01c9f [core] Introduce mergedRowCount in Split (#7090)
8526f01c9f is described below

commit 8526f01c9fecf38241a1c6d5ee93f97eed6e5b98
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jan 21 14:15:20 2026 +0800

    [core] Introduce mergedRowCount in Split (#7090)
---
 .../apache/paimon/globalindex/IndexedSplit.java    |  6 ++++
 .../paimon/table/FallbackReadFileStoreTable.java   |  7 +++++
 .../paimon/table/format/FormatDataSplit.java       |  6 ++++
 .../paimon/table/object/ObjectTableImpl.java       |  6 ++++
 .../org/apache/paimon/table/source/ChainSplit.java |  6 ++++
 .../org/apache/paimon/table/source/DataSplit.java  | 35 +++++++++++++++-------
 .../paimon/table/source/DataTableBatchScan.java    |  7 +++--
 .../apache/paimon/table/source/QueryAuthSplit.java |  7 +++++
 .../java/org/apache/paimon/table/source/Split.java | 13 ++++++++
 .../table/system/AggregationFieldsTable.java       |  6 ++++
 .../paimon/table/system/AllPartitionsTable.java    |  6 ++++
 .../paimon/table/system/AllTableOptionsTable.java  |  6 ++++
 .../apache/paimon/table/system/AllTablesTable.java |  6 ++++
 .../apache/paimon/table/system/BranchesTable.java  |  7 +++++
 .../apache/paimon/table/system/BucketsTable.java   |  6 ++++
 .../paimon/table/system/CatalogOptionsTable.java   |  6 ++++
 .../apache/paimon/table/system/ConsumersTable.java |  6 ++++
 .../org/apache/paimon/table/system/FilesTable.java |  6 ++++
 .../apache/paimon/table/system/ManifestsTable.java |  6 ++++
 .../apache/paimon/table/system/OptionsTable.java   |  6 ++++
 .../paimon/table/system/PartitionsTable.java       |  6 ++++
 .../apache/paimon/table/system/SchemasTable.java   |  6 ++++
 .../apache/paimon/table/system/SnapshotsTable.java |  6 ++++
 .../apache/paimon/table/system/StatisticTable.java |  6 ++++
 .../paimon/table/system/TableIndexesTable.java     |  6 ++++
 .../org/apache/paimon/table/system/TagsTable.java  |  6 ++++
 .../table/source/DataSplitCompatibleTest.java      | 14 +++------
 .../paimon/flink/source/BaseDataTableSource.java   | 12 +++-----
 .../paimon/spark/aggregate/AggFuncEvaluator.scala  |  3 +-
 .../spark/aggregate/AggregatePushDownUtils.scala   |  2 +-
 30 files changed, 193 insertions(+), 34 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java
index 2f40137fd8..d9b6fef28c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java
@@ -35,6 +35,7 @@ import java.io.ObjectOutputStream;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.OptionalLong;
 
 /** Indexed split for global index. */
 public class IndexedSplit implements Split {
@@ -71,6 +72,11 @@ public class IndexedSplit implements Split {
         return rowRanges.stream().mapToLong(r -> r.to - r.from + 1).sum();
     }
 
+    @Override
+    public OptionalLong mergedRowCount() {
+        return OptionalLong.of(rowCount());
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 282b89e2e0..65e875c6a5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -63,6 +63,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -249,6 +250,7 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
 
     /** Split for fallback read. */
     public interface FallbackSplit extends Split {
+
         boolean isFallback();
 
         Split wrapped();
@@ -281,6 +283,11 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         public long rowCount() {
             return split.rowCount();
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return split.mergedRowCount();
+        }
     }
 
     /** DataSplit fallback implementation. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
index 7c2ab8f901..f90d8c9ed3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
@@ -25,6 +25,7 @@ import org.apache.paimon.table.source.Split;
 import javax.annotation.Nullable;
 
 import java.util.Objects;
+import java.util.OptionalLong;
 
 /** {@link FormatDataSplit} for format table. */
 public class FormatDataSplit implements Split {
@@ -85,6 +86,11 @@ public class FormatDataSplit implements Split {
         return -1;
     }
 
+    @Override
+    public OptionalLong mergedRowCount() {
+        return OptionalLong.empty();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTableImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTableImpl.java
index b17596df16..1c95708d46 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTableImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTableImpl.java
@@ -51,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.data.BinaryString.fromString;
 
@@ -189,6 +190,11 @@ public class ObjectTableImpl implements ReadonlyTable, 
ObjectTable {
         public int hashCode() {
             return Objects.hash(location);
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private static class ObjectRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
index 30733c93e1..5339cd06a6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 
 /**
  * A split describes chain table read scope. It follows DataSplit's custom 
serialization pattern and
@@ -87,6 +88,11 @@ public class ChainSplit implements Split {
         return sum;
     }
 
+    @Override
+    public OptionalLong mergedRowCount() {
+        return OptionalLong.empty();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 3a4c112a95..458c31f73a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -55,7 +55,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** Input splits. Needed by most batch computation engines. */
 public class DataSplit implements Split {
@@ -143,17 +142,31 @@ public class DataSplit implements Split {
         return rowCount;
     }
 
-    /** Whether it is possible to calculate the merged row count. */
-    public boolean mergedRowCountAvailable() {
-        return rawConvertible
-                && (dataDeletionFiles == null
-                        || dataDeletionFiles.stream()
-                                .allMatch(f -> f == null || f.cardinality() != 
null));
-    }
+    @Override
+    public OptionalLong mergedRowCount() {
+        if (!rawConvertible
+                || (dataDeletionFiles != null
+                        && !dataDeletionFiles.stream()
+                                .allMatch(f -> f == null || f.cardinality() != 
null))) {
+            return OptionalLong.empty();
+        }
 
-    public long mergedRowCount() {
-        checkState(mergedRowCountAvailable());
-        return partialMergedRowCount();
+        long sum = 0L;
+        List<RawFile> rawFiles = convertToRawFiles().orElse(null);
+        if (rawFiles != null) {
+            for (int i = 0; i < rawFiles.size(); i++) {
+                RawFile rawFile = rawFiles.get(i);
+                DeletionFile deletionFile =
+                        dataDeletionFiles == null ? null : 
dataDeletionFiles.get(i);
+                Long cardinality = deletionFile == null ? null : 
deletionFile.cardinality();
+                if (deletionFile == null) {
+                    sum += rawFile.rowCount();
+                } else if (cardinality != null) {
+                    sum += rawFile.rowCount() - cardinality;
+                }
+            }
+        }
+        return OptionalLong.of(sum);
     }
 
     public Object minValue(int fieldIndex, DataField dataField, 
SimpleStatsEvolutions evolutions) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 012160f5bd..457229d7ad 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.table.source.PushDownUtils.minmaxAvailable;
 
@@ -144,10 +145,10 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
 
         List<Split> limitedSplits = new ArrayList<>();
         for (DataSplit dataSplit : splits) {
-            if (dataSplit.rawConvertible()) {
-                long partialMergedRowCount = dataSplit.partialMergedRowCount();
+            OptionalLong mergedRowCount = dataSplit.mergedRowCount();
+            if (mergedRowCount.isPresent()) {
                 limitedSplits.add(dataSplit);
-                scannedRowCount += partialMergedRowCount;
+                scannedRowCount += mergedRowCount.getAsLong();
                 if (scannedRowCount >= pushDownLimit) {
                     SnapshotReader.Plan newPlan =
                             new PlanImpl(plan.watermark(), plan.snapshotId(), 
limitedSplits);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
index 75da5bda7f..9b9042b7e2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
@@ -22,6 +22,8 @@ import org.apache.paimon.catalog.TableQueryAuthResult;
 
 import javax.annotation.Nullable;
 
+import java.util.OptionalLong;
+
 /** A wrapper class for {@link Split} that adds query authorization 
information. */
 public class QueryAuthSplit implements Split {
 
@@ -48,4 +50,9 @@ public class QueryAuthSplit implements Split {
     public long rowCount() {
         return split.rowCount();
     }
+
+    @Override
+    public OptionalLong mergedRowCount() {
+        return split.mergedRowCount();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
index d6cb381e24..0371c324f8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
@@ -23,6 +23,7 @@ import org.apache.paimon.annotation.Public;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Optional;
+import java.util.OptionalLong;
 
 /**
  * An input split for reading.
@@ -32,8 +33,20 @@ import java.util.Optional;
 @Public
 public interface Split extends Serializable {
 
+    /**
+     * The row count in files, may be duplicated, such as in the primary key 
table and the
+     * Data-Evolution Append table.
+     */
     long rowCount();
 
+    /**
+     * Return the merged row count of data files. For example, when the delete 
vector is enabled in
+     * the primary key table, the number of rows that have been deleted will 
be subtracted from the
+     * returned result. In the Data Evolution mode of the Append table, the 
actual number of rows
+     * will be returned.
+     */
+    OptionalLong mergedRowCount();
+
     /**
      * If all files in this split can be read without merging, returns an 
{@link Optional} wrapping
      * a list of {@link RawFile}s to be read without merging. Otherwise, 
returns {@link
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
index 6be92518d7..98660a6e12 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
@@ -53,6 +53,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 import java.util.function.Function;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -160,6 +161,11 @@ public class AggregationFieldsTable implements 
ReadonlyTable {
         public int hashCode() {
             return Objects.hash(location);
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     /** {@link TableRead} implementation for {@link AggregationFieldsTable}. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllPartitionsTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllPartitionsTable.java
index b73a403c53..8925caccf8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllPartitionsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllPartitionsTable.java
@@ -52,6 +52,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 
 /** This is a system table to display all the database-table-partitions. */
 public class AllPartitionsTable implements ReadonlyTable {
@@ -180,6 +181,11 @@ public class AllPartitionsTable implements ReadonlyTable {
         public int hashCode() {
             return Objects.hash(rows);
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private static class AllPartitionsRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
index 6e70da201d..4e53cbdd96 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
@@ -50,6 +50,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 
 /**
  * This is a system table to display all the database-table properties.
@@ -158,6 +159,11 @@ public class AllTableOptionsTable implements ReadonlyTable 
{
         public int hashCode() {
             return Objects.hash(allOptions);
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private static class AllTableOptionsRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTablesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTablesTable.java
index 4c29a5145b..f58c561b9a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTablesTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTablesTable.java
@@ -52,6 +52,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.CoreOptions.TYPE;
 import static 
org.apache.paimon.rest.responses.AuditRESTResponse.FIELD_CREATED_AT;
@@ -200,6 +201,11 @@ public class AllTablesTable implements ReadonlyTable {
         public int hashCode() {
             return Objects.hash(rows);
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private static class AllTablesRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
index 490f683c63..f652549703 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
@@ -65,6 +65,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
 
@@ -149,6 +150,7 @@ public class BranchesTable implements ReadonlyTable {
     }
 
     private static class BranchesSplit extends SingletonSplit {
+
         private static final long serialVersionUID = 1L;
 
         private final Path location;
@@ -177,6 +179,11 @@ public class BranchesTable implements ReadonlyTable {
         public int hashCode() {
             return Objects.hash(location);
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private static class BranchesRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 10ca1775a1..18aaee6f86 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -57,6 +57,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
 
@@ -147,6 +148,11 @@ public class BucketsTable implements ReadonlyTable {
         public int hashCode() {
             return 1;
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private static class BucketsRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
index f49b1d9b72..78b281e585 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
@@ -47,6 +47,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.utils.SerializationUtils.newStringType;
 
@@ -148,6 +149,11 @@ public class CatalogOptionsTable implements ReadonlyTable {
         public int hashCode() {
             return catalogOptions.hashCode();
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private static class CatalogOptionsRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
index 6aad82190b..74c160da96 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
@@ -53,6 +53,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
 
@@ -158,6 +159,11 @@ public class ConsumersTable implements ReadonlyTable {
         public int hashCode() {
             return Objects.hash(location);
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     /** {@link TableRead} implementation for {@link ConsumersTable}. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index b6cae80180..7e62516e85 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -78,6 +78,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 import java.util.TreeMap;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -283,6 +284,11 @@ public class FilesTable implements ReadonlyTable {
             }
             return scan.plan();
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private static class FilesRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
index 0813e838f3..ea91154b7d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -60,6 +60,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
 
@@ -159,6 +160,11 @@ public class ManifestsTable implements ReadonlyTable {
             }
             return o != null && getClass() == o.getClass();
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private static class ManifestsRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
index d9de8bba19..a613a76121 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
@@ -48,6 +48,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
 import static org.apache.paimon.utils.SerializationUtils.newStringType;
@@ -150,6 +151,11 @@ public class OptionsTable implements ReadonlyTable {
         public int hashCode() {
             return Objects.hash(location);
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private class OptionsRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
index 22d2c85283..296542d70a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -72,6 +72,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.OptionalLong;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -166,6 +167,11 @@ public class PartitionsTable implements ReadonlyTable {
         public int hashCode() {
             return 1;
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private static class PartitionsRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index b051605bb6..b5093c3aeb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -73,6 +73,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
@@ -172,6 +173,11 @@ public class SchemasTable implements ReadonlyTable {
         public int hashCode() {
             return 0;
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     /** {@link TableRead} implementation for {@link SchemasTable}. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index d869be3ded..394dc4d58a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -71,6 +71,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
 
@@ -195,6 +196,11 @@ public class SnapshotsTable implements ReadonlyTable {
         public int hashCode() {
             return Objects.hash(location);
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private class SnapshotsRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
index 437be76b56..b1d305ffa7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
 
@@ -156,6 +157,11 @@ public class StatisticTable implements ReadonlyTable {
         public int hashCode() {
             return Objects.hash(location);
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private static class StatisticRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
index 9cc1d95962..990a3e8f2d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
@@ -62,6 +62,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
 import static org.apache.paimon.utils.SerializationUtils.newStringType;
@@ -154,6 +155,11 @@ public class TableIndexesTable implements ReadonlyTable {
             }
             return o != null && getClass() == o.getClass();
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private static class IndexesRead implements InnerTableRead {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index 236fbe3110..0dd22e79be 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -67,6 +67,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.TreeMap;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -188,6 +189,11 @@ public class TagsTable implements ReadonlyTable {
         public int hashCode() {
             return Objects.hash(location);
         }
+
+        @Override
+        public OptionalLong mergedRowCount() {
+            return OptionalLong.empty();
+        }
     }
 
     private class TagsRead implements InnerTableRead {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java
index cfe9086fa0..dcd888b7f1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java
@@ -67,14 +67,11 @@ public class DataSplitCompatibleTest {
         List<DataFileMeta> dataFiles =
                 Arrays.asList(newDataFile(1000L), newDataFile(2000L), 
newDataFile(3000L));
         DataSplit split = newDataSplit(false, dataFiles, null);
-        assertThat(split.partialMergedRowCount()).isEqualTo(0L);
-        assertThat(split.mergedRowCountAvailable()).isEqualTo(false);
+        assertThat(split.mergedRowCount()).isEmpty();
 
         // rawConvertible without deletion files
         split = newDataSplit(true, dataFiles, null);
-        assertThat(split.partialMergedRowCount()).isEqualTo(6000L);
-        assertThat(split.mergedRowCountAvailable()).isEqualTo(true);
-        assertThat(split.mergedRowCount()).isEqualTo(6000L);
+        assertThat(split.mergedRowCount()).hasValue(6000L);
 
         // rawConvertible with deletion files without cardinality
         ArrayList<DeletionFile> deletionFiles = new ArrayList<>();
@@ -82,8 +79,7 @@ public class DataSplitCompatibleTest {
         deletionFiles.add(new DeletionFile("p", 1, 2, null));
         deletionFiles.add(new DeletionFile("p", 1, 2, 100L));
         split = newDataSplit(true, dataFiles, deletionFiles);
-        assertThat(split.partialMergedRowCount()).isEqualTo(3900L);
-        assertThat(split.mergedRowCountAvailable()).isEqualTo(false);
+        assertThat(split.mergedRowCount()).isEmpty();
 
         // rawConvertible with deletion files with cardinality
         deletionFiles = new ArrayList<>();
@@ -91,9 +87,7 @@ public class DataSplitCompatibleTest {
         deletionFiles.add(new DeletionFile("p", 1, 2, 200L));
         deletionFiles.add(new DeletionFile("p", 1, 2, 100L));
         split = newDataSplit(true, dataFiles, deletionFiles);
-        assertThat(split.partialMergedRowCount()).isEqualTo(5700L);
-        assertThat(split.mergedRowCountAvailable()).isEqualTo(true);
-        assertThat(split.mergedRowCount()).isEqualTo(5700L);
+        assertThat(split.mergedRowCount()).hasValue(5700L);
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 606b383ab6..8e1ecfc955 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -37,7 +37,6 @@ import org.apache.paimon.table.BucketSpec;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.Projection;
 
@@ -66,6 +65,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -353,15 +353,11 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
                         .splits();
         long countPushed = 0;
         for (Split s : splits) {
-            if (!(s instanceof DataSplit)) {
+            OptionalLong mergedRowCount = s.mergedRowCount();
+            if (!mergedRowCount.isPresent()) {
                 return false;
             }
-            DataSplit split = (DataSplit) s;
-            if (!split.mergedRowCountAvailable()) {
-                return false;
-            }
-
-            countPushed += split.mergedRowCount();
+            countPushed += mergedRowCount.getAsLong();
         }
 
         this.countPushed = countPushed;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
index 6282b214d4..1f737f602f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.types.{DataType, LongType}
 import org.apache.spark.unsafe.types.UTF8String
 
 trait AggFuncEvaluator[T] {
+
   def update(dataSplit: DataSplit): Unit
 
   def result(): T
@@ -42,7 +43,7 @@ class CountStarEvaluator extends AggFuncEvaluator[Long] {
   private var _result: Long = 0L
 
   override def update(dataSplit: DataSplit): Unit = {
-    _result += dataSplit.mergedRowCount()
+    _result += dataSplit.mergedRowCount().getAsLong
   }
 
   val a: Int = 1;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
index df6af4d483..52ce726c35 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
@@ -70,7 +70,7 @@ object AggregatePushDownUtils {
     }
     val dataSplits = splits.map(_.asInstanceOf[DataSplit])
 
-    if (!dataSplits.forall(_.mergedRowCountAvailable())) {
+    if (!dataSplits.forall(_.mergedRowCount().isPresent)) {
       return None
     }
 


Reply via email to