This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 4ca628eb [SPARK-33867][SQL] Instant and LocalDate values aren't handled when generating SQL queries 4ca628eb is described below commit 4ca628eb2f54c3e039867c5ccbb0cde7413c18e4 Author: Chircu <chi...@arezzosky.com> AuthorDate: Thu Jan 28 11:58:20 2021 +0900 [SPARK-33867][SQL] Instant and LocalDate values aren't handled when generating SQL queries ### What changes were proposed in this pull request? When generating SQL queries only the old date time API types are handled for values in org.apache.spark.sql.jdbc.JdbcDialect#compileValue. If the new API is used (spark.sql.datetime.java8API.enabled=true) Instant and LocalDate values are not quoted and errors are thrown. The change proposed is to handle Instant and LocalDate values the same way that Timestamp and Date are. ### Why are the changes needed? In the current state if an Instant is used in a filter, an exception will be thrown. Ex (dataset was read from PostgreSQL): dataset.filter(current_timestamp().gt(col(VALID_FROM))) Stacktrace (the T11 is from an instant formatted like yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'): Caused by: org.postgresql.util.PSQLException: ERROR: syntax error at or near "T11"Caused by: org.postgresql.util.PSQLException: ERROR: syntax error at or near "T11" Position: 285 at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2103) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1836) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:257) at org.postgresql.jdbc2.AbstractJdbc2Statement. [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test added Closes #31148 from cristichircu/SPARK-33867. Lead-authored-by: Chircu <chi...@arezzosky.com> Co-authored-by: Cristi Chircu <chi...@arezzosky.com> Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> (cherry picked from commit 829f118f98ef0732c8dd784f06298465e47ee3a0) Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> --- .../scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 10 ++++++++++ .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 14 ++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index ead0a1a..6c72172 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.jdbc import java.sql.{Connection, Date, Timestamp} +import java.time.{Instant, LocalDate} import scala.collection.mutable.ArrayBuilder @@ -26,9 +27,11 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -175,7 +178,14 @@ abstract class JdbcDialect extends Serializable with Logging{ def compileValue(value: Any): Any = value match { case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "'" + timestampValue + "'" + case timestampValue: Instant => + val timestampFormatter = TimestampFormatter.getFractionFormatter( + DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)) + s"'${timestampFormatter.format(timestampValue)}'" case dateValue: Date => "'" + dateValue + "'" + case dateValue: LocalDate => + val dateFormatter = DateFormatter(DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)) + s"'${dateFormatter.format(dateValue)}'" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index b81824d..70f5508 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal import java.sql.{Date, DriverManager, SQLException, Timestamp} +import java.time.{Instant, LocalDate} import java.util.{Calendar, GregorianCalendar, Properties} import scala.collection.JavaConverters._ @@ -1005,6 +1006,19 @@ class JDBCSuite extends QueryTest === java.sql.Timestamp.valueOf("2002-02-20 11:22:33.543543")) } + test("SPARK-33867: Test DataFrame.where for LocalDate and Instant") { + // Test for SPARK-33867 + val timestamp = Instant.parse("2001-02-20T11:22:33.543543Z") + val date = LocalDate.parse("1995-01-01") + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") { + val jdbcDf = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties()) + val rows = jdbcDf.where($"B" > date && $"C" > timestamp).collect() + assert(rows(0).getAs[LocalDate](1) === LocalDate.parse("1996-01-01")) + // 8 hour difference since saved time was America/Los_Angeles and Instant is GMT + assert(rows(0).getAs[Instant](2) === Instant.parse("2002-02-20T19:22:33.543543Z")) + } + } + test("test credentials in the properties are not in plan output") { val df = sql("SELECT * FROM parts") val explain = ExplainCommand(df.queryExecution.logical, ExtendedMode) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org