Repository: spark
Updated Branches:
  refs/heads/master b433acae7 -> cafca54c0


[SPARK-20557][SQL] Support JDBC data type Time with Time Zone

### What changes were proposed in this pull request?

This PR is to support JDBC data type TIME WITH TIME ZONE. It can be converted 
to TIMESTAMP

In addition, before this PR, for unsupported data types, we simply output the 
type number instead of the type name.

```
java.sql.SQLException: Unsupported type 2014
```
After this PR, the message is like
```
java.sql.SQLException: Unsupported type TIMESTAMP_WITH_TIMEZONE
```

- Also upgrade the H2 version to `1.4.195` which has the type fix for 
"TIMESTAMP WITH TIMEZONE". However, it is not fully supported. Thus, we capture 
the exception, but we still need it to partially test the support of "TIMESTAMP 
WITH TIMEZONE", because Docker tests are not regularly run.

### How was this patch tested?
Added test cases.

Author: Xiao Li <gatorsm...@gmail.com>

Closes #17835 from gatorsmile/h2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cafca54c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cafca54c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cafca54c

Branch: refs/heads/master
Commit: cafca54c0ea8bd9c3b80dcbc88d9f2b8d708a026
Parents: b433aca
Author: Xiao Li <gatorsm...@gmail.com>
Authored: Sat May 6 22:21:19 2017 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Sat May 6 22:21:19 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/jdbc/OracleIntegrationSuite.scala |  2 +-
 .../sql/jdbc/PostgresIntegrationSuite.scala     | 15 ++++++++++++
 sql/core/pom.xml                                |  2 +-
 .../execution/datasources/jdbc/JdbcUtils.scala  | 12 +++++++---
 .../apache/spark/sql/internal/CatalogImpl.scala |  1 -
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 24 ++++++++++++++++++--
 6 files changed, 48 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cafca54c/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 85d4a4a..f7b1ec3 100644
--- 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -192,7 +192,7 @@ class OracleIntegrationSuite extends 
DockerJDBCIntegrationSuite with SharedSQLCo
     checkRow(sql("SELECT * FROM datetime1 where id = 1").head())
   }
 
