This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 646f9a0 feat: Improve CometSortMergeJoin statistics (#304)
646f9a0 is described below
commit 646f9a021a50dc35a67c46573cbda7956b068c66
Author: Pablo Langa <[email protected]>
AuthorDate: Tue Apr 23 09:52:00 2024 -0400
feat: Improve CometSortMergeJoin statistics (#304)
* Improve CometSortMegeJoin statistics
* Add tests
---------
Co-authored-by: Pablo Langa <[email protected]>
---
.../org/apache/spark/sql/comet/operators.scala | 10 +++++++
.../org/apache/comet/exec/CometExecSuite.scala | 35 +++++++++++++++++++++-
2 files changed, 44 insertions(+), 1 deletion(-)
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 8545eee..1065367 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -727,6 +727,16 @@ case class CometSortMergeJoinExec(
override def hashCode(): Int =
Objects.hashCode(leftKeys, rightKeys, condition, left, right)
+
+ override lazy val metrics: Map[String, SQLMetric] =
+ Map(
+ "input_batches" -> SQLMetrics.createMetric(sparkContext, "Number of
batches consumed"),
+ "input_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows
consumed"),
+ "output_batches" -> SQLMetrics.createMetric(sparkContext, "Number of
batches produced"),
+ "output_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows
produced"),
+ "peak_mem_used" ->
+ SQLMetrics.createSizeMetric(sparkContext, "Peak memory used for
buffered data"),
+ "join_time" -> SQLMetrics.createNanoTimingMetric(sparkContext, "Total
time for joining"))
}
case class CometScanWrapper(override val nativeOp: Operator, override val
originalPlan: SparkPlan)
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 5e99073..cc968a6 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics,
CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Hex
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode
-import org.apache.spark.sql.comet.{CometBroadcastExchangeExec,
CometCollectLimitExec, CometFilterExec, CometHashAggregateExec,
CometProjectExec, CometRowToColumnarExec, CometScanExec,
CometTakeOrderedAndProjectExec}
+import org.apache.spark.sql.comet.{CometBroadcastExchangeExec,
CometCollectLimitExec, CometFilterExec, CometHashAggregateExec,
CometProjectExec, CometRowToColumnarExec, CometScanExec,
CometSortMergeJoinExec, CometTakeOrderedAndProjectExec}
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle,
CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec,
SQLExecution, UnionExec}
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
@@ -275,6 +275,39 @@ class CometExecSuite extends CometTestBase {
}
}
+ test("Comet native metrics: SortMergeJoin") {
+ withSQLConf(
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
+ "spark.sql.autoBroadcastJoinThreshold" -> "0",
+ "spark.sql.join.preferSortMergeJoin" -> "true") {
+ withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl1") {
+ withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl2") {
+ val df = sql("SELECT * FROM tbl1 INNER JOIN tbl2 ON tbl1._1 =
tbl2._1")
+ df.collect()
+
+ val metrics = find(df.queryExecution.executedPlan) {
+ case _: CometSortMergeJoinExec => true
+ case _ => false
+ }.map(_.metrics).get
+
+ assert(metrics.contains("input_batches"))
+ assert(metrics("input_batches").value == 2L)
+ assert(metrics.contains("input_rows"))
+ assert(metrics("input_rows").value == 10L)
+ assert(metrics.contains("output_batches"))
+ assert(metrics("output_batches").value == 1L)
+ assert(metrics.contains("output_rows"))
+ assert(metrics("output_rows").value == 5L)
+ assert(metrics.contains("peak_mem_used"))
+ assert(metrics("peak_mem_used").value > 1L)
+ assert(metrics.contains("join_time"))
+ assert(metrics("join_time").value > 1L)
+ }
+ }
+ }
+ }
+
test(
"fix: ReusedExchangeExec + CometShuffleExchangeExec under QueryStageExec "
+
"should be CometRoot") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]