This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new cfa9afa58 [GLUTEN-4835][CORE] Match metric names with Spark (#4834)
cfa9afa58 is described below

commit cfa9afa58ce17ff001282a82f632100d941a0322
Author: Chungmin Lee <l...@chungmin.dev>
AuthorDate: Mon Mar 11 02:21:58 2024 -0700

    [GLUTEN-4835][CORE] Match metric names with Spark (#4834)
---
 .../backendsapi/clickhouse/CHMetricsApi.scala      | 52 +++++++++++-----------
 .../metrics/BatchScanMetricsUpdater.scala          |  6 +--
 .../metrics/ExpandMetricsUpdater.scala             |  4 +-
 .../metrics/FileSourceScanMetricsUpdater.scala     |  6 +--
 .../metrics/FilterMetricsUpdater.scala             |  4 +-
 .../metrics/GenerateMetricsUpdater.scala           |  4 +-
 .../metrics/HashAggregateMetricsUpdater.scala      |  4 +-
 .../metrics/HashJoinMetricsUpdater.scala           |  4 +-
 .../metrics/HiveTableScanMetricsUpdater.scala      |  4 +-
 .../metrics/InputIteratorMetricsUpdater.scala      |  4 +-
 .../metrics/ProjectMetricsUpdater.scala            |  4 +-
 .../glutenproject/metrics/SortMetricsUpdater.scala |  4 +-
 .../metrics/WindowMetricsUpdater.scala             |  4 +-
 .../GlutenClickHouseTPCHBucketSuite.scala          | 18 ++++----
 ...ckHouseTPCHColumnarShuffleParquetAQESuite.scala | 44 +++++++++---------
 .../GlutenClickHouseTPCHParquetBucketSuite.scala   | 18 ++++----
 .../GlutenClickHouseTPCDSMetricsSuite.scala        | 12 ++---
 .../metrics/GlutenClickHouseTPCHMetricsSuite.scala | 36 +++++++--------
 .../benchmarks/CHParquetReadBenchmark.scala        |  2 +-
 .../backendsapi/velox/MetricsApiImpl.scala         | 20 ++++-----
 .../execution/BasicScanExecTransformer.scala       |  2 +-
 .../metrics/BatchScanMetricsUpdater.scala          |  4 +-
 .../metrics/ExpandMetricsUpdater.scala             |  2 +-
 .../metrics/FileSourceScanMetricsUpdater.scala     |  2 +-
 .../metrics/FilterMetricsUpdater.scala             |  2 +-
 .../metrics/HiveTableScanMetricsUpdater.scala      |  2 +-
 .../metrics/InputIteratorMetricsUpdater.scala      |  4 +-
 .../metrics/LimitMetricsUpdater.scala              |  2 +-
 .../metrics/ProjectMetricsUpdater.scala            |  2 +-
 .../glutenproject/metrics/SortMetricsUpdater.scala |  2 +-
 .../metrics/WindowMetricsUpdater.scala             |  2 +-
 .../org/apache/spark/sql/GlutenSQLQuerySuite.scala | 26 +++++++++++
 32 files changed, 165 insertions(+), 141 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
index 838612036..488686e93 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
@@ -44,8 +44,8 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
       "iterReadTime" -> SQLMetrics.createTimingMetric(
         sparkContext,
         "time of reading from iterator"),
-      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "fillingRightJoinSideTime" -> SQLMetrics.createTimingMetric(
         sparkContext,
         "filling right join side time")
@@ -59,12 +59,12 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
 
   override def genBatchScanTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
       "inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input 
vectors"),
       "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
input bytes"),
       "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw 
input rows"),
       "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
raw input bytes"),
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
       "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"),
@@ -79,12 +79,12 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
   override def genHiveTableScanTransformerMetrics(
       sparkContext: SparkContext): Map[String, SQLMetric] =
     Map(
-      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
       "inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input 
vectors"),
       "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
input bytes"),
       "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw 
input rows"),
       "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
raw input bytes"),
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
       "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"),
@@ -107,12 +107,12 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
   override def genFileSourceScanTransformerMetrics(
       sparkContext: SparkContext): Map[String, SQLMetric] =
     Map(
-      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
       "inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input 
vectors"),
       "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
input bytes"),
       "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw 
input rows"),
       "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
raw input bytes"),
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
       "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"),
@@ -133,10 +133,10 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
 
   override def genFilterTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
-      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
       "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
input bytes"),
       "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time"),
       "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of 
waiting for data"),
@@ -149,10 +149,10 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
 
   override def genProjectTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
-      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
       "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
input bytes"),
       "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time"),
       "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of 
waiting for data"),
@@ -166,10 +166,10 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
   override def genHashAggregateTransformerMetrics(
       sparkContext: SparkContext): Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
-      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
       "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
input bytes"),
       "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time"),
       "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of 
waiting for data"),
@@ -187,10 +187,10 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
 
   override def genExpandTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
-      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
       "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
input bytes"),
       "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time"),
       "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of 
waiting for data"),
@@ -233,10 +233,10 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
 
   override def genWindowTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
-      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
       "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
input bytes"),
       "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time"),
       "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of 
waiting for data"),
@@ -263,10 +263,10 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
 
   override def genLimitTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
-      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
       "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
input bytes"),
       "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time"),
       "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of 
waiting for data"),
@@ -279,10 +279,10 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
 
   override def genSortTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
-      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
       "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
input bytes"),
       "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time"),
       "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of 
waiting for data"),
@@ -319,10 +319,10 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
 
   override def genHashJoinTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
-      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
       "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
input bytes"),
       "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time"),
       "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of 
