Repository: spark Updated Branches: refs/heads/master 6a9a85b84 -> 865b2fd84
[SPARK-18937][SQL] Timezone support in CSV/JSON parsing ## What changes were proposed in this pull request? This is a follow-up pr of #16308. This pr enables timezone support in CSV/JSON parsing. We should introduce `timeZone` option for CSV/JSON datasources (the default value of the option is session local timezone). The datasources should use the `timeZone` option to format/parse to write/read timestamp values. Notice that while reading, if the timestampFormat has the timezone info, the timezone will not be used because we should respect the timezone in the values. For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values written with the default timezone option, which is `"GMT"` because session local timezone is `"GMT"` here, are: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "GMT") scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts") df: org.apache.spark.sql.DataFrame = [ts: timestamp] scala> df.show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ scala> df.write.json("/path/to/gmtjson") ``` ```sh $ cat /path/to/gmtjson/part-* {"ts":"2016-01-01T00:00:00.000Z"} ``` whereas setting the option to `"PST"`, they are: ```scala scala> df.write.option("timeZone", "PST").json("/path/to/pstjson") ``` ```sh $ cat /path/to/pstjson/part-* {"ts":"2015-12-31T16:00:00.000-08:00"} ``` We can properly read these files even if the timezone option is wrong because the timestamp values have timezone info: ```scala scala> val schema = new StructType().add("ts", TimestampType) schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true)) scala> spark.read.schema(schema).json("/path/to/gmtjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ scala> spark.read.schema(schema).option("timeZone", "PST").json("/path/to/gmtjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ ``` And even if `timezoneFormat` doesn't contain timezone info, we can properly read the values with setting correct timezone option: ```scala scala> df.write.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson") ``` ```sh $ cat /path/to/jstjson/part-* {"ts":"2016-01-01T09:00:00"} ``` ```scala // wrong result scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").json("/path/to/jstjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 09:00:00| +-------------------+ // correct result scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ ``` This pr also makes `JsonToStruct` and `StructToJson` `TimeZoneAwareExpression` to be able to evaluate values with timezone option. ## How was this patch tested? Existing tests and added some tests. Author: Takuya UESHIN <ues...@happy-camper.st> Closes #16750 from ueshin/issues/SPARK-18937. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/865b2fd8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/865b2fd8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/865b2fd8 Branch: refs/heads/master Commit: 865b2fd84c6f82de147540c8f17bbe0f0d9fb69c Parents: 6a9a85b Author: Takuya UESHIN <ues...@happy-camper.st> Authored: Wed Feb 15 13:26:34 2017 -0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Feb 15 13:26:34 2017 -0800 ---------------------------------------------------------------------- python/pyspark/sql/readwriter.py | 43 ++++--- python/pyspark/sql/streaming.py | 20 ++-- .../catalyst/expressions/jsonExpressions.scala | 30 ++++- .../spark/sql/catalyst/json/JSONOptions.scala | 11 +- .../sql/catalyst/json/JacksonGenerator.scala | 2 +- .../expressions/JsonExpressionsSuite.scala | 113 ++++++++++++++++--- .../org/apache/spark/sql/DataFrameReader.scala | 8 +- .../org/apache/spark/sql/DataFrameWriter.scala | 4 + .../scala/org/apache/spark/sql/Dataset.scala | 6 +- .../datasources/csv/CSVFileFormat.scala | 8 +- .../execution/datasources/csv/CSVOptions.scala | 21 ++-- .../datasources/csv/UnivocityGenerator.scala | 2 +- .../datasources/csv/UnivocityParser.scala | 2 +- .../datasources/json/JsonFileFormat.scala | 9 +- .../spark/sql/streaming/DataStreamReader.scala | 4 + .../datasources/csv/CSVInferSchemaSuite.scala | 22 ++-- .../execution/datasources/csv/CSVSuite.scala | 44 +++++++- .../datasources/csv/UnivocityParserSuite.scala | 73 +++++++----- .../execution/datasources/json/JsonSuite.scala | 46 +++++++- .../sql/sources/ResolvedDataSourceSuite.scala | 6 +- 20 files changed, 351 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index d31f3fb..1678334 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -158,7 +158,8 @@ class DataFrameReader(OptionUtils): def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, - mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None): + mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, + timeZone=None): """ Loads a JSON file (`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per @@ -204,11 +205,13 @@ class DataFrameReader(OptionUtils): :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the - default value value, ``yyyy-MM-dd``. + default value, ``yyyy-MM-dd``. :param timestampFormat: sets the string that indicates a timestamp format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the - default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. + If None is set, it uses the default value, session local timezone. >>> df1 = spark.read.json('python/test_support/sql/people.json') >>> df1.dtypes @@ -225,7 +228,7 @@ class DataFrameReader(OptionUtils): allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, - timestampFormat=timestampFormat) + timestampFormat=timestampFormat, timeZone=timeZone) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -298,7 +301,7 @@ class DataFrameReader(OptionUtils): comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, - maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None): + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None): """Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -341,11 +344,11 @@ class DataFrameReader(OptionUtils): :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the - default value value, ``yyyy-MM-dd``. + default value, ``yyyy-MM-dd``. :param timestampFormat: sets the string that indicates a timestamp format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the - default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. :param maxColumns: defines a hard limit of how many columns a record can have. If None is set, it uses the default value, ``20480``. :param maxCharsPerColumn: defines the maximum number of characters allowed for any given @@ -357,6 +360,8 @@ class DataFrameReader(OptionUtils): uses the default value, ``10``. :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. + :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. + If None is set, it uses the default value, session local timezone. * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. When a schema is set by user, it sets ``null`` for extra fields. @@ -374,7 +379,7 @@ class DataFrameReader(OptionUtils): nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, - maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) + maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone) if isinstance(path, basestring): path = [path] return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) @@ -591,7 +596,8 @@ class DataFrameWriter(OptionUtils): self._jwrite.saveAsTable(name) @since(1.4) - def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None): + def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None, + timeZone=None): """Saves the content of the :class:`DataFrame` in JSON format at the specified path. :param path: the path in any Hadoop supported file system @@ -607,17 +613,20 @@ class DataFrameWriter(OptionUtils): :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the - default value value, ``yyyy-MM-dd``. + default value, ``yyyy-MM-dd``. :param timestampFormat: sets the string that indicates a timestamp format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the - default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + :param timeZone: sets the string that indicates a timezone to be used to format timestamps. + If None is set, it uses the default value, session local timezone. >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) self._set_opts( - compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat) + compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat, + timeZone=timeZone) self._jwrite.json(path) @since(1.4) @@ -664,7 +673,7 @@ class DataFrameWriter(OptionUtils): @since(2.0) def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, - timestampFormat=None): + timestampFormat=None, timeZone=None): """Saves the content of the :class:`DataFrame` in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -699,18 +708,20 @@ class DataFrameWriter(OptionUtils): :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the - default value value, ``yyyy-MM-dd``. + default value, ``yyyy-MM-dd``. :param timestampFormat: sets the string that indicates a timestamp format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the - default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. + If None is set, it uses the default value, session local timezone. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header, nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll, - dateFormat=dateFormat, timestampFormat=timestampFormat) + dateFormat=dateFormat, timestampFormat=timestampFormat, timeZone=timeZone) self._jwrite.csv(path) @since(1.5) http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/python/pyspark/sql/streaming.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index a10b185..d988e59 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -429,7 +429,7 @@ class DataStreamReader(OptionUtils): allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, - timestampFormat=None): + timestampFormat=None, timeZone=None): """ Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) and returns a :class`DataFrame`. @@ -476,11 +476,13 @@ class DataStreamReader(OptionUtils): :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the - default value value, ``yyyy-MM-dd``. + default value, ``yyyy-MM-dd``. :param timestampFormat: sets the string that indicates a timestamp format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the - default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. + If None is set, it uses the default value, session local timezone. >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema) >>> json_sdf.isStreaming @@ -494,7 +496,7 @@ class DataStreamReader(OptionUtils): allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, - timestampFormat=timestampFormat) + timestampFormat=timestampFormat, timeZone=timeZone) if isinstance(path, basestring): return self._df(self._jreader.json(path)) else: @@ -552,7 +554,7 @@ class DataStreamReader(OptionUtils): comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, - maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None): + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -597,11 +599,11 @@ class DataStreamReader(OptionUtils): :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to date type. If None is set, it uses the - default value value, ``yyyy-MM-dd``. + default value, ``yyyy-MM-dd``. :param timestampFormat: sets the string that indicates a timestamp format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the - default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. :param maxColumns: defines a hard limit of how many columns a record can have. If None is set, it uses the default value, ``20480``. :param maxCharsPerColumn: defines the maximum number of characters allowed for any given @@ -609,6 +611,8 @@ class DataStreamReader(OptionUtils): ``-1`` meaning unlimited length. :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. + :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. + If None is set, it uses the default value, session local timezone. * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. When a schema is set by user, it sets ``null`` for extra fields. @@ -628,7 +632,7 @@ class DataStreamReader(OptionUtils): nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, - maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) + maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) else: http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index c410e79..bd852a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -482,19 +482,29 @@ case class JsonTuple(children: Seq[Expression]) /** * Converts an json input string to a [[StructType]] with the specified schema. */ -case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression) - extends UnaryExpression with CodegenFallback with ExpectsInputTypes { +case class JsonToStruct( + schema: StructType, + options: Map[String, String], + child: Expression, + timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { override def nullable: Boolean = true + def this(schema: StructType, options: Map[String, String], child: Expression) = + this(schema, options, child, None) + @transient lazy val parser = new JacksonParser( schema, "invalid", // Not used since we force fail fast. Invalid rows will be set to `null`. - new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE))) + new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get)) override def dataType: DataType = schema + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) + override def nullSafeEval(json: Any): Any = { try parser.parse(json.toString).headOption.orNull catch { case _: SparkSQLJsonProcessingException => null @@ -507,10 +517,15 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child: /** * Converts a [[StructType]] to a json output string. */ -case class StructToJson(options: Map[String, String], child: Expression) - extends UnaryExpression with CodegenFallback with ExpectsInputTypes { +case class StructToJson( + options: Map[String, String], + child: Expression, + timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { override def nullable: Boolean = true + def this(options: Map[String, String], child: Expression) = this(options, child, None) + @transient lazy val writer = new CharArrayWriter() @@ -519,7 +534,7 @@ case class StructToJson(options: Map[String, String], child: Expression) new JacksonGenerator( child.dataType.asInstanceOf[StructType], writer, - new JSONOptions(options)) + new JSONOptions(options, timeZoneId.get)) override def dataType: DataType = StringType @@ -538,6 +553,9 @@ case class StructToJson(options: Map[String, String], child: Expression) } } + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) + override def nullSafeEval(row: Any): Any = { gen.write(row.asInstanceOf[InternalRow]) gen.flush() http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 02bd8de..5307ce1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.json -import java.util.Locale +import java.util.{Locale, TimeZone} import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.commons.lang3.time.FastDateFormat @@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]]. */ private[sql] class JSONOptions( - @transient private val parameters: CaseInsensitiveMap[String]) + @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String) extends Logging with Serializable { - def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String], defaultTimeZoneId: String) = + this(CaseInsensitiveMap(parameters), defaultTimeZoneId) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) @@ -58,13 +59,15 @@ private[sql] class JSONOptions( private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord") + val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId)) + // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. val dateFormat: FastDateFormat = FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US) val timestampFormat: FastDateFormat = FastDateFormat.getInstance( - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), Locale.US) + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US) // Parse mode flags if (!ParseModes.isValidMode(parseMode)) { http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index bf8e3c8..dec5527 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.types._ private[sql] class JacksonGenerator( schema: StructType, writer: Writer, - options: JSONOptions = new JSONOptions(Map.empty[String, String])) { + options: JSONOptions) { // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`. http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 8e20bd1..0c46819 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.catalyst.expressions +import java.util.Calendar + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.ParseModes -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes} +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType} import org.apache.spark.unsafe.types.UTF8String class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -305,51 +307,53 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("json_tuple - hive key 4 - null json") { checkJsonTuple( JsonTuple(Literal(null) :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - hive key 5 - null and empty fields") { checkJsonTuple( JsonTuple(Literal("""{"f1": "", "f5": null}""") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(UTF8String.fromString(""), null, null, null, null))) + InternalRow(UTF8String.fromString(""), null, null, null, null)) } test("json_tuple - hive key 6 - invalid json (array)") { checkJsonTuple( JsonTuple(Literal("[invalid JSON string]") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - invalid json (object start only)") { checkJsonTuple( JsonTuple(Literal("{") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - invalid json (no object end)") { checkJsonTuple( JsonTuple(Literal("""{"foo": "bar"""") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - invalid json (invalid json)") { checkJsonTuple( JsonTuple(Literal("\\") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - preserve newlines") { checkJsonTuple( JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil), - InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc")))) + InternalRow(UTF8String.fromString("b\nc"))) } + val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID) + test("from_json") { val jsonData = """{"a": 1}""" val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData)), - InternalRow.fromSeq(1 :: Nil) + JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), + InternalRow(1) ) } @@ -357,13 +361,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val jsonData = """{"a" 1}""" val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData)), + JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), null ) // Other modes should still return `null`. checkEvaluation( - JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)), + JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId), null ) } @@ -371,15 +375,58 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("from_json null input column") { val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal.create(null, StringType)), + JsonToStruct(schema, Map.empty, Literal.create(null, StringType), gmtId), null ) } + test("from_json with timestamp") { + val schema = StructType(StructField("t", TimestampType) :: Nil) + + val jsonData1 = """{"t": "2016-01-01T00:00:00.123Z"}""" + var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) + c.set(2016, 0, 1, 0, 0, 0) + c.set(Calendar.MILLISECOND, 123) + checkEvaluation( + JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId), + InternalRow(c.getTimeInMillis * 1000L) + ) + // The result doesn't change because the json string includes timezone string ("Z" here), + // which means the string represents the timestamp string in the timezone regardless of + // the timeZoneId parameter. + checkEvaluation( + JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")), + InternalRow(c.getTimeInMillis * 1000L) + ) + + val jsonData2 = """{"t": "2016-01-01T00:00:00"}""" + for (tz <- DateTimeTestUtils.ALL_TIMEZONES) { + c = Calendar.getInstance(tz) + c.set(2016, 0, 1, 0, 0, 0) + c.set(Calendar.MILLISECOND, 0) + checkEvaluation( + JsonToStruct( + schema, + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"), + Literal(jsonData2), + Option(tz.getID)), + InternalRow(c.getTimeInMillis * 1000L) + ) + checkEvaluation( + JsonToStruct( + schema, + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> tz.getID), + Literal(jsonData2), + gmtId), + InternalRow(c.getTimeInMillis * 1000L) + ) + } + } + test("SPARK-19543: from_json empty input column") { val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal.create(" ", StringType)), + JsonToStruct(schema, Map.empty, Literal.create(" ", StringType), gmtId), null ) } @@ -388,7 +435,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val schema = StructType(StructField("a", IntegerType) :: Nil) val struct = Literal.create(create_row(1), schema) checkEvaluation( - StructToJson(Map.empty, struct), + StructToJson(Map.empty, struct, gmtId), """{"a":1}""" ) } @@ -397,8 +444,40 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val schema = StructType(StructField("a", IntegerType) :: Nil) val struct = Literal.create(null, schema) checkEvaluation( - StructToJson(Map.empty, struct), + StructToJson(Map.empty, struct, gmtId), null ) } + + test("to_json with timestamp") { + val schema = StructType(StructField("t", TimestampType) :: Nil) + val c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) + c.set(2016, 0, 1, 0, 0, 0) + c.set(Calendar.MILLISECOND, 0) + val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema) + + checkEvaluation( + StructToJson(Map.empty, struct, gmtId), + """{"t":"2016-01-01T00:00:00.000Z"}""" + ) + checkEvaluation( + StructToJson(Map.empty, struct, Option("PST")), + """{"t":"2015-12-31T16:00:00.000-08:00"}""" + ) + + checkEvaluation( + StructToJson( + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> gmtId.get), + struct, + gmtId), + """{"t":"2016-01-01T00:00:00"}""" + ) + checkEvaluation( + StructToJson( + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> "PST"), + struct, + gmtId), + """{"t":"2015-12-31T16:00:00"}""" + ) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 1830839..780fe51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -27,6 +27,7 @@ import org.apache.spark.Partition import org.apache.spark.annotation.InterfaceStability import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource @@ -298,6 +299,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> + * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.</li> * </ul> * * @since 2.0.0 @@ -329,7 +332,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def json(jsonRDD: RDD[String]): DataFrame = { - val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap) + val parsedOptions: JSONOptions = + new JSONOptions(extraOptions.toMap, sparkSession.sessionState.conf.sessionLocalTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) @@ -401,6 +405,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> + * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.</li> * <li>`maxColumns` (default `20480`): defines a hard limit of how many columns * a record can have.</li> * <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 748ebba..1d834b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -456,6 +456,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> + * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to format timestamps.</li> * </ul> * * @since 1.4.0 @@ -562,6 +564,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> + * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to format timestamps.</li> * </ul> * * @since 2.0.0 http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6b80ff4..e62cd9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.json.JacksonGenerator +import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans._ @@ -2678,10 +2678,12 @@ class Dataset[T] private[sql]( */ def toJSON: Dataset[String] = { val rowSchema = this.schema + val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter => val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records - val gen = new JacksonGenerator(rowSchema, writer) + val gen = new JacksonGenerator(rowSchema, writer, + new JSONOptions(Map.empty[String, String], sessionLocalTimeZone)) new Iterator[String] { override def hasNext: Boolean = iter.hasNext http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 1d2bf07..566f40f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -29,7 +29,7 @@ import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.CompressionCodecs +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.sources._ @@ -55,7 +55,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { files: Seq[FileStatus]): Option[StructType] = { require(files.nonEmpty, "Cannot infer schema from an empty set of files") - val csvOptions = new CSVOptions(options) + val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val paths = files.map(_.getPath.toString) val lines: Dataset[String] = createBaseDataset(sparkSession, csvOptions, paths) val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis @@ -69,7 +69,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { dataSchema: StructType): OutputWriterFactory = { CSVUtils.verifySchema(dataSchema) val conf = job.getConfiguration - val csvOptions = new CSVOptions(options) + val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) csvOptions.compressionCodec.foreach { codec => CompressionCodecs.setCodecConfiguration(conf, codec) } @@ -96,7 +96,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - val csvOptions = new CSVOptions(options) + val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 9d79ea6..b7fbaa4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.StandardCharsets -import java.util.Locale +import java.util.{Locale, TimeZone} import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling} import org.apache.commons.lang3.time.FastDateFormat @@ -26,10 +26,12 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes} -private[csv] class CSVOptions(@transient private val parameters: CaseInsensitiveMap[String]) +private[csv] class CSVOptions( + @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String) extends Logging with Serializable { - def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String], defaultTimeZoneId: String) = + this(CaseInsensitiveMap(parameters), defaultTimeZoneId) private def getChar(paramName: String, default: Char): Char = { val paramValue = parameters.get(paramName) @@ -106,13 +108,15 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive name.map(CompressionCodecs.getCodecClassName) } + val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId)) + // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. val dateFormat: FastDateFormat = FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US) val timestampFormat: FastDateFormat = FastDateFormat.getInstance( - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), Locale.US) + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US) val maxColumns = getInt("maxColumns", 20480) @@ -161,12 +165,3 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive settings } } - -object CSVOptions { - - def apply(): CSVOptions = new CSVOptions(CaseInsensitiveMap(Map.empty)) - - def apply(paramName: String, paramValue: String): CSVOptions = { - new CSVOptions(Map(paramName -> paramValue)) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala index ee79138..4082a0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types._ private[csv] class UnivocityGenerator( schema: StructType, writer: Writer, - options: CSVOptions = new CSVOptions(Map.empty[String, String])) { + options: CSVOptions) { private val writerSettings = options.asWriterSettings writerSettings.setHeaders(schema.fieldNames: _*) private val gen = new CsvWriter(writer, writerSettings) http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 3b42aa6..2e409b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -76,7 +76,7 @@ private[csv] class UnivocityParser( name: String, dataType: DataType, nullable: Boolean = true, - options: CSVOptions = CSVOptions()): ValueConverter = dataType match { + options: CSVOptions): ValueConverter = dataType match { case _: ByteType => (d: String) => nullSafeDatum(d, name, nullable, options)(_.toByte) http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 98ab9d2..b4a8ff2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -47,7 +47,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { if (files.isEmpty) { None } else { - val parsedOptions: JSONOptions = new JSONOptions(options) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) @@ -67,7 +68,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { val conf = job.getConfiguration - val parsedOptions: JSONOptions = new JSONOptions(options) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) parsedOptions.compressionCodec.foreach { codec => CompressionCodecs.setCodecConfiguration(conf, codec) } @@ -97,7 +99,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val parsedOptions: JSONOptions = new JSONOptions(options) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index b7ffb3c..4e706da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -181,6 +181,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> + * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.</li> * </ul> * * @since 2.0.0 @@ -230,6 +232,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> + * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.</li> * <li>`maxColumns` (default `20480`): defines a hard limit of how many columns * a record can have.</li> * <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index d8c6c25..6617420 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.types._ class CSVInferSchemaSuite extends SparkFunSuite { test("String fields types are inferred correctly from null types") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(CSVInferSchema.inferField(NullType, "", options) == NullType) assert(CSVInferSchema.inferField(NullType, null, options) == NullType) assert(CSVInferSchema.inferField(NullType, "100000000000", options) == LongType) @@ -41,7 +41,7 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("String fields types are inferred correctly from other types") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(CSVInferSchema.inferField(LongType, "1.0", options) == DoubleType) assert(CSVInferSchema.inferField(LongType, "test", options) == StringType) assert(CSVInferSchema.inferField(IntegerType, "1.0", options) == DoubleType) @@ -60,21 +60,21 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("Timestamp field types are inferred correctly via custom data format") { - var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm")) + var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT") assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) - options = new CSVOptions(Map("timestampFormat" -> "yyyy")) + options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT") assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType) } test("Timestamp field types are inferred correctly from other types") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType) assert(CSVInferSchema.inferField(DoubleType, "2015-08-20 14:10", options) == StringType) assert(CSVInferSchema.inferField(LongType, "2015-08 14:49:00", options) == StringType) } test("Boolean fields types are inferred correctly from other types") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(CSVInferSchema.inferField(LongType, "Fale", options) == StringType) assert(CSVInferSchema.inferField(DoubleType, "TRUEe", options) == StringType) } @@ -92,12 +92,12 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("Null fields are handled properly when a nullValue is specified") { - var options = new CSVOptions(Map("nullValue" -> "null")) + var options = new CSVOptions(Map("nullValue" -> "null"), "GMT") assert(CSVInferSchema.inferField(NullType, "null", options) == NullType) assert(CSVInferSchema.inferField(StringType, "null", options) == StringType) assert(CSVInferSchema.inferField(LongType, "null", options) == LongType) - options = new CSVOptions(Map("nullValue" -> "\\N")) + options = new CSVOptions(Map("nullValue" -> "\\N"), "GMT") assert(CSVInferSchema.inferField(IntegerType, "\\N", options) == IntegerType) assert(CSVInferSchema.inferField(DoubleType, "\\N", options) == DoubleType) assert(CSVInferSchema.inferField(TimestampType, "\\N", options) == TimestampType) @@ -111,12 +111,12 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { - val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm")) + val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), "GMT") assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) } test("SPARK-18877: `inferField` on DecimalType should find a common type with `typeSoFar`") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map.empty[String, String], "GMT") // 9.03E+12 is Decimal(3, -10) and 1.19E+11 is Decimal(3, -9). assert(CSVInferSchema.inferField(DecimalType(3, -10), "1.19E+11", options) == @@ -134,7 +134,7 @@ class CSVInferSchemaSuite extends SparkFunSuite { test("DoubleType should be infered when user defined nan/inf are provided") { val options = new CSVOptions(Map("nanValue" -> "nan", "negativeInf" -> "-inf", - "positiveInf" -> "inf")) + "positiveInf" -> "inf"), "GMT") assert(CSVInferSchema.inferField(NullType, "nan", options) == DoubleType) assert(CSVInferSchema.inferField(NullType, "inf", options) == DoubleType) assert(CSVInferSchema.inferField(NullType, "-inf", options) == DoubleType) http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index df9cebb..0c9a729 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -839,7 +839,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } - test("Write timestamps correctly with dateFormat option") { + test("Write timestamps correctly with timestampFormat option") { withTempDir { dir => // With dateFormat option. val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv" @@ -870,6 +870,48 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("Write timestamps correctly with timestampFormat option and timeZone option") { + withTempDir { dir => + // With dateFormat option and timeZone option. + val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv" + val timestampsWithFormat = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "true") + .option("timestampFormat", "dd/MM/yyyy HH:mm") + .load(testFile(datesFile)) + timestampsWithFormat.write + .format("csv") + .option("header", "true") + .option("timestampFormat", "yyyy/MM/dd HH:mm") + .option("timeZone", "GMT") + .save(timestampsWithFormatPath) + + // This will load back the timestamps as string. + val stringTimestampsWithFormat = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "false") + .load(timestampsWithFormatPath) + val expectedStringTimestampsWithFormat = Seq( + Row("2015/08/27 01:00"), + Row("2014/10/28 01:30"), + Row("2016/01/29 04:00")) + + checkAnswer(stringTimestampsWithFormat, expectedStringTimestampsWithFormat) + + val readBack = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "true") + .option("timestampFormat", "yyyy/MM/dd HH:mm") + .option("timeZone", "GMT") + .load(timestampsWithFormatPath) + + checkAnswer(readBack, timestampsWithFormat) + } + } + test("load duplicated field names consistently with null or empty strings - case sensitive") { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { withTempPath { path => http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala index 62dae08..a74b22a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.unsafe.types.UTF8String class UnivocityParserSuite extends SparkFunSuite { private val parser = - new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map.empty[String, String])) + new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map.empty[String, String], "GMT")) private def assertNull(v: Any) = assert(v == null) @@ -38,7 +38,8 @@ class UnivocityParserSuite extends SparkFunSuite { stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) => val decimalValue = new BigDecimal(decimalVal.toString) - assert(parser.makeConverter("_1", decimalType).apply(strVal) === + val options = new CSVOptions(Map.empty[String, String], "GMT") + assert(parser.makeConverter("_1", decimalType, options = options).apply(strVal) === Decimal(decimalValue, decimalType.precision, decimalType.scale)) } } @@ -50,20 +51,23 @@ class UnivocityParserSuite extends SparkFunSuite { // Nullable field with nullValue option. types.foreach { t => // Tests that a custom nullValue. + val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), "GMT") val converter = - parser.makeConverter("_1", t, nullable = true, CSVOptions("nullValue", "-")) + parser.makeConverter("_1", t, nullable = true, options = nullValueOptions) assertNull(converter.apply("-")) assertNull(converter.apply(null)) // Tests that the default nullValue is empty string. - assertNull(parser.makeConverter("_1", t, nullable = true).apply("")) + val options = new CSVOptions(Map.empty[String, String], "GMT") + assertNull(parser.makeConverter("_1", t, nullable = true, options = options).apply("")) } // Not nullable field with nullValue option. types.foreach { t => // Casts a null to not nullable field should throw an exception. + val options = new CSVOptions(Map("nullValue" -> "-"), "GMT") val converter = - parser.makeConverter("_1", t, nullable = false, CSVOptions("nullValue", "-")) + parser.makeConverter("_1", t, nullable = false, options = options) var message = intercept[RuntimeException] { converter.apply("-") }.getMessage @@ -77,48 +81,52 @@ class UnivocityParserSuite extends SparkFunSuite { // If nullValue is different with empty string, then, empty string should not be casted into // null. Seq(true, false).foreach { b => + val options = new CSVOptions(Map("nullValue" -> "null"), "GMT") val converter = - parser.makeConverter("_1", StringType, nullable = b, CSVOptions("nullValue", "null")) + parser.makeConverter("_1", StringType, nullable = b, options = options) assert(converter.apply("") == UTF8String.fromString("")) } } test("Throws exception for empty string with non null type") { + val options = new CSVOptions(Map.empty[String, String], "GMT") val exception = intercept[RuntimeException]{ - parser.makeConverter("_1", IntegerType, nullable = false, CSVOptions()).apply("") + parser.makeConverter("_1", IntegerType, nullable = false, options = options).apply("") } assert(exception.getMessage.contains("null value found but field _1 is not nullable.")) } test("Types are cast correctly") { - assert(parser.makeConverter("_1", ByteType).apply("10") == 10) - assert(parser.makeConverter("_1", ShortType).apply("10") == 10) - assert(parser.makeConverter("_1", IntegerType).apply("10") == 10) - assert(parser.makeConverter("_1", LongType).apply("10") == 10) - assert(parser.makeConverter("_1", FloatType).apply("1.00") == 1.0) - assert(parser.makeConverter("_1", DoubleType).apply("1.00") == 1.0) - assert(parser.makeConverter("_1", BooleanType).apply("true") == true) - - val timestampsOptions = CSVOptions("timestampFormat", "dd/MM/yyyy hh:mm") + val options = new CSVOptions(Map.empty[String, String], "GMT") + assert(parser.makeConverter("_1", ByteType, options = options).apply("10") == 10) + assert(parser.makeConverter("_1", ShortType, options = options).apply("10") == 10) + assert(parser.makeConverter("_1", IntegerType, options = options).apply("10") == 10) + assert(parser.makeConverter("_1", LongType, options = options).apply("10") == 10) + assert(parser.makeConverter("_1", FloatType, options = options).apply("1.00") == 1.0) + assert(parser.makeConverter("_1", DoubleType, options = options).apply("1.00") == 1.0) + assert(parser.makeConverter("_1", BooleanType, options = options).apply("true") == true) + + val timestampsOptions = + new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), "GMT") val customTimestamp = "31/01/2015 00:00" val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime val castedTimestamp = - parser.makeConverter("_1", TimestampType, nullable = true, timestampsOptions) + parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions) .apply(customTimestamp) assert(castedTimestamp == expectedTime * 1000L) val customDate = "31/01/2015" - val dateOptions = CSVOptions("dateFormat", "dd/MM/yyyy") + val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), "GMT") val expectedDate = dateOptions.dateFormat.parse(customDate).getTime val castedDate = - parser.makeConverter("_1", DateType, nullable = true, dateOptions) + parser.makeConverter("_1", DateType, nullable = true, options = dateOptions) .apply(customTimestamp) assert(castedDate == DateTimeUtils.millisToDays(expectedDate)) val timestamp = "2015-01-01 00:00:00" - assert(parser.makeConverter("_1", TimestampType).apply(timestamp) == + assert(parser.makeConverter("_1", TimestampType, options = options).apply(timestamp) == DateTimeUtils.stringToTime(timestamp).getTime * 1000L) - assert(parser.makeConverter("_1", DateType).apply("2015-01-01") == + assert(parser.makeConverter("_1", DateType, options = options).apply("2015-01-01") == DateTimeUtils.millisToDays(DateTimeUtils.stringToTime("2015-01-01").getTime)) } @@ -127,16 +135,18 @@ class UnivocityParserSuite extends SparkFunSuite { try { Locale.setDefault(new Locale("fr", "FR")) // Would parse as 1.0 in fr-FR - assert(parser.makeConverter("_1", FloatType).apply("1,00") == 100.0) - assert(parser.makeConverter("_1", DoubleType).apply("1,00") == 100.0) + val options = new CSVOptions(Map.empty[String, String], "GMT") + assert(parser.makeConverter("_1", FloatType, options = options).apply("1,00") == 100.0) + assert(parser.makeConverter("_1", DoubleType, options = options).apply("1,00") == 100.0) } finally { Locale.setDefault(originalLocale) } } test("Float NaN values are parsed correctly") { + val options = new CSVOptions(Map("nanValue" -> "nn"), "GMT") val floatVal: Float = parser.makeConverter( - "_1", FloatType, nullable = true, CSVOptions("nanValue", "nn") + "_1", FloatType, nullable = true, options = options ).apply("nn").asInstanceOf[Float] // Java implements the IEEE-754 floating point standard which guarantees that any comparison @@ -145,36 +155,41 @@ class UnivocityParserSuite extends SparkFunSuite { } test("Double NaN values are parsed correctly") { + val options = new CSVOptions(Map("nanValue" -> "-"), "GMT") val doubleVal: Double = parser.makeConverter( - "_1", DoubleType, nullable = true, CSVOptions("nanValue", "-") + "_1", DoubleType, nullable = true, options = options ).apply("-").asInstanceOf[Double] assert(doubleVal.isNaN) } test("Float infinite values can be parsed") { + val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT") val floatVal1 = parser.makeConverter( - "_1", FloatType, nullable = true, CSVOptions("negativeInf", "max") + "_1", FloatType, nullable = true, options = negativeInfOptions ).apply("max").asInstanceOf[Float] assert(floatVal1 == Float.NegativeInfinity) + val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT") val floatVal2 = parser.makeConverter( - "_1", FloatType, nullable = true, CSVOptions("positiveInf", "max") + "_1", FloatType, nullable = true, options = positiveInfOptions ).apply("max").asInstanceOf[Float] assert(floatVal2 == Float.PositiveInfinity) } test("Double infinite values can be parsed") { + val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT") val doubleVal1 = parser.makeConverter( - "_1", DoubleType, nullable = true, CSVOptions("negativeInf", "max") + "_1", DoubleType, nullable = true, options = negativeInfOptions ).apply("max").asInstanceOf[Double] assert(doubleVal1 == Double.NegativeInfinity) + val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT") val doubleVal2 = parser.makeConverter( - "_1", DoubleType, nullable = true, CSVOptions("positiveInf", "max") + "_1", DoubleType, nullable = true, options = positiveInfOptions ).apply("max").asInstanceOf[Double] assert(doubleVal2 == Double.PositiveInfinity) http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 156fd96..9344aed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -62,7 +62,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { generator.flush() } - val dummyOption = new JSONOptions(Map.empty[String, String]) + val dummyOption = new JSONOptions(Map.empty[String, String], "GMT") val dummySchema = StructType(Seq.empty) val parser = new JacksonParser(dummySchema, "", dummyOption) @@ -1366,7 +1366,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-6245 JsonRDD.inferSchema on empty RDD") { // This is really a test that it doesn't throw an exception - val emptySchema = JsonInferSchema.infer(empty, "", new JSONOptions(Map.empty[String, String])) + val emptySchema = JsonInferSchema.infer( + empty, "", new JSONOptions(Map.empty[String, String], "GMT")) assert(StructType(Seq()) === emptySchema) } @@ -1391,7 +1392,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-8093 Erase empty structs") { val emptySchema = JsonInferSchema.infer( - emptyRecords, "", new JSONOptions(Map.empty[String, String])) + emptyRecords, "", new JSONOptions(Map.empty[String, String], "GMT")) assert(StructType(Seq()) === emptySchema) } @@ -1723,7 +1724,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - test("Write timestamps correctly with dateFormat option") { + test("Write timestamps correctly with timestampFormat option") { val customSchema = new StructType(Array(StructField("date", TimestampType, true))) withTempDir { dir => // With dateFormat option. @@ -1751,6 +1752,43 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + test("Write timestamps correctly with timestampFormat option and timeZone option") { + val customSchema = new StructType(Array(StructField("date", TimestampType, true))) + withTempDir { dir => + // With dateFormat option and timeZone option. + val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json" + val timestampsWithFormat = spark.read + .schema(customSchema) + .option("timestampFormat", "dd/MM/yyyy HH:mm") + .json(datesRecords) + timestampsWithFormat.write + .format("json") + .option("timestampFormat", "yyyy/MM/dd HH:mm") + .option("timeZone", "GMT") + .save(timestampsWithFormatPath) + + // This will load back the timestamps as string. + val stringSchema = StructType(StructField("date", StringType, true) :: Nil) + val stringTimestampsWithFormat = spark.read + .schema(stringSchema) + .json(timestampsWithFormatPath) + val expectedStringDatesWithFormat = Seq( + Row("2015/08/27 01:00"), + Row("2014/10/28 01:30"), + Row("2016/01/29 04:00")) + + checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat) + + val readBack = spark.read + .schema(customSchema) + .option("timestampFormat", "yyyy/MM/dd HH:mm") + .option("timeZone", "GMT") + .json(timestampsWithFormatPath) + + checkAnswer(readBack, timestampsWithFormat) + } + } + test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { val records = sparkContext .parallelize("""{"a": 3, "b": 1.1}""" :: """{"a": 3.1, "b": 0.000001}""" :: Nil) http://git-wip-us.apache.org/repos/asf/spark/blob/865b2fd8/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 76ffb94..9b5e364e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -19,11 +19,15 @@ package org.apache.spark.sql.sources import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.DataSource class ResolvedDataSourceSuite extends SparkFunSuite { private def getProvidingClass(name: String): Class[_] = - DataSource(sparkSession = null, className = name).providingClass + DataSource( + sparkSession = null, + className = name, + options = Map("timeZone" -> DateTimeUtils.defaultTimeZone().getID)).providingClass test("jdbc") { assert( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org