This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 30834b847e7 [SPARK-39157][SQL] H2Dialect should override getJDBCType 
so as make the data type is correct
30834b847e7 is described below

commit 30834b847e7577cf694558d43fb618fc0b1eb09e
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Fri May 13 22:01:04 2022 +0800

    [SPARK-39157][SQL] H2Dialect should override getJDBCType so as make the 
data type is correct
    
    ### What changes were proposed in this pull request?
    Currently, `H2Dialect` not implement `getJDBCType` of `JdbcDialect`, so the 
DS V2 push-down will throw exception show below:
    ```
    Job aborted due to stage failure: Task 0 in stage 13.0 failed 1 times, most 
recent failure: Lost task 0.0 in stage 13.0 (TID 13) (jiaan-gengdembp executor 
driver):
     org.h2.jdbc.JdbcSQLNonTransientException: Unknown data type: "STRING"; SQL 
statement:
    SELECT "DEPT","NAME","SALARY","BONUS","IS_MANAGER" FROM "test"."employee"  
WHERE ("BONUS" IS NOT NULL) AND ("DEPT" IS NOT NULL) AND (CAST("BONUS" AS 
string) LIKE '%30%') AND (CAST("DEPT" AS byte) > 1) AND (CAST("DEPT" AS short) 
> 1) AND (CAST("BONUS" AS decimal(20,2)) > 1200.00)    [50004-210]
    ```
    H2Dialect should implement `getJDBCType` of `JdbcDialect`.
    
    ### Why are the changes needed?
     make the H2 data type is correct.
    
    ### Does this PR introduce _any_ user-facing change?
    'Yes'.
    Fix a bug for `H2Dialect`.
    
    ### How was this patch tested?
    New tests.
    
    Closes #36516 from beliefer/SPARK-39157.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit fa3f096e02d408fbeab5f69af451ef8bc8f5b3db)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../scala/org/apache/spark/sql/jdbc/H2Dialect.scala     | 13 ++++++++++++-
 .../scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala   | 17 +++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 0aa971c0d3a..56cadbe8e2c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.SQLException
+import java.sql.{SQLException, Types}
 import java.util.Locale
 
 import scala.util.control.NonFatal
@@ -27,6 +27,8 @@ import 
org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchT
 import org.apache.spark.sql.connector.expressions.Expression
 import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, 
GeneralAggregateFunc}
 import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, 
DecimalType, ShortType, StringType}
 
 private object H2Dialect extends JdbcDialect {
   override def canHandle(url: String): Boolean =
@@ -90,6 +92,15 @@ private object H2Dialect extends JdbcDialect {
     )
   }
 
+  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+    case StringType => Option(JdbcType("CLOB", Types.CLOB))
+    case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN))
+    case ShortType | ByteType => Some(JdbcType("SMALLINT", Types.SMALLINT))
+    case t: DecimalType => Some(
+      JdbcType(s"NUMERIC(${t.precision},${t.scale})", Types.NUMERIC))
+    case _ => JdbcUtils.getCommonJDBCType(dt)
+  }
+
   override def classifyException(message: String, e: Throwable): 
AnalysisException = {
     e match {
       case exception: SQLException =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index b6f36b912f8..91526cef507 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -466,6 +466,23 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
         checkFiltersRemoved(df7, false)
         checkPushedInfo(df7, "PushedFilters: [DEPT IS NOT NULL]")
         checkAnswer(df7, Seq(Row(6, "jen", 12000, 1200, true)))
+
+        val df8 = sql(
+          """
+            |SELECT * FROM h2.test.employee
+            |WHERE cast(bonus as string) like '%30%'
+            |AND cast(dept as byte) > 1
+            |AND cast(dept as short) > 1
+            |AND cast(bonus as decimal(20, 2)) > 1200""".stripMargin)
+        checkFiltersRemoved(df8, ansiMode)
+        val expectedPlanFragment8 = if (ansiMode) {
+          "PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL, " +
+            "CAST(BONUS AS string) LIKE '%30%', CAST(DEPT AS byte) > 1, ...,"
+        } else {
+          "PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL],"
+        }
+        checkPushedInfo(df8, expectedPlanFragment8)
+        checkAnswer(df8, Seq(Row(2, "david", 10000, 1300, true)))
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to