waiting for data"),
@@ -358,10 +358,10 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
   }
   override def genGenerateTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
-      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
       "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
input bytes"),
       "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time"),
       "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of 
waiting for data"),
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
index 5cd44a508..d173f1715 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
@@ -23,10 +23,10 @@ class BatchScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric])
   extends MetricsUpdater {
 
   val scanTime: SQLMetric = metrics("scanTime")
-  val outputRows: SQLMetric = metrics("outputRows")
+  val outputRows: SQLMetric = metrics("numOutputRows")
   val outputVectors: SQLMetric = metrics("outputVectors")
   val outputBytes: SQLMetric = metrics("outputBytes")
-  val inputRows: SQLMetric = metrics("inputRows")
+  val inputRows: SQLMetric = metrics("numInputRows")
   val inputBytes: SQLMetric = metrics("inputBytes")
   val extraTime: SQLMetric = metrics("extraTime")
   val inputWaitTime: SQLMetric = metrics("inputWaitTime")
@@ -34,7 +34,7 @@ class BatchScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric])
 
   override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
     // inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
-    // inputMetrics.bridgeIncRecordsRead(metrics("inputRows").value)
+    // inputMetrics.bridgeIncRecordsRead(metrics("numInputRows").value)
   }
 
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
index 0aa62e875..d43cbcbc6 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
@@ -33,9 +33,9 @@ class ExpandMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends MetricsU
         MetricsUtil.updateExtraTimeMetric(
           metricsData,
           metrics("extraTime"),
-          metrics("outputRows"),
+          metrics("numOutputRows"),
           metrics("outputBytes"),
-          metrics("inputRows"),
+          metrics("numInputRows"),
           metrics("inputBytes"),
           ExpandMetricsUpdater.INCLUDING_PROCESSORS,
           ExpandMetricsUpdater.CH_PLAN_NODE_NAME
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
index 7985efbf0..1c6da8dad 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
@@ -27,10 +27,10 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
   extends MetricsUpdater {
 
   val scanTime: SQLMetric = metrics("scanTime")
-  val outputRows: SQLMetric = metrics("outputRows")
+  val outputRows: SQLMetric = metrics("numOutputRows")
   val outputVectors: SQLMetric = metrics("outputVectors")
   val outputBytes: SQLMetric = metrics("outputBytes")
-  val inputRows: SQLMetric = metrics("inputRows")
+  val inputRows: SQLMetric = metrics("numInputRows")
   val inputBytes: SQLMetric = metrics("inputBytes")
   val extraTime: SQLMetric = metrics("extraTime")
   val inputWaitTime: SQLMetric = metrics("inputWaitTime")
@@ -38,7 +38,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
 
   override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
     // inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
-    // inputMetrics.bridgeIncRecordsRead(metrics("inputRows").value)
+    // inputMetrics.bridgeIncRecordsRead(metrics("numInputRows").value)
   }
 
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
index 1a2b77f35..b44a9e382 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
@@ -33,9 +33,9 @@ class FilterMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends MetricsU
         MetricsUtil.updateExtraTimeMetric(
           metricsData,
           metrics("extraTime"),
-          metrics("outputRows"),
+          metrics("numOutputRows"),
           metrics("outputBytes"),
-          metrics("inputRows"),
+          metrics("numInputRows"),
           metrics("inputBytes"),
           FilterMetricsUpdater.INCLUDING_PROCESSORS,
           FilterMetricsUpdater.INCLUDING_PROCESSORS
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala
index 166ba5d7e..5f2e3f63c 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala
@@ -33,9 +33,9 @@ class GenerateMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends Metric
         MetricsUtil.updateExtraTimeMetric(
           metricsData,
           metrics("extraTime"),
-          metrics("outputRows"),
+          metrics("numOutputRows"),
           metrics("outputBytes"),
-          metrics("inputRows"),
+          metrics("numInputRows"),
           metrics("inputBytes"),
           GenerateMetricsUpdater.INCLUDING_PROCESSORS,
           GenerateMetricsUpdater.CH_PLAN_NODE_NAME
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala
index ac55b8d17..8f6e07a94 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala
@@ -44,9 +44,9 @@ class HashAggregateMetricsUpdater(val metrics: Map[String, 
SQLMetric])
           MetricsUtil.updateExtraTimeMetric(
             aggMetricsData,
             metrics("extraTime"),
-            metrics("outputRows"),
+            metrics("numOutputRows"),
             metrics("outputBytes"),
-            metrics("inputRows"),
+            metrics("numInputRows"),
             metrics("inputBytes"),
             HashAggregateMetricsUpdater.INCLUDING_PROCESSORS,
             HashAggregateMetricsUpdater.CH_PLAN_NODE_NAME
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala
index 522069aef..180bcc034 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala
@@ -83,9 +83,9 @@ class HashJoinMetricsUpdater(val metrics: Map[String, 
SQLMetric])
                   metrics("extraTime") += (processor.time / 1000L).toLong
                 }
                 if 
(HashJoinMetricsUpdater.CH_PLAN_NODE_NAME.contains(processor.name)) {
-                  metrics("outputRows") += processor.outputRows
+                  metrics("numOutputRows") += processor.outputRows
                   metrics("outputBytes") += processor.outputBytes
-                  metrics("inputRows") += processor.inputRows
+                  metrics("numInputRows") += processor.inputRows
                   metrics("inputBytes") += processor.inputBytes
                 }
               })
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
index 4e3682561..89c3198da 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
@@ -23,10 +23,10 @@ class HiveTableScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric]
   extends MetricsUpdater {
 
   val scanTime: SQLMetric = metrics("scanTime")
-  val outputRows: SQLMetric = metrics("outputRows")
+  val outputRows: SQLMetric = metrics("numOutputRows")
   val outputVectors: SQLMetric = metrics("outputVectors")
   val outputBytes: SQLMetric = metrics("outputBytes")
-  val inputRows: SQLMetric = metrics("inputRows")
+  val inputRows: SQLMetric = metrics("numInputRows")
   val inputBytes: SQLMetric = metrics("inputBytes")
   val extraTime: SQLMetric = metrics("extraTime")
   val inputWaitTime: SQLMetric = metrics("inputWaitTime")
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
index e608cf1e3..3a3477659 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
@@ -33,8 +33,8 @@ case class InputIteratorMetricsUpdater(metrics: Map[String, 
SQLMetric]) extends
               InputIteratorMetricsUpdater.CH_PLAN_NODE_NAME
                 .exists(processor.name.startsWith(_))
             ) {
-              metrics("inputRows") += processor.inputRows
-              metrics("outputRows") += processor.outputRows
+              metrics("numInputRows") += processor.inputRows
+              metrics("numOutputRows") += processor.outputRows
             }
             if (processor.name.equalsIgnoreCase("FillingRightJoinSide")) {
               metrics("fillingRightJoinSideTime") += (processor.time / 
1000L).toLong
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
index bfdcdfd02..4bd445590 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
@@ -33,9 +33,9 @@ class ProjectMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends Metrics
         MetricsUtil.updateExtraTimeMetric(
           metricsData,
           metrics("extraTime"),
-          metrics("outputRows"),
+          metrics("numOutputRows"),
           metrics("outputBytes"),
-          metrics("inputRows"),
+          metrics("numInputRows"),
           metrics("inputBytes"),
           ProjectMetricsUpdater.INCLUDING_PROCESSORS,
           ProjectMetricsUpdater.CH_PLAN_NODE_NAME
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
index 5aba5e290..e53ba6ccf 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
@@ -33,9 +33,9 @@ class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) 
extends MetricsUpd
         MetricsUtil.updateExtraTimeMetric(
           metricsData,
           metrics("extraTime"),
-          metrics("outputRows"),
+          metrics("numOutputRows"),
           metrics("outputBytes"),
-          metrics("inputRows"),
+          metrics("numInputRows"),
           metrics("inputBytes"),
           SortMetricsUpdater.INCLUDING_PROCESSORS,
           SortMetricsUpdater.CH_PLAN_NODE_NAME
diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
index 6943359ae..e36713c34 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
@@ -33,9 +33,9 @@ class WindowMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends MetricsU
         MetricsUtil.updateExtraTimeMetric(
           metricsData,
           metrics("extraTime"),
-          metrics("outputRows"),
+          metrics("numOutputRows"),
           metrics("outputBytes"),
-          metrics("inputRows"),
+          metrics("numInputRows"),
           metrics("inputBytes"),
           WindowMetricsUpdater.INCLUDING_PROCESSORS,
           WindowMetricsUpdater.CH_PLAN_NODE_NAME
diff --git 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHBucketSuite.scala
 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHBucketSuite.scala
index 23face9ce..02d5bcb63 100644
--- 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHBucketSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHBucketSuite.scala
@@ -242,7 +242,7 @@ class GlutenClickHouseTPCHBucketSuite
         
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
         assert(plans(0).metrics("numFiles").value === 2)
         assert(plans(0).metrics("pruningTime").value === -1)
-        assert(plans(0).metrics("outputRows").value === 591673)
+        assert(plans(0).metrics("numOutputRows").value === 591673)
       })
   }
 
