This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c2536a7eabd [SPARK-39469][SQL] Infer date type for CSV schema inference c2536a7eabd is described below commit c2536a7eabd8764cbbaaff22935e19685b92f22b Author: Jonathan Cui <jonathan....@databricks.com> AuthorDate: Thu Jul 21 17:04:44 2022 +0800 [SPARK-39469][SQL] Infer date type for CSV schema inference ### What changes were proposed in this pull request? 1. Add a new `inferDate` option to CSV Options. The description is: > Whether or not to infer columns that satisfy the `dateFormat` option as `Date`. Requires `inferSchema` to be true. When `false`, columns with dates will be inferred as `String` (or as `Timestamp` if it fits the `timestampFormat`) Legacy date formats in `Timestamp` columns cannot be parsed with this option. An error will be thrown if `inferDate` is true when SQL Configuration LegacyTimeParserPolicy is `LEGACY`. This is to avoid incorrect schema inferences from legacy time parsers not doing strict parsing. The `inferDate` option should prevent performance degradation for users who don't opt-in. 2. Modify InferField in CSVInferSchema.scala to include Date type. If `typeSoFar` in `inferField` is Date, Timestamp or TimstampNTZ, we will first attempt to parse Date and then parse Timestamp/TimestampNTZ. The reason why we attempt to parse date for `typeSoFar`=Timestamp/TimestampNTZ is because of the case where a column contains a timestamp entry and then a date entry - we should detect both of the data types and infer the column as a timestamp type. Example: ``` Seq("2010|10|10", "2010_10_10") .toDF.repartition(1).write.mode("overwrite").text("/tmp/foo") spark.read .option("inferSchema", "true") .option("header", "false") .option("dateFormat", "yyyy|MM|dd") .option("timestampFormat", "yyyy_MM_dd").csv("/tmp/foo").printSchema() ``` Result: ``` root |-- _c0: timestamp (nullable = true) ``` 3. Also modified `makeConverter` in `UnivocityParser` to handle Date type entries in a timestamp type column to properly parse the above example. ### Does this PR introduce _any_ user-facing change? The new behavior of schema inference when `inferDate = true`: 1. If a column contains only dates, it should be of “date” type in the inferred schema --> If the date format and the timestamp format are identical (e.g. both are yyyy/mm/dd), entries will default to being interpreted as Date 3. If a column contains dates and timestamps, it should be of “timestamp” type in the inferred schema ### How was this patch tested? Unit tests were added to `CSVInferSchemaSuite` and `UnivocityParserSuite`. An end to end test is added to `CSVSuite` ### Benchmarks: `inferDate` increases parsing/inference time in general. The impact scales with the number of rows (and not the number of columns). For columns of date type (which would be inferred as timestamp when `inferDate=false`), inference and parsing takes 30% longer. The performance impact is much greater on columns of timestamp type (taking 30x longer than `inferDate=false`) - due to trying each timestamp as a date (and throwing an error) during the inference step. #### Number of seconds taken to parse each CSV file with `inferDate true` and `inferDate false` | | inferDate=False | inferDate=True | master branch | |---------------------------------------------|-----------------|----------------|---------------| | Small file (<100 row/col). Mixed data types | 0.32 | 0.33 | | | 100K rows. 4 columns. Mixed data types. | 0.70 | 2.80 | 0.70 | | 20k columns. 4 rows. Mixed Data types. | 16.32 | 15.90 | 13.5 | | Large file. Only date type. | 2.15 | 3.70 | 2.10 | | Large file. Only timestamp type. | 2.60 | 77.00 | 2.30 | Results are the average of 3 trials with the same machine. Over multiple runs, master branch benchmark times have also shown results that are slower than `inferDate=false` (although the average is slightly faster). Given the +/- 20% variance in results between trials, master branch benchmark results are roughly similar to `inferDate=False` results. Closes #36871 from Jonathancui123/SPARK-39469-date-infer. Lead-authored-by: Jonathan Cui <jonathan....@databricks.com> Co-authored-by: Jonathan Cui <jonathancui...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- core/src/main/resources/error/error-classes.json | 6 +++ docs/sql-data-sources-csv.md | 6 +++ .../spark/sql/catalyst/csv/CSVInferSchema.scala | 21 ++++++++- .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 24 +++++++++- .../spark/sql/catalyst/csv/UnivocityParser.scala | 35 +++++++++----- .../spark/sql/errors/QueryExecutionErrors.scala | 8 +++- .../sql/catalyst/csv/CSVInferSchemaSuite.scala | 55 ++++++++++++++++++++++ .../sql/catalyst/csv/UnivocityParserSuite.scala | 23 +++++++++ .../test/resources/test-data/date-infer-schema.csv | 4 ++ .../sql/execution/datasources/csv/CSVSuite.scala | 52 ++++++++++++++++++++ 10 files changed, 219 insertions(+), 15 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index d49239c29f5..e2a99c1a62e 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -23,6 +23,12 @@ ], "sqlState" : "22005" }, + "CANNOT_INFER_DATE" : { + "message" : [ + "Cannot infer date in schema inference when LegacyTimeParserPolicy is \"LEGACY\". Legacy Date formatter does not support strict date format matching which is required to avoid inferring timestamps and other non-date entries to date." + ], + "sqlState" : "22007" + }, "CANNOT_PARSE_DECIMAL" : { "message" : [ "Cannot parse decimal" diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index 1be1d7446e8..8384f8332a6 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -108,6 +108,12 @@ Data source options of CSV can be set via: <td>Infers the input schema automatically from data. It requires one extra pass over the data. CSV built-in functions ignore this option.</td> <td>read</td> </tr> + <tr> + <td><code>inferDate</code></td> + <td>false</td> + <td>Whether or not to infer columns that satisfy the <code>dateFormat</code> option as <code>Date</code>. Requires <code>inferSchema</code> to be <code>true</code>. When <code>false</code>, columns with dates will be inferred as <code>String</code> (or as <code>Timestamp</code> if it fits the <code>timestampFormat</code>).</td> + <td>read</td> + </tr> <tr> <td><code>enforceSchema</code></td> <td>true</td> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 8b0c6c49b85..3132fea8700 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -24,8 +24,8 @@ import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils +import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT -import org.apache.spark.sql.catalyst.util.TimestampFormatter import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -46,6 +46,12 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { isParsing = true, forTimestampNTZ = true) + private lazy val dateFormatter = DateFormatter( + options.dateFormatInRead, + options.locale, + legacyFormat = FAST_DATE_FORMAT, + isParsing = true) + private val decimalParser = if (options.locale == Locale.US) { // Special handling the default locale for backward compatibility s: String => new java.math.BigDecimal(s) @@ -117,7 +123,10 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { case LongType => tryParseLong(field) case _: DecimalType => tryParseDecimal(field) case DoubleType => tryParseDouble(field) + case DateType => tryParseDateTime(field) + case TimestampNTZType if options.inferDate => tryParseDateTime(field) case TimestampNTZType => tryParseTimestampNTZ(field) + case TimestampType if options.inferDate => tryParseDateTime(field) case TimestampType => tryParseTimestamp(field) case BooleanType => tryParseBoolean(field) case StringType => StringType @@ -169,6 +178,16 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private def tryParseDouble(field: String): DataType = { if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) { DoubleType + } else if (options.inferDate) { + tryParseDateTime(field) + } else { + tryParseTimestampNTZ(field) + } + } + + private def tryParseDateTime(field: String): DataType = { + if ((allCatch opt dateFormatter.parse(field)).isDefined) { + DateType } else { tryParseTimestampNTZ(field) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 3e92c3d25eb..a033e3a3a8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -148,7 +148,28 @@ class CSVOptions( // A language tag in IETF BCP 47 format val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US) - val dateFormatInRead: Option[String] = parameters.get("dateFormat") + /** + * Infer columns with all valid date entries as date type (otherwise inferred as timestamp type). + * Disabled by default for backwards compatibility and performance. When enabled, date entries in + * timestamp columns will be cast to timestamp upon parsing. Not compatible with + * legacyTimeParserPolicy == LEGACY since legacy date parser will accept extra trailing characters + */ + val inferDate = { + val inferDateFlag = getBool("inferDate") + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY && inferDateFlag) { + throw QueryExecutionErrors.inferDateWithLegacyTimeParserError() + } + inferDateFlag + } + + // Provide a default value for dateFormatInRead when inferDate. This ensures that the + // Iso8601DateFormatter (with strict date parsing) is used for date inference + val dateFormatInRead: Option[String] = + if (inferDate) { + Option(parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)) + } else { + parameters.get("dateFormat") + } val dateFormatInWrite: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern) val timestampFormatInRead: Option[String] = @@ -195,7 +216,6 @@ class CSVOptions( */ val enforceSchema = getBool("enforceSchema", default = true) - /** * String representation of an empty value in read and in write. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 56ebfcc26c6..0237b6c454d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, GenericInternalRow, Literal} import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{daysToMicros, TimeZoneUTC} import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.errors.QueryExecutionErrors @@ -197,34 +198,46 @@ class UnivocityParser( Decimal(decimalParser(datum), dt.precision, dt.scale) } - case _: TimestampType => (d: String) => + case _: DateType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => try { - timestampFormatter.parse(datum) + dateFormatter.parse(datum) } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) - DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e) + DateTimeUtils.stringToDate(str).getOrElse(throw e) } } - case _: TimestampNTZType => (d: String) => - nullSafeDatum(d, name, nullable, options) { datum => - timestampNTZFormatter.parseWithoutTimeZone(datum, false) - } - - case _: DateType => (d: String) => + case _: TimestampType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => try { - dateFormatter.parse(datum) + timestampFormatter.parse(datum) } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) - DateTimeUtils.stringToDate(str).getOrElse(throw e) + DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse { + // There may be date type entries in timestamp column due to schema inference + if (options.inferDate) { + daysToMicros(dateFormatter.parse(datum), options.zoneId) + } else { + throw(e) + } + } + } + } + + case _: TimestampNTZType => (d: String) => + nullSafeDatum(d, name, nullable, options) { datum => + try { + timestampNTZFormatter.parseWithoutTimeZone(datum, false) + } catch { + case NonFatal(e) if (options.inferDate) => + daysToMicros(dateFormatter.parse(datum), TimeZoneUTC.toZoneId) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 64e6283c0e3..1ef31673d6a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, FileStatus, Path} import org.apache.hadoop.fs.permission.FsPermission import org.codehaus.commons.compiler.{CompileException, InternalCompilerException} -import org.apache.spark.{Partition, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException, SparkConcurrentModificationException, SparkDateTimeException, SparkException, SparkFileAlreadyExistsException, SparkFileNotFoundException, SparkIllegalArgumentException, SparkIndexOutOfBoundsException, SparkNoSuchElementException, SparkNoSuchMethodException, SparkNumberFormatException, SparkRuntimeException, SparkSecurityException, SparkSQLException, SparkSQLFea [...] +import org.apache.spark.{Partition, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException, SparkConcurrentModificationException, SparkDateTimeException, SparkException, SparkFileAlreadyExistsException, SparkFileNotFoundException, SparkIllegalArgumentException, SparkIndexOutOfBoundsException, SparkNoSuchElementException, SparkNoSuchMethodException, SparkNumberFormatException, SparkRuntimeException, SparkSecurityException, SparkSQLException, SparkSQLFea [...] import org.apache.spark.executor.CommitDeniedException import org.apache.spark.launcher.SparkLauncher import org.apache.spark.memory.SparkOutOfMemoryError @@ -528,6 +528,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { """.stripMargin) } + def inferDateWithLegacyTimeParserError(): Throwable with SparkThrowable = { + new SparkIllegalArgumentException(errorClass = "CANNOT_INFER_DATE", + messageParameters = Array() + ) + } + def streamedOperatorUnsupportedByDataSourceError( className: String, operator: String): Throwable = { new UnsupportedOperationException( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index d268f8c2e72..8790223a680 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -109,6 +109,12 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { assert( inferSchema.mergeRowTypes(Array(DoubleType), Array(LongType)).sameElements(Array(DoubleType))) + assert( + inferSchema.mergeRowTypes(Array(DateType), + Array(TimestampNTZType)).sameElements(Array(TimestampNTZType))) + assert( + inferSchema.mergeRowTypes(Array(DateType), + Array(TimestampType)).sameElements(Array(TimestampType))) } test("Null fields are handled properly when a nullValue is specified") { @@ -192,4 +198,53 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { Seq("en-US").foreach(checkDecimalInfer(_, StringType)) Seq("ko-KR", "ru-RU", "de-DE").foreach(checkDecimalInfer(_, DecimalType(7, 0))) } + + test("SPARK-39469: inferring date type") { + // "yyyy/MM/dd" format + var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd", "inferDate" -> "true"), + false, "UTC") + var inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(NullType, "2018/12/02") == DateType) + // "MMM yyyy" format + options = new CSVOptions(Map("dateFormat" -> "MMM yyyy", "inferDate" -> "true"), + false, "GMT") + inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(NullType, "Dec 2018") == DateType) + // Field should strictly match date format to infer as date + options = new CSVOptions( + Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", + "inferDate" -> "true"), + columnPruning = false, + defaultTimeZoneId = "GMT") + inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(NullType, "2018-12-03T11:00:00") == TimestampType) + assert(inferSchema.inferField(NullType, "2018-12-03") == DateType) + } + + test("SPARK-39469: inferring date and timestamp types in a mixed column with inferDate=true") { + var options = new CSVOptions( + Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy|MM|dd", + "timestampNTZFormat" -> "yyyy/MM/dd", "inferDate" -> "true"), + columnPruning = false, + defaultTimeZoneId = "UTC") + var inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(DateType, "2012_12_12") == DateType) + assert(inferSchema.inferField(DateType, "2003|01|01") == TimestampType) + // SQL configuration must be set to default to TimestampNTZ + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") { + assert(inferSchema.inferField(DateType, "2003/02/05") == TimestampNTZType) + } + + // inferField should upgrade a date field to timestamp if the typeSoFar is a timestamp + assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == TimestampNTZType) + assert(inferSchema.inferField(TimestampType, "2018_12_03") == TimestampType) + + // No errors when Date and Timestamp have the same format. Inference defaults to date + options = new CSVOptions( + Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy_MM_dd"), + columnPruning = false, + defaultTimeZoneId = "UTC") + inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(DateType, "2012_12_12") == DateType) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 4166401d040..2589376bc3d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.csv import java.math.BigDecimal import java.text.{DecimalFormat, DecimalFormatSymbols} +import java.time.{ZoneOffset} import java.util.{Locale, TimeZone} import org.apache.commons.lang3.time.FastDateFormat @@ -358,4 +359,26 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, "UTC") check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern)) } + + test("SPARK-39469: dates should be parsed correctly in a timestamp column when inferDate=true") { + def checkDate(dataType: DataType): Unit = { + val timestampsOptions = + new CSVOptions(Map("inferDate" -> "true", "timestampFormat" -> "dd/MM/yyyy HH:mm", + "timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy"), + false, DateTimeUtils.getZoneId("-08:00").toString) + // Use CSVOption ZoneId="-08:00" (PST) to test that Dates in TimestampNTZ column are always + // converted to their equivalent UTC timestamp + val dateString = "08_09_2001" + val expected = dataType match { + case TimestampType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.of("-08:00")) + case TimestampNTZType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.UTC) + case DateType => days(2001, 9, 8) + } + val parser = new UnivocityParser(new StructType(), timestampsOptions) + assert(parser.makeConverter("d", dataType).apply(dateString) == expected) + } + checkDate(TimestampType) + checkDate(TimestampNTZType) + checkDate(DateType) + } } diff --git a/sql/core/src/test/resources/test-data/date-infer-schema.csv b/sql/core/src/test/resources/test-data/date-infer-schema.csv new file mode 100644 index 00000000000..ca16ec81e6d --- /dev/null +++ b/sql/core/src/test/resources/test-data/date-infer-schema.csv @@ -0,0 +1,4 @@ +date,timestamp-date,date-timestamp +2001-09-08,2014-10-27T18:30:00,1765-03-28 +1941-01-02,2000-09-14T01:01:00,1423-11-12T23:41:00 +0293-11-07,1995-06-25,2016-01-28T20:00:00 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index bf92ffcf465..758f5430608 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, Que import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -74,6 +75,7 @@ abstract class CSVSuite private val simpleSparseFile = "test-data/simple_sparse.csv" private val numbersFile = "test-data/numbers.csv" private val datesFile = "test-data/dates.csv" + private val dateInferSchemaFile = "test-data/date-infer-schema.csv" private val unescapedQuotesFile = "test-data/unescaped-quotes.csv" private val valueMalformedFile = "test-data/value-malformed.csv" private val badAfterGoodFile = "test-data/bad_after_good.csv" @@ -2788,6 +2790,56 @@ abstract class CSVSuite } } } + + test("SPARK-39469: Infer schema for date type") { + val options1 = Map( + "header" -> "true", + "inferSchema" -> "true", + "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", + "dateFormat" -> "yyyy-MM-dd", + "inferDate" -> "true") + val options2 = Map( + "header" -> "true", + "inferSchema" -> "true", + "inferDate" -> "true") + + // Error should be thrown when attempting to inferDate with Legacy parser + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + val msg = intercept[IllegalArgumentException] { + spark.read + .format("csv") + .options(options1) + .load(testFile(dateInferSchemaFile)) + }.getMessage + assert(msg.contains("CANNOT_INFER_DATE")) + } else { + // 1. Specify date format and timestamp format + // 2. Date inference should work with default date format when dateFormat is not provided + Seq(options1, options2).foreach {options => + val results = spark.read + .format("csv") + .options(options) + .load(testFile(dateInferSchemaFile)) + + val expectedSchema = StructType(List(StructField("date", DateType), + StructField("timestamp-date", TimestampType), + StructField("date-timestamp", TimestampType))) + assert(results.schema == expectedSchema) + + val expected = + Seq( + Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"), + Timestamp.valueOf("1765-03-28 00:00:0.0")), + Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"), + Timestamp.valueOf("1423-11-12 23:41:0.0")), + Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"), + Timestamp.valueOf("2016-01-28 20:00:00.0")) + ) + assert(results.collect().toSeq.map(_.toSeq) == expected) + } + + } + } } class CSVv1Suite extends CSVSuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org