This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bebd78e13160 [SPARK-47761][SQL] Oracle: Support reading 
AnsiIntervalTypes
bebd78e13160 is described below

commit bebd78e13160b37de5b3363e6b3c4c63365af601
Author: Kent Yao <y...@apache.org>
AuthorDate: Wed Apr 10 11:37:57 2024 +0800

    [SPARK-47761][SQL] Oracle: Support reading AnsiIntervalTypes
    
    ### What changes were proposed in this pull request?
    
    In this PR, I proposed to add support for reading well-defined ANSI 
interval types from Oracle databases.
    
    - NTERVAL YEAR [(year_precision)] TO MONTH
      - Stores a period of time in years and months, where year_precision is 
the number of digits in the YEAR datetime field. Accepted values are 0 to 9. 
The default is 2. The size is fixed at 5 bytes.
    - INTERVAL DAY [(day_precision)] TO SECOND [(fractional_seconds_precision)]
      - Stores a period of time in days, hours, minutes, and seconds, where
        - day_precision is the maximum number of digits in the DAY datetime 
field. Accepted values are 0 to 9. The default is 2.
        - fractional_seconds_precision is the number of digits in the 
fractional part of the SECOND field. Accepted values are 0 to 9. The default is 
6. The size is fixed at 11 bytes.
    
    Both of them are mapped to the defaults of AnsiIntervalTypes.
    
    We also add two developer APIs that convert interval strings to underlying 
representations of YearMonthIntervalType and DaytimeIntervalType. The default 
implementations assume that the inputs are compatible with the ANSI style, 
incompatible values will fail to be parsed. However, it shall fail much earlier 
at the data type mapping step because of undefined mapping rules
    
    ### Why are the changes needed?
    
    Improve the Oracle accessibility, as tt's safe to read Oracle external 
intervals
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, reading Oracle intervals are available now
    ### How was this patch tested?
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #45925 from yaooqinn/SPARK-47761.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../spark/sql/jdbc/OracleIntegrationSuite.scala    | 29 +++++++++++++++++++++-
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 10 ++++++++
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   | 26 +++++++++++++++++++
 .../org/apache/spark/sql/jdbc/OracleDialect.scala  | 14 +++++++++++
 4 files changed, 78 insertions(+), 1 deletion(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 3b6e7faa164e..7728ee774bda 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.jdbc
 
 import java.math.BigDecimal
 import java.sql.{Connection, Date, Timestamp}
+import java.time.{Duration, Period}
 import java.util.{Properties, TimeZone}
 
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.{DataFrame, Row, SaveMode}
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, 
WholeStageCodegenExec}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -540,4 +541,30 @@ class OracleIntegrationSuite extends 
DockerJDBCIntegrationSuite with SharedSpark
     assert(df.count() === 2)
     assert(df2.collect().forall(_.getTimestamp(0) === row1))
   }