@@ -301,7 +301,7 @@ class GlutenClickHouseTPCHBucketSuite
           
assert(plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
         }
         assert(plans(11).metrics("numFiles").value === 1)
-        assert(plans(11).metrics("outputRows").value === 1000)
+        assert(plans(11).metrics("numOutputRows").value === 1000)
       })
   }
 
@@ -337,11 +337,11 @@ class GlutenClickHouseTPCHBucketSuite
           
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
         }
         assert(plans(2).metrics("numFiles").value === 2)
-        assert(plans(2).metrics("outputRows").value === 3111)
+        assert(plans(2).metrics("numOutputRows").value === 3111)
 
         
assert(!(plans(3).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
         assert(plans(3).metrics("numFiles").value === 2)
-        assert(plans(3).metrics("outputRows").value === 72678)
+        assert(plans(3).metrics("numOutputRows").value === 72678)
       })
 
     withSQLConf(
@@ -383,11 +383,11 @@ class GlutenClickHouseTPCHBucketSuite
 
         
assert(plans(1).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
         assert(plans(1).metrics("numFiles").value === 2)
-        assert(plans(1).metrics("outputRows").value === 5552)
+        assert(plans(1).metrics("numOutputRows").value === 5552)
 
         
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
         assert(plans(2).metrics("numFiles").value === 2)
-        assert(plans(2).metrics("outputRows").value === 379809)
+        assert(plans(2).metrics("numOutputRows").value === 379809)
       })
 
     withSQLConf(
@@ -417,7 +417,7 @@ class GlutenClickHouseTPCHBucketSuite
         
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
         assert(plans(0).metrics("numFiles").value === 2)
         assert(plans(0).metrics("pruningTime").value === -1)
-        assert(plans(0).metrics("outputRows").value === 11618)
+        assert(plans(0).metrics("numOutputRows").value === 11618)
       })
   }
 
