This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 72a95bcad7f1 [SPARK-47324][SQL] Add missing timestamp conversion for 
JDBC nested types
72a95bcad7f1 is described below

commit 72a95bcad7f1906c97fb0971ed6338374ec3009d
Author: Kent Yao <y...@apache.org>
AuthorDate: Mon Mar 11 09:34:12 2024 +0900

    [SPARK-47324][SQL] Add missing timestamp conversion for JDBC nested types
    
    ### What changes were proposed in this pull request?
    
    [SPARK-44280](https://issues.apache.org/jira/browse/SPARK-44280) added a 
new API convertJavaTimestampToTimestamp which is called only for plain 
timestamps.
    
    This PR makes it work for timestamps in arrays
    
    ### Why are the changes needed?
    
    data consistency/correctness
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #45435 from yaooqinn/SPARK-47324.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/sql/jdbc/PostgresIntegrationSuite.scala  | 17 +++++---
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 46 +++++++++-------------
 2 files changed, 29 insertions(+), 34 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
index 2d1c0314f27b..04e31679f386 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -23,8 +23,7 @@ import java.text.SimpleDateFormat
 import java.time.{LocalDateTime, ZoneOffset}
 import java.util.Properties
 
-import org.apache.spark.sql.Column
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Column, Row}
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.types.{ArrayType, DecimalType, FloatType, 
ShortType}
 import org.apache.spark.tags.DockerTest
@@ -149,9 +148,12 @@ class PostgresIntegrationSuite extends 
DockerJDBCIntegrationSuite {
       |('2013-04-05 18:01:02.123456')""".stripMargin).executeUpdate()
 
     conn.prepareStatement("CREATE TABLE infinity_timestamp" +
-      "(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP);").executeUpdate()
-    conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column)" +
-      " VALUES ('infinity'), ('-infinity');").executeUpdate()
+      "(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP, timestamp_array 
TIMESTAMP[])")
+      .executeUpdate()
+    conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column, 
timestamp_array)" +
+      " VALUES ('infinity', ARRAY[TIMESTAMP 'infinity']), " +
+        "('-infinity', ARRAY[TIMESTAMP '-infinity'])")
+      .executeUpdate()
 
     conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT 
''").executeUpdate()
     conn.prepareStatement("create table custom_type(type_array 
not_null_text[]," +
@@ -447,10 +449,13 @@ class PostgresIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     assert(row.length == 2)
     val infinity = row(0).getAs[Timestamp]("timestamp_column")
     val negativeInfinity = row(1).getAs[Timestamp]("timestamp_column")
+    val infinitySeq = 
row(0).getAs[scala.collection.Seq[Timestamp]]("timestamp_array")
+    val negativeInfinitySeq = 
row(1).getAs[scala.collection.Seq[Timestamp]]("timestamp_array")
     val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 
0).toEpochSecond(ZoneOffset.UTC)
     val maxTimestamp = LocalDateTime.of(9999, 12, 31, 23, 59, 
59).toEpochSecond(ZoneOffset.UTC)
-
     assert(infinity.getTime == maxTimestamp)
     assert(negativeInfinity.getTime == minTimeStamp)
+    assert(infinitySeq.head.getTime == maxTimestamp)
+    assert(negativeInfinitySeq.head.getTime == minTimeStamp)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index b5e78ba32cd5..a7bbb832a839 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
-import java.sql.{Connection, JDBCType, PreparedStatement, ResultSet, 
ResultSetMetaData, SQLException}
+import java.math.{BigDecimal => JBigDecimal}
+import java.sql.{Connection, Date, JDBCType, PreparedStatement, ResultSet, 
ResultSetMetaData, SQLException, Timestamp}
 import java.time.{Instant, LocalDate}
 import java.util
 import java.util.concurrent.TimeUnit
@@ -414,7 +415,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
     case DecimalType.Fixed(p, s) =>
       (rs: ResultSet, row: InternalRow, pos: Int) =>
         val decimal =
-          nullSafeConvert[java.math.BigDecimal](rs.getBigDecimal(pos + 1), d 
=> Decimal(d, p, s))
+          nullSafeConvert[JBigDecimal](rs.getBigDecimal(pos + 1), d => 
Decimal(d, p, s))
         row.update(pos, decimal)
 
     case DoubleType =>
@@ -508,37 +509,22 @@ object JdbcUtils extends Logging with SQLConfHelper {
 
     case ArrayType(et, _) =>
       val elementConversion = et match {
-        case TimestampType =>
-          (array: Object) =>
-            array.asInstanceOf[Array[java.sql.Timestamp]].map { timestamp =>
-              nullSafeConvert(timestamp, fromJavaTimestamp)
-            }
+        case TimestampType => arrayConverter[Timestamp] {
+          (t: Timestamp) => 
fromJavaTimestamp(dialect.convertJavaTimestampToTimestamp(t))
+        }
 
         case TimestampNTZType =>
-          (array: Object) =>
-            array.asInstanceOf[Array[java.sql.Timestamp]].map { timestamp =>
-              nullSafeConvert(timestamp, (t: java.sql.Timestamp) =>
-                
localDateTimeToMicros(dialect.convertJavaTimestampToTimestampNTZ(t)))
-            }
+          arrayConverter[Timestamp] {
+            (t: Timestamp) => 
localDateTimeToMicros(dialect.convertJavaTimestampToTimestampNTZ(t))
+          }
 
         case StringType =>
-          (array: Object) =>
-            // some underling types are not String such as uuid, inet, cidr, 
etc.
-            array.asInstanceOf[Array[java.lang.Object]]
-              .map(obj => if (obj == null) null else 
UTF8String.fromString(obj.toString))
-
-        case DateType =>
-          (array: Object) =>
-            array.asInstanceOf[Array[java.sql.Date]].map { date =>
-              nullSafeConvert(date, fromJavaDate)
-            }
+          arrayConverter[Object]((obj: Object) => 
UTF8String.fromString(obj.toString))
+
+        case DateType => arrayConverter[Date](fromJavaDate)
 
         case dt: DecimalType =>
-          (array: Object) =>
-            array.asInstanceOf[Array[java.math.BigDecimal]].map { decimal =>
-              nullSafeConvert[java.math.BigDecimal](
-                decimal, d => Decimal(d, dt.precision, dt.scale))
-            }
+            arrayConverter[java.math.BigDecimal](d => Decimal(d, dt.precision, 
dt.scale))
 
         case LongType if metadata.contains("binarylong") =>
           throw 
QueryExecutionErrors.unsupportedArrayElementTypeBasedOnBinaryError(dt)
@@ -552,7 +538,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
       (rs: ResultSet, row: InternalRow, pos: Int) =>
         val array = nullSafeConvert[java.sql.Array](
           input = rs.getArray(pos + 1),
-          array => new 
GenericArrayData(elementConversion.apply(array.getArray)))
+          array => new GenericArrayData(elementConversion(array.getArray)))
         row.update(pos, array)
 
     case _ => throw 
QueryExecutionErrors.unsupportedJdbcTypeError(dt.catalogString)
@@ -566,6 +552,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
     }
   }
 
+  private def arrayConverter[T](elementConvert: T => Any): Any => Any = 
(array: Any) => {
+    array.asInstanceOf[Array[T]].map(e => nullSafeConvert(e, elementConvert))
+  }
+
   // A `JDBCValueSetter` is responsible for setting a value from `Row` into a 
field for
   // `PreparedStatement`. The last argument `Int` means the index for the 
value to be set
   // in the SQL statement and also used for the value in `Row`.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to