Repository: spark
Updated Branches:
  refs/heads/master 973f7c01d -> 79551f558


[SPARK-25945][SQL] Support locale while parsing date/timestamp from CSV/JSON

## What changes were proposed in this pull request?

In the PR, I propose to add new option `locale` into CSVOptions/JSONOptions to 
make parsing date/timestamps in local languages possible. Currently the locale 
is hard coded to `Locale.US`.

## How was this patch tested?

Added two tests for parsing a date from CSV/JSON - `ноя 2018`.

Closes #22951 from MaxGekk/locale.

Authored-by: Maxim Gekk <max.g...@gmail.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79551f55
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79551f55
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79551f55

Branch: refs/heads/master
Commit: 79551f558dafed41177b605b0436e9340edf5712
Parents: 973f7c0
Author: Maxim Gekk <max.g...@gmail.com>
Authored: Fri Nov 9 09:45:06 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Fri Nov 9 09:45:06 2018 +0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                 | 15 +++++++++++----
 python/pyspark/sql/streaming.py                  | 14 ++++++++++----
 .../spark/sql/catalyst/csv/CSVOptions.scala      |  7 +++++--
 .../spark/sql/catalyst/json/JSONOptions.scala    |  7 +++++--
 .../expressions/CsvExpressionsSuite.scala        | 19 ++++++++++++++++++-
 .../expressions/JsonExpressionsSuite.scala       | 19 ++++++++++++++++++-
 .../org/apache/spark/sql/DataFrameReader.scala   |  4 ++++
 .../spark/sql/streaming/DataStreamReader.scala   |  4 ++++
 .../org/apache/spark/sql/CsvFunctionsSuite.scala | 17 +++++++++++++++++
 .../apache/spark/sql/JsonFunctionsSuite.scala    | 17 +++++++++++++++++
 10 files changed, 109 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/79551f55/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 690b130..726de4a 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -177,7 +177,7 @@ class DataFrameReader(OptionUtils):
              allowNumericLeadingZero=None, 
allowBackslashEscapingAnyCharacter=None,
              mode=None, columnNameOfCorruptRecord=None, dateFormat=None, 
timestampFormat=None,
              multiLine=None, allowUnquotedControlChars=None, lineSep=None, 
samplingRatio=None,
-             dropFieldIfAllNull=None, encoding=None):
+             dropFieldIfAllNull=None, encoding=None, locale=None):
         """
         Loads JSON files and returns the results as a :class:`DataFrame`.
 
@@ -249,6 +249,9 @@ class DataFrameReader(OptionUtils):
         :param dropFieldIfAllNull: whether to ignore column of all null values 
or empty
                                    array/struct during schema inference. If 
None is set, it
                                    uses the default value, ``false``.
+        :param locale: sets a locale as language tag in IETF BCP 47 format. If 
None is set,
+                       it uses the default value, ``en-US``. For instance, 
``locale`` is used while
+                       parsing dates and timestamps.
 
         >>> df1 = spark.read.json('python/test_support/sql/people.json')
         >>> df1.dtypes
@@ -267,7 +270,8 @@ class DataFrameReader(OptionUtils):
             mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, 
dateFormat=dateFormat,
             timestampFormat=timestampFormat, multiLine=multiLine,
             allowUnquotedControlChars=allowUnquotedControlChars, 
lineSep=lineSep,
-            samplingRatio=samplingRatio, 
dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding)
+            samplingRatio=samplingRatio, 
dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
+            locale=locale)
         if isinstance(path, basestring):
             path = [path]
         if type(path) == list:
@@ -349,7 +353,7 @@ class DataFrameReader(OptionUtils):
             negativeInf=None, dateFormat=None, timestampFormat=None, 
maxColumns=None,
             maxCharsPerColumn=None, maxMalformedLogPerPartition=None, 
mode=None,
             columnNameOfCorruptRecord=None, multiLine=None, 
charToEscapeQuoteEscaping=None,
-            samplingRatio=None, enforceSchema=None, emptyValue=None):
+            samplingRatio=None, enforceSchema=None, emptyValue=None, 
locale=None):
         r"""Loads a CSV file and returns the result as a  :class:`DataFrame`.
 
         This function will go through the input once to determine the input 