@@ -442,11 +442,11 @@ class GlutenClickHouseTPCHBucketSuite
 
         
assert(plans(1).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
         assert(plans(1).metrics("numFiles").value === 2)
-        assert(plans(1).metrics("outputRows").value === 150000)
+        assert(plans(1).metrics("numOutputRows").value === 150000)
 
         
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
         assert(plans(2).metrics("numFiles").value === 2)
-        assert(plans(2).metrics("outputRows").value === 3155)
+        assert(plans(2).metrics("numOutputRows").value === 3155)
       })
 
     withSQLConf(
diff --git 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
index f710333d6..29815aff6 100644
--- 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
+++ 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
@@ -67,21 +67,21 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
         assert(plans(4).metrics("numFiles").value === 1)
         assert(plans(4).metrics("pruningTime").value === -1)
         assert(plans(4).metrics("filesSize").value === 19230111)
-        assert(plans(4).metrics("outputRows").value === 600572)
+        assert(plans(4).metrics("numOutputRows").value === 600572)
 
-        assert(plans(3).metrics("inputRows").value === 591673)
-        assert(plans(3).metrics("outputRows").value === 4)
+        assert(plans(3).metrics("numInputRows").value === 591673)
+        assert(plans(3).metrics("numOutputRows").value === 4)
         assert(plans(3).metrics("outputVectors").value === 1)
 
-        assert(plans(2).metrics("inputRows").value === 8)
-        assert(plans(2).metrics("outputRows").value === 8)
+        assert(plans(2).metrics("numInputRows").value === 8)
+        assert(plans(2).metrics("numOutputRows").value === 8)
 
         // Execute Sort operator, it will read the data twice.
-        assert(plans(1).metrics("outputRows").value === 8)
+        assert(plans(1).metrics("numOutputRows").value === 8)
         assert(plans(1).metrics("outputVectors").value === 2)
 
-        assert(plans(0).metrics("inputRows").value === 4)
-        assert(plans(0).metrics("outputRows").value === 4)
+        assert(plans(0).metrics("numInputRows").value === 4)
+        assert(plans(0).metrics("numOutputRows").value === 4)
     }
   }
 
@@ -100,12 +100,12 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
           assert(plans(2).metrics("pruningTime").value === -1)
           assert(plans(2).metrics("filesSize").value === 19230111)
 
-          assert(plans(1).metrics("inputRows").value === 591673)
-          assert(plans(1).metrics("outputRows").value === 4)
+          assert(plans(1).metrics("numInputRows").value === 591673)
+          assert(plans(1).metrics("numOutputRows").value === 4)
           assert(plans(1).metrics("outputVectors").value === 1)
 
           // Execute Sort operator, it will read the data twice.
-          assert(plans(0).metrics("outputRows").value === 8)
+          assert(plans(0).metrics("numOutputRows").value === 8)
           assert(plans(0).metrics("outputVectors").value === 2)
       }
     }
@@ -138,17 +138,17 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
 
           assert(inputIteratorTransformers.size == 4)
 
-          assert(inputIteratorTransformers(3).metrics("inputRows").value === 
324322)
-          assert(inputIteratorTransformers(3).metrics("outputRows").value === 
324322)
+          assert(inputIteratorTransformers(3).metrics("numInputRows").value 
=== 324322)
+          assert(inputIteratorTransformers(3).metrics("numOutputRows").value 
=== 324322)
 
-          assert(inputIteratorTransformers(2).metrics("inputRows").value === 
72678)
-          assert(inputIteratorTransformers(2).metrics("outputRows").value === 
72678)
+          assert(inputIteratorTransformers(2).metrics("numInputRows").value 
=== 72678)
+          assert(inputIteratorTransformers(2).metrics("numOutputRows").value 
=== 72678)
 
-          assert(inputIteratorTransformers(1).metrics("inputRows").value === 
3111)
-          assert(inputIteratorTransformers(1).metrics("outputRows").value === 
3111)
+          assert(inputIteratorTransformers(1).metrics("numInputRows").value 
=== 3111)
+          assert(inputIteratorTransformers(1).metrics("numOutputRows").value 
=== 3111)
 
-          assert(inputIteratorTransformers(0).metrics("inputRows").value === 
15224)
-          assert(inputIteratorTransformers(0).metrics("outputRows").value === 
15224)
+          assert(inputIteratorTransformers(0).metrics("numInputRows").value 
=== 15224)
+          assert(inputIteratorTransformers(0).metrics("numOutputRows").value 
=== 15224)
       }
     }
   }
@@ -280,10 +280,10 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
           case scanExec: BasicScanExecTransformer => scanExec
           case filterExec: FilterExecTransformerBase => filterExec
         }
-        assert(plans(2).metrics("inputRows").value === 600572)
-        assert(plans(2).metrics("outputRows").value === 379809)
+        assert(plans(2).metrics("numInputRows").value === 600572)
+        assert(plans(2).metrics("numOutputRows").value === 379809)
 
-        assert(plans(3).metrics("outputRows").value === 600572)
+        assert(plans(3).metrics("numOutputRows").value === 600572)
     }
   }
 
diff --git 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
index 427c4a6eb..8c417499e 100644
--- 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
@@ -267,7 +267,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
         
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
         assert(plans(0).metrics("numFiles").value === 4)
         assert(plans(0).metrics("pruningTime").value === -1)
-        assert(plans(0).metrics("outputRows").value === 600572)
+        assert(plans(0).metrics("numOutputRows").value === 600572)
       }
     )
   }
@@ -329,7 +329,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
           
assert(plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
         }
         assert(plans(11).metrics("numFiles").value === 1)
-        assert(plans(11).metrics("outputRows").value === 1000)
+        assert(plans(11).metrics("numOutputRows").value === 1000)
       }
     )
   }
