Repository: spark
Updated Branches:
  refs/heads/master 3d82f6eb7 -> a4002651a


[SPARK-20557][SQL] Only support TIMESTAMP WITH TIME ZONE for Oracle Dialect

## What changes were proposed in this pull request?
In the previous PRs, https://github.com/apache/spark/pull/17832 and 
https://github.com/apache/spark/pull/17835 , we convert `TIMESTAMP WITH TIME 
ZONE` and `TIME WITH TIME ZONE` to `TIMESTAMP` for all the JDBC sources. 
However, this conversion could be risky since it does not respect our SQL 
configuration `spark.sql.session.timeZone`.

In addition, each vendor might have different semantics for these two types. 
For example, Postgres simply returns `TIMESTAMP` types for `TIMESTAMP WITH TIME 
ZONE`. For such supports, we should do it case by case. This PR reverts the 
general support of `TIMESTAMP WITH TIME ZONE` and `TIME WITH TIME ZONE` for 
JDBC sources, except ORACLE Dialect.

When supporting the ORACLE's `TIMESTAMP WITH TIME ZONE`, we only support it 
when the JVM default timezone is the same as the user-specified configuration 
`spark.sql.session.timeZone` (whose default is the JVM default timezone). Now, 
we still treat `TIMESTAMP WITH TIME ZONE` as `TIMESTAMP` when fetching the 
values via the Oracle JDBC connector, whose client converts the timestamp 
values with time zone to the timestamp values using the local JVM default 
timezone (a test case is added to `OracleIntegrationSuite.scala` in this PR for 
showing the behavior). Thus, to avoid any future behavior change, we will not 
support it if JVM default timezone is different from 
`spark.sql.session.timeZone`

No regression because the previous two PRs were just merged to be unreleased 
master branch.

## How was this patch tested?
Added the test cases

Author: gatorsmile <gatorsm...@gmail.com>

Closes #19939 from gatorsmile/timezoneUpdate.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a4002651
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a4002651
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a4002651

Branch: refs/heads/master
Commit: a4002651a3ea673cf3eff7927531c1659663d194
Parents: 3d82f6e
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Mon Dec 11 16:33:06 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Mon Dec 11 16:33:06 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/jdbc/OracleIntegrationSuite.scala | 67 +++++++++++++++++++-
 .../sql/jdbc/PostgresIntegrationSuite.scala     |  2 +
 .../execution/datasources/jdbc/JdbcUtils.scala  |  4 +-
 .../apache/spark/sql/jdbc/OracleDialect.scala   | 13 +++-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |  4 +-
 5 files changed, 82 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a4002651/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 9034318..8512496 100644
--- 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -18,11 +18,12 @@
 package org.apache.spark.sql.jdbc
 
 import java.sql.{Connection, Date, Timestamp}
-import java.util.Properties
+import java.util.{Properties, TimeZone}
 import java.math.BigDecimal
 
-import org.apache.spark.sql.{DataFrame, Row, SaveMode}
-import org.apache.spark.sql.execution.{WholeStageCodegenExec, 
RowDataSourceScanExec}
+import org.apache.spark.sql.{DataFrame, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.execution.{RowDataSourceScanExec, 
WholeStageCodegenExec}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.tags.DockerTest
@@ -77,6 +78,9 @@ class OracleIntegrationSuite extends 
DockerJDBCIntegrationSuite with SharedSQLCo
     conn.prepareStatement(
       "INSERT INTO ts_with_timezone VALUES " +
         "(1, to_timestamp_tz('1999-12-01 11:00:00 UTC','YYYY-MM-DD HH:MI:SS 
TZR'))").executeUpdate()
+    conn.prepareStatement(
+      "INSERT INTO ts_with_timezone VALUES " +
+        "(2, to_timestamp_tz('1999-12-01 12:00:00 PST','YYYY-MM-DD HH:MI:SS 
TZR'))").executeUpdate()
     conn.commit()
 
     conn.prepareStatement(
@@ -235,6 +239,63 @@ class OracleIntegrationSuite extends 
DockerJDBCIntegrationSuite with SharedSQLCo
     assert(types(1).equals("class java.sql.Timestamp"))
   }
 
+  test("Column type TIMESTAMP with SESSION_LOCAL_TIMEZONE is different from 
default") {
+    val defaultJVMTimeZone = TimeZone.getDefault
+    // Pick the timezone different from the current default time zone of JVM
+    val sofiaTimeZone = TimeZone.getTimeZone("Europe/Sofia")
+    val shanghaiTimeZone = TimeZone.getTimeZone("Asia/Shanghai")
+    val localSessionTimeZone =
+      if (defaultJVMTimeZone == shanghaiTimeZone) sofiaTimeZone else 
shanghaiTimeZone
+
+    withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> 
localSessionTimeZone.getID) {
+      val e = intercept[java.sql.SQLException] {
+        val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new 
Properties)
+        dfRead.collect()
+      }.getMessage
+      assert(e.contains("Unrecognized SQL type -101"))
+    }
+  }
+
+  /**
+   * Change the Time Zone `timeZoneId` of JVM before executing `f`, then 
switches back to the
+   * original after `f` returns.
+   * @param timeZoneId the ID for a TimeZone, either an abbreviation such as 
"PST", a full name such
+   *                   as "America/Los_Angeles", or a custom ID such as 
"GMT-8:00".
+   */
+  private def withTimeZone(timeZoneId: String)(f: => Unit): Unit = {
+    val originalLocale = TimeZone.getDefault
+    try {
+      // Add Locale setting
+      TimeZone.setDefault(TimeZone.getTimeZone(timeZoneId))
+      f
+    } finally {
+      TimeZone.setDefault(originalLocale)
+    }
+  }
+
+  test("Column TIMESTAMP with TIME ZONE(JVM timezone)") {
+    def checkRow(row: Row, ts: String): Unit = {
+      assert(row.getTimestamp(1).equals(Timestamp.valueOf(ts)))
+    }
+
+    withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> 
TimeZone.getDefault.getID) {
+      val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new 
Properties)
+      withTimeZone("PST") {
+        assert(dfRead.collect().toSet ===
+          Set(
+            Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01 
03:00:00")),
+            Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01 
12:00:00"))))
+      }
+
+      withTimeZone("UTC") {
+        assert(dfRead.collect().toSet ===
+          Set(
+            Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01 
11:00:00")),
+            Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01 
20:00:00"))))
+      }
+    }
+  }
+
   test("SPARK-18004: Make sure date or timestamp related predicate is pushed 
down correctly") {
     val props = new Properties()
     props.put("oracle.jdbc.mapDateToTimestamp", "false")

http://git-wip-us.apache.org/repos/asf/spark/blob/a4002651/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
index 48aba90..be32cb8 100644
--- 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
+++ 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -151,6 +151,8 @@ class PostgresIntegrationSuite extends 
DockerJDBCIntegrationSuite {
 
   test("SPARK-20557: column type TIMESTAMP with TIME ZONE and TIME with TIME 
ZONE " +
     "should be recognized") {
+    // When using JDBC to read the columns of TIMESTAMP with TIME ZONE and 
TIME with TIME ZONE
+    // the actual types are java.sql.Types.TIMESTAMP and java.sql.Types.TIME
     val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new 
Properties)
     val rows = dfRead.collect()
     val types = rows(0).toSeq.map(x => x.getClass.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/a4002651/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 75c94fc..bbc95df 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -226,10 +226,10 @@ object JdbcUtils extends Logging {
       case java.sql.Types.STRUCT        => StringType
       case java.sql.Types.TIME          => TimestampType
       case java.sql.Types.TIME_WITH_TIMEZONE
-                                        => TimestampType
+                                        => null
       case java.sql.Types.TIMESTAMP     => TimestampType
       case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
-                                        => TimestampType
+                                        => null
       case java.sql.Types.TINYINT       => IntegerType
       case java.sql.Types.VARBINARY     => BinaryType
       case java.sql.Types.VARCHAR       => StringType

http://git-wip-us.apache.org/repos/asf/spark/blob/a4002651/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index e3f106c..6ef77f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -18,7 +18,10 @@
 package org.apache.spark.sql.jdbc
 
 import java.sql.{Date, Timestamp, Types}
+import java.util.TimeZone
 
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 
@@ -29,6 +32,13 @@ private case object OracleDialect extends JdbcDialect {
 
   override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")
 
+  private def supportTimeZoneTypes: Boolean = {
+    val timeZone = DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone)
+    // TODO: support timezone types when users are not using the JVM timezone, 
which
+    // is the default value of SESSION_LOCAL_TIMEZONE
+    timeZone == TimeZone.getDefault
+  }
+
   override def getCatalystType(
       sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = {
     sqlType match {
@@ -49,7 +59,8 @@ private case object OracleDialect extends JdbcDialect {
           case _ if scale == -127L => 
Option(DecimalType(DecimalType.MAX_PRECISION, 10))
           case _ => None
         }
-      case TIMESTAMPTZ => Some(TimestampType) // Value for Timestamp with Time 
Zone in Oracle
+      case TIMESTAMPTZ if supportTimeZoneTypes
+        => Some(TimestampType) // Value for Timestamp with Time Zone in Oracle
       case BINARY_FLOAT => Some(FloatType) // Value for 
OracleTypes.BINARY_FLOAT
       case BINARY_DOUBLE => Some(DoubleType) // Value for 
OracleTypes.BINARY_DOUBLE
       case _ => None

http://git-wip-us.apache.org/repos/asf/spark/blob/a4002651/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 61571bc..0767ca1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1064,10 +1064,10 @@ class JDBCSuite extends SparkFunSuite
   }
 
   test("unsupported types") {
-    var e = intercept[SparkException] {
+    var e = intercept[SQLException] {
       spark.read.jdbc(urlWithUserAndPass, "TEST.TIMEZONE", new 
Properties()).collect()
     }.getMessage
-    assert(e.contains("java.lang.UnsupportedOperationException: 
unimplemented"))
+    assert(e.contains("Unsupported type TIMESTAMP_WITH_TIMEZONE"))
     e = intercept[SQLException] {
       spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY", new 
Properties()).collect()
     }.getMessage


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to