EmilyMatt commented on code in PR #2615:
URL: https://github.com/apache/datafusion-comet/pull/2615#discussion_r2448229340
##########
spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala:
##########
@@ -203,7 +203,7 @@ case class CometScanRule(session: SparkSession) extends
Rule[SparkPlan] with Com
return withInfos(scanExec, fallbackReasons.toSet)
}
- if (scanImpl != CometConf.SCAN_NATIVE_COMET &&
encryptionEnabled(hadoopConf)) {
+ if (encryptionEnabled(hadoopConf) && scanImpl !=
CometConf.SCAN_NATIVE_COMET) {
Review Comment:
This had me perplexed for a bit so just reversed the ordering it makes it
much more understandable imo
##########
spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala:
##########
@@ -51,6 +51,14 @@ case class CometBatchScanExec(wrapped: BatchScanExec,
runtimeFilters: Seq[Expres
override lazy val inputRDD: RDD[InternalRow] = wrappedScan.inputRDD
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val rdd = inputRDD.asInstanceOf[RDD[ColumnarBatch]]
+
+ // Can skip the following logic if we're using different metrics to
calculate this,
+ // e.g., Datafusion reader metrics.
+ if (Seq("numOutputRows", "scanTime").exists(metric =>
!metrics.contains(metric))) {
Review Comment:
This could have been resolved in a number of ways but in the end I just
chose the safest one
##########
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java:
##########
@@ -214,7 +217,8 @@ private NativeBatchReader(AbstractColumnReader[]
columnReaders) {
boolean useLegacyDateTimestamp,
StructType partitionSchema,
InternalRow partitionValues,
- Map<String, SQLMetric> metrics) {
+ Map<String, SQLMetric> metrics,
+ Object metricsNode) {
Review Comment:
Honestly using the Object here is really itchy but under the constraints it
was the best way
##########
spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala:
##########
@@ -246,29 +246,25 @@ case class CometScanExec(
}
}
- override lazy val metrics: Map[String, SQLMetric] = wrapped.metrics ++ {
- // Tracking scan time has overhead, we can't afford to do it for each row,
and can only do
- // it for each batch.
- if (supportsColumnar) {
- Map(
- "scanTime" -> SQLMetrics.createNanoTimingMetric(
- sparkContext,
- "scan time")) ++ CometMetricNode.scanMetrics(sparkContext)
- } else {
- Map.empty
- }
- } ++ {
- relation.fileFormat match {
- case f: MetricsSupport => f.initMetrics(sparkContext)
+ override lazy val metrics: Map[String, SQLMetric] =
+ wrapped.driverMetrics ++ (relation.fileFormat match {
+ case m: MetricsSupport => m.getMetrics
case _ => Map.empty
- }
- }
+ })
protected override def doExecute(): RDD[InternalRow] = {
ColumnarToRowExec(this).doExecute()
}
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val rdd = inputRDD.asInstanceOf[RDD[ColumnarBatch]]
+
+ // Can skip the following logic if we're using different metrics to
calculate this,
+ // e.g., Datafusion reader metrics.
+ if (Seq("numOutputRows", "scanTime").exists(metric =>
!metrics.contains(metric))) {
Review Comment:
Likewise
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]