@@ -369,11 +369,11 @@ class GlutenClickHouseTPCHParquetBucketSuite
           
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
         }
         assert(plans(2).metrics("numFiles").value === 4)
-        assert(plans(2).metrics("outputRows").value === 15000)
+        assert(plans(2).metrics("numOutputRows").value === 15000)
 
         
assert(!(plans(3).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
         assert(plans(3).metrics("numFiles").value === 4)
-        assert(plans(3).metrics("outputRows").value === 150000)
+        assert(plans(3).metrics("numOutputRows").value === 150000)
       }
     )
 
@@ -421,11 +421,11 @@ class GlutenClickHouseTPCHParquetBucketSuite
 
         
assert(plans(1).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
         assert(plans(1).metrics("numFiles").value === 4)
-        assert(plans(1).metrics("outputRows").value === 150000)
+        assert(plans(1).metrics("numOutputRows").value === 150000)
 
         
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
         assert(plans(2).metrics("numFiles").value === 4)
-        assert(plans(2).metrics("outputRows").value === 600572)
+        assert(plans(2).metrics("numOutputRows").value === 600572)
       }
     )
 
@@ -461,7 +461,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
         
assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
         assert(plans(0).metrics("numFiles").value === 4)
         assert(plans(0).metrics("pruningTime").value === -1)
-        assert(plans(0).metrics("outputRows").value === 600572)
+        assert(plans(0).metrics("numOutputRows").value === 600572)
       }
     )
   }
@@ -489,11 +489,11 @@ class GlutenClickHouseTPCHParquetBucketSuite
 
         
assert(plans(1).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
         assert(plans(1).metrics("numFiles").value === 4)
-        assert(plans(1).metrics("outputRows").value === 150000)
+        assert(plans(1).metrics("numOutputRows").value === 150000)
 
         
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
         assert(plans(2).metrics("numFiles").value === 4)
-        assert(plans(2).metrics("outputRows").value === 600572)
+        assert(plans(2).metrics("numOutputRows").value === 600572)
       }
     )
 
diff --git 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
index 3a4cee9c0..bffd4f9ed 100644
--- 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
@@ -99,9 +99,9 @@ class GlutenClickHouseTPCDSMetricsSuite extends 
GlutenClickHouseTPCDSAbstractSui
         assert(windowPlan0.metrics("totalTime").value == 2)
         assert(windowPlan0.metrics("inputWaitTime").value == 12)
         assert(windowPlan0.metrics("outputWaitTime").value == 0)
-        assert(windowPlan0.metrics("outputRows").value == 10717)
+        assert(windowPlan0.metrics("numOutputRows").value == 10717)
         assert(windowPlan0.metrics("outputBytes").value == 1224479)
-        assert(windowPlan0.metrics("inputRows").value == 10717)
+        assert(windowPlan0.metrics("numInputRows").value == 10717)
         assert(windowPlan0.metrics("inputBytes").value == 1128026)
 
         val windowPlan1 = allGlutenPlans(5)
@@ -109,18 +109,18 @@ class GlutenClickHouseTPCDSMetricsSuite extends 
GlutenClickHouseTPCDSAbstractSui
         assert(windowPlan1.metrics("extraTime").value == 1)
         assert(windowPlan1.metrics("inputWaitTime").value == 23)
         assert(windowPlan1.metrics("outputWaitTime").value == 2)
-        assert(windowPlan1.metrics("outputRows").value == 12333)
+        assert(windowPlan1.metrics("numOutputRows").value == 12333)
         assert(windowPlan1.metrics("outputBytes").value == 1360484)
-        assert(windowPlan1.metrics("inputRows").value == 12333)
+        assert(windowPlan1.metrics("numInputRows").value == 12333)
         assert(windowPlan1.metrics("inputBytes").value == 1261820)
 
         val sortPlan = allGlutenPlans(6)
         assert(sortPlan.metrics("totalTime").value == 3)
         assert(sortPlan.metrics("inputWaitTime").value == 30)
         assert(sortPlan.metrics("outputWaitTime").value == 1)
-        assert(sortPlan.metrics("outputRows").value == 12333)
+        assert(sortPlan.metrics("numOutputRows").value == 12333)
         assert(sortPlan.metrics("outputBytes").value == 1261820)
-        assert(sortPlan.metrics("inputRows").value == 12333)
+        assert(sortPlan.metrics("numInputRows").value == 12333)
         assert(sortPlan.metrics("inputBytes").value == 1261820)
     }
   }
diff --git 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
index 7b90a5500..012578317 100644
--- 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
@@ -68,11 +68,11 @@ class GlutenClickHouseTPCHMetricsSuite extends 
GlutenClickHouseTPCHAbstractSuite
         assert(plans(2).metrics("pruningTime").value === -1)
         assert(plans(2).metrics("filesSize").value === 19230111)
 
-        assert(plans(1).metrics("outputRows").value === 4)
+        assert(plans(1).metrics("numOutputRows").value === 4)
         assert(plans(1).metrics("outputVectors").value === 1)
 
         // Execute Sort operator, it will read the data twice.
-        assert(plans(0).metrics("outputRows").value === 4)
+        assert(plans(0).metrics("numOutputRows").value === 4)
         assert(plans(0).metrics("outputVectors").value === 1)
     }
   }
@@ -89,8 +89,8 @@ class GlutenClickHouseTPCHMetricsSuite extends 
GlutenClickHouseTPCHAbstractSuite
           case generate: GenerateExecTransformer => generate
         }
         assert(plans.size == 1)
