This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 5f383f0 [SPARK-35999][SQL] Make from_csv/to_csv to handle day-time intervals properly 5f383f0 is described below commit 5f383f0102af14f5cc4cc43744fc95437ccbf291 Author: Kousuke Saruta <saru...@oss.nttdata.com> AuthorDate: Tue Jul 6 17:37:38 2021 +0800 [SPARK-35999][SQL] Make from_csv/to_csv to handle day-time intervals properly ### What changes were proposed in this pull request? This PR fixes an issue that `from_csv/to_csv` doesn't handle day-time intervals properly. `from_csv` throws exception if day-time interval types are given. ``` spark-sql> select from_csv("interval '1 2:3:4' day to second", "a interval day to second"); 21/07/03 04:39:13 ERROR SparkSQLDriver: Failed in [select from_csv("interval '1 2:3:4' day to second", "a interval day to second")] java.lang.Exception: Unsupported type: interval day to second at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedTypeError(QueryExecutionErrors.scala:775) at org.apache.spark.sql.catalyst.csv.UnivocityParser.makeConverter(UnivocityParser.scala:224) at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$valueConverters$1(UnivocityParser.scala:134) ``` Also, `to_csv` doesn't handle day-time interval types properly though any exception is thrown. The result of `to_csv` for day-time interval types is not ANSI interval compliant form. ``` spark-sql> select to_csv(named_struct("a", interval '1 2:3:4' day to second)); 93784000000 ``` The result above should be `INTERVAL '1 02:03:04' DAY TO SECOND`. ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. Closes #33226 from sarutak/csv-dtinterval. Authored-by: Kousuke Saruta <saru...@oss.nttdata.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit def8bc5c9631be0bd3172522006c23634e6f2c31) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/csv/UnivocityGenerator.scala | 5 +++ .../spark/sql/catalyst/csv/UnivocityParser.scala | 5 +++ .../org/apache/spark/sql/CsvFunctionsSuite.scala | 47 +++++++++++++++++++++- 3 files changed, 56 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 5d70ccb..2abf7bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -66,6 +66,11 @@ class UnivocityGenerator( IntervalUtils.toYearMonthIntervalString( row.getInt(ordinal), IntervalStringStyles.ANSI_STYLE, start, end) + case DayTimeIntervalType(start, end) => + (row: InternalRow, ordinal: Int) => + IntervalUtils.toDayTimeIntervalString( + row.getLong(ordinal), IntervalStringStyles.ANSI_STYLE, start, end) + case udt: UserDefinedType[_] => makeConverter(udt.sqlType) case dt: DataType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 3ec1ea0..16d1251 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -222,6 +222,11 @@ class UnivocityParser( Cast(Literal(datum), ym).eval(EmptyRow) } + case dt: DayTimeIntervalType => (d: String) => + nullSafeDatum(d, name, nullable, options) { datum => + Cast(Literal(datum), dt).eval(EmptyRow) + } + case udt: UserDefinedType[_] => makeConverter(name, udt.sqlType, nullable) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index c6afb25..711a4bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql import java.text.SimpleDateFormat -import java.time.Period +import java.time.{Duration, Period} import java.util.Locale import scala.collection.JavaConverters._ @@ -28,6 +28,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.DayTimeIntervalType.{DAY, HOUR, MINUTE, SECOND} import org.apache.spark.sql.types.YearMonthIntervalType.{MONTH, YEAR} class CsvFunctionsSuite extends QueryTest with SharedSparkSession { @@ -308,4 +309,48 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-35999: Make from_csv/to_csv to handle day-time intervals properly") { + val dtDF = Seq(Duration.ofDays(1).plusHours(2).plusMinutes(3).plusSeconds(4)).toDF + Seq( + (DayTimeIntervalType(), "INTERVAL '1 02:03:04' DAY TO SECOND", + Duration.ofDays(1).plusHours(2).plusMinutes(3).plusSeconds(4)), + (DayTimeIntervalType(DAY, MINUTE), "INTERVAL '1 02:03' DAY TO MINUTE", + Duration.ofDays(1).plusHours(2).plusMinutes(3)), + (DayTimeIntervalType(DAY, HOUR), "INTERVAL '1 02' DAY TO HOUR", + Duration.ofDays(1).plusHours(2)), + (DayTimeIntervalType(DAY), "INTERVAL '1' DAY", + Duration.ofDays(1)), + (DayTimeIntervalType(HOUR, SECOND), "INTERVAL '26:03:04' HOUR TO SECOND", + Duration.ofHours(26).plusMinutes(3).plusSeconds(4)), + (DayTimeIntervalType(HOUR, MINUTE), "INTERVAL '26:03' HOUR TO MINUTE", + Duration.ofHours(26).plusMinutes(3)), + (DayTimeIntervalType(HOUR), "INTERVAL '26' HOUR", + Duration.ofHours(26)), + (DayTimeIntervalType(MINUTE, SECOND), "INTERVAL '1563:04' MINUTE TO SECOND", + Duration.ofMinutes(1563).plusSeconds(4)), + (DayTimeIntervalType(MINUTE), "INTERVAL '1563' MINUTE", + Duration.ofMinutes(1563)), + (DayTimeIntervalType(SECOND), "INTERVAL '93784' SECOND", + Duration.ofSeconds(93784)) + ).foreach { case (toCsvDtype, toCsvExpected, fromCsvExpected) => + val toCsvDF = dtDF.select(to_csv(struct($"value" cast toCsvDtype)) as "csv") + checkAnswer(toCsvDF, Row(toCsvExpected)) + + DataTypeTestUtils.dayTimeIntervalTypes.foreach { fromCsvDtype => + val fromCsvDF = toCsvDF + .select( + from_csv( + $"csv", + StructType(StructField("a", fromCsvDtype) :: Nil), + Map.empty[String, String]) as "value") + .selectExpr("value.a") + if (toCsvDtype == fromCsvDtype) { + checkAnswer(fromCsvDF, Row(fromCsvExpected)) + } else { + checkAnswer(fromCsvDF, Row(null)) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org