This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bebd78e13160 [SPARK-47761][SQL] Oracle: Support reading AnsiIntervalTypes bebd78e13160 is described below commit bebd78e13160b37de5b3363e6b3c4c63365af601 Author: Kent Yao <y...@apache.org> AuthorDate: Wed Apr 10 11:37:57 2024 +0800 [SPARK-47761][SQL] Oracle: Support reading AnsiIntervalTypes ### What changes were proposed in this pull request? In this PR, I proposed to add support for reading well-defined ANSI interval types from Oracle databases. - NTERVAL YEAR [(year_precision)] TO MONTH - Stores a period of time in years and months, where year_precision is the number of digits in the YEAR datetime field. Accepted values are 0 to 9. The default is 2. The size is fixed at 5 bytes. - INTERVAL DAY [(day_precision)] TO SECOND [(fractional_seconds_precision)] - Stores a period of time in days, hours, minutes, and seconds, where - day_precision is the maximum number of digits in the DAY datetime field. Accepted values are 0 to 9. The default is 2. - fractional_seconds_precision is the number of digits in the fractional part of the SECOND field. Accepted values are 0 to 9. The default is 6. The size is fixed at 11 bytes. Both of them are mapped to the defaults of AnsiIntervalTypes. We also add two developer APIs that convert interval strings to underlying representations of YearMonthIntervalType and DaytimeIntervalType. The default implementations assume that the inputs are compatible with the ANSI style, incompatible values will fail to be parsed. However, it shall fail much earlier at the data type mapping step because of undefined mapping rules ### Why are the changes needed? Improve the Oracle accessibility, as tt's safe to read Oracle external intervals ### Does this PR introduce _any_ user-facing change? Yes, reading Oracle intervals are available now ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45925 from yaooqinn/SPARK-47761. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> --- .../spark/sql/jdbc/OracleIntegrationSuite.scala | 29 +++++++++++++++++++++- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 10 ++++++++ .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 26 +++++++++++++++++++ .../org/apache/spark/sql/jdbc/OracleDialect.scala | 14 +++++++++++ 4 files changed, 78 insertions(+), 1 deletion(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index 3b6e7faa164e..7728ee774bda 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal import java.sql.{Connection, Date, Timestamp} +import java.time.{Duration, Period} import java.util.{Properties, TimeZone} import org.scalatest.time.SpanSugar._ -import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.{DataFrame, Row, SaveMode} import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.execution.{RowDataSourceScanExec, WholeStageCodegenExec} import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -540,4 +541,30 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSpark assert(df.count() === 2) assert(df2.collect().forall(_.getTimestamp(0) === row1)) } + + test("SPARK-47761: Reading ANSI INTERVAL Types") { + val df: String => DataFrame = query => spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("query", query) + .load() + checkAnswer(df("SELECT INTERVAL '1-2' YEAR(1) TO MONTH as i0 FROM dual"), + Row(Period.of(1, 2, 0))) + checkAnswer(df("SELECT INTERVAL '1-2' YEAR(2) TO MONTH as i1 FROM dual"), + Row(Period.of(1, 2, 0))) + checkAnswer(df("SELECT INTERVAL '12345-2' YEAR(9) TO MONTH as i2 FROM dual"), + Row(Period.of(12345, 2, 0))) + checkAnswer(df("SELECT INTERVAL '1 12:23:56' DAY(1) TO SECOND(0) as i3 FROM dual"), + Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56))) + checkAnswer(df("SELECT INTERVAL '1 12:23:56.12' DAY TO SECOND(2) as i4 FROM dual"), + Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56).plusMillis(120))) + checkAnswer(df("SELECT INTERVAL '1 12:23:56.1234' DAY TO SECOND(4) as i5 FROM dual"), + Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56).plusMillis(123) + .plusNanos(400000))) + checkAnswer(df("SELECT INTERVAL '1 12:23:56.123456' DAY TO SECOND(6) as i6 FROM dual"), + Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56).plusMillis(123) + .plusNanos(456000))) + checkAnswer(df("SELECT INTERVAL '1 12:23:56.12345678' DAY TO SECOND(8) as i7 FROM dual"), + Row(Duration.ofDays(1).plusHours(12).plusMinutes(23).plusSeconds(56).plusMillis(123) + .plusNanos(456000))) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 08313f26a877..13cad43f9734 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -532,6 +532,16 @@ object JdbcUtils extends Logging with SQLConfHelper { (rs: ResultSet, row: InternalRow, pos: Int) => row.update(pos, rs.getBytes(pos + 1)) + case _: YearMonthIntervalType => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.update(pos, + nullSafeConvert(rs.getString(pos + 1), dialect.getYearMonthIntervalAsMonths)) + + case _: DayTimeIntervalType => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.update(pos, + nullSafeConvert(rs.getString(pos + 1), dialect.getDayTimeIntervalAsMicros)) + case _: ArrayType if metadata.contains("pg_bit_array_type") => // SPARK-47628: Handle PostgreSQL bit(n>1) array type ahead. As in the pgjdbc driver, // bit(n>1)[] is not distinguishable from bit(1)[], and they are all recognized as boolen[]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index d800cc6a8617..5e98bcbce489 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -21,6 +21,7 @@ import java.sql.{Connection, Date, Driver, Statement, Timestamp} import java.time.{Instant, LocalDate, LocalDateTime} import java.util import java.util.ServiceLoader +import java.util.concurrent.TimeUnit import scala.collection.mutable.ArrayBuilder import scala.util.control.NonFatal @@ -34,6 +35,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{localDateTimeToMicros, toJavaTimestampNoRebase} +import org.apache.spark.sql.catalyst.util.IntervalUtils.{fromDayTimeString, fromYearMonthString, getDuration} import org.apache.spark.sql.connector.catalog.{Identifier, TableChange} import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction @@ -164,6 +166,30 @@ abstract class JdbcDialect extends Serializable with Logging { @Since("4.0.0") def convertJavaDateToDate(d: Date): Date = d + /** + * Converts an year-month interval string to an int value `months`. + * + * @param yearmonthStr the year-month interval string + * @return the number of total months in the interval + * @throws IllegalArgumentException if the input string is invalid + */ + @Since("4.0.0") + def getYearMonthIntervalAsMonths(yearmonthStr: String): Int = { + fromYearMonthString(yearmonthStr).months + } + + /** + * Converts a day-time interval string to a long value `micros`. + * + * @param daytimeStr the day-time interval string + * @return the number of total microseconds in the interval + * @throws IllegalArgumentException if the input string is invalid + */ + @Since("4.0.0") + def getDayTimeIntervalAsMicros(daytimeStr: String): Long = { + getDuration(fromDayTimeString(daytimeStr), TimeUnit.MICROSECONDS) + } + /** * Convert java.sql.Timestamp to a LocalDateTime representing the same wall-clock time as the * value stored in a remote database. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index a9c246c93879..001d47f13b21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -103,6 +103,8 @@ private case class OracleDialect() extends JdbcDialect { Some(TimestampType) case BINARY_FLOAT => Some(FloatType) // Value for OracleTypes.BINARY_FLOAT case BINARY_DOUBLE => Some(DoubleType) // Value for OracleTypes.BINARY_DOUBLE + case INTERVAL_YM => Some(YearMonthIntervalType()) + case INTERVAL_DS => Some(DayTimeIntervalType()) case _ => None } } @@ -231,4 +233,16 @@ private[jdbc] object OracleDialect { final val TIMESTAMP_TZ = -101 // oracle.jdbc.OracleType.TIMESTAMP_WITH_LOCAL_TIME_ZONE final val TIMESTAMP_LTZ = -102 + // INTERVAL YEAR [(year_precision)] TO MONTH + // Stores a period of time in years and months, where year_precision is the number of digits in + // the YEAR datetime field. Accepted values are 0 to 9. The default is 2. + // The size is fixed at 5 bytes. + final val INTERVAL_YM = -103 + // INTERVAL DAY [(day_precision)] TO SECOND [(fractional_seconds_precision)] + // Stores a period of time in days, hours, minutes, and seconds, where + // - day_precision is the maximum number of digits in the DAY datetime field. + // Accepted values are 0 to 9. The default is 2. + // - fractional_seconds_precision is the number of digits in the fractional part + // of the SECOND field. Accepted values are 0 to 9. The default is 6. + final val INTERVAL_DS = -104 } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org