-        assert(plans.head.metrics("inputRows").value == 25)
-        assert(plans.head.metrics("outputRows").value == 266)
+        assert(plans.head.metrics("numInputRows").value == 25)
+        assert(plans.head.metrics("numOutputRows").value == 266)
         assert(plans.head.metrics("outputVectors").value == 1)
     }
   }
@@ -109,11 +109,11 @@ class GlutenClickHouseTPCHMetricsSuite extends 
GlutenClickHouseTPCHAbstractSuite
           assert(plans(2).metrics("pruningTime").value === -1)
           assert(plans(2).metrics("filesSize").value === 19230111)
 
-          assert(plans(1).metrics("outputRows").value === 4)
+          assert(plans(1).metrics("numOutputRows").value === 4)
           assert(plans(1).metrics("outputVectors").value === 1)
 
           // Execute Sort operator, it will read the data twice.
-          assert(plans(0).metrics("outputRows").value === 4)
+          assert(plans(0).metrics("numOutputRows").value === 4)
           assert(plans(0).metrics("outputVectors").value === 1)
       }
     }
@@ -193,24 +193,24 @@ class GlutenClickHouseTPCHMetricsSuite extends 
GlutenClickHouseTPCHAbstractSuite
                 assert(s.metrics("scanTime").value == 2)
                 assert(s.metrics("inputWaitTime").value == 4)
                 assert(s.metrics("outputWaitTime").value == 2)
-                assert(s.metrics("outputRows").value == 20000)
+                assert(s.metrics("numOutputRows").value == 20000)
                 assert(s.metrics("outputBytes").value == 1451663)
               case f: FilterExecTransformerBase =>
                 assert(f.metrics("totalTime").value == 3)
                 assert(f.metrics("inputWaitTime").value == 14)
                 assert(f.metrics("outputWaitTime").value == 1)
-                assert(f.metrics("outputRows").value == 73)
+                assert(f.metrics("numOutputRows").value == 73)
                 assert(f.metrics("outputBytes").value == 5304)
-                assert(f.metrics("inputRows").value == 20000)
+                assert(f.metrics("numInputRows").value == 20000)
                 assert(f.metrics("inputBytes").value == 1451663)
                 assert(f.metrics("extraTime").value == 1)
               case p: ProjectExecTransformer =>
                 assert(p.metrics("totalTime").value == 0)
                 assert(p.metrics("inputWaitTime").value == 7)
                 assert(p.metrics("outputWaitTime").value == 0)
-                assert(p.metrics("outputRows").value == 73)
+                assert(p.metrics("numOutputRows").value == 73)
                 assert(p.metrics("outputBytes").value == 2336)
-                assert(p.metrics("inputRows").value == 73)
+                assert(p.metrics("numInputRows").value == 73)
                 assert(p.metrics("inputBytes").value == 5085)
             }
         }
@@ -230,25 +230,25 @@ class GlutenClickHouseTPCHMetricsSuite extends 
GlutenClickHouseTPCHAbstractSuite
             assert(scanPlan.metrics("scanTime").value == 2)
             assert(scanPlan.metrics("inputWaitTime").value == 3)
             assert(scanPlan.metrics("outputWaitTime").value == 1)
-            assert(scanPlan.metrics("outputRows").value == 80000)
+            assert(scanPlan.metrics("numOutputRows").value == 80000)
             assert(scanPlan.metrics("outputBytes").value == 2160000)
 
             val filterPlan = allGlutenPlans(8)
             assert(filterPlan.metrics("totalTime").value == 1)
             assert(filterPlan.metrics("inputWaitTime").value == 13)
             assert(filterPlan.metrics("outputWaitTime").value == 1)
-            assert(filterPlan.metrics("outputRows").value == 80000)
+            assert(filterPlan.metrics("numOutputRows").value == 80000)
             assert(filterPlan.metrics("outputBytes").value == 2160000)
-            assert(filterPlan.metrics("inputRows").value == 80000)
+            assert(filterPlan.metrics("numInputRows").value == 80000)
             assert(filterPlan.metrics("inputBytes").value == 2160000)
 
             val joinPlan = allGlutenPlans(2)
             assert(joinPlan.metrics("totalTime").value == 1)
             assert(joinPlan.metrics("inputWaitTime").value == 6)
             assert(joinPlan.metrics("outputWaitTime").value == 0)
-            assert(joinPlan.metrics("outputRows").value == 292)
+            assert(joinPlan.metrics("numOutputRows").value == 292)
             assert(joinPlan.metrics("outputBytes").value == 16644)
-            assert(joinPlan.metrics("inputRows").value == 80000)
+            assert(joinPlan.metrics("numInputRows").value == 80000)
             assert(joinPlan.metrics("inputBytes").value == 1920000)
         }
 
@@ -269,9 +269,9 @@ class GlutenClickHouseTPCHMetricsSuite extends 
GlutenClickHouseTPCHAbstractSuite
             assert(shjPlan.metrics("totalTime").value == 6)
             assert(shjPlan.metrics("inputWaitTime").value == 5)
             assert(shjPlan.metrics("outputWaitTime").value == 0)
-            assert(shjPlan.metrics("outputRows").value == 44)
+            assert(shjPlan.metrics("numOutputRows").value == 44)
             assert(shjPlan.metrics("outputBytes").value == 3740)
-            assert(shjPlan.metrics("inputRows").value == 11985)
+            assert(shjPlan.metrics("numInputRows").value == 11985)
             assert(shjPlan.metrics("inputBytes").value == 299625)
         }
     }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala
 
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala
index d6e1d314c..d2687bd69 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala
@@ -98,7 +98,7 @@ object CHParquetReadBenchmark extends SqlBasedBenchmark with 
CHSqlBasedBenchmark
       .take(readFileCnt)
       .map(_.asInstanceOf[FilePartition])
 
