This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8526f01c9f [core] Introduce mergedRowCount in Split (#7090)
8526f01c9f is described below
commit 8526f01c9fecf38241a1c6d5ee93f97eed6e5b98
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jan 21 14:15:20 2026 +0800
[core] Introduce mergedRowCount in Split (#7090)
---
.../apache/paimon/globalindex/IndexedSplit.java | 6 ++++
.../paimon/table/FallbackReadFileStoreTable.java | 7 +++++
.../paimon/table/format/FormatDataSplit.java | 6 ++++
.../paimon/table/object/ObjectTableImpl.java | 6 ++++
.../org/apache/paimon/table/source/ChainSplit.java | 6 ++++
.../org/apache/paimon/table/source/DataSplit.java | 35 +++++++++++++++-------
.../paimon/table/source/DataTableBatchScan.java | 7 +++--
.../apache/paimon/table/source/QueryAuthSplit.java | 7 +++++
.../java/org/apache/paimon/table/source/Split.java | 13 ++++++++
.../table/system/AggregationFieldsTable.java | 6 ++++
.../paimon/table/system/AllPartitionsTable.java | 6 ++++
.../paimon/table/system/AllTableOptionsTable.java | 6 ++++
.../apache/paimon/table/system/AllTablesTable.java | 6 ++++
.../apache/paimon/table/system/BranchesTable.java | 7 +++++
.../apache/paimon/table/system/BucketsTable.java | 6 ++++
.../paimon/table/system/CatalogOptionsTable.java | 6 ++++
.../apache/paimon/table/system/ConsumersTable.java | 6 ++++
.../org/apache/paimon/table/system/FilesTable.java | 6 ++++
.../apache/paimon/table/system/ManifestsTable.java | 6 ++++
.../apache/paimon/table/system/OptionsTable.java | 6 ++++
.../paimon/table/system/PartitionsTable.java | 6 ++++
.../apache/paimon/table/system/SchemasTable.java | 6 ++++
.../apache/paimon/table/system/SnapshotsTable.java | 6 ++++
.../apache/paimon/table/system/StatisticTable.java | 6 ++++
.../paimon/table/system/TableIndexesTable.java | 6 ++++
.../org/apache/paimon/table/system/TagsTable.java | 6 ++++
.../table/source/DataSplitCompatibleTest.java | 14 +++------
.../paimon/flink/source/BaseDataTableSource.java | 12 +++-----
.../paimon/spark/aggregate/AggFuncEvaluator.scala | 3 +-
.../spark/aggregate/AggregatePushDownUtils.scala | 2 +-
30 files changed, 193 insertions(+), 34 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java
index 2f40137fd8..d9b6fef28c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java
@@ -35,6 +35,7 @@ import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
+import java.util.OptionalLong;
/** Indexed split for global index. */
public class IndexedSplit implements Split {
@@ -71,6 +72,11 @@ public class IndexedSplit implements Split {
return rowRanges.stream().mapToLong(r -> r.to - r.from + 1).sum();
}
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.of(rowCount());
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 282b89e2e0..65e875c6a5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -63,6 +63,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;
@@ -249,6 +250,7 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
/** Split for fallback read. */
public interface FallbackSplit extends Split {
+
boolean isFallback();
Split wrapped();
@@ -281,6 +283,11 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
public long rowCount() {
return split.rowCount();
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return split.mergedRowCount();
+ }
}
/** DataSplit fallback implementation. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
index 7c2ab8f901..f90d8c9ed3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java
@@ -25,6 +25,7 @@ import org.apache.paimon.table.source.Split;
import javax.annotation.Nullable;
import java.util.Objects;
+import java.util.OptionalLong;
/** {@link FormatDataSplit} for format table. */
public class FormatDataSplit implements Split {
@@ -85,6 +86,11 @@ public class FormatDataSplit implements Split {
return -1;
}
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTableImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTableImpl.java
index b17596df16..1c95708d46 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTableImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTableImpl.java
@@ -51,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalLong;
import static org.apache.paimon.data.BinaryString.fromString;
@@ -189,6 +190,11 @@ public class ObjectTableImpl implements ReadonlyTable,
ObjectTable {
public int hashCode() {
return Objects.hash(location);
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private static class ObjectRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
index 30733c93e1..5339cd06a6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.OptionalLong;
/**
* A split describes chain table read scope. It follows DataSplit's custom
serialization pattern and
@@ -87,6 +88,11 @@ public class ChainSplit implements Split {
return sum;
}
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 3a4c112a95..458c31f73a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -55,7 +55,6 @@ import java.util.stream.Collectors;
import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkState;
/** Input splits. Needed by most batch computation engines. */
public class DataSplit implements Split {
@@ -143,17 +142,31 @@ public class DataSplit implements Split {
return rowCount;
}
- /** Whether it is possible to calculate the merged row count. */
- public boolean mergedRowCountAvailable() {
- return rawConvertible
- && (dataDeletionFiles == null
- || dataDeletionFiles.stream()
- .allMatch(f -> f == null || f.cardinality() !=
null));
- }
+ @Override
+ public OptionalLong mergedRowCount() {
+ if (!rawConvertible
+ || (dataDeletionFiles != null
+ && !dataDeletionFiles.stream()
+ .allMatch(f -> f == null || f.cardinality() !=
null))) {
+ return OptionalLong.empty();
+ }
- public long mergedRowCount() {
- checkState(mergedRowCountAvailable());
- return partialMergedRowCount();
+ long sum = 0L;
+ List<RawFile> rawFiles = convertToRawFiles().orElse(null);
+ if (rawFiles != null) {
+ for (int i = 0; i < rawFiles.size(); i++) {
+ RawFile rawFile = rawFiles.get(i);
+ DeletionFile deletionFile =
+ dataDeletionFiles == null ? null :
dataDeletionFiles.get(i);
+ Long cardinality = deletionFile == null ? null :
deletionFile.cardinality();
+ if (deletionFile == null) {
+ sum += rawFile.rowCount();
+ } else if (cardinality != null) {
+ sum += rawFile.rowCount() - cardinality;
+ }
+ }
+ }
+ return OptionalLong.of(sum);
}
public Object minValue(int fieldIndex, DataField dataField,
SimpleStatsEvolutions evolutions) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 012160f5bd..457229d7ad 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.OptionalLong;
import static org.apache.paimon.table.source.PushDownUtils.minmaxAvailable;
@@ -144,10 +145,10 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
List<Split> limitedSplits = new ArrayList<>();
for (DataSplit dataSplit : splits) {
- if (dataSplit.rawConvertible()) {
- long partialMergedRowCount = dataSplit.partialMergedRowCount();
+ OptionalLong mergedRowCount = dataSplit.mergedRowCount();
+ if (mergedRowCount.isPresent()) {
limitedSplits.add(dataSplit);
- scannedRowCount += partialMergedRowCount;
+ scannedRowCount += mergedRowCount.getAsLong();
if (scannedRowCount >= pushDownLimit) {
SnapshotReader.Plan newPlan =
new PlanImpl(plan.watermark(), plan.snapshotId(),
limitedSplits);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
index 75da5bda7f..9b9042b7e2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
@@ -22,6 +22,8 @@ import org.apache.paimon.catalog.TableQueryAuthResult;
import javax.annotation.Nullable;
+import java.util.OptionalLong;
+
/** A wrapper class for {@link Split} that adds query authorization
information. */
public class QueryAuthSplit implements Split {
@@ -48,4 +50,9 @@ public class QueryAuthSplit implements Split {
public long rowCount() {
return split.rowCount();
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return split.mergedRowCount();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
index d6cb381e24..0371c324f8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
@@ -23,6 +23,7 @@ import org.apache.paimon.annotation.Public;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
+import java.util.OptionalLong;
/**
* An input split for reading.
@@ -32,8 +33,20 @@ import java.util.Optional;
@Public
public interface Split extends Serializable {
+ /**
+ * The row count in files, may be duplicated, such as in the primary key
table and the
+ * Data-Evolution Append table.
+ */
long rowCount();
+ /**
+ * Return the merged row count of data files. For example, when the delete
vector is enabled in
+ * the primary key table, the number of rows that have been deleted will
be subtracted from the
+ * returned result. In the Data Evolution mode of the Append table, the
actual number of rows
+ * will be returned.
+ */
+ OptionalLong mergedRowCount();
+
/**
* If all files in this split can be read without merging, returns an
{@link Optional} wrapping
* a list of {@link RawFile}s to be read without merging. Otherwise,
returns {@link
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
index 6be92518d7..98660a6e12 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
@@ -53,6 +53,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.OptionalLong;
import java.util.function.Function;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -160,6 +161,11 @@ public class AggregationFieldsTable implements
ReadonlyTable {
public int hashCode() {
return Objects.hash(location);
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
/** {@link TableRead} implementation for {@link AggregationFieldsTable}. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllPartitionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllPartitionsTable.java
index b73a403c53..8925caccf8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllPartitionsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllPartitionsTable.java
@@ -52,6 +52,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.OptionalLong;
/** This is a system table to display all the database-table-partitions. */
public class AllPartitionsTable implements ReadonlyTable {
@@ -180,6 +181,11 @@ public class AllPartitionsTable implements ReadonlyTable {
public int hashCode() {
return Objects.hash(rows);
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private static class AllPartitionsRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
index 6e70da201d..4e53cbdd96 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
@@ -50,6 +50,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.OptionalLong;
/**
* This is a system table to display all the database-table properties.
@@ -158,6 +159,11 @@ public class AllTableOptionsTable implements ReadonlyTable
{
public int hashCode() {
return Objects.hash(allOptions);
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private static class AllTableOptionsRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTablesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTablesTable.java
index 4c29a5145b..f58c561b9a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTablesTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTablesTable.java
@@ -52,6 +52,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.OptionalLong;
import static org.apache.paimon.CoreOptions.TYPE;
import static
org.apache.paimon.rest.responses.AuditRESTResponse.FIELD_CREATED_AT;
@@ -200,6 +201,11 @@ public class AllTablesTable implements ReadonlyTable {
public int hashCode() {
return Objects.hash(rows);
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private static class AllTablesRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
index 490f683c63..f652549703 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
@@ -65,6 +65,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.OptionalLong;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -149,6 +150,7 @@ public class BranchesTable implements ReadonlyTable {
}
private static class BranchesSplit extends SingletonSplit {
+
private static final long serialVersionUID = 1L;
private final Path location;
@@ -177,6 +179,11 @@ public class BranchesTable implements ReadonlyTable {
public int hashCode() {
return Objects.hash(location);
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private static class BranchesRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index 10ca1775a1..18aaee6f86 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -57,6 +57,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -147,6 +148,11 @@ public class BucketsTable implements ReadonlyTable {
public int hashCode() {
return 1;
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private static class BucketsRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
index f49b1d9b72..78b281e585 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
@@ -47,6 +47,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import static org.apache.paimon.utils.SerializationUtils.newStringType;
@@ -148,6 +149,11 @@ public class CatalogOptionsTable implements ReadonlyTable {
public int hashCode() {
return catalogOptions.hashCode();
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private static class CatalogOptionsRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
index 6aad82190b..74c160da96 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
@@ -53,6 +53,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.OptionalLong;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -158,6 +159,11 @@ public class ConsumersTable implements ReadonlyTable {
public int hashCode() {
return Objects.hash(location);
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
/** {@link TableRead} implementation for {@link ConsumersTable}. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index b6cae80180..7e62516e85 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -78,6 +78,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -283,6 +284,11 @@ public class FilesTable implements ReadonlyTable {
}
return scan.plan();
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private static class FilesRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
index 0813e838f3..ea91154b7d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -60,6 +60,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -159,6 +160,11 @@ public class ManifestsTable implements ReadonlyTable {
}
return o != null && getClass() == o.getClass();
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private static class ManifestsRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
index d9de8bba19..a613a76121 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
@@ -48,6 +48,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.OptionalLong;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
import static org.apache.paimon.utils.SerializationUtils.newStringType;
@@ -150,6 +151,11 @@ public class OptionsTable implements ReadonlyTable {
public int hashCode() {
return Objects.hash(location);
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private class OptionsRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
index 22d2c85283..296542d70a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -72,6 +72,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.OptionalLong;
import java.util.stream.Collectors;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -166,6 +167,11 @@ public class PartitionsTable implements ReadonlyTable {
public int hashCode() {
return 1;
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private static class PartitionsRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index b051605bb6..b5093c3aeb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -73,6 +73,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@@ -172,6 +173,11 @@ public class SchemasTable implements ReadonlyTable {
public int hashCode() {
return 0;
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
/** {@link TableRead} implementation for {@link SchemasTable}. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index d869be3ded..394dc4d58a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -71,6 +71,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalLong;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -195,6 +196,11 @@ public class SnapshotsTable implements ReadonlyTable {
public int hashCode() {
return Objects.hash(location);
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private class SnapshotsRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
index 437be76b56..b1d305ffa7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalLong;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -156,6 +157,11 @@ public class StatisticTable implements ReadonlyTable {
public int hashCode() {
return Objects.hash(location);
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private static class StatisticRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
index 9cc1d95962..990a3e8f2d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
@@ -62,6 +62,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
import static org.apache.paimon.utils.SerializationUtils.newStringType;
@@ -154,6 +155,11 @@ public class TableIndexesTable implements ReadonlyTable {
}
return o != null && getClass() == o.getClass();
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private static class IndexesRead implements InnerTableRead {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index 236fbe3110..0dd22e79be 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -67,6 +67,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.TreeMap;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -188,6 +189,11 @@ public class TagsTable implements ReadonlyTable {
public int hashCode() {
return Objects.hash(location);
}
+
+ @Override
+ public OptionalLong mergedRowCount() {
+ return OptionalLong.empty();
+ }
}
private class TagsRead implements InnerTableRead {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java
index cfe9086fa0..dcd888b7f1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java
@@ -67,14 +67,11 @@ public class DataSplitCompatibleTest {
List<DataFileMeta> dataFiles =
Arrays.asList(newDataFile(1000L), newDataFile(2000L),
newDataFile(3000L));
DataSplit split = newDataSplit(false, dataFiles, null);
- assertThat(split.partialMergedRowCount()).isEqualTo(0L);
- assertThat(split.mergedRowCountAvailable()).isEqualTo(false);
+ assertThat(split.mergedRowCount()).isEmpty();
// rawConvertible without deletion files
split = newDataSplit(true, dataFiles, null);
- assertThat(split.partialMergedRowCount()).isEqualTo(6000L);
- assertThat(split.mergedRowCountAvailable()).isEqualTo(true);
- assertThat(split.mergedRowCount()).isEqualTo(6000L);
+ assertThat(split.mergedRowCount()).hasValue(6000L);
// rawConvertible with deletion files without cardinality
ArrayList<DeletionFile> deletionFiles = new ArrayList<>();
@@ -82,8 +79,7 @@ public class DataSplitCompatibleTest {
deletionFiles.add(new DeletionFile("p", 1, 2, null));
deletionFiles.add(new DeletionFile("p", 1, 2, 100L));
split = newDataSplit(true, dataFiles, deletionFiles);
- assertThat(split.partialMergedRowCount()).isEqualTo(3900L);
- assertThat(split.mergedRowCountAvailable()).isEqualTo(false);
+ assertThat(split.mergedRowCount()).isEmpty();
// rawConvertible with deletion files with cardinality
deletionFiles = new ArrayList<>();
@@ -91,9 +87,7 @@ public class DataSplitCompatibleTest {
deletionFiles.add(new DeletionFile("p", 1, 2, 200L));
deletionFiles.add(new DeletionFile("p", 1, 2, 100L));
split = newDataSplit(true, dataFiles, deletionFiles);
- assertThat(split.partialMergedRowCount()).isEqualTo(5700L);
- assertThat(split.mergedRowCountAvailable()).isEqualTo(true);
- assertThat(split.mergedRowCount()).isEqualTo(5700L);
+ assertThat(split.mergedRowCount()).hasValue(5700L);
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 606b383ab6..8e1ecfc955 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -37,7 +37,6 @@ import org.apache.paimon.table.BucketSpec;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Projection;
@@ -66,6 +65,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -353,15 +353,11 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
.splits();
long countPushed = 0;
for (Split s : splits) {
- if (!(s instanceof DataSplit)) {
+ OptionalLong mergedRowCount = s.mergedRowCount();
+ if (!mergedRowCount.isPresent()) {
return false;
}
- DataSplit split = (DataSplit) s;
- if (!split.mergedRowCountAvailable()) {
- return false;
- }
-
- countPushed += split.mergedRowCount();
+ countPushed += mergedRowCount.getAsLong();
}
this.countPushed = countPushed;
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
index 6282b214d4..1f737f602f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.types.{DataType, LongType}
import org.apache.spark.unsafe.types.UTF8String
trait AggFuncEvaluator[T] {
+
def update(dataSplit: DataSplit): Unit
def result(): T
@@ -42,7 +43,7 @@ class CountStarEvaluator extends AggFuncEvaluator[Long] {
private var _result: Long = 0L
override def update(dataSplit: DataSplit): Unit = {
- _result += dataSplit.mergedRowCount()
+ _result += dataSplit.mergedRowCount().getAsLong
}
val a: Int = 1;
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
index df6af4d483..52ce726c35 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
@@ -70,7 +70,7 @@ object AggregatePushDownUtils {
}
val dataSplits = splits.map(_.asInstanceOf[DataSplit])
- if (!dataSplits.forall(_.mergedRowCountAvailable())) {
+ if (!dataSplits.forall(_.mergedRowCount().isPresent)) {
return None
}