parthchandra commented on code in PR #2615:
URL: https://github.com/apache/datafusion-comet/pull/2615#discussion_r2449716813


##########
spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala:
##########
@@ -246,29 +246,25 @@ case class CometScanExec(
     }
   }
 
-  override lazy val metrics: Map[String, SQLMetric] = wrapped.metrics ++ {
-    // Tracking scan time has overhead, we can't afford to do it for each row, 
and can only do
-    // it for each batch.
-    if (supportsColumnar) {
-      Map(
-        "scanTime" -> SQLMetrics.createNanoTimingMetric(
-          sparkContext,
-          "scan time")) ++ CometMetricNode.scanMetrics(sparkContext)
-    } else {
-      Map.empty
-    }
-  } ++ {
-    relation.fileFormat match {
-      case f: MetricsSupport => f.initMetrics(sparkContext)
+  override lazy val metrics: Map[String, SQLMetric] =
+    wrapped.driverMetrics ++ (relation.fileFormat match {
+      case m: MetricsSupport => m.getMetrics
       case _ => Map.empty
-    }
-  }
+    })
 
   protected override def doExecute(): RDD[InternalRow] = {
     ColumnarToRowExec(this).doExecute()
   }
 
   protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val rdd = inputRDD.asInstanceOf[RDD[ColumnarBatch]]
+
+    // Can skip the following logic if we're using different metrics to 
calculate this,
+    // e.g., Datafusion reader metrics.
+    if (Seq("numOutputRows", "scanTime").exists(metric => 
!metrics.contains(metric))) {

Review Comment:
   What is this for?



##########
spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala:
##########
@@ -107,6 +107,94 @@ object CometMetricNode {
         SQLMetrics.createNanoTimingMetric(sc, "Total time for casting 
columns"))
   }
 
+  def parquetScanMetrics(sc: SparkContext): Map[String, SQLMetric] = {
+    Map(
+      "numOutputRows" -> SQLMetrics.createMetric(sc, "number of output rows"),
+      "scanTime" -> SQLMetrics.createNanoTimingMetric(sc, "scan time"),
+      "ParquetRowGroups" -> SQLMetrics.createMetric(sc, "num of Parquet row 
groups read"),
+      "ParquetNativeDecodeTime" -> SQLMetrics.createNanoTimingMetric(
+        sc,
+        "time spent in Parquet native decoding"),
+      "ParquetNativeLoadTime" -> SQLMetrics.createNanoTimingMetric(
+        sc,
+        "time spent in loading Parquet native vectors"),
+      "ParquetLoadRowGroupTime" -> SQLMetrics.createNanoTimingMetric(
+        sc,
+        "time spent in loading Parquet row groups"),
+      "ParquetInputFileReadTime" -> SQLMetrics.createNanoTimingMetric(
+        sc,
+        "time spent in reading Parquet file from storage"),
+      "ParquetInputFileReadSize" -> SQLMetrics.createSizeMetric(
+        sc,
+        "read size when reading Parquet file from storage (MB)"),
+      "ParquetInputFileReadThroughput" -> SQLMetrics.createAverageMetric(
+        sc,
+        "read throughput when reading Parquet file from storage (MB/sec)"))
+  }
+
+  def nativeScanMetrics(sc: SparkContext): Map[String, SQLMetric] = {
+    Map(
+      "output_rows" -> SQLMetrics.createMetric(sc, "number of output rows"),

Review Comment:
   Do we not need  the metric to be called `numOutputRows` to match the name 
used in `CometScanExec` (and `CometBatchScanExec` ?



##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -214,7 +217,8 @@ private NativeBatchReader(AbstractColumnReader[] 
columnReaders) {
       boolean useLegacyDateTimestamp,
       StructType partitionSchema,
       InternalRow partitionValues,
-      Map<String, SQLMetric> metrics) {
+      Map<String, SQLMetric> metrics,

Review Comment:
   Do we still need this parameter now? It was a placeholder for eventually 
implementing the metrics.



##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -214,7 +217,8 @@ private NativeBatchReader(AbstractColumnReader[] 
columnReaders) {
       boolean useLegacyDateTimestamp,
       StructType partitionSchema,
       InternalRow partitionValues,
-      Map<String, SQLMetric> metrics) {
+      Map<String, SQLMetric> metrics,
+      Object metricsNode) {

Review Comment:
   I agree, but I don't know if there is a better way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to