-    val numOutputRows = chFileScan.longMetric("outputRows")
+    val numOutputRows = chFileScan.longMetric("numOutputRows")
     val numOutputVectors = chFileScan.longMetric("outputVectors")
     val scanTime = chFileScan.longMetric("scanTime")
     // Generate Substrait plan
diff --git 
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala
 
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala
index 2d1eb1315..71c2642ff 100644
--- 
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala
+++ 
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala
@@ -42,7 +42,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
     Map(
       "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time 
count"),
       "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, 
"totaltime of input iterator"),
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors")
     )
   }
@@ -54,12 +54,12 @@ class MetricsApiImpl extends MetricsApi with Logging {
 
   override def genBatchScanTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
       "inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input 
vectors"),
       "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
input bytes"),
       "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw 
input rows"),
       "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
raw input bytes"),
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
       "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, 
"totaltime of batch scan"),
@@ -91,7 +91,6 @@ class MetricsApiImpl extends MetricsApi with Logging {
     Map(
       "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw 
input rows"),
       "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
raw input bytes"),
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
       "scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime 
of scan"),
@@ -132,7 +131,6 @@ class MetricsApiImpl extends MetricsApi with Logging {
     Map(
       "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw 
input rows"),
       "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
raw input bytes"),
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
       "scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime 
of scan"),
@@ -170,7 +168,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
 
   override def genFilterTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
       "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, 
"totaltime of filter"),
@@ -186,7 +184,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
 
   override def genProjectTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
       "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, 
"totaltime of project"),
@@ -243,7 +241,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
 
   override def genExpandTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
       "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, 
"totaltime of expand"),
@@ -285,7 +283,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
 
   override def genWindowTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
       "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, 
"totaltime of window"),
@@ -315,7 +313,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
 
   override def genLimitTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
       "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, 
"totaltime of limit"),
@@ -340,7 +338,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
 
   override def genSortTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
-      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
       "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of 
output vectors"),
       "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of 
output bytes"),
       "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, 
"totaltime of sort"),
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
 
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
index 3ef1eb75b..625ab6e97 100644
--- 
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
@@ -67,7 +67,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
   }
 
   def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
-    val numOutputRows = longMetric("outputRows")
+    val numOutputRows = longMetric("numOutputRows")
     val numOutputVectors = longMetric("outputVectors")
     val scanTime = longMetric("scanTime")
     val substraitContext = new SubstraitContext
diff --git 
a/gluten-data/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
 
