This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e6a3385e27fa [SPARK-47044][SQL] Add executed query for JDBC external datasources to explain output e6a3385e27fa is described below commit e6a3385e27fa95391433ea02fa053540fe101d40 Author: Uros Stankovic <uros.stanko...@databricks.com> AuthorDate: Tue Feb 20 22:03:28 2024 +0800 [SPARK-47044][SQL] Add executed query for JDBC external datasources to explain output ### What changes were proposed in this pull request? Add generated JDBC query to EXPLAIN FORMATTED command when physical Scan node should access to JDBC source to create RDD. Output of Explain formatted with this change from newly added test. ``` == Physical Plan == * Project (2) +- * Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$14349389d (1) (1) Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$14349389d [codegen id : 1] Output [1]: [MAX(ID)#x] Arguments: [MAX(ID)#x], [StructField(MAX(ID),IntegerType,true)], PushedDownOperators(Some(org.apache.spark.sql.connector.expressions.aggregate.Aggregation647d3279),None,None,None,List(),ArraySeq(ID IS NOT NULL, ID > 1)), JDBCRDD[0] at $anonfun$executePhase$2 at LexicalThreadLocal.scala:63, org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$14349389d, Statistics(sizeInBytes=8.0 EiB, ColumnStat: N/A) External engine query: SELECT MAX("ID") FROM "test"."people" WHERE ("ID" IS NOT NULL) AND ("ID" > 1) (2) Project [codegen id : 1] Output [1]: [MAX(ID)#x AS max(id)#x] Input [1]: [MAX(ID)#x] ``` ### Why are the changes needed? This command will allow customers to see which query text is sent to external JDBC sources. ### Does this PR introduce _any_ user-facing change? Yes Customer will have another field in EXPLAIN FORMATTED command for RowDataSourceScanExec node. ### How was this patch tested? Tested using JDBC V2 suite by new unit test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45102 from urosstan-db/add-sql-query-for-external-datasources. Authored-by: Uros Stankovic <uros.stanko...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/catalyst/trees/TreeNode.scala | 8 ++-- .../spark/sql/execution/DataSourceScanExec.scala | 10 ++++ .../datasources/ExternalEngineDatasourceRDD.scala | 26 ++++++++++ .../sql/execution/datasources/jdbc/JDBCRDD.scala | 56 ++++++++++++---------- .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 7 +++ 5 files changed, 78 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index dbacb833ef59..10e2718da833 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -1000,12 +1000,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] val str = if (verbose) { if (addSuffix) verboseStringWithSuffix(maxFields) else verboseString(maxFields) + } else if (printNodeId) { + simpleStringWithNodeId() } else { - if (printNodeId) { - simpleStringWithNodeId() - } else { - simpleString(maxFields) - } + simpleString(maxFields) } append(prefix) append(str) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index ec265f4eaea4..474d65a251ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -127,6 +127,16 @@ case class RowDataSourceScanExec( } } + override def verboseStringWithOperatorId(): String = { + super.verboseStringWithOperatorId() + (rdd match { + case externalEngineDatasourceRdd: ExternalEngineDatasourceRDD => + "External engine query: " + + externalEngineDatasourceRdd.getExternalEngineQuery + + System.lineSeparator() + case _ => "" + }) + } + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ExternalEngineDatasourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ExternalEngineDatasourceRDD.scala new file mode 100644 index 000000000000..14ca824596f9 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ExternalEngineDatasourceRDD.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +/** + * Represents a trait that should be implemented by relations which + * access external database engines + */ +trait ExternalEngineDatasourceRDD { + def getExternalEngineQuery: String +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index a436627fd117..395d4a339d90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.expressions.filter.Predicate -import org.apache.spark.sql.execution.datasources.DataSourceMetricsMixin +import org.apache.spark.sql.execution.datasources.{DataSourceMetricsMixin, ExternalEngineDatasourceRDD} import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects} @@ -173,7 +173,7 @@ class JDBCRDD( limit: Int, sortOrders: Array[String], offset: Int) - extends RDD[InternalRow](sc, Nil) with DataSourceMetricsMixin { + extends RDD[InternalRow](sc, Nil) with DataSourceMetricsMixin with ExternalEngineDatasourceRDD { /** * Execution time of the query issued to JDBC connection @@ -182,11 +182,40 @@ class JDBCRDD( sparkContext, name = "JDBC query execution time") + private lazy val dialect = JdbcDialects.get(url) + + def generateJdbcQuery(partition: Option[JDBCPartition]): String = { + // H2's JDBC driver does not support the setSchema() method. We pass a + // fully-qualified table name in the SELECT statement. I don't know how to + // talk about a table in a completely portable way. + var builder = dialect + .getJdbcSQLQueryBuilder(options) + .withPredicates(predicates, partition.getOrElse(JDBCPartition(whereClause = null, idx = 1))) + .withColumns(columns) + .withSortOrders(sortOrders) + .withLimit(limit) + .withOffset(offset) + + groupByColumns.foreach { groupByKeys => + builder = builder.withGroupByColumns(groupByKeys) + } + + sample.foreach { tableSampleInfo => + builder = builder.withTableSample(tableSampleInfo) + } + + builder.build() + } + /** * Retrieve the list of partitions corresponding to this RDD. */ override def getPartitions: Array[Partition] = partitions + override def getExternalEngineQuery: String = { + generateJdbcQuery(partition = None) + } + /** * Runs the SQL query against the JDBC driver. * @@ -236,7 +265,6 @@ class JDBCRDD( val inputMetrics = context.taskMetrics().inputMetrics val part = thePart.asInstanceOf[JDBCPartition] conn = getConnection(part.idx) - val dialect = JdbcDialects.get(url) import scala.jdk.CollectionConverters._ dialect.beforeFetch(conn, options.asProperties.asScala.toMap) @@ -256,27 +284,7 @@ class JDBCRDD( case None => } - // H2's JDBC driver does not support the setSchema() method. We pass a - // fully-qualified table name in the SELECT statement. I don't know how to - // talk about a table in a completely portable way. - - var builder = dialect - .getJdbcSQLQueryBuilder(options) - .withColumns(columns) - .withPredicates(predicates, part) - .withSortOrders(sortOrders) - .withLimit(limit) - .withOffset(offset) - - groupByColumns.foreach { groupByKeys => - builder = builder.withGroupByColumns(groupByKeys) - } - - sample.foreach { tableSampleInfo => - builder = builder.withTableSample(tableSampleInfo) - } - - val sqlText = builder.build() + val sqlText = generateJdbcQuery(Some(part)) stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) stmt.setFetchSize(options.fetchSize) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index c258da07cac9..7bae2d77a161 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalo import org.apache.spark.sql.connector.catalog.functions.{ScalarFunction, UnboundFunction} import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.Expression +import org.apache.spark.sql.execution.FormattedMode import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.functions.{abs, acos, asin, atan, atan2, avg, ceil, coalesce, cos, cosh, cot, count, count_distinct, degrees, exp, floor, lit, log => logarithm, log10, not, pow, radians, round, signum, sin, sinh, sqrt, sum, tan, tanh, udf, when} @@ -3021,4 +3022,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel JdbcDialects.registerDialect(H2Dialect) } } + + test("Explain shows executed SQL query") { + val df = sql("SELECT max(id) FROM h2.test.people WHERE id > 1") + val explained = getNormalizedExplain(df, FormattedMode) + assert(explained.contains("External engine query:")) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org