This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 544b7e1 [SPARK-35998][SQL] Make from_csv/to_csv to handle year-month intervals properly 544b7e1 is described below commit 544b7e16acf51b7c2a95588885fb4ebe7b19a00e Author: Kousuke Saruta <saru...@oss.nttdata.com> AuthorDate: Mon Jul 5 13:10:50 2021 +0300 [SPARK-35998][SQL] Make from_csv/to_csv to handle year-month intervals properly ### What changes were proposed in this pull request? This PR fixes an issue that `from_csv/to_csv` doesn't handle year-month intervals properly. `from_csv` throws exception if year-month interval types are given. ``` spark-sql> select from_csv("interval '1-2' year to month", "a interval year to month"); 21/07/03 04:32:24 ERROR SparkSQLDriver: Failed in [select from_csv("interval '1-2' year to month", "a interval year to month")] java.lang.Exception: Unsupported type: interval year to month at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedTypeError(QueryExecutionErrors.scala:775) at org.apache.spark.sql.catalyst.csv.UnivocityParser.makeConverter(UnivocityParser.scala:224) at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$valueConverters$1(UnivocityParser.scala:134) ``` Also, `to_csv` doesn't handle year-month interval types properly though any exception is thrown. The result of `to_csv` for year-month interval types is not ANSI interval compliant form. ``` spark-sql> select to_csv(named_struct("a", interval '1-2' year to month)); 14 ``` The result above should be `INTERVAL '1-2' YEAR TO MONTH`. ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. Closes #33210 from sarutak/csv-yminterval. Authored-by: Kousuke Saruta <saru...@oss.nttdata.com> Signed-off-by: Max Gekk <max.g...@gmail.com> (cherry picked from commit f4237aff7ebece0b8d61e680ecbe759850f25af8) Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../sql/catalyst/csv/UnivocityGenerator.scala | 7 +++++- .../spark/sql/catalyst/csv/UnivocityParser.scala | 7 +++++- .../org/apache/spark/sql/CsvFunctionsSuite.scala | 29 ++++++++++++++++++++++ 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 11b31ce..5d70ccb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -22,7 +22,7 @@ import java.io.Writer import com.univocity.parsers.csv.CsvWriter import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} +import org.apache.spark.sql.catalyst.util.{DateFormatter, IntervalStringStyles, IntervalUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.types._ @@ -61,6 +61,11 @@ class UnivocityGenerator( case TimestampType => (row: InternalRow, ordinal: Int) => timestampFormatter.format(row.getLong(ordinal)) + case YearMonthIntervalType(start, end) => + (row: InternalRow, ordinal: Int) => + IntervalUtils.toYearMonthIntervalString( + row.getInt(ordinal), IntervalStringStyles.ANSI_STYLE, start, end) + case udt: UserDefinedType[_] => makeConverter(udt.sqlType) case dt: DataType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 672d133..3ec1ea0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -26,7 +26,7 @@ import com.univocity.parsers.csv.CsvParser import org.apache.spark.SparkUpgradeException import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} -import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow} +import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, GenericInternalRow, Literal} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.errors.QueryExecutionErrors @@ -217,6 +217,11 @@ class UnivocityParser( IntervalUtils.safeStringToInterval(UTF8String.fromString(datum)) } + case ym: YearMonthIntervalType => (d: String) => + nullSafeDatum(d, name, nullable, options) { datum => + Cast(Literal(datum), ym).eval(EmptyRow) + } + case udt: UserDefinedType[_] => makeConverter(name, udt.sqlType, nullable) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 6ae57cb..c6afb25 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import java.text.SimpleDateFormat +import java.time.Period import java.util.Locale import scala.collection.JavaConverters._ @@ -27,6 +28,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.YearMonthIntervalType.{MONTH, YEAR} class CsvFunctionsSuite extends QueryTest with SharedSparkSession { import testImplicits._ @@ -279,4 +281,31 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-35998: Make from_csv/to_csv to handle year-month intervals properly") { + val ymDF = Seq(Period.of(1, 2, 0)).toDF + Seq( + (YearMonthIntervalType(), "INTERVAL '1-2' YEAR TO MONTH", Period.of(1, 2, 0)), + (YearMonthIntervalType(YEAR), "INTERVAL '1' YEAR", Period.of(1, 0, 0)), + (YearMonthIntervalType(MONTH), "INTERVAL '14' MONTH", Period.of(1, 2, 0)) + ).foreach { case (toCsvDtype, toCsvExpected, fromCsvExpected) => + val toCsvDF = ymDF.select(to_csv(struct($"value" cast toCsvDtype)) as "csv") + checkAnswer(toCsvDF, Row(toCsvExpected)) + + DataTypeTestUtils.yearMonthIntervalTypes.foreach { fromCsvDtype => + val fromCsvDF = toCsvDF + .select( + from_csv( + $"csv", + StructType(StructField("a", fromCsvDtype) :: Nil), + Map.empty[String, String]) as "value") + .selectExpr("value.a") + if (toCsvDtype == fromCsvDtype) { + checkAnswer(fromCsvDF, Row(fromCsvExpected)) + } else { + checkAnswer(fromCsvDF, Row(null)) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org