schema if
@@ -446,6 +450,9 @@ class DataFrameReader(OptionUtils):
                               If None is set, it uses the default value, 
``1.0``.
         :param emptyValue: sets the string representation of an empty value. 
If None is set, it uses
                            the default value, empty string.
+        :param locale: sets a locale as language tag in IETF BCP 47 format. If 
None is set,
+                       it uses the default value, ``en-US``. For instance, 
``locale`` is used while
+                       parsing dates and timestamps.
 
         >>> df = spark.read.csv('python/test_support/sql/ages.csv')
         >>> df.dtypes
@@ -465,7 +472,7 @@ class DataFrameReader(OptionUtils):
             maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
             columnNameOfCorruptRecord=columnNameOfCorruptRecord, 
multiLine=multiLine,
             charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, 
samplingRatio=samplingRatio,
-            enforceSchema=enforceSchema, emptyValue=emptyValue)
+            enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale)
         if isinstance(path, basestring):
             path = [path]
         if type(path) == list:

http://git-wip-us.apache.org/repos/asf/spark/blob/79551f55/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index b18453b..02b14ea 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -404,7 +404,7 @@ class DataStreamReader(OptionUtils):
              allowComments=None, allowUnquotedFieldNames=None, 
allowSingleQuotes=None,
              allowNumericLeadingZero=None, 
allowBackslashEscapingAnyCharacter=None,
              mode=None, columnNameOfCorruptRecord=None, dateFormat=None, 
timestampFormat=None,
-             multiLine=None,  allowUnquotedControlChars=None, lineSep=None):
+             multiLine=None,  allowUnquotedControlChars=None, lineSep=None, 
locale=None):
         """
         Loads a JSON file stream and returns the results as a 
:class:`DataFrame`.
 
@@ -469,6 +469,9 @@ class DataStreamReader(OptionUtils):
                                           including tab and line feed 
characters) or not.
         :param lineSep: defines the line separator that should be used for 
parsing. If None is
                         set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
+        :param locale: sets a locale as language tag in IETF BCP 47 format. If 
None is set,
+                       it uses the default value, ``en-US``. For instance, 
``locale`` is used while
+                       parsing dates and timestamps.
 
         >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = 
sdf_schema)
         >>> json_sdf.isStreaming
@@ -483,7 +486,7 @@ class DataStreamReader(OptionUtils):
             
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
             mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, 
dateFormat=dateFormat,
             timestampFormat=timestampFormat, multiLine=multiLine,
-            allowUnquotedControlChars=allowUnquotedControlChars, 
lineSep=lineSep)
+            allowUnquotedControlChars=allowUnquotedControlChars, 
lineSep=lineSep, locale=locale)
         if isinstance(path, basestring):
             return self._df(self._jreader.json(path))
         else:
@@ -564,7 +567,7 @@ class DataStreamReader(OptionUtils):
             negativeInf=None, dateFormat=None, timestampFormat=None, 
maxColumns=None,
             maxCharsPerColumn=None, maxMalformedLogPerPartition=None, 
mode=None,
             columnNameOfCorruptRecord=None, multiLine=None, 
charToEscapeQuoteEscaping=None,
-            enforceSchema=None, emptyValue=None):
+            enforceSchema=None, emptyValue=None, locale=None):
         r"""Loads a CSV file stream and returns the result as a 
:class:`DataFrame`.
 
         This function will go through the input once to determine the input 
schema if
@@ -660,6 +663,9 @@ class DataStreamReader(OptionUtils):
                                           different, ``\0`` otherwise..
         :param emptyValue: sets the string representation of an empty value. 
If None is set, it uses
                            the default value, empty string.
+        :param locale: sets a locale as language tag in IETF BCP 47 format. If 
None is set,
+                       it uses the default value, ``en-US``. For instance, 
``locale`` is used while
+                       parsing dates and timestamps.
 
         >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = 
sdf_schema)
         >>> csv_sdf.isStreaming
@@ -677,7 +683,7 @@ class DataStreamReader(OptionUtils):
             maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
             columnNameOfCorruptRecord=columnNameOfCorruptRecord, 
multiLine=multiLine,
             charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, 
enforceSchema=enforceSchema,
-            emptyValue=emptyValue)
+            emptyValue=emptyValue, locale=locale)
         if isinstance(path, basestring):
             return self._df(self._jreader.csv(path))
         else:

http://git-wip-us.apache.org/repos/asf/spark/blob/79551f55/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index cdaaa17..6428235 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -131,13 +131,16 @@ class CSVOptions(
   val timeZone: TimeZone = DateTimeUtils.getTimeZone(
     parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
 
+  // A language tag in IETF BCP 47 format
+  val locale: Locale = 
parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)
+
   // Uses `FastDateFormat` which can be direct replacement for 
`SimpleDateFormat` and thread-safe.
   val dateFormat: FastDateFormat =
-    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", 
"yyyy-MM-dd"), Locale.US)
+    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", 
"yyyy-MM-dd"), locale)
 
   val timestampFormat: FastDateFormat =
     FastDateFormat.getInstance(
-      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), 
timeZone, Locale.US)
+      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), 
timeZone, locale)
 
   val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79551f55/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 64152e0..e10b8a3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -76,16 +76,19 @@ private[sql] class JSONOptions(
   // Whether to ignore column of all null values or empty array/struct during 
schema inference
   val dropFieldIfAllNull = 
parameters.get("dropFieldIfAllNull").map(_.toBoolean).getOrElse(false)
 
+  // A language tag in IETF BCP 47 format
+  val locale: Locale = 
parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)
+
   val timeZone: TimeZone = DateTimeUtils.getTimeZone(
     parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
 
   // Uses `FastDateFormat` which can be direct replacement for 
`SimpleDateFormat` and thread-safe.
   val dateFormat: FastDateFormat =
-    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", 
"yyyy-MM-dd"), Locale.US)
+    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", 
"yyyy-MM-dd"), locale)
 
   val timestampFormat: FastDateFormat =
     FastDateFormat.getInstance(
-      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), 
timeZone, Locale.US)
+      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), 
timeZone, locale)
 
   val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79551f55/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
index d006197..f5aaaec 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import java.util.Calendar
+import java.text.SimpleDateFormat
+import java.util.{Calendar, Locale}
 
 import org.scalatest.exceptions.TestFailedException
 
@@ -209,4 +210,20 @@ class CsvExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with P
       "2015-12-31T16:00:00"
     )
   }
+
+  test("parse date with locale") {
+    Seq("en-US", "ru-RU").foreach { langTag =>
+      val locale = Locale.forLanguageTag(langTag)
+      val date = new SimpleDateFormat("yyyy-MM-dd").parse("2018-11-05")
+      val schema = new StructType().add("d", DateType)
+      val dateFormat = "MMM yyyy"
+      val sdf = new SimpleDateFormat(dateFormat, locale)
+      val dateStr = sdf.format(date)
+      val options = Map("dateFormat" -> dateFormat, "locale" -> langTag)
+
+      checkEvaluation(
+        CsvToStructs(schema, options, Literal.create(dateStr), gmtId),
+        InternalRow(17836)) // number of days from 1970-01-01
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79551f55/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 3046421..6ee8c74 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import java.util.Calendar
+import java.text.SimpleDateFormat
+import java.util.{Calendar, Locale}
 
 import org.scalatest.exceptions.TestFailedException
 
@@ -737,4 +738,20 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
         CreateMap(Seq(Literal.create("allowNumericLeadingZeros"), 
Literal.create("true")))),
       "struct<col:bigint>")
   }
