This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 09ad28d0ab [spark] Handle NPE for pushdown aggregate when a datasplit 
has a null max/min value (#6611)
09ad28d0ab is described below

commit 09ad28d0abaaf79ddb6fd5110edb260d482612e5
Author: xieshuaihu <[email protected]>
AuthorDate: Sun Dec 28 11:59:41 2025 +0800

    [spark] Handle NPE for pushdown aggregate when a datasplit has a null 
max/min value (#6611)
---
 .../apache/paimon/spark/aggregate/AggFuncEvaluator.scala    |  6 ++++++
 .../apache/paimon/spark/sql/PushDownAggregatesTest.scala    | 13 +++++++++++++
 2 files changed, 19 insertions(+)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
index fcb64e3064..6282b214d4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala
@@ -59,6 +59,9 @@ case class MinEvaluator(idx: Int, dataField: DataField, 
evolutions: SimpleStatsE
 
   override def update(dataSplit: DataSplit): Unit = {
     val other = dataSplit.minValue(idx, dataField, evolutions)
+    if (other == null) {
+      return
+    }
     if (_result == null || CompareUtils.compareLiteral(dataField.`type`(), 
_result, other) > 0) {
       _result = other;
     }
@@ -80,6 +83,9 @@ case class MaxEvaluator(idx: Int, dataField: DataField, 
evolutions: SimpleStatsE
 
   override def update(dataSplit: DataSplit): Unit = {
     val other = dataSplit.maxValue(idx, dataField, evolutions)
+    if (other == null) {
+      return
+    }
     if (_result == null || CompareUtils.compareLiteral(dataField.`type`(), 
_result, other) < 0) {
       _result = other
     }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
index af99a67b24..766fac386f 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
@@ -294,4 +294,17 @@ class PushDownAggregatesTest extends PaimonSparkTestBase 
with AdaptiveSparkPlanH
       Seq(Row(1, "t1"))
     )
   }
+
+  // https://github.com/apache/paimon/issues/6610
+  test("Push down aggregate: aggregate a column in one partition is all null 
and another is not") {
+    withTable("T") {
+      spark.sql("CREATE TABLE T (c1 INT, c2 LONG) PARTITIONED BY(day STRING)")
+
+      spark.sql("INSERT INTO T VALUES (1, 2, '2025-11-10')")
+      spark.sql("INSERT INTO T VALUES (null, 2, '2025-11-11')")
+
+      runAndCheckAggregate("SELECT MIN(c1) FROM T", Row(1) :: Nil, 0)
+      runAndCheckAggregate("SELECT MAX(c1) FROM T", Row(1) :: Nil, 0)
+    }
+  }
 }

Reply via email to