+
+  test("SPARK-47761: Reading ANSI INTERVAL Types") {
+    val df: String => DataFrame = query => spark.read.format("jdbc")
+      .option("url", jdbcUrl)
+      .option("query", query)
+      .load()
+    checkAnswer(df("SELECT INTERVAL '1-2' YEAR(1) TO MONTH as i0 FROM dual"),
+      Row(Period.of(1, 2, 0)))
+    checkAnswer(df("SELECT INTERVAL '1-2' YEAR(2) TO MONTH as i1 FROM dual"),
+      Row(Period.of(1, 2, 0)))
+    checkAnswer(df("SELECT INTERVAL '12345-2' YEAR(9) TO MONTH as i2 FROM 
dual"),
+      Row(Period.of(12345, 2, 0)))
+    checkAnswer(df("SELECT INTERVAL '1 12:23:56' DAY(1) TO SECOND(0) as i3 
FROM dual"),
+      Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56)))
+    checkAnswer(df("SELECT INTERVAL '1 12:23:56.12' DAY TO SECOND(2) as i4 
FROM dual"),
+      
Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56).plusMillis(120)))
+    checkAnswer(df("SELECT INTERVAL '1 12:23:56.1234' DAY TO SECOND(4) as i5 
FROM dual"),
+      
Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56).plusMillis(123)
+        .plusNanos(400000)))
+    checkAnswer(df("SELECT INTERVAL '1 12:23:56.123456' DAY TO SECOND(6) as i6 
FROM dual"),
+      
Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56).plusMillis(123)
+        .plusNanos(456000)))
+    checkAnswer(df("SELECT INTERVAL '1 12:23:56.12345678' DAY TO SECOND(8) as 
i7 FROM dual"),
+      
Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56).plusMillis(123)
+        .plusNanos(456000)))
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 08313f26a877..13cad43f9734 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -532,6 +532,16 @@ object JdbcUtils extends Logging with SQLConfHelper {
       (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.update(pos, rs.getBytes(pos + 1))
 
+    case _: YearMonthIntervalType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.update(pos,
+          nullSafeConvert(rs.getString(pos + 1), 
dialect.getYearMonthIntervalAsMonths))
+
+    case _: DayTimeIntervalType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.update(pos,
+          nullSafeConvert(rs.getString(pos + 1), 
dialect.getDayTimeIntervalAsMicros))
+
     case _: ArrayType if metadata.contains("pg_bit_array_type") =>
       // SPARK-47628: Handle PostgreSQL bit(n>1) array type ahead. As in the 
pgjdbc driver,
       // bit(n>1)[] is not distinguishable from bit(1)[], and they are all 
recognized as boolen[].
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index d800cc6a8617..5e98bcbce489 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -21,6 +21,7 @@ import java.sql.{Connection, Date, Driver, Statement, 
Timestamp}
 import java.time.{Instant, LocalDate, LocalDateTime}
 import java.util
 import java.util.ServiceLoader
+import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable.ArrayBuilder
 import scala.util.control.NonFatal
@@ -34,6 +35,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, 
TimestampFormatter}
 import 
org.apache.spark.sql.catalyst.util.DateTimeUtils.{localDateTimeToMicros, 
toJavaTimestampNoRebase}
+import org.apache.spark.sql.catalyst.util.IntervalUtils.{fromDayTimeString, 
fromYearMonthString, getDuration}
 import org.apache.spark.sql.connector.catalog.{Identifier, TableChange}
 import org.apache.spark.sql.connector.catalog.TableChange._
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
@@ -164,6 +166,30 @@ abstract class JdbcDialect extends Serializable with 
Logging {
   @Since("4.0.0")
   def convertJavaDateToDate(d: Date): Date = d
 
+  /**
+   * Converts an year-month interval string to an int value `months`.
+   *
+   * @param yearmonthStr the year-month interval string
+   * @return the number of total months in the interval
+   * @throws IllegalArgumentException if the input string is invalid
+   */
+  @Since("4.0.0")
+  def getYearMonthIntervalAsMonths(yearmonthStr: String): Int = {
+    fromYearMonthString(yearmonthStr).months
+  }
+
+  /**
+   * Converts a day-time interval string to a long value `micros`.
+   *
+   * @param daytimeStr the day-time interval string
+   * @return the number of total microseconds in the interval
+   * @throws IllegalArgumentException if the input string is invalid
+   */
+  @Since("4.0.0")
+  def getDayTimeIntervalAsMicros(daytimeStr: String): Long = {
+    getDuration(fromDayTimeString(daytimeStr), TimeUnit.MICROSECONDS)
+  }
+
   /**
    * Convert java.sql.Timestamp to a LocalDateTime representing the same 
wall-clock time as the
    * value stored in a remote database.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index a9c246c93879..001d47f13b21 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -103,6 +103,8 @@ private case class OracleDialect() extends JdbcDialect {
         Some(TimestampType)
       case BINARY_FLOAT => Some(FloatType) // Value for 
OracleTypes.BINARY_FLOAT
       case BINARY_DOUBLE => Some(DoubleType) // Value for 
OracleTypes.BINARY_DOUBLE
+      case INTERVAL_YM => Some(YearMonthIntervalType())
+      case INTERVAL_DS => Some(DayTimeIntervalType())
       case _ => None
     }
   }
@@ -231,4 +233,16 @@ private[jdbc] object OracleDialect {
   final val TIMESTAMP_TZ = -101
   // oracle.jdbc.OracleType.TIMESTAMP_WITH_LOCAL_TIME_ZONE
   final val TIMESTAMP_LTZ = -102
+  // INTERVAL YEAR [(year_precision)] TO MONTH
+  // Stores a period of time in years and months, where year_precision is the 
number of digits in
+  // the YEAR datetime field. Accepted values are 0 to 9. The default is 2.
+  // The size is fixed at 5 bytes.
+  final val INTERVAL_YM = -103
+  // INTERVAL DAY [(day_precision)] TO SECOND [(fractional_seconds_precision)]
+  // Stores a period of time in days, hours, minutes, and seconds, where
+  // - day_precision is the maximum number of digits in the DAY datetime field.
+  //   Accepted values are 0 to 9. The default is 2.
+  // - fractional_seconds_precision is the number of digits in the fractional 
part
+  //   of the SECOND field. Accepted values are 0 to 9. The default is 6.
+  final val INTERVAL_DS = -104
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to