EmilyMatt commented on code in PR #2615:
URL: https://github.com/apache/datafusion-comet/pull/2615#discussion_r2450992207
##########
spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala:
##########
@@ -246,29 +246,25 @@ case class CometScanExec(
}
}
- override lazy val metrics: Map[String, SQLMetric] = wrapped.metrics ++ {
- // Tracking scan time has overhead, we can't afford to do it for each row,
and can only do
- // it for each batch.
- if (supportsColumnar) {
- Map(
- "scanTime" -> SQLMetrics.createNanoTimingMetric(
- sparkContext,
- "scan time")) ++ CometMetricNode.scanMetrics(sparkContext)
- } else {
- Map.empty
- }
- } ++ {
- relation.fileFormat match {
- case f: MetricsSupport => f.initMetrics(sparkContext)
+ override lazy val metrics: Map[String, SQLMetric] =
+ wrapped.driverMetrics ++ (relation.fileFormat match {
+ case m: MetricsSupport => m.getMetrics
case _ => Map.empty
- }
- }
+ })
protected override def doExecute(): RDD[InternalRow] = {
ColumnarToRowExec(this).doExecute()
}
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val rdd = inputRDD.asInstanceOf[RDD[ColumnarBatch]]
+
+ // Can skip the following logic if we're using different metrics to
calculate this,
+ // e.g., Datafusion reader metrics.
+ if (Seq("numOutputRows", "scanTime").exists(metric =>
!metrics.contains(metric))) {
Review Comment:
My mistake, I failed to understand the importance of these specific metrics
and wanted to only use the native reader metrics when possible
##########
spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala:
##########
@@ -246,29 +246,25 @@ case class CometScanExec(
}
}
- override lazy val metrics: Map[String, SQLMetric] = wrapped.metrics ++ {
- // Tracking scan time has overhead, we can't afford to do it for each row,
and can only do
- // it for each batch.
- if (supportsColumnar) {
- Map(
- "scanTime" -> SQLMetrics.createNanoTimingMetric(
- sparkContext,
- "scan time")) ++ CometMetricNode.scanMetrics(sparkContext)
- } else {
- Map.empty
- }
- } ++ {
- relation.fileFormat match {
- case f: MetricsSupport => f.initMetrics(sparkContext)
+ override lazy val metrics: Map[String, SQLMetric] =
+ wrapped.driverMetrics ++ (relation.fileFormat match {
+ case m: MetricsSupport => m.getMetrics
case _ => Map.empty
- }
- }
+ })
protected override def doExecute(): RDD[InternalRow] = {
ColumnarToRowExec(this).doExecute()
}
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val rdd = inputRDD.asInstanceOf[RDD[ColumnarBatch]]
+
+ // Can skip the following logic if we're using different metrics to
calculate this,
+ // e.g., Datafusion reader metrics.
+ if (Seq("numOutputRows", "scanTime").exists(metric =>
!metrics.contains(metric))) {
Review Comment:
Removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]