b/gluten-data/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
index 0eb8141e5..32f8cd880 100644
--- 
a/gluten-data/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
+++ 
b/gluten-data/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala
@@ -29,12 +29,12 @@ class BatchScanMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends Metri
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     if (opMetrics != null) {
       val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
-      metrics("inputRows") += operatorMetrics.inputRows
+      metrics("numInputRows") += operatorMetrics.inputRows
       metrics("inputVectors") += operatorMetrics.inputVectors
       metrics("inputBytes") += operatorMetrics.inputBytes
       metrics("rawInputRows") += operatorMetrics.rawInputRows
       metrics("rawInputBytes") += operatorMetrics.rawInputBytes
-      metrics("outputRows") += operatorMetrics.outputRows
+      metrics("numOutputRows") += operatorMetrics.outputRows
       metrics("outputVectors") += operatorMetrics.outputVectors
       metrics("outputBytes") += operatorMetrics.outputBytes
       metrics("cpuCount") += operatorMetrics.cpuCount
diff --git 
a/gluten-data/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
 
b/gluten-data/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
index 1d57fffd2..e254469f8 100644
--- 
a/gluten-data/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
+++ 
b/gluten-data/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala
@@ -23,7 +23,7 @@ class ExpandMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends MetricsU
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     if (opMetrics != null) {
       val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
-      metrics("outputRows") += operatorMetrics.outputRows
+      metrics("numOutputRows") += operatorMetrics.outputRows
       metrics("outputVectors") += operatorMetrics.outputVectors
       metrics("outputBytes") += operatorMetrics.outputBytes
       metrics("cpuCount") += operatorMetrics.cpuCount
diff --git 
a/gluten-data/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
 
b/gluten-data/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
index cbdb7b9fc..ff8b1a576 100644
--- 
a/gluten-data/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
+++ 
b/gluten-data/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
@@ -28,7 +28,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
 
   val rawInputRows: SQLMetric = metrics("rawInputRows")
   val rawInputBytes: SQLMetric = metrics("rawInputBytes")
-  val outputRows: SQLMetric = metrics("outputRows")
+  val outputRows: SQLMetric = metrics("numOutputRows")
   val outputVectors: SQLMetric = metrics("outputVectors")
   val outputBytes: SQLMetric = metrics("outputBytes")
   val wallNanos: SQLMetric = metrics("wallNanos")
diff --git 
a/gluten-data/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
 
b/gluten-data/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
index f9e95875a..f29931023 100644
--- 
a/gluten-data/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
+++ 
b/gluten-data/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala
@@ -23,7 +23,7 @@ class FilterMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends MetricsU
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     if (opMetrics != null) {
       val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
-      metrics("outputRows") += operatorMetrics.outputRows
+      metrics("numOutputRows") += operatorMetrics.outputRows
       metrics("outputVectors") += operatorMetrics.outputVectors
       metrics("outputBytes") += operatorMetrics.outputBytes
       metrics("cpuCount") += operatorMetrics.cpuCount
diff --git 
a/gluten-data/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
 
b/gluten-data/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
index 26ad731e6..b7a858d44 100644
--- 
a/gluten-data/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
+++ 
b/gluten-data/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala
@@ -23,7 +23,7 @@ class HiveTableScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric]
   extends MetricsUpdater {
   val rawInputRows: SQLMetric = metrics("rawInputRows")
   val rawInputBytes: SQLMetric = metrics("rawInputBytes")
-  val outputRows: SQLMetric = metrics("outputRows")
+  val outputRows: SQLMetric = metrics("numOutputRows")
   val outputVectors: SQLMetric = metrics("outputVectors")
   val outputBytes: SQLMetric = metrics("outputBytes")
   val wallNanos: SQLMetric = metrics("wallNanos")
diff --git 
a/gluten-data/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
 
b/gluten-data/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
index 2b3967ae3..87ca348fa 100644
--- 
a/gluten-data/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
+++ 
b/gluten-data/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala
@@ -26,10 +26,10 @@ case class InputIteratorMetricsUpdater(metrics: Map[String, 
SQLMetric]) extends
       if (operatorMetrics.outputRows == 0 && operatorMetrics.outputVectors == 
0) {
         // Sometimes, velox does not update metrics for intermediate operator,
         // here we try to use the input metrics
-        metrics("outputRows") += operatorMetrics.inputRows
+        metrics("numOutputRows") += operatorMetrics.inputRows
         metrics("outputVectors") += operatorMetrics.inputVectors
       } else {
-        metrics("outputRows") += operatorMetrics.outputRows
+        metrics("numOutputRows") += operatorMetrics.outputRows
         metrics("outputVectors") += operatorMetrics.outputVectors
       }
     }
diff --git 
a/gluten-data/src/main/scala/io/glutenproject/metrics/LimitMetricsUpdater.scala 
b/gluten-data/src/main/scala/io/glutenproject/metrics/LimitMetricsUpdater.scala
index f61f7443f..a3ab24637 100644
--- 
a/gluten-data/src/main/scala/io/glutenproject/metrics/LimitMetricsUpdater.scala
+++ 
b/gluten-data/src/main/scala/io/glutenproject/metrics/LimitMetricsUpdater.scala
@@ -23,7 +23,7 @@ class LimitMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends MetricsUp
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     if (opMetrics != null) {
       val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
-      metrics("outputRows") += operatorMetrics.outputRows
+      metrics("numOutputRows") += operatorMetrics.outputRows
       metrics("outputVectors") += operatorMetrics.outputVectors
       metrics("outputBytes") += operatorMetrics.outputBytes
       metrics("cpuCount") += operatorMetrics.cpuCount
diff --git 
a/gluten-data/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
 
b/gluten-data/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
index b2bb17961..03b41202e 100644
--- 
a/gluten-data/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
+++ 
b/gluten-data/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala
@@ -23,7 +23,7 @@ class ProjectMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends Metrics
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     if (opMetrics != null) {
       val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
-      metrics("outputRows") += operatorMetrics.outputRows
+      metrics("numOutputRows") += operatorMetrics.outputRows
       metrics("outputVectors") += operatorMetrics.outputVectors
       metrics("outputBytes") += operatorMetrics.outputBytes
       metrics("cpuCount") += operatorMetrics.cpuCount
diff --git 
a/gluten-data/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala 
b/gluten-data/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
index 2351b1c9f..38414002d 100644
--- 
a/gluten-data/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
+++ 
b/gluten-data/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala
@@ -23,7 +23,7 @@ class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) 
extends MetricsUpd
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     if (opMetrics != null) {
       val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
-      metrics("outputRows") += operatorMetrics.outputRows
+      metrics("numOutputRows") += operatorMetrics.outputRows
       metrics("outputVectors") += operatorMetrics.outputVectors
       metrics("outputBytes") += operatorMetrics.outputBytes
       metrics("cpuCount") += operatorMetrics.cpuCount
diff --git 
a/gluten-data/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
 
b/gluten-data/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
index 032d7aa1b..7b962294e 100644
--- 
a/gluten-data/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
+++ 
b/gluten-data/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala
@@ -23,7 +23,7 @@ class WindowMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends MetricsU
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     if (opMetrics != null) {
       val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
-      metrics("outputRows") += operatorMetrics.outputRows
+      metrics("numOutputRows") += operatorMetrics.outputRows
       metrics("outputVectors") += operatorMetrics.outputVectors
       metrics("outputBytes") += operatorMetrics.outputBytes
       metrics("cpuCount") += operatorMetrics.cpuCount
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
index 7199ddd70..bd9699e40 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
@@ -128,4 +128,30 @@ class GlutenSQLQuerySuite extends SQLQuerySuite with 
GlutenSQLTestsTrait {
           "Escape character must be followed by '%', '_' or the escape 
character itself"))
     }
   }
+
+  testGluten("StreamingQueryProgress.numInputRows should be correct") {
+    withTempDir {
+      dir =>
+        val path = dir.toURI.getPath
+        val numRows = 20
+        val df = spark.range(0, numRows)
+        df.write.mode("overwrite").format("parquet").save(path)
+        val q = spark.readStream
+          .format("parquet")
+          .schema(df.schema)
+          .load(path)
+          .writeStream
+          .format("memory")
+          .queryName("test")
+          .start()
+        q.processAllAvailable
+        val inputOutputPairs = q.recentProgress.map(p => (p.numInputRows, 
p.sink.numOutputRows))
+
+        // numInputRows and sink.numOutputRows must be the same
+        assert(inputOutputPairs.forall(x => x._1 == x._2))
+
+        // Sum of numInputRows must match the total number of rows of the input
+        assert(inputOutputPairs.map(_._1).sum == numRows)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to