This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 869da2d  feat: Improve CometHashJoin statistics (#309)
869da2d is described below

commit 869da2dcf21db4e63eff6959cc67670cf731530a
Author: Pablo Langa <[email protected]>
AuthorDate: Wed Apr 24 14:26:42 2024 -0400

    feat: Improve CometHashJoin statistics (#309)
    
    * HashMergeJoin metrics
    
    * HashMergeJoin metrics test
    
    * Fix test
    
    * Fix format
    
    * Fix descriptions
    
    * Fix imports
    
    * Update spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    
    * delete conf
    
    * Fix
    
    ---------
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
---
 .../org/apache/spark/sql/comet/operators.scala     | 20 +++++++++++++
 .../org/apache/comet/exec/CometExecSuite.scala     | 35 +++++++++++++++++++++-
 2 files changed, 54 insertions(+), 1 deletion(-)

diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 571ec22..a857975 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -694,6 +694,26 @@ case class CometHashJoinExec(
 
   override def hashCode(): Int =
     Objects.hashCode(leftKeys, rightKeys, condition, buildSide, left, right)
+
+  override lazy val metrics: Map[String, SQLMetric] =
+    Map(
+      "build_time" ->
+        SQLMetrics.createNanoTimingMetric(
+          sparkContext,
+          "Total time for collecting build-side of join"),
+      "build_input_batches" ->
+        SQLMetrics.createMetric(sparkContext, "Number of batches consumed by 
build-side"),
+      "build_input_rows" ->
+        SQLMetrics.createMetric(sparkContext, "Number of rows consumed by 
build-side"),
+      "build_mem_used" ->
+        SQLMetrics.createSizeMetric(sparkContext, "Memory used by build-side"),
+      "input_batches" ->
+        SQLMetrics.createMetric(sparkContext, "Number of batches consumed by 
probe-side"),
+      "input_rows" ->
+        SQLMetrics.createMetric(sparkContext, "Number of rows consumed by 
probe-side"),
+      "output_batches" -> SQLMetrics.createMetric(sparkContext, "Number of 
batches produced"),
+      "output_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows 
produced"),
+      "join_time" -> SQLMetrics.createNanoTimingMetric(sparkContext, "Total 
time for joining"))
 }
 
 case class CometBroadcastHashJoinExec(
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 264ea4c..e5b3523 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, 
CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.Hex
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode
-import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, 
CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, 
CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortExec, 
CometSortMergeJoinExec, CometTakeOrderedAndProjectExec}
+import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, 
CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, 
CometHashJoinExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, 
CometSortExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec}
 import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, 
CometShuffleExchangeExec}
 import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, 
SQLExecution, UnionExec}
 import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
@@ -331,6 +331,39 @@ class CometExecSuite extends CometTestBase {
     }
   }
 
+  test("Comet native metrics: HashJoin") {
+    withParquetTable((0 until 5).map(i => (i, i + 1)), "t1") {
+      withParquetTable((0 until 5).map(i => (i, i + 1)), "t2") {
+        val df = sql("SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 
ON t1._1 = t2._1")
+        df.collect()
+
+        val metrics = find(df.queryExecution.executedPlan) {
+          case _: CometHashJoinExec => true
+          case _ => false
+        }.map(_.metrics).get
+
+        assert(metrics.contains("build_time"))
+        assert(metrics("build_time").value > 1L)
+        assert(metrics.contains("build_input_batches"))
+        assert(metrics("build_input_batches").value == 5L)
+        assert(metrics.contains("build_mem_used"))
+        assert(metrics("build_mem_used").value > 1L)
+        assert(metrics.contains("build_input_rows"))
+        assert(metrics("build_input_rows").value == 5L)
+        assert(metrics.contains("input_batches"))
+        assert(metrics("input_batches").value == 5L)
+        assert(metrics.contains("input_rows"))
+        assert(metrics("input_rows").value == 5L)
+        assert(metrics.contains("output_batches"))
+        assert(metrics("output_batches").value == 5L)
+        assert(metrics.contains("output_rows"))
+        assert(metrics("output_rows").value == 5L)
+        assert(metrics.contains("join_time"))
+        assert(metrics("join_time").value > 1L)
+      }
+    }
+  }
+
   test(
     "fix: ReusedExchangeExec + CometShuffleExchangeExec under QueryStageExec " 
+
       "should be CometRoot") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to