Kevin Appel created SPARK-37259: ----------------------------------- Summary: JDBC read is always going to wrap the query in a select statement Key: SPARK-37259 URL: https://issues.apache.org/jira/browse/SPARK-37259 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.2 Reporter: Kevin Appel
The read jdbc is wrapping the query it sends to the database server inside a select statement and there is no way to override this currently. Initially I ran into this issue when trying to run a CTE query against SQL server and it fails, the details of the failure is in these cases: [https://github.com/microsoft/mssql-jdbc/issues/1340] [https://github.com/microsoft/mssql-jdbc/issues/1657] [https://github.com/microsoft/sql-spark-connector/issues/147] https://issues.apache.org/jira/browse/SPARK-32825 https://issues.apache.org/jira/browse/SPARK-34928 I started to patch the code to get the query to run and ran into a few different items, if there is a way to add these features to allow this code path to run, this would be extremely helpful to running these type of edge case queries. These are basic examples here the actual queries are much more complex and would require significant time to rewrite. Inside JDBCOptions.scala the query is being set to either, using the dbtable this allows the query to be passed without modification {code:java} name.trim or s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}" {code} Inside JDBCRelation.scala this is going to try to get the schema for this query, and this ends up running dialect.getSchemaQuery which is doing: {code:java} s"SELECT * FROM $table WHERE 1=0"{code} Overriding the dialect here and initially just passing back the $table gets passed here and to the next issue which is in the compute function in JDBCRDD.scala {code:java} val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} $myTableSampleClause" + s" $myWhereClause $getGroupByClause $myLimitClause" {code} For these two queries, about a CTE query and using temp tables, finding out the schema is difficult without actually running the query and for the temp table if you run it in the schema check that will have the table now exist and fail when it runs the actual query. The way I patched these is by doing these two items: JDBCRDD.scala (compute) {code:java} val runQueryAsIs = options.parameters.getOrElse("runQueryAsIs", "false").toBoolean val sqlText = if (runQueryAsIs) { s"${options.tableOrQuery}" } else { s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause" } JDBC {code} Relation.scala (getSchema) {code:java} val useCustomSchema = jdbcOptions.parameters.getOrElse("useCustomSchema", "false").toBoolean if (useCustomSchema) { val myCustomSchema = jdbcOptions.parameters.getOrElse("customSchema", "").toString val newSchema = CatalystSqlParser.parseTableSchema(myCustomSchema) logInfo(s"Going to return the new $newSchema because useCustomSchema is $useCustomSchema and passed in $myCustomSchema") newSchema } else { val tableSchema = JDBCRDD.resolveTable(jdbcOptions) jdbcOptions.customSchema match { case Some(customSchema) => JdbcUtils.getCustomSchema( tableSchema, customSchema, resolver) case None => tableSchema } }{code} This is allowing the query to run as is, by using the dbtable option and then provide a custom schema that will bypass the dialect schema check Test queries {code:java} query1 = """ SELECT 1 as DummyCOL """ query2 = """ WITH DummyCTE AS ( SELECT 1 as DummyCOL ) SELECT * FROM DummyCTE """ query3 = """ (SELECT * INTO #Temp1a FROM (SELECT @@VERSION as version) data ) (SELECT * FROM #Temp1a) """ {code} Test schema {code:java} schema1 = """ DummyXCOL INT """ schema2 = """ DummyXCOL STRING """ {code} Test code {code:java} jdbcDFWorking = ( spark.read.format("jdbc") .option("url", f"jdbc:sqlserver://{server}:{port};databaseName={database};") .option("user", user) .option("password", password) .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") .option("dbtable", queryx) .option("customSchema", schemax) .option("useCustomSchema", "true") .option("runQueryAsIs", "true") .load() ) {code} Currently we ran into this on these two special SQL server queries however we aren't sure if there is other DB's we are using that we haven't hit this type of issue yet, without going through this I didn't realize the query is always wrapped in the SELECT no matter what you do. This is on the Spark 3.1.2 and using the PySpark with the Python 3.7.11 Thank you for your consideration and assistance to a way to fix this Kevin -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org