-  test("SPARK-20557: column type TIMEZONE with TIME STAMP should be 
recognized") {
+  test("SPARK-20557: column type TIMESTAMP with TIME ZONE should be 
recognized") {
     val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new 
Properties)
     val rows = dfRead.collect()
     val types = rows(0).toSeq.map(x => x.getClass.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/cafca54c/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
index a1a065a..eb3c458 100644
--- 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
+++ 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -55,6 +55,13 @@ class PostgresIntegrationSuite extends 
DockerJDBCIntegrationSuite {
       + "null, null, null, null, null, "
       + "null, null, null, null, null, null, null)"
     ).executeUpdate()
+
+    conn.prepareStatement("CREATE TABLE ts_with_timezone " +
+      "(id integer, tstz TIMESTAMP WITH TIME ZONE, ttz TIME WITH TIME ZONE)")
+      .executeUpdate()
+    conn.prepareStatement("INSERT INTO ts_with_timezone VALUES " +
+      "(1, TIMESTAMP WITH TIME ZONE '2016-08-12 10:22:31.949271-07', TIME WITH 
TIME ZONE '17:22:31.949271+00')")
+      .executeUpdate()
   }
 
   test("Type mapping for various types") {
@@ -126,4 +133,12 @@ class PostgresIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     assert(schema(0).dataType == FloatType)
     assert(schema(1).dataType == ShortType)
   }
+
+  test("SPARK-20557: column type TIMESTAMP with TIME ZONE and TIME with TIME 
ZONE should be recognized") {
+    val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new 
Properties)
+    val rows = dfRead.collect()
+    val types = rows(0).toSeq.map(x => x.getClass.toString)
+    assert(types(1).equals("class java.sql.Timestamp"))
+    assert(types(2).equals("class java.sql.Timestamp"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cafca54c/sql/core/pom.xml
----------------------------------------------------------------------
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index e170133..fe4be96 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -115,7 +115,7 @@
     <dependency>
       <groupId>com.h2database</groupId>
       <artifactId>h2</artifactId>
-      <version>1.4.183</version>
+      <version>1.4.195</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/cafca54c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index fb877d1..71eaab1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
-import java.sql.{Connection, Driver, DriverManager, PreparedStatement, 
ResultSet, ResultSetMetaData, SQLException}
+import java.sql.{Connection, Driver, DriverManager, JDBCType, 
PreparedStatement, ResultSet, ResultSetMetaData, SQLException}
 import java.util.Locale
 
 import scala.collection.JavaConverters._
@@ -217,11 +217,14 @@ object JdbcUtils extends Logging {
       case java.sql.Types.OTHER         => null
       case java.sql.Types.REAL          => DoubleType
       case java.sql.Types.REF           => StringType
+      case java.sql.Types.REF_CURSOR    => null
       case java.sql.Types.ROWID         => LongType
       case java.sql.Types.SMALLINT      => IntegerType
       case java.sql.Types.SQLXML        => StringType
       case java.sql.Types.STRUCT        => StringType
       case java.sql.Types.TIME          => TimestampType
+      case java.sql.Types.TIME_WITH_TIMEZONE
+                                        => TimestampType
       case java.sql.Types.TIMESTAMP     => TimestampType
       case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
                                         => TimestampType
@@ -229,11 +232,14 @@ object JdbcUtils extends Logging {
       case java.sql.Types.TINYINT       => IntegerType
       case java.sql.Types.VARBINARY     => BinaryType
       case java.sql.Types.VARCHAR       => StringType
-      case _                            => null
+      case _                            =>
+        throw new SQLException("Unrecognized SQL type " + sqlType)
       // scalastyle:on
     }
 
-    if (answer == null) throw new SQLException("Unsupported type " + sqlType)
+    if (answer == null) {
+      throw new SQLException("Unsupported type " + 
JDBCType.valueOf(sqlType).getName)
+    }
     answer
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cafca54c/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index e1049c6..142b005 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.storage.StorageLevel
 
 
-
 /**
  * Internal implementation of the user-facing `Catalog`.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/cafca54c/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 5bd36ec..d9f3689 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -18,13 +18,13 @@
 package org.apache.spark.sql.jdbc
 
 import java.math.BigDecimal
-import java.sql.{Date, DriverManager, Timestamp}
+import java.sql.{Date, DriverManager, SQLException, Timestamp}
 import java.util.{Calendar, GregorianCalendar, Properties}
 
 import org.h2.jdbc.JdbcSQLException
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.DataSourceScanExec
@@ -141,6 +141,15 @@ class JDBCSuite extends SparkFunSuite
         |OPTIONS (url '$url', dbtable 'TEST.TIMETYPES', user 'testUser', 
password 'testPass')
        """.stripMargin.replaceAll("\n", " "))
 
+    conn.prepareStatement("CREATE TABLE test.timezone (tz TIMESTAMP WITH TIME 
ZONE) " +
+      "AS SELECT '1999-01-08 04:05:06.543543543 GMT-08:00'")
+      .executeUpdate()
+    conn.commit()
+
+    conn.prepareStatement("CREATE TABLE test.array (ar ARRAY) " +
+      "AS SELECT '(1, 2, 3)'")
+      .executeUpdate()
+    conn.commit()
 
     conn.prepareStatement("create table test.flttypes (a DOUBLE, b REAL, c 
DECIMAL(38, 18))"
         ).executeUpdate()
@@ -919,6 +928,17 @@ class JDBCSuite extends SparkFunSuite
     assert(res === (foobarCnt, 0L, foobarCnt) :: Nil)
   }
 
+  test("unsupported types") {
+    var e = intercept[SparkException] {
+      spark.read.jdbc(urlWithUserAndPass, "TEST.TIMEZONE", new 
Properties()).collect()
+    }.getMessage
+    assert(e.contains("java.lang.UnsupportedOperationException: 
unimplemented"))
+    e = intercept[SQLException] {
+      spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY", new 
Properties()).collect()
+    }.getMessage
+    assert(e.contains("Unsupported type ARRAY"))
+  }
+
   test("SPARK-19318: Connection properties keys should be case-sensitive.") {
     def testJdbcOptions(options: JDBCOptions): Unit = {
       // Spark JDBC data source options are case-insensitive


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to