This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c2536a7eabd [SPARK-39469][SQL] Infer date type for CSV schema inference
c2536a7eabd is described below

commit c2536a7eabd8764cbbaaff22935e19685b92f22b
Author: Jonathan Cui <jonathan....@databricks.com>
AuthorDate: Thu Jul 21 17:04:44 2022 +0800

    [SPARK-39469][SQL] Infer date type for CSV schema inference
    
    ### What changes were proposed in this pull request?
    
    1. Add a new `inferDate` option to CSV Options. The description is:
    > Whether or not to infer columns that satisfy the `dateFormat` option as 
`Date`. Requires `inferSchema` to be true. When `false`, columns with dates 
will be inferred as `String` (or as `Timestamp` if it fits the 
`timestampFormat`) Legacy date formats in `Timestamp` columns cannot be parsed 
with this option.
    
    An error will be thrown if `inferDate` is true when SQL Configuration 
LegacyTimeParserPolicy is `LEGACY`. This is to avoid incorrect schema 
inferences from legacy time parsers not doing strict parsing.
    
    The `inferDate` option should prevent performance degradation for users who 
don't opt-in.
    
    2. Modify InferField in CSVInferSchema.scala to include Date type.
    
    If `typeSoFar` in `inferField` is Date, Timestamp or TimstampNTZ, we will 
first attempt to parse Date and then parse Timestamp/TimestampNTZ. The reason 
why we attempt to parse date for `typeSoFar`=Timestamp/TimestampNTZ is because 
of the case where a column contains a timestamp entry and then a date entry - 
we should detect both of the data types and infer the column as a timestamp 
type.
    
    Example:
    ```
    Seq("2010|10|10", "2010_10_10")
      .toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")
    spark.read
      .option("inferSchema", "true")
      .option("header", "false")
      .option("dateFormat", "yyyy|MM|dd")
      .option("timestampFormat", "yyyy_MM_dd").csv("/tmp/foo").printSchema()
    ```
    Result:
    ```
    root
     |-- _c0: timestamp (nullable = true)
    ```
    
    3. Also modified `makeConverter` in `UnivocityParser` to handle Date type 
entries in a timestamp type column to properly parse the above example.
    
    ### Does this PR introduce _any_ user-facing change?
    
    The new behavior of schema inference when `inferDate = true`:
    1. If a column contains only dates, it should be of “date” type in the 
inferred schema
    --> If the date format and the timestamp format are identical (e.g. both 
are yyyy/mm/dd), entries will default to being interpreted as Date
    3. If a column contains dates and timestamps, it should be of “timestamp” 
type in the inferred schema
    
    ### How was this patch tested?
    
    Unit tests were added to `CSVInferSchemaSuite` and `UnivocityParserSuite`. 
An end to end test is added to `CSVSuite`
    
    ### Benchmarks:
    
    `inferDate` increases parsing/inference time in general. The impact scales 
with the number of rows (and not the number of columns). For columns of date 
type (which would be inferred as timestamp when `inferDate=false`), inference 
and parsing takes 30% longer. The performance impact is much greater on columns 
of timestamp type (taking 30x longer than `inferDate=false`) - due to trying 
each timestamp as a date (and throwing an error) during the inference step.
    
    #### Number of seconds taken to parse each CSV file with `inferDate true` 
and `inferDate false`
    
    |                                             | inferDate=False | 
inferDate=True | master branch |
    
|---------------------------------------------|-----------------|----------------|---------------|
    | Small file (<100 row/col). Mixed data types | 0.32            | 0.33      
     |                |
    | 100K rows. 4 columns. Mixed data types.     | 0.70            | 2.80      
     | 0.70          |
    | 20k columns. 4 rows. Mixed Data types.      | 16.32           | 15.90     
     | 13.5           |
    | Large file. Only date type.                 | 2.15            | 3.70      
     | 2.10          |
    | Large file. Only timestamp type.            | 2.60            | 77.00     
     | 2.30          |
    
    Results are the average of 3 trials with the same machine.
    
    Over multiple runs, master branch benchmark times have also shown results 
