This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 4ca628eb [SPARK-33867][SQL] Instant and LocalDate values aren't 
handled when generating SQL queries
4ca628eb is described below

commit 4ca628eb2f54c3e039867c5ccbb0cde7413c18e4
Author: Chircu <chi...@arezzosky.com>
AuthorDate: Thu Jan 28 11:58:20 2021 +0900

    [SPARK-33867][SQL] Instant and LocalDate values aren't handled when 
generating SQL queries
    
    ### What changes were proposed in this pull request?
    
    When generating SQL queries only the old date time API types are handled 
for values in org.apache.spark.sql.jdbc.JdbcDialect#compileValue. If the new 
API is used (spark.sql.datetime.java8API.enabled=true) Instant and LocalDate 
values are not quoted and errors are thrown. The change proposed is to handle 
Instant and LocalDate values the same way that Timestamp and Date are.
    
    ### Why are the changes needed?
    
    In the current state if an Instant is used in a filter, an exception will 
be thrown.
    Ex (dataset was read from PostgreSQL): 
dataset.filter(current_timestamp().gt(col(VALID_FROM)))
    Stacktrace (the T11 is from an instant formatted like 
yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'):
    Caused by: org.postgresql.util.PSQLException: ERROR: syntax error at or 
near "T11"Caused by: org.postgresql.util.PSQLException: ERROR: syntax error at 
or near "T11"  Position: 285 at 
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2103)
 at 
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1836)
 at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:257) at 
org.postgresql.jdbc2.AbstractJdbc2Statement. [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Test added
    
    Closes #31148 from cristichircu/SPARK-33867.
    
    Lead-authored-by: Chircu <chi...@arezzosky.com>
    Co-authored-by: Cristi Chircu <chi...@arezzosky.com>
    Signed-off-by: Takeshi Yamamuro <yamam...@apache.org>
    (cherry picked from commit 829f118f98ef0732c8dd784f06298465e47ee3a0)
    Signed-off-by: Takeshi Yamamuro <yamam...@apache.org>
---
 .../scala/org/apache/spark/sql/jdbc/JdbcDialects.scala     | 10 ++++++++++
 .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala   | 14 ++++++++++++++
 2 files changed, 24 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index ead0a1a..6c72172 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.jdbc
 
 import java.sql.{Connection, Date, Timestamp}
+import java.time.{Instant, LocalDate}
 
 import scala.collection.mutable.ArrayBuilder
 
@@ -26,9 +27,11 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.spark.annotation.{DeveloperApi, Since}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, 
TimestampFormatter}
 import org.apache.spark.sql.connector.catalog.TableChange
 import org.apache.spark.sql.connector.catalog.TableChange._
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 /**
@@ -175,7 +178,14 @@ abstract class JdbcDialect extends Serializable with 
Logging{
   def compileValue(value: Any): Any = value match {
     case stringValue: String => s"'${escapeSql(stringValue)}'"
     case timestampValue: Timestamp => "'" + timestampValue + "'"
+    case timestampValue: Instant =>
+      val timestampFormatter = TimestampFormatter.getFractionFormatter(
+        DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+      s"'${timestampFormatter.format(timestampValue)}'"
     case dateValue: Date => "'" + dateValue + "'"
+    case dateValue: LocalDate =>
+      val dateFormatter = 
DateFormatter(DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
+      s"'${dateFormatter.format(dateValue)}'"
     case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
     case _ => value
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index b81824d..70f5508 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc
 
 import java.math.BigDecimal
 import java.sql.{Date, DriverManager, SQLException, Timestamp}
+import java.time.{Instant, LocalDate}
 import java.util.{Calendar, GregorianCalendar, Properties}
 
 import scala.collection.JavaConverters._
@@ -1005,6 +1006,19 @@ class JDBCSuite extends QueryTest
       === java.sql.Timestamp.valueOf("2002-02-20 11:22:33.543543"))
   }
 
+  test("SPARK-33867: Test DataFrame.where for LocalDate and Instant") {
+    // Test for SPARK-33867
+    val timestamp = Instant.parse("2001-02-20T11:22:33.543543Z")
+    val date = LocalDate.parse("1995-01-01")
+    withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
+      val jdbcDf = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new 
Properties())
+      val rows = jdbcDf.where($"B" > date && $"C" > timestamp).collect()
+      assert(rows(0).getAs[LocalDate](1) === LocalDate.parse("1996-01-01"))
+      // 8 hour difference since saved time was America/Los_Angeles and 
Instant is GMT
+      assert(rows(0).getAs[Instant](2) === 
Instant.parse("2002-02-20T19:22:33.543543Z"))
+    }
+  }
+
   test("test credentials in the properties are not in plan output") {
     val df = sql("SELECT * FROM parts")
     val explain = ExplainCommand(df.queryExecution.logical, ExtendedMode)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to