This is an automated email from the ASF dual-hosted git repository.

zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b8c5711d67 [VL] Add metric abandonedPartialAggregation (#12106)
b8c5711d67 is described below

commit b8c5711d672c40ef3cc0717646074fa1bdeac46e
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon May 25 16:27:40 2026 +0100

    [VL] Add metric abandonedPartialAggregation (#12106)
---
 .../java/org/apache/gluten/metrics/Metrics.java    |  4 ++++
 .../org/apache/gluten/metrics/OperatorMetrics.java |  3 +++
 .../gluten/backendsapi/velox/VeloxMetricsApi.scala |  3 +++
 .../metrics/HashAggregateMetricsUpdater.scala      |  2 ++
 .../org/apache/gluten/metrics/MetricsUtil.scala    |  3 +++
 .../gluten/execution/VeloxMetricsSuite.scala       | 27 +++++++++++++++++++++-
 cpp/core/jni/JniWrapper.cc                         |  3 ++-
 cpp/core/utils/Metrics.h                           |  1 +
 cpp/velox/compute/WholeStageResultIterator.cc      |  3 +++
 9 files changed, 47 insertions(+), 2 deletions(-)

diff --git 
a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java 
b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
index 1bcfd67392..12f38d84e5 100644
--- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
+++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java
@@ -42,6 +42,7 @@ public class Metrics implements IMetrics {
   public long[] numReplacedWithDynamicFilterRows;
   public long[] numDynamicFilterInputRows;
   public long[] flushRowCount;
+  public long[] abandonedPartialAggregationRows;
   public long[] loadedToValueHook;
   public long[] bloomFilterBlocksByteSize;
   public long[] skippedSplits;
@@ -94,6 +95,7 @@ public class Metrics implements IMetrics {
       long[] numReplacedWithDynamicFilterRows,
       long[] numDynamicFilterInputRows,
       long[] flushRowCount,
+      long[] abandonedPartialAggregationRows,
       long[] loadedToValueHook,
       long[] bloomFilterBlocksByteSize,
       long[] scanTime,
@@ -140,6 +142,7 @@ public class Metrics implements IMetrics {
     this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
     this.numDynamicFilterInputRows = numDynamicFilterInputRows;
     this.flushRowCount = flushRowCount;
+    this.abandonedPartialAggregationRows = abandonedPartialAggregationRows;
     this.loadedToValueHook = loadedToValueHook;
     this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
     this.skippedSplits = skippedSplits;
@@ -192,6 +195,7 @@ public class Metrics implements IMetrics {
         numReplacedWithDynamicFilterRows[index],
         numDynamicFilterInputRows[index],
         flushRowCount[index],
+        abandonedPartialAggregationRows[index],
         loadedToValueHook[index],
         bloomFilterBlocksByteSize[index],
         scanTime[index],
diff --git 
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java 
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
index d52245a334..18feed0b9e 100644
--- 
a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
+++ 
b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
@@ -40,6 +40,7 @@ public class OperatorMetrics implements IOperatorMetrics {
   public long numReplacedWithDynamicFilterRows;
   public long numDynamicFilterInputRows;
   public long flushRowCount;
+  public long abandonedPartialAggregationRows;
   public long loadedToValueHook;
   public long bloomFilterBlocksByteSize;
   public long skippedSplits;
@@ -87,6 +88,7 @@ public class OperatorMetrics implements IOperatorMetrics {
       long numReplacedWithDynamicFilterRows,
       long numDynamicFilterInputRows,
       long flushRowCount,
+      long abandonedPartialAggregationRows,
       long loadedToValueHook,
       long bloomFilterBlocksByteSize,
       long scanTime,
@@ -131,6 +133,7 @@ public class OperatorMetrics implements IOperatorMetrics {
     this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
     this.numDynamicFilterInputRows = numDynamicFilterInputRows;
     this.flushRowCount = flushRowCount;
+    this.abandonedPartialAggregationRows = abandonedPartialAggregationRows;
     this.loadedToValueHook = loadedToValueHook;
     this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
     this.skippedSplits = skippedSplits;
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 69ed2f09a9..800fd758e8 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -311,6 +311,9 @@ class VeloxMetricsApi extends MetricsApi with Logging {
         "number of spilled partitions"),
       "aggSpilledFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
spilled files"),
       "flushRowCount" -> SQLMetrics.createMetric(sparkContext, "number of 
flushed rows"),
+      "abandonedPartialAggregationRows" -> SQLMetrics.createMetric(
+        sparkContext,
+        "number of rows after partial aggregation abandonment"),
       "loadedToValueHook" -> SQLMetrics.createMetric(
         sparkContext,
         "number of pushdown aggregations"),
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
index 3843a34b83..dcfa0d7cf3 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
@@ -44,6 +44,7 @@ class HashAggregateMetricsUpdaterImpl(val metrics: 
Map[String, SQLMetric])
   val aggSpilledPartitions: SQLMetric = metrics("aggSpilledPartitions")
   val aggSpilledFiles: SQLMetric = metrics("aggSpilledFiles")
   val flushRowCount: SQLMetric = metrics("flushRowCount")
+  val abandonedPartialAggregationRows: SQLMetric = 
metrics("abandonedPartialAggregationRows")
   val loadedToValueHook: SQLMetric = metrics("loadedToValueHook")
 
   val rowConstructionCpuCount: SQLMetric = metrics("rowConstructionCpuCount")
@@ -81,6 +82,7 @@ class HashAggregateMetricsUpdaterImpl(val metrics: 
Map[String, SQLMetric])
     aggSpilledPartitions += aggMetrics.spilledPartitions
     aggSpilledFiles += aggMetrics.spilledFiles
     flushRowCount += aggMetrics.flushRowCount
+    abandonedPartialAggregationRows += 
aggMetrics.abandonedPartialAggregationRows
     loadedToValueHook += aggMetrics.loadedToValueHook
     idx += 1
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index f2c9214b06..24930ea153 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -122,6 +122,7 @@ object MetricsUtil extends Logging {
     var numReplacedWithDynamicFilterRows: Long = 0
     var numDynamicFilterInputRows: Long = 0
     var flushRowCount: Long = 0
+    var abandonedPartialAggregationRows: Long = 0
     var loadedToValueHook: Long = 0
     var bloomFilterBlocksByteSize: Long = 0
     var scanTime: Long = 0
@@ -159,6 +160,7 @@ object MetricsUtil extends Logging {
       numReplacedWithDynamicFilterRows += 
metrics.numReplacedWithDynamicFilterRows
       numDynamicFilterInputRows += metrics.numDynamicFilterInputRows
       flushRowCount += metrics.flushRowCount
+      abandonedPartialAggregationRows += 
metrics.abandonedPartialAggregationRows
       loadedToValueHook += metrics.loadedToValueHook
       bloomFilterBlocksByteSize += metrics.bloomFilterBlocksByteSize
       scanTime += metrics.scanTime
@@ -203,6 +205,7 @@ object MetricsUtil extends Logging {
       numReplacedWithDynamicFilterRows,
       numDynamicFilterInputRows,
       flushRowCount,
+      abandonedPartialAggregationRows,
       loadedToValueHook,
       bloomFilterBlocksByteSize,
       scanTime,
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
index 35edc4fa6e..861729e349 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.SparkConf
@@ -189,6 +189,31 @@ class VeloxMetricsSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
     }
   }
 
+  test("Hash aggregate metrics include abandoned partial aggregation rows") {
+    withSQLConf(
+      GlutenConfig.COLUMNAR_MAX_BATCH_SIZE.key -> "10",
+      VeloxConfig.ABANDON_PARTIAL_AGGREGATION_MIN_ROWS.key -> "0",
+      VeloxConfig.ABANDON_PARTIAL_AGGREGATION_MIN_PCT.key -> "0"
+    ) {
+      runQueryAndCompare("SELECT c2, sum(c1) FROM metrics_t1 GROUP BY c2") {
+        df =>
+          val aggregates = collect(df.queryExecution.executedPlan) {
+            case agg: HashAggregateExecBaseTransformer => agg
+          }
+          assert(aggregates.nonEmpty)
+          val numTotalAbandonedPartialAggregationRows = aggregates.map {
+            agg =>
+              val metrics = agg.metrics
+              assert(metrics.contains("abandonedPartialAggregationRows"))
+              val num = metrics("abandonedPartialAggregationRows").value
+              assert(num >= 0)
+              num
+          }.sum
+          assert(numTotalAbandonedPartialAggregationRows > 0)
+      }
+    }
+  }
+
   test("Metrics of noop filter's children") {
     runQueryAndCompare("SELECT c1, c2 FROM metrics_t1 where c1 < 50") {
       df =>
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 46b9d7603c..a726d3be91 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -277,7 +277,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
       env,
       metricsBuilderClass,
       "<init>",
-      
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
+      
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
 
   nativeColumnarToRowInfoClass =
       createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
@@ -595,6 +595,7 @@ JNIEXPORT jobject JNICALL 
Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
       longArray[Metrics::kNumReplacedWithDynamicFilterRows],
       longArray[Metrics::kNumDynamicFilterInputRows],
       longArray[Metrics::kFlushRowCount],
+      longArray[Metrics::kAbandonedPartialAggregationRows],
       longArray[Metrics::kLoadedToValueHook],
       longArray[Metrics::kBloomFilterBlocksByteSize],
       longArray[Metrics::kScanTime],
diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h
index 55f2bb4ef5..67c0b485c7 100644
--- a/cpp/core/utils/Metrics.h
+++ b/cpp/core/utils/Metrics.h
@@ -69,6 +69,7 @@ struct Metrics {
     kNumReplacedWithDynamicFilterRows,
     kNumDynamicFilterInputRows,
     kFlushRowCount,
+    kAbandonedPartialAggregationRows,
     kLoadedToValueHook,
     kBloomFilterBlocksByteSize,
     kScanTime,
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index ccc1917f41..2b957ce54d 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -44,6 +44,7 @@ const std::string kDynamicFiltersAccepted = 
"dynamicFiltersAccepted";
 const std::string kReplacedWithDynamicFilterRows = 
"replacedWithDynamicFilterRows";
 const std::string kDynamicFilterInputRows = "dynamicFilterInputRows";
 const std::string kFlushRowCount = "flushRowCount";
+const std::string kAbandonedPartialAggregationRows = 
"abandonedPartialAggregationRows";
 const std::string kLoadedToValueHook = "loadedToValueHook";
 const std::string kBloomFilterBlocksByteSize = "bloomFilterSize";
 const std::string kTotalScanTime = "totalScanTime";
@@ -504,6 +505,8 @@ void WholeStageResultIterator::collectMetrics() {
       metrics_->get(Metrics::kNumDynamicFilterInputRows)[metricIndex] =
           runtimeMetric("sum", second->customStats, kDynamicFilterInputRows);
       metrics_->get(Metrics::kFlushRowCount)[metricIndex] = 
runtimeMetric("sum", second->customStats, kFlushRowCount);
+      metrics_->get(Metrics::kAbandonedPartialAggregationRows)[metricIndex] =
+          runtimeMetric("sum", second->customStats, 
kAbandonedPartialAggregationRows);
       metrics_->get(Metrics::kLoadedToValueHook)[metricIndex] =
           runtimeMetric("sum", second->customStats, kLoadedToValueHook);
       metrics_->get(Metrics::kBloomFilterBlocksByteSize)[metricIndex] =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to