that are slower than `inferDate=false` (although the average is slightly 
faster). Given the +/- 20% variance in results between trials, master branch 
benchmark results are roughly similar to `inferDate=False` results.
    
    Closes #36871 from Jonathancui123/SPARK-39469-date-infer.
    
    Lead-authored-by: Jonathan Cui <jonathan....@databricks.com>
    Co-authored-by: Jonathan Cui <jonathancui...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 core/src/main/resources/error/error-classes.json   |  6 +++
 docs/sql-data-sources-csv.md                       |  6 +++
 .../spark/sql/catalyst/csv/CSVInferSchema.scala    | 21 ++++++++-
 .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 24 +++++++++-
 .../spark/sql/catalyst/csv/UnivocityParser.scala   | 35 +++++++++-----
 .../spark/sql/errors/QueryExecutionErrors.scala    |  8 +++-
 .../sql/catalyst/csv/CSVInferSchemaSuite.scala     | 55 ++++++++++++++++++++++
 .../sql/catalyst/csv/UnivocityParserSuite.scala    | 23 +++++++++
 .../test/resources/test-data/date-infer-schema.csv |  4 ++
 .../sql/execution/datasources/csv/CSVSuite.scala   | 52 ++++++++++++++++++++
 10 files changed, 219 insertions(+), 15 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index d49239c29f5..e2a99c1a62e 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -23,6 +23,12 @@
     ],
     "sqlState" : "22005"
   },
+  "CANNOT_INFER_DATE" : {
+    "message" : [
+      "Cannot infer date in schema inference when LegacyTimeParserPolicy is 
\"LEGACY\". Legacy Date formatter does not support strict date format matching 
which is required to avoid inferring timestamps and other non-date entries to 
date."
+    ],
+    "sqlState" : "22007"
+  },
   "CANNOT_PARSE_DECIMAL" : {
     "message" : [
       "Cannot parse decimal"
diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md
index 1be1d7446e8..8384f8332a6 100644
--- a/docs/sql-data-sources-csv.md
+++ b/docs/sql-data-sources-csv.md
@@ -108,6 +108,12 @@ Data source options of CSV can be set via:
     <td>Infers the input schema automatically from data. It requires one extra 
pass over the data. CSV built-in functions ignore this option.</td>
     <td>read</td>
   </tr>
+  <tr>
+    <td><code>inferDate</code></td> 
+    <td>false</td>
+    <td>Whether or not to infer columns that satisfy the 
<code>dateFormat</code> option as <code>Date</code>. Requires 
<code>inferSchema</code> to be <code>true</code>. When <code>false</code>, 
columns with dates will be inferred as <code>String</code> (or as 
<code>Timestamp</code> if it fits the <code>timestampFormat</code>).</td>
+    <td>read</td>
+  </tr>
   <tr>
     <td><code>enforceSchema</code></td>
     <td>true</td>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index 8b0c6c49b85..3132fea8700 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -24,8 +24,8 @@ import scala.util.control.Exception.allCatch
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
 import org.apache.spark.sql.catalyst.expressions.ExprUtils
+import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
-import org.apache.spark.sql.catalyst.util.TimestampFormatter
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -46,6 +46,12 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
     isParsing = true,
     forTimestampNTZ = true)
 
+  private lazy val dateFormatter = DateFormatter(
+    options.dateFormatInRead,
+    options.locale,
+    legacyFormat = FAST_DATE_FORMAT,
+    isParsing = true)
+
   private val decimalParser = if (options.locale == Locale.US) {
     // Special handling the default locale for backward compatibility
     s: String => new java.math.BigDecimal(s)
@@ -117,7 +123,10 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
         case LongType => tryParseLong(field)
         case _: DecimalType => tryParseDecimal(field)
         case DoubleType => tryParseDouble(field)
+        case DateType => tryParseDateTime(field)
+        case TimestampNTZType if options.inferDate => tryParseDateTime(field)
         case TimestampNTZType => tryParseTimestampNTZ(field)
+        case TimestampType if options.inferDate => tryParseDateTime(field)
         case TimestampType => tryParseTimestamp(field)
         case BooleanType => tryParseBoolean(field)
         case StringType => StringType
@@ -169,6 +178,16 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
   private def tryParseDouble(field: String): DataType = {
     if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) {
       DoubleType
+    } else if (options.inferDate) {
+      tryParseDateTime(field)
+    } else {
+      tryParseTimestampNTZ(field)
+    }
+  }
+
+  private def tryParseDateTime(field: String): DataType = {
+    if ((allCatch opt dateFormatter.parse(field)).isDefined) {
+      DateType
     } else {
       tryParseTimestampNTZ(field)
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index 3e92c3d25eb..a033e3a3a8d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -148,7 +148,28 @@ class CSVOptions(
   // A language tag in IETF BCP 47 format
   val locale: Locale = 
parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)
 
-  val dateFormatInRead: Option[String] = parameters.get("dateFormat")
+  /**
+   * Infer columns with all valid date entries as date type (otherwise 
inferred as timestamp type).
+   * Disabled by default for backwards compatibility and performance. When 
enabled, date entries in
+   * timestamp columns will be cast to timestamp upon parsing. Not compatible 
with
+   * legacyTimeParserPolicy == LEGACY since legacy date parser will accept 
extra trailing characters
+   */
+  val inferDate = {
+    val inferDateFlag = getBool("inferDate")
+    if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY && 
inferDateFlag) {
+      throw QueryExecutionErrors.inferDateWithLegacyTimeParserError()
+    }
+    inferDateFlag
+  }
+
+  // Provide a default value for dateFormatInRead when inferDate. This ensures 
that the
+  // Iso8601DateFormatter (with strict date parsing) is used for date inference
+  val dateFormatInRead: Option[String] =
+    if (inferDate) {
+      Option(parameters.getOrElse("dateFormat", DateFormatter.defaultPattern))
+    } else {
+      parameters.get("dateFormat")
+    }
   val dateFormatInWrite: String = parameters.getOrElse("dateFormat", 
DateFormatter.defaultPattern)
 
   val timestampFormatInRead: Option[String] =
@@ -195,7 +216,6 @@ class CSVOptions(
    */
   val enforceSchema = getBool("enforceSchema", default = true)
 
-
   /**
    * String representation of an empty value in read and in write.
    */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 56ebfcc26c6..0237b6c454d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters}
 import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, 
GenericInternalRow, Literal}
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{daysToMicros, 
TimeZoneUTC}
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -197,34 +198,46 @@ class UnivocityParser(
         Decimal(decimalParser(datum), dt.precision, dt.scale)
       }
 
-    case _: TimestampType => (d: String) =>
+    case _: DateType => (d: String) =>
       nullSafeDatum(d, name, nullable, options) { datum =>
         try {
-          timestampFormatter.parse(datum)
+          dateFormatter.parse(datum)
         } catch {
           case NonFatal(e) =>
             // If fails to parse, then tries the way used in 2.0 and 1.x for 
backwards
             // compatibility.
             val str = 
DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
-            DateTimeUtils.stringToTimestamp(str, 
options.zoneId).getOrElse(throw e)
+            DateTimeUtils.stringToDate(str).getOrElse(throw e)
         }
       }
 
-    case _: TimestampNTZType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options) { datum =>
-        timestampNTZFormatter.parseWithoutTimeZone(datum, false)
-      }
-
-    case _: DateType => (d: String) =>
+    case _: TimestampType => (d: String) =>
       nullSafeDatum(d, name, nullable, options) { datum =>
         try {
-          dateFormatter.parse(datum)
+          timestampFormatter.parse(datum)
         } catch {
           case NonFatal(e) =>
             // If fails to parse, then tries the way used in 2.0 and 1.x for 
backwards
             // compatibility.
             val str = 
DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
-            DateTimeUtils.stringToDate(str).getOrElse(throw e)
+            DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse {
+              // There may be date type entries in timestamp column due to 
schema inference
+              if (options.inferDate) {
+                daysToMicros(dateFormatter.parse(datum), options.zoneId)
+              } else {
+                throw(e)
+              }
+            }
+        }
+      }
+
+    case _: TimestampNTZType => (d: String) =>
+      nullSafeDatum(d, name, nullable, options) { datum =>
+        try {
+          timestampNTZFormatter.parseWithoutTimeZone(datum, false)
+        } catch {
+          case NonFatal(e) if (options.inferDate) =>
+            daysToMicros(dateFormatter.parse(datum), TimeZoneUTC.toZoneId)
         }
       }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 64e6283c0e3..1ef31673d6a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, 
