This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f15414e9180 [SPARK-42534][SQL] Fix DB2Dialect Limit clause f15414e9180 is described below commit f15414e91805e050dac8e4624298a2047ab2c5e9 Author: Ivan Sadikov <ivan.sadi...@databricks.com> AuthorDate: Fri Feb 24 10:19:46 2023 +0800 [SPARK-42534][SQL] Fix DB2Dialect Limit clause ### What changes were proposed in this pull request? The PR fixes DB2 Limit clause syntax. Although DB2 supports LIMIT keyword, it seems that this support varies across databases and versions and the recommended way is to use `FETCH FIRST x ROWS ONLY`. In fact, some versions don't support LIMIT at all. Doc: https://www.ibm.com/docs/en/db2/11.5?topic=subselect-fetch-clause, usage example: https://www.mullinsconsulting.com/dbu_0502.htm. ### Why are the changes needed? Fixes the incorrect Limit clause which could cause errors when using against DB2 versions that don't support LIMIT. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a unit test and an integration test to cover this functionality. Closes #40134 from sadikovi/db2-limit-fix. Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/jdbc/DB2IntegrationSuite.scala | 21 +++++++++++++++++++++ .../org/apache/spark/sql/jdbc/DB2Dialect.scala | 4 ++++ .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 4 ++-- .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 13 +++++++++++++ 4 files changed, 40 insertions(+), 2 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala index 6cee6622e1c..e4251512e43 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala @@ -217,4 +217,25 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite { assert(actual.length === 2) assert(actual.toSet === expectedResult) } + + test("SPARK-42534: DB2 Limit pushdown test") { + val actual = sqlContext.read + .format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "tbl") + .load() + .limit(2) + .select("x", "y") + .orderBy("x") + .collect() + + val expected = sqlContext.read + .format("jdbc") + .option("url", jdbcUrl) + .option("query", "SELECT x, y FROM tbl ORDER BY x FETCH FIRST 2 ROWS ONLY") + .load() + .collect() + + assert(actual === expected) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 6c7c1bfe737..5889be880dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -160,4 +160,8 @@ private object DB2Dialect extends JdbcDialect { s"DROP SCHEMA ${quoteIdentifier(schema)} RESTRICT" } } + + override def getLimitClause(limit: Integer): String = { + if (limit > 0) s"FETCH FIRST $limit ROWS ONLY" else "" + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 2e9477356e6..855fa6857af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -541,14 +541,14 @@ abstract class JdbcDialect extends Serializable with Logging { * Returns the LIMIT clause for the SELECT statement */ def getLimitClause(limit: Integer): String = { - if (limit > 0 ) s"LIMIT $limit" else "" + if (limit > 0) s"LIMIT $limit" else "" } /** * Returns the OFFSET clause for the SELECT statement */ def getOffsetClause(offset: Integer): String = { - if (offset > 0 ) s"OFFSET $offset" else "" + if (offset > 0) s"OFFSET $offset" else "" } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 27609de5433..34c5fada19c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1028,6 +1028,19 @@ class JDBCSuite extends QueryTest with SharedSparkSession { "SELECT TOP (123) a,b FROM test") } + test("SPARK-42534: DB2Dialect Limit query test") { + // JDBC url is a required option but is not used in this test. + val options = new JDBCOptions(Map("url" -> "jdbc:db2://host:port", "dbtable" -> "test")) + assert( + DB2Dialect + .getJdbcSQLQueryBuilder(options) + .withColumns(Array("a", "b")) + .withLimit(123) + .build() + .trim() == + "SELECT a,b FROM test FETCH FIRST 123 ROWS ONLY") + } + test("table exists query by jdbc dialect") { val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db") val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org