martin-g commented on code in PR #3842:
URL: https://github.com/apache/datafusion-comet/pull/3842#discussion_r3013790462
##########
spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala:
##########
@@ -139,6 +139,13 @@ private[spark] class CometExecRDD(
ctx.addTaskCompletionListener[Unit] { _ =>
it.close()
subqueries.foreach(sub => CometScalarSubquery.removeSubquery(it.id,
sub))
+
+ nativeMetrics.metrics
+ .get("bytes_scanned")
Review Comment:
Before ac6b869 the logic was looking into the children (and collected just
the first one).
Now it looks only in the root node.
Shouldn't it look into all Scan nodes ? Or maybe the logic should be moved
to CometNativeScanExec#doExecuteColumnar() ?!
##########
spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala:
##########
@@ -91,4 +94,66 @@ class CometTaskMetricsSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
}
}
+
+ test("native_datafusion scan reports task-level input metrics matching
Spark") {
+ withParquetTable((0 until 10000).map(i => (i, (i + 1).toLong)), "tbl") {
+ // Collect baseline input metrics from vanilla Spark (Comet disabled)
+ val (sparkBytes, sparkRecords) =
collectInputMetrics(CometConf.COMET_ENABLED.key -> "false")
+
+ // Collect input metrics from Comet native_datafusion scan
+ val (cometBytes, cometRecords) = collectInputMetrics(
+ CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_DATAFUSION)
+
+ // Records must match exactly
+ assert(
+ cometRecords == sparkRecords,
+ s"recordsRead mismatch: comet=$cometRecords, spark=$sparkRecords")
+
+ // Bytes should be in the same ballpark -- both read the same Parquet
file(s),
+ // but the exact byte count can differ due to reader implementation
details
+ // (e.g. footer reads, page headers, buffering granularity).
+ assert(sparkBytes > 0, s"Spark bytesRead should be > 0, got $sparkBytes")
+ assert(cometBytes > 0, s"Comet bytesRead should be > 0, got $cometBytes")
+ val ratio = cometBytes.toDouble / sparkBytes.toDouble
+ assert(
+ ratio >= 0.8 && ratio <= 1.2,
+ s"bytesRead ratio out of range: comet=$cometBytes, spark=$sparkBytes,
ratio=$ratio")
+ }
+ }
+
+ /**
+ * Runs `SELECT * FROM tbl` with the given SQL config overrides and returns
the aggregated
+ * (bytesRead, recordsRead) across all tasks.
+ */
+ private def collectInputMetrics(confs: (String, String)*): (Long, Long) = {
+ val inputMetricsList = mutable.ArrayBuffer.empty[InputMetrics]
+
+ val listener = new SparkListener {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+ val im = taskEnd.taskMetrics.inputMetrics
+ inputMetricsList.synchronized {
+ inputMetricsList += im
+ }
+ }
+ }
+
+ spark.sparkContext.addSparkListener(listener)
+ try {
+ // Drain any earlier events
+ spark.sparkContext.listenerBus.waitUntilEmpty()
+
+ withSQLConf(confs: _*) {
+ sql("SELECT * FROM tbl").collect()
Review Comment:
```suggestion
sql("SELECT * FROM tbl WHERE _1 > 5000").collect()
```
add a filter to make it more realistic
##########
spark/src/test/scala/org/apache/spark/sql/comet/CometTaskMetricsSuite.scala:
##########
@@ -91,4 +94,66 @@ class CometTaskMetricsSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
}
}
+
+ test("native_datafusion scan reports task-level input metrics matching
Spark") {
+ withParquetTable((0 until 10000).map(i => (i, (i + 1).toLong)), "tbl") {
+ // Collect baseline input metrics from vanilla Spark (Comet disabled)
+ val (sparkBytes, sparkRecords) =
collectInputMetrics(CometConf.COMET_ENABLED.key -> "false")
+
+ // Collect input metrics from Comet native_datafusion scan
+ val (cometBytes, cometRecords) = collectInputMetrics(
+ CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_DATAFUSION)
Review Comment:
```suggestion
val (cometBytes, cometRecords) = collectInputMetrics(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key ->
CometConf.SCAN_NATIVE_DATAFUSION)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]