This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 22586efb3cd4 [SPARK-46933][SQL] Add query execution time metric to 
connectors which use JDBCRDD
22586efb3cd4 is described below

commit 22586efb3cd4a19969d92b249a14326fba3244d2
Author: Uros Stankovic <uros.stanko...@databricks.com>
AuthorDate: Thu Feb 1 22:48:50 2024 +0800

    [SPARK-46933][SQL] Add query execution time metric to connectors which use 
JDBCRDD
    
    ### What changes were proposed in this pull request?
    This pull request should add measuring query execution time on external 
JDBC data source.
    Another change is changing access right for JDBCRDD class, that is needed 
for adding another metric (SQL text) which will be done in some next PR.
    
    ### Why are the changes needed?
    Query execution time is very important metric to have
    
    ### Does this PR introduce _any_ user-facing change?
    User can see query execution time on SparkPlan graph under node metrics tab
    
    ### How was this patch tested?
    Tested using custom image
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #44969 from 
urosstan-db/SPARK-46933-Add-scan-metrics-to-jdbc-connector.
    
    Lead-authored-by: Uros Stankovic <uros.stanko...@databricks.com>
    Co-authored-by: Uros Stankovic 
<155642965+urosstan...@users.noreply.github.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/execution/DataSourceScanExec.scala   | 17 ++++++++++++--
 .../datasources/DataSourceMetricsMixin.scala       | 24 ++++++++++++++++++++
 .../sql/execution/datasources/jdbc/JDBCRDD.scala   | 26 ++++++++++++++++++++--
 3 files changed, 63 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 2622eadaefb3..ec265f4eaea4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -99,6 +99,10 @@ trait DataSourceScanExec extends LeafExecNode {
   def inputRDDs(): Seq[RDD[InternalRow]]
 }
 
+object DataSourceScanExec {
+  val numOutputRowsKey = "numOutputRows"
+}
+
 /** Physical plan node for scanning data from a relation. */
 case class RowDataSourceScanExec(
     output: Seq[Attribute],
@@ -111,8 +115,17 @@ case class RowDataSourceScanExec(
     tableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec with InputRDDCodegen {
 
-  override lazy val metrics =
-    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
+  override lazy val metrics: Map[String, SQLMetric] = {
+    val metrics = Map(
+      DataSourceScanExec.numOutputRowsKey ->
+        SQLMetrics.createMetric(sparkContext, "number of output rows")
+    )
+
+    rdd match {
+      case rddWithDSMetrics: DataSourceMetricsMixin => metrics ++ 
rddWithDSMetrics.getMetrics
+      case _ => metrics
+    }
+  }
 
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceMetricsMixin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceMetricsMixin.scala
new file mode 100644
index 000000000000..6c1e5e876e99
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceMetricsMixin.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+trait DataSourceMetricsMixin {
+  def getMetrics: Seq[(String, SQLMetric)]
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 934ed9ac2a1b..a436627fd117 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -26,7 +26,9 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.execution.datasources.DataSourceMetricsMixin
 import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.CompletionIterator
@@ -157,7 +159,7 @@ object JDBCRDD extends Logging {
  * Both the driver code and the workers must be able to access the database; 
the driver
  * needs to fetch the schema while the workers need to fetch the data.
  */
-private[jdbc] class JDBCRDD(
+class JDBCRDD(
     sc: SparkContext,
     getConnection: Int => Connection,
     schema: StructType,
@@ -171,7 +173,14 @@ private[jdbc] class JDBCRDD(
     limit: Int,
     sortOrders: Array[String],
     offset: Int)
-  extends RDD[InternalRow](sc, Nil) {
+  extends RDD[InternalRow](sc, Nil) with DataSourceMetricsMixin {
+
+  /**
+   * Execution time of the query issued to JDBC connection
+   */
+  val queryExecutionTimeMetric: SQLMetric = SQLMetrics.createNanoTimingMetric(
+    sparkContext,
+    name = "JDBC query execution time")
 
   /**
    * Retrieve the list of partitions corresponding to this RDD.
@@ -272,11 +281,24 @@ private[jdbc] class JDBCRDD(
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
     stmt.setQueryTimeout(options.queryTimeout)
+
+    val startTime = System.nanoTime
     rs = stmt.executeQuery()
+    val endTime = System.nanoTime
+
+    val executionTime = endTime - startTime
+    queryExecutionTimeMetric.add(executionTime)
+
     val rowsIterator =
       JdbcUtils.resultSetToSparkInternalRows(rs, dialect, schema, inputMetrics)
 
     CompletionIterator[InternalRow, Iterator[InternalRow]](
       new InterruptibleIterator(context, rowsIterator), close())
   }
+
+  override def getMetrics: Seq[(String, SQLMetric)] = {
+    Seq(
+      "queryExecutionTime" -> queryExecutionTimeMetric
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to