This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 30834b847e7 [SPARK-39157][SQL] H2Dialect should override getJDBCType so as make the data type is correct 30834b847e7 is described below commit 30834b847e7577cf694558d43fb618fc0b1eb09e Author: Jiaan Geng <belie...@163.com> AuthorDate: Fri May 13 22:01:04 2022 +0800 [SPARK-39157][SQL] H2Dialect should override getJDBCType so as make the data type is correct ### What changes were proposed in this pull request? Currently, `H2Dialect` not implement `getJDBCType` of `JdbcDialect`, so the DS V2 push-down will throw exception show below: ``` Job aborted due to stage failure: Task 0 in stage 13.0 failed 1 times, most recent failure: Lost task 0.0 in stage 13.0 (TID 13) (jiaan-gengdembp executor driver): org.h2.jdbc.JdbcSQLNonTransientException: Unknown data type: "STRING"; SQL statement: SELECT "DEPT","NAME","SALARY","BONUS","IS_MANAGER" FROM "test"."employee" WHERE ("BONUS" IS NOT NULL) AND ("DEPT" IS NOT NULL) AND (CAST("BONUS" AS string) LIKE '%30%') AND (CAST("DEPT" AS byte) > 1) AND (CAST("DEPT" AS short) > 1) AND (CAST("BONUS" AS decimal(20,2)) > 1200.00) [50004-210] ``` H2Dialect should implement `getJDBCType` of `JdbcDialect`. ### Why are the changes needed? make the H2 data type is correct. ### Does this PR introduce _any_ user-facing change? 'Yes'. Fix a bug for `H2Dialect`. ### How was this patch tested? New tests. Closes #36516 from beliefer/SPARK-39157. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit fa3f096e02d408fbeab5f69af451ef8bc8f5b3db) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../scala/org/apache/spark/sql/jdbc/H2Dialect.scala | 13 ++++++++++++- .../scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 17 +++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 0aa971c0d3a..56cadbe8e2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.jdbc -import java.sql.SQLException +import java.sql.{SQLException, Types} import java.util.Locale import scala.util.control.NonFatal @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchT import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc} import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType} private object H2Dialect extends JdbcDialect { override def canHandle(url: String): Boolean = @@ -90,6 +92,15 @@ private object H2Dialect extends JdbcDialect { ) } + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { + case StringType => Option(JdbcType("CLOB", Types.CLOB)) + case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN)) + case ShortType | ByteType => Some(JdbcType("SMALLINT", Types.SMALLINT)) + case t: DecimalType => Some( + JdbcType(s"NUMERIC(${t.precision},${t.scale})", Types.NUMERIC)) + case _ => JdbcUtils.getCommonJDBCType(dt) + } + override def classifyException(message: String, e: Throwable): AnalysisException = { e match { case exception: SQLException => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index b6f36b912f8..91526cef507 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -466,6 +466,23 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkFiltersRemoved(df7, false) checkPushedInfo(df7, "PushedFilters: [DEPT IS NOT NULL]") checkAnswer(df7, Seq(Row(6, "jen", 12000, 1200, true))) + + val df8 = sql( + """ + |SELECT * FROM h2.test.employee + |WHERE cast(bonus as string) like '%30%' + |AND cast(dept as byte) > 1 + |AND cast(dept as short) > 1 + |AND cast(bonus as decimal(20, 2)) > 1200""".stripMargin) + checkFiltersRemoved(df8, ansiMode) + val expectedPlanFragment8 = if (ansiMode) { + "PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL, " + + "CAST(BONUS AS string) LIKE '%30%', CAST(DEPT AS byte) > 1, ...," + } else { + "PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL]," + } + checkPushedInfo(df8, expectedPlanFragment8) + checkAnswer(df8, Seq(Row(2, "david", 10000, 1300, true))) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org