+
+  test("parse date with locale") {
+    Seq("en-US", "ru-RU").foreach { langTag =>
+      val locale = Locale.forLanguageTag(langTag)
+      val date = new SimpleDateFormat("yyyy-MM-dd").parse("2018-11-05")
+      val schema = new StructType().add("d", DateType)
+      val dateFormat = "MMM yyyy"
+      val sdf = new SimpleDateFormat(dateFormat, locale)
+      val dateStr = s"""{"d":"${sdf.format(date)}"}"""
+      val options = Map("dateFormat" -> dateFormat, "locale" -> langTag)
+
+      checkEvaluation(
+        JsonToStructs(schema, options, Literal.create(dateStr), gmtId),
+        InternalRow(17836)) // number of days from 1970-01-01
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79551f55/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 95c97e5..02ffc94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -384,6 +384,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * for schema inferring.</li>
    * <li>`dropFieldIfAllNull` (default `false`): whether to ignore column of 
all null values or
    * empty array/struct during schema inference.</li>
+   * <li>`locale` (default is `en-US`): sets a locale as language tag in IETF 
BCP 47 format.
+   * For instance, this is used while parsing dates and timestamps.</li>
    * </ul>
    *
    * @since 2.0.0
@@ -604,6 +606,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field 
having malformed string
    * created by `PERMISSIVE` mode. This overrides 
`spark.sql.columnNameOfCorruptRecord`.</li>
    * <li>`multiLine` (default `false`): parse one record, which may span 
multiple lines.</li>
+   * <li>`locale` (default is `en-US`): sets a locale as language tag in IETF 
BCP 47 format.
+   * For instance, this is used while parsing dates and timestamps.</li>
    * </ul>
    *
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/79551f55/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 4c7dced..20c8430 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -296,6 +296,8 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * that should be used for parsing.</li>
    * <li>`dropFieldIfAllNull` (default `false`): whether to ignore column of 
all null values or
    * empty array/struct during schema inference.</li>
+   * <li>`locale` (default is `en-US`): sets a locale as language tag in IETF 
BCP 47 format.
+   * For instance, this is used while parsing dates and timestamps.</li>
    * </ul>
    *
    * @since 2.0.0
@@ -372,6 +374,8 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field 
having malformed string
    * created by `PERMISSIVE` mode. This overrides 
`spark.sql.columnNameOfCorruptRecord`.</li>
    * <li>`multiLine` (default `false`): parse one record, which may span 
multiple lines.</li>
+   * <li>`locale` (default is `en-US`): sets a locale as language tag in IETF 
BCP 47 format.
+   * For instance, this is used while parsing dates and timestamps.</li>
    * </ul>
    *
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/79551f55/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index b97ac38..1c359ce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql
 
+import java.text.SimpleDateFormat
+import java.util.Locale
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.SparkException
@@ -164,4 +167,18 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSQLContext {
     val df1 = Seq(Tuple1(Tuple1(1))).toDF("a")
     checkAnswer(df1.selectExpr("to_csv(a)"), Row("1") :: Nil)
   }
+
+  test("parse timestamps with locale") {
+    Seq("en-US", "ko-KR", "zh-CN", "ru-RU").foreach { langTag =>
+      val locale = Locale.forLanguageTag(langTag)
+      val ts = new SimpleDateFormat("dd/MM/yyyy HH:mm").parse("06/11/2018 
18:00")
+      val timestampFormat = "dd MMM yyyy HH:mm"
+      val sdf = new SimpleDateFormat(timestampFormat, locale)
+      val input = Seq(s"""${sdf.format(ts)}""").toDS()
+      val options = Map("timestampFormat" -> timestampFormat, "locale" -> 
langTag)
+      val df = input.select(from_csv($"value", lit("time timestamp"), 
options.asJava))
+
+      checkAnswer(df, Row(Row(java.sql.Timestamp.valueOf("2018-11-06 
18:00:00.0"))))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79551f55/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index d6b7338..24e7564 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql
 
+import java.text.SimpleDateFormat
+import java.util.Locale
+
 import collection.JavaConverters._
 
 import org.apache.spark.SparkException
@@ -591,4 +594,18 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
       df.select(from_json($"value", schema, Map("columnNameOfCorruptRecord" -> 
"_unparsed"))),
       Row(Row(null, badRec, null)) :: Row(Row(2, null, 12)) :: Nil)
   }
+
+  test("parse timestamps with locale") {
+    Seq("en-US", "ko-KR", "zh-CN", "ru-RU").foreach { langTag =>
+      val locale = Locale.forLanguageTag(langTag)
+      val ts = new SimpleDateFormat("dd/MM/yyyy HH:mm").parse("06/11/2018 
18:00")
+      val timestampFormat = "dd MMM yyyy HH:mm"
+      val sdf = new SimpleDateFormat(timestampFormat, locale)
+      val input = Seq(s"""{"time": "${sdf.format(ts)}"}""").toDS()
+      val options = Map("timestampFormat" -> timestampFormat, "locale" -> 
langTag)
+      val df = input.select(from_json($"value", "time timestamp", options))
+
+      checkAnswer(df, Row(Row(java.sql.Timestamp.valueOf("2018-11-06 
18:00:00.0"))))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to