EmilyMatt commented on code in PR #2615:
URL: https://github.com/apache/datafusion-comet/pull/2615#discussion_r2450992207


##########
spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala:
##########
@@ -246,29 +246,25 @@ case class CometScanExec(
     }
   }
 
-  override lazy val metrics: Map[String, SQLMetric] = wrapped.metrics ++ {
-    // Tracking scan time has overhead, we can't afford to do it for each row, 
and can only do
-    // it for each batch.
-    if (supportsColumnar) {
-      Map(
-        "scanTime" -> SQLMetrics.createNanoTimingMetric(
-          sparkContext,
-          "scan time")) ++ CometMetricNode.scanMetrics(sparkContext)
-    } else {
-      Map.empty
-    }
-  } ++ {
-    relation.fileFormat match {
-      case f: MetricsSupport => f.initMetrics(sparkContext)
+  override lazy val metrics: Map[String, SQLMetric] =
+    wrapped.driverMetrics ++ (relation.fileFormat match {
+      case m: MetricsSupport => m.getMetrics
       case _ => Map.empty
-    }
-  }
+    })
 
   protected override def doExecute(): RDD[InternalRow] = {
     ColumnarToRowExec(this).doExecute()
   }
 
   protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val rdd = inputRDD.asInstanceOf[RDD[ColumnarBatch]]
+
+    // Can skip the following logic if we're using different metrics to 
calculate this,
+    // e.g., Datafusion reader metrics.
+    if (Seq("numOutputRows", "scanTime").exists(metric => 
!metrics.contains(metric))) {

Review Comment:
   My mistake, I failed to understand the importance of these specific metrics 
and wanted to only use the native reader metrics when possible



##########
spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala:
##########
@@ -246,29 +246,25 @@ case class CometScanExec(
     }
   }
 
-  override lazy val metrics: Map[String, SQLMetric] = wrapped.metrics ++ {
-    // Tracking scan time has overhead, we can't afford to do it for each row, 
and can only do
-    // it for each batch.
-    if (supportsColumnar) {
-      Map(
-        "scanTime" -> SQLMetrics.createNanoTimingMetric(
-          sparkContext,
-          "scan time")) ++ CometMetricNode.scanMetrics(sparkContext)
-    } else {
-      Map.empty
-    }
-  } ++ {
-    relation.fileFormat match {
-      case f: MetricsSupport => f.initMetrics(sparkContext)
+  override lazy val metrics: Map[String, SQLMetric] =
+    wrapped.driverMetrics ++ (relation.fileFormat match {
+      case m: MetricsSupport => m.getMetrics
       case _ => Map.empty
-    }
-  }
+    })
 
   protected override def doExecute(): RDD[InternalRow] = {
     ColumnarToRowExec(this).doExecute()
   }
 
   protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val rdd = inputRDD.asInstanceOf[RDD[ColumnarBatch]]
+
+    // Can skip the following logic if we're using different metrics to 
calculate this,
+    // e.g., Datafusion reader metrics.
+    if (Seq("numOutputRows", "scanTime").exists(metric => 
!metrics.contains(metric))) {

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to