This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 09ad28d0ab [spark] Handle NPE for pushdown aggregate when a datasplit
has a null max/min value (#6611)
09ad28d0ab is described below
commit 09ad28d0abaaf79ddb6fd5110edb260d482612e5
Author: xieshuaihu <[email protected]>
AuthorDate: Sun Dec 28 11:59:41 2025 +0800
[spark] Handle NPE for pushdown aggregate when a datasplit has a null
max/min value (#6611)
---
.../apache/paimon/spark/aggregate/AggFuncEvaluator.scala | 6 ++++++
.../apache/paimon/spark/sql/PushDownAggregatesTest.scala | 13 +++++++++++++
2 files changed, 19 insertions(+)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
index fcb64e3064..6282b214d4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
@@ -59,6 +59,9 @@ case class MinEvaluator(idx: Int, dataField: DataField,
evolutions: SimpleStatsE
override def update(dataSplit: DataSplit): Unit = {
val other = dataSplit.minValue(idx, dataField, evolutions)
+ if (other == null) {
+ return
+ }
if (_result == null || CompareUtils.compareLiteral(dataField.`type`(),
_result, other) > 0) {
_result = other;
}
@@ -80,6 +83,9 @@ case class MaxEvaluator(idx: Int, dataField: DataField,
evolutions: SimpleStatsE
override def update(dataSplit: DataSplit): Unit = {
val other = dataSplit.maxValue(idx, dataField, evolutions)
+ if (other == null) {
+ return
+ }
if (_result == null || CompareUtils.compareLiteral(dataField.`type`(),
_result, other) < 0) {
_result = other
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
index af99a67b24..766fac386f 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
@@ -294,4 +294,17 @@ class PushDownAggregatesTest extends PaimonSparkTestBase
with AdaptiveSparkPlanH
Seq(Row(1, "t1"))
)
}
+
+ // https://github.com/apache/paimon/issues/6610
+ test("Push down aggregate: aggregate a column in one partition is all null
and another is not") {
+ withTable("T") {
+ spark.sql("CREATE TABLE T (c1 INT, c2 LONG) PARTITIONED BY(day STRING)")
+
+ spark.sql("INSERT INTO T VALUES (1, 2, '2025-11-10')")
+ spark.sql("INSERT INTO T VALUES (null, 2, '2025-11-11')")
+
+ runAndCheckAggregate("SELECT MIN(c1) FROM T", Row(1) :: Nil, 0)
+ runAndCheckAggregate("SELECT MAX(c1) FROM T", Row(1) :: Nil, 0)
+ }
+ }
}