This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/release-1.3 by this push:
new 2e11817b77 [spark] Fix group by partial partition of a multi partition
table (#6375)
2e11817b77 is described below
commit 2e11817b77704d447ca9da0cdbaebca6021d8e87
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Oct 10 13:41:04 2025 +0800
[spark] Fix group by partial partition of a multi partition table (#6375)
---
.../paimon/spark/aggregate/LocalAggregator.scala | 15 ++++++------
.../paimon/spark/sql/PushDownAggregatesTest.scala | 27 ++++++++++++++++++++++
2 files changed, 35 insertions(+), 7 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
index fb401c78e1..ae62c37c4f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
@@ -24,7 +24,8 @@ import org.apache.paimon.spark.data.SparkInternalRow
import org.apache.paimon.stats.SimpleStatsEvolutions
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.source.DataSplit
-import org.apache.paimon.utils.{InternalRowUtils, ProjectedRow}
+import org.apache.paimon.types.RowType
+import org.apache.paimon.utils.ProjectedRow
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
@@ -40,7 +41,8 @@ class LocalAggregator(table: FileStoreTable) {
private val partitionType = SparkTypeUtils.toPartitionType(table)
private val groupByEvaluatorMap = new mutable.HashMap[InternalRow,
Seq[AggFuncEvaluator[_]]]()
private var requiredGroupByType: Seq[DataType] = _
- private var requiredGroupByIndexMapping: Seq[Int] = _
+ private var requiredGroupByIndexMapping: Array[Int] = _
+ private var requiredGroupByPaimonType: RowType = _
private var aggFuncEvaluatorGetter: () => Seq[AggFuncEvaluator[_]] = _
private var isInitialized = false
private lazy val simpleStatsEvolutions = {
@@ -78,15 +80,14 @@ class LocalAggregator(table: FileStoreTable) {
partitionType.getFieldIndex(r.fieldNames().head)
}
+ requiredGroupByPaimonType =
partitionType.project(requiredGroupByIndexMapping)
+
isInitialized = true
}
private def requiredGroupByRow(partitionRow: BinaryRow): InternalRow = {
- val projectedRow =
-
ProjectedRow.from(requiredGroupByIndexMapping.toArray).replaceRow(partitionRow)
- // `ProjectedRow` does not support `hashCode`, so do a deep copy
- val genericRow = InternalRowUtils.copyInternalRow(projectedRow,
partitionType)
- SparkInternalRow.create(partitionType).replace(genericRow)
+ val projectedRow =
ProjectedRow.from(requiredGroupByIndexMapping).replaceRow(partitionRow)
+ SparkInternalRow.create(requiredGroupByPaimonType).replace(projectedRow)
}
def update(dataSplit: DataSplit): Unit = {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
index a60d88aef9..af99a67b24 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
@@ -267,4 +267,31 @@ class PushDownAggregatesTest extends PaimonSparkTestBase
with AdaptiveSparkPlanH
})
})
}
+
+ test("Push down aggregate: group by partial partition of a multi partition
table") {
+ sql(s"""
+ |CREATE TABLE T (
+ |c1 STRING,
+ |c2 STRING,
+ |c3 STRING,
+ |c4 STRING,
+ |c5 DATE)
+ |PARTITIONED BY (c5, c1)
+ |TBLPROPERTIES ('primary-key' = 'c5, c1, c3')
+ |""".stripMargin)
+
+ sql("INSERT INTO T VALUES ('t1', 'k1', 'v1', 'r1', '2025-01-01')")
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM T GROUP BY c1"),
+ Seq(Row(1))
+ )
+ checkAnswer(
+ sql("SELECT c1, COUNT(*) FROM T GROUP BY c1"),
+ Seq(Row("t1", 1))
+ )
+ checkAnswer(
+ sql("SELECT COUNT(*), c1 FROM T GROUP BY c1"),
+ Seq(Row(1, "t1"))
+ )
+ }
}