FileStatus, Path}
 import org.apache.hadoop.fs.permission.FsPermission
 import org.codehaus.commons.compiler.{CompileException, 
InternalCompilerException}
 
-import org.apache.spark.{Partition, SparkArithmeticException, 
SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException, 
SparkConcurrentModificationException, SparkDateTimeException, SparkException, 
SparkFileAlreadyExistsException, SparkFileNotFoundException, 
SparkIllegalArgumentException, SparkIndexOutOfBoundsException, 
SparkNoSuchElementException, SparkNoSuchMethodException, 
SparkNumberFormatException, SparkRuntimeException, SparkSecurityException, 
SparkSQLException, SparkSQLFea [...]
+import org.apache.spark.{Partition, SparkArithmeticException, 
SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException, 
SparkConcurrentModificationException, SparkDateTimeException, SparkException, 
SparkFileAlreadyExistsException, SparkFileNotFoundException, 
SparkIllegalArgumentException, SparkIndexOutOfBoundsException, 
SparkNoSuchElementException, SparkNoSuchMethodException, 
SparkNumberFormatException, SparkRuntimeException, SparkSecurityException, 
SparkSQLException, SparkSQLFea [...]
 import org.apache.spark.executor.CommitDeniedException
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.memory.SparkOutOfMemoryError
@@ -528,6 +528,12 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
        """.stripMargin)
   }
 
+  def inferDateWithLegacyTimeParserError(): Throwable with SparkThrowable = {
+    new SparkIllegalArgumentException(errorClass = "CANNOT_INFER_DATE",
+      messageParameters = Array()
+    )
+  }
+
   def streamedOperatorUnsupportedByDataSourceError(
       className: String, operator: String): Throwable = {
     new UnsupportedOperationException(
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
index d268f8c2e72..8790223a680 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
@@ -109,6 +109,12 @@ class CSVInferSchemaSuite extends SparkFunSuite with 
SQLHelper {
     assert(
       inferSchema.mergeRowTypes(Array(DoubleType),
         Array(LongType)).sameElements(Array(DoubleType)))
+    assert(
+      inferSchema.mergeRowTypes(Array(DateType),
+        Array(TimestampNTZType)).sameElements(Array(TimestampNTZType)))
+    assert(
+      inferSchema.mergeRowTypes(Array(DateType),
+        Array(TimestampType)).sameElements(Array(TimestampType)))
   }
 
   test("Null fields are handled properly when a nullValue is specified") {
@@ -192,4 +198,53 @@ class CSVInferSchemaSuite extends SparkFunSuite with 
SQLHelper {
     Seq("en-US").foreach(checkDecimalInfer(_, StringType))
     Seq("ko-KR", "ru-RU", "de-DE").foreach(checkDecimalInfer(_, DecimalType(7, 
0)))
   }
+
+  test("SPARK-39469: inferring date type") {
+    // "yyyy/MM/dd" format
+    var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd", "inferDate" 
-> "true"),
+      false, "UTC")
+    var inferSchema = new CSVInferSchema(options)
+    assert(inferSchema.inferField(NullType, "2018/12/02") == DateType)
+    // "MMM yyyy" format
+    options = new CSVOptions(Map("dateFormat" -> "MMM yyyy", "inferDate" -> 
"true"),
+      false, "GMT")
+    inferSchema = new CSVInferSchema(options)
+    assert(inferSchema.inferField(NullType, "Dec 2018") == DateType)
+    // Field should strictly match date format to infer as date
+    options = new CSVOptions(
+      Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" -> 
"yyyy-MM-dd'T'HH:mm:ss",
+        "inferDate" -> "true"),
+      columnPruning = false,
+      defaultTimeZoneId = "GMT")
+    inferSchema = new CSVInferSchema(options)
+    assert(inferSchema.inferField(NullType, "2018-12-03T11:00:00") == 
TimestampType)
+    assert(inferSchema.inferField(NullType, "2018-12-03") == DateType)
+  }
+
+  test("SPARK-39469: inferring date and timestamp types in a mixed column with 
inferDate=true") {
+    var options = new CSVOptions(
+      Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy|MM|dd",
+        "timestampNTZFormat" -> "yyyy/MM/dd", "inferDate" -> "true"),
+      columnPruning = false,
+      defaultTimeZoneId = "UTC")
+    var inferSchema = new CSVInferSchema(options)
+    assert(inferSchema.inferField(DateType, "2012_12_12") == DateType)
+    assert(inferSchema.inferField(DateType, "2003|01|01") == TimestampType)
+    // SQL configuration must be set to default to TimestampNTZ
+    withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
+      assert(inferSchema.inferField(DateType, "2003/02/05") == 
TimestampNTZType)
+    }
+
+    // inferField should upgrade a date field to timestamp if the typeSoFar is 
a timestamp
+    assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == 
TimestampNTZType)
+    assert(inferSchema.inferField(TimestampType, "2018_12_03") == 
TimestampType)
+
+    // No errors when Date and Timestamp have the same format. Inference 
defaults to date
+    options = new CSVOptions(
+      Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy_MM_dd"),
+      columnPruning = false,
+      defaultTimeZoneId = "UTC")
+    inferSchema = new CSVInferSchema(options)
+    assert(inferSchema.inferField(DateType, "2012_12_12") == DateType)
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
index 4166401d040..2589376bc3d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.csv
 
 import java.math.BigDecimal
 import java.text.{DecimalFormat, DecimalFormatSymbols}
+import java.time.{ZoneOffset}
 import java.util.{Locale, TimeZone}
 
 import org.apache.commons.lang3.time.FastDateFormat
@@ -358,4 +359,26 @@ class UnivocityParserSuite extends SparkFunSuite with 
SQLHelper {
       Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, 
"UTC")
     check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern))
   }
+
+  test("SPARK-39469: dates should be parsed correctly in a timestamp column 
when inferDate=true") {
+    def checkDate(dataType: DataType): Unit = {
+      val timestampsOptions =
+        new CSVOptions(Map("inferDate" -> "true", "timestampFormat" -> 
"dd/MM/yyyy HH:mm",
+          "timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> 
"dd_MM_yyyy"),
+          false, DateTimeUtils.getZoneId("-08:00").toString)
+      // Use CSVOption ZoneId="-08:00" (PST) to test that Dates in 
TimestampNTZ column are always
+      // converted to their equivalent UTC timestamp
+      val dateString = "08_09_2001"
+      val expected = dataType match {
+        case TimestampType => date(2001, 9, 8, 0, 0, 0, 0, 
ZoneOffset.of("-08:00"))
+        case TimestampNTZType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.UTC)
+        case DateType => days(2001, 9, 8)
+      }
+      val parser = new UnivocityParser(new StructType(), timestampsOptions)
+      assert(parser.makeConverter("d", dataType).apply(dateString) == expected)
+    }
+    checkDate(TimestampType)
+    checkDate(TimestampNTZType)
+    checkDate(DateType)
+  }
 }
diff --git a/sql/core/src/test/resources/test-data/date-infer-schema.csv 
b/sql/core/src/test/resources/test-data/date-infer-schema.csv
new file mode 100644
index 00000000000..ca16ec81e6d
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/date-infer-schema.csv
@@ -0,0 +1,4 @@
+date,timestamp-date,date-timestamp
+2001-09-08,2014-10-27T18:30:00,1765-03-28
+1941-01-02,2000-09-14T01:01:00,1423-11-12T23:41:00
+0293-11-07,1995-06-25,2016-01-28T20:00:00
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index bf92ffcf465..758f5430608 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.{AnalysisException, Column, 
DataFrame, Encoders, Que
 import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
 import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 
@@ -74,6 +75,7 @@ abstract class CSVSuite
   private val simpleSparseFile = "test-data/simple_sparse.csv"
   private val numbersFile = "test-data/numbers.csv"
   private val datesFile = "test-data/dates.csv"
+  private val dateInferSchemaFile = "test-data/date-infer-schema.csv"
   private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
   private val valueMalformedFile = "test-data/value-malformed.csv"
   private val badAfterGoodFile = "test-data/bad_after_good.csv"
@@ -2788,6 +2790,56 @@ abstract class CSVSuite
       }
     }
   }
+
+  test("SPARK-39469: Infer schema for date type") {
+    val options1 = Map(
+      "header" -> "true",
+      "inferSchema" -> "true",
+      "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
+      "dateFormat" -> "yyyy-MM-dd",
+      "inferDate" -> "true")
+    val options2 = Map(
+      "header" -> "true",
+      "inferSchema" -> "true",
+      "inferDate" -> "true")
+
+    // Error should be thrown when attempting to inferDate with Legacy parser
+    if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
+      val msg = intercept[IllegalArgumentException] {
+        spark.read
+          .format("csv")
+          .options(options1)
+          .load(testFile(dateInferSchemaFile))
+      }.getMessage
+      assert(msg.contains("CANNOT_INFER_DATE"))
+    } else {
+      // 1. Specify date format and timestamp format
+      // 2. Date inference should work with default date format when 
dateFormat is not provided
+      Seq(options1, options2).foreach {options =>
+        val results = spark.read
+          .format("csv")
+          .options(options)
+          .load(testFile(dateInferSchemaFile))
+
+        val expectedSchema = StructType(List(StructField("date", DateType),
+          StructField("timestamp-date", TimestampType),
+          StructField("date-timestamp", TimestampType)))
+        assert(results.schema == expectedSchema)
+
+        val expected =
+          Seq(
+            Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 
18:30:0.0"),
+              Timestamp.valueOf("1765-03-28 00:00:0.0")),
+            Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 
01:01:0.0"),
+              Timestamp.valueOf("1423-11-12 23:41:0.0")),
+            Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 
00:00:00.0"),
+              Timestamp.valueOf("2016-01-28 20:00:00.0"))
+          )
+        assert(results.collect().toSeq.map(_.toSeq) == expected)
+      }
+
+    }
+  }
 }
 
 class CSVv1Suite extends CSVSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to