Repository: spark Updated Branches: refs/heads/master ba186a841 -> 2a7921a81
[SPARK-18939][SQL] Timezone support in partition values. ## What changes were proposed in this pull request? This is a follow-up pr of #16308 and #16750. This pr enables timezone support in partition values. We should use `timeZone` option introduced at #16750 to parse/format partition values of the `TimestampType`. For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT` which will be used for partition values, the values written by the default timezone option, which is `"GMT"` because the session local timezone is `"GMT"` here, are: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "GMT") scala> val df = Seq((1, new java.sql.Timestamp(1451606400000L))).toDF("i", "ts") df: org.apache.spark.sql.DataFrame = [i: int, ts: timestamp] scala> df.show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ scala> df.write.partitionBy("ts").save("/path/to/gmtpartition") ``` ```sh $ ls /path/to/gmtpartition/ _SUCCESS ts=2016-01-01 00%3A00%3A00 ``` whereas setting the option to `"PST"`, they are: ```scala scala> df.write.option("timeZone", "PST").partitionBy("ts").save("/path/to/pstpartition") ``` ```sh $ ls /path/to/pstpartition/ _SUCCESS ts=2015-12-31 16%3A00%3A00 ``` We can properly read the partition values if the session local timezone and the timezone of the partition values are the same: ```scala scala> spark.read.load("/path/to/gmtpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ ``` And even if the timezones are different, we can properly read the values with setting corrent timezone option: ```scala // wrong result scala> spark.read.load("/path/to/pstpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2015-12-31 16:00:00| +---+-------------------+ // correct result scala> spark.read.option("timeZone", "PST").load("/path/to/pstpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ ``` ## How was this patch tested? Existing tests and added some tests. Author: Takuya UESHIN <ues...@happy-camper.st> Closes #17053 from ueshin/issues/SPARK-18939. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a7921a8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a7921a8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a7921a8 Branch: refs/heads/master Commit: 2a7921a813ecd847fd933ffef10edc64684e9df7 Parents: ba186a8 Author: Takuya UESHIN <ues...@happy-camper.st> Authored: Fri Mar 3 16:35:54 2017 -0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Fri Mar 3 16:35:54 2017 -0800 ---------------------------------------------------------------------- .../sql/catalyst/catalog/ExternalCatalog.scala | 4 +- .../sql/catalyst/catalog/InMemoryCatalog.scala | 3 +- .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../spark/sql/catalyst/catalog/interface.scala | 10 ++-- .../execution/OptimizeMetadataOnlyQuery.scala | 10 ++-- .../datasources/CatalogFileIndex.scala | 3 +- .../datasources/FileFormatWriter.scala | 18 +++--- .../PartitioningAwareFileIndex.scala | 16 +++-- .../datasources/PartitioningUtils.scala | 42 +++++++++---- .../execution/datasources/csv/CSVSuite.scala | 15 +++-- .../ParquetPartitionDiscoverySuite.scala | 62 +++++++++++++++----- .../sql/sources/PartitionedWriteSuite.scala | 35 +++++++++++ .../spark/sql/hive/HiveExternalCatalog.scala | 9 ++- .../sql/hive/execution/HiveTableScanExec.scala | 3 +- .../sql/hive/HiveExternalCatalogSuite.scala | 2 +- 15 files changed, 175 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index a3a4ab3..31eded4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -244,11 +244,13 @@ abstract class ExternalCatalog { * @param db database name * @param table table name * @param predicates partition-pruning predicates + * @param defaultTimeZoneId default timezone id to parse partition values of TimestampType */ def listPartitionsByFilter( db: String, table: String, - predicates: Seq[Expression]): Seq[CatalogTablePartition] + predicates: Seq[Expression], + defaultTimeZoneId: String): Seq[CatalogTablePartition] // -------------------------------------------------------------------------- // Functions http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 6bb2b2d..340e845 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -544,7 +544,8 @@ class InMemoryCatalog( override def listPartitionsByFilter( db: String, table: String, - predicates: Seq[Expression]): Seq[CatalogTablePartition] = { + predicates: Seq[Expression], + defaultTimeZoneId: String): Seq[CatalogTablePartition] = { // TODO: Provide an implementation throw new UnsupportedOperationException( "listPartitionsByFilter is not implemented for InMemoryCatalog") http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0673489..f6412e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -841,7 +841,7 @@ class SessionCatalog( val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) - externalCatalog.listPartitionsByFilter(db, table, predicates) + externalCatalog.listPartitionsByFilter(db, table, predicates, conf.sessionLocalTimeZone) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index cb93902..887caf0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -26,8 +26,8 @@ import org.apache.spark.sql.catalyst.{CatalystConf, FunctionIdentifier, Internal import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.util.quoteIdentifier -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types.StructType @@ -113,11 +113,11 @@ case class CatalogTablePartition( /** * Given the partition schema, returns a row with that schema holding the partition values. */ - def toRow(partitionSchema: StructType): InternalRow = { + def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = { + val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties) + val timeZoneId = caseInsensitiveProperties.getOrElse("timeZone", defaultTimeZondId) InternalRow.fromSeq(partitionSchema.map { field => - // TODO: use correct timezone for partition values. - Cast(Literal(spec(field.name)), field.dataType, - Option(DateTimeUtils.defaultTimeZone().getID)).eval() + Cast(Literal(spec(field.name)), field.dataType, Option(timeZoneId)).eval() }) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index b02edd4..aa578f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.internal.SQLConf @@ -103,11 +103,13 @@ case class OptimizeMetadataOnlyQuery( case relation: CatalogRelation => val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) + val caseInsensitiveProperties = + CaseInsensitiveMap(relation.tableMeta.storage.properties) + val timeZoneId = caseInsensitiveProperties.get("timeZone") + .getOrElse(conf.sessionLocalTimeZone) val partitionData = catalog.listPartitions(relation.tableMeta.identifier).map { p => InternalRow.fromSeq(partAttrs.map { attr => - // TODO: use correct timezone for partition values. - Cast(Literal(p.spec(attr.name)), attr.dataType, - Option(DateTimeUtils.defaultTimeZone().getID)).eval() + Cast(Literal(p.spec(attr.name)), attr.dataType, Option(timeZoneId)).eval() }) } LocalRelation(partAttrs, partitionData) http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 1235a4b..2068811 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -72,7 +72,8 @@ class CatalogFileIndex( val path = new Path(p.location) val fs = path.getFileSystem(hadoopConf) PartitionPath( - p.toRow(partitionSchema), path.makeQualified(fs.getUri, fs.getWorkingDirectory)) + p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), + path.makeQualified(fs.getUri, fs.getWorkingDirectory)) } val partitionSpec = PartitionSpec(partitionSchema, partitions) new PrunedInMemoryFileIndex( http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index c177968..950e5ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.execution.{QueryExecution, SortExec, SQLExecution} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -68,7 +68,8 @@ object FileFormatWriter extends Logging { val bucketIdExpression: Option[Expression], val path: String, val customPartitionLocations: Map[TablePartitionSpec, String], - val maxRecordsPerFile: Long) + val maxRecordsPerFile: Long, + val timeZoneId: String) extends Serializable { assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ dataColumns), @@ -122,9 +123,11 @@ object FileFormatWriter extends Logging { spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get) } + val caseInsensitiveOptions = CaseInsensitiveMap(options) + // Note: prepareWrite has side effect. It sets "job". val outputWriterFactory = - fileFormat.prepareWrite(sparkSession, job, options, dataColumns.toStructType) + fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions, dataColumns.toStructType) val description = new WriteJobDescription( uuid = UUID.randomUUID().toString, @@ -136,8 +139,10 @@ object FileFormatWriter extends Logging { bucketIdExpression = bucketIdExpression, path = outputSpec.outputPath, customPartitionLocations = outputSpec.customPartitionLocations, - maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong) - .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile) + maxRecordsPerFile = caseInsensitiveOptions.get("maxRecordsPerFile").map(_.toLong) + .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile), + timeZoneId = caseInsensitiveOptions.get("timeZone") + .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) ) // We should first sort by partition columns, then bucket id, and finally sorting columns. @@ -330,11 +335,10 @@ object FileFormatWriter extends Logging { /** Expressions that given partition columns build a path string like: col1=val/col2=val/... */ private def partitionPathExpression: Seq[Expression] = { desc.partitionColumns.zipWithIndex.flatMap { case (c, i) => - // TODO: use correct timezone for partition values. val escaped = ScalaUDF( ExternalCatalogUtils.escapePathName _, StringType, - Seq(Cast(c, StringType, Option(DateTimeUtils.defaultTimeZone().getID))), + Seq(Cast(c, StringType, Option(desc.timeZoneId))), Seq(StringType)) val str = If(IsNull(c), Literal(ExternalCatalogUtils.DEFAULT_PARTITION_NAME), escaped) val partitionName = Literal(c.name + "=") :: str :: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 549257c..c8097a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -30,7 +30,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration @@ -125,22 +125,27 @@ abstract class PartitioningAwareFileIndex( val leafDirs = leafDirToChildrenFiles.filter { case (_, files) => files.exists(f => isDataPath(f.getPath)) }.keys.toSeq + + val caseInsensitiveOptions = CaseInsensitiveMap(parameters) + val timeZoneId = caseInsensitiveOptions.get("timeZone") + .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) + userPartitionSchema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => val spec = PartitioningUtils.parsePartitions( leafDirs, typeInference = false, - basePaths = basePaths) + basePaths = basePaths, + timeZoneId = timeZoneId) // Without auto inference, all of value in the `row` should be null or in StringType, // we need to cast into the data type that user specified. def castPartitionValuesToUserSchema(row: InternalRow) = { InternalRow((0 until row.numFields).map { i => - // TODO: use correct timezone for partition values. Cast( Literal.create(row.getUTF8String(i), StringType), userProvidedSchema.fields(i).dataType, - Option(DateTimeUtils.defaultTimeZone().getID)).eval() + Option(timeZoneId)).eval() }: _*) } @@ -151,7 +156,8 @@ abstract class PartitioningAwareFileIndex( PartitioningUtils.parsePartitions( leafDirs, typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, - basePaths = basePaths) + basePaths = basePaths, + timeZoneId = timeZoneId) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index bad5996..09876bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import java.lang.{Double => JDouble, Long => JLong} import java.math.{BigDecimal => JBigDecimal} -import java.sql.{Date => JDate, Timestamp => JTimestamp} +import java.util.TimeZone import scala.collection.mutable.ArrayBuffer import scala.util.Try @@ -31,7 +31,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String // TODO: We should tighten up visibility of the classes here once we clean up Hive coupling. @@ -91,10 +93,19 @@ object PartitioningUtils { private[datasources] def parsePartitions( paths: Seq[Path], typeInference: Boolean, - basePaths: Set[Path]): PartitionSpec = { + basePaths: Set[Path], + timeZoneId: String): PartitionSpec = { + parsePartitions(paths, typeInference, basePaths, TimeZone.getTimeZone(timeZoneId)) + } + + private[datasources] def parsePartitions( + paths: Seq[Path], + typeInference: Boolean, + basePaths: Set[Path], + timeZone: TimeZone): PartitionSpec = { // First, we need to parse every partition's path and see if we can find partition values. val (partitionValues, optDiscoveredBasePaths) = paths.map { path => - parsePartition(path, typeInference, basePaths) + parsePartition(path, typeInference, basePaths, timeZone) }.unzip // We create pairs of (path -> path's partition value) here @@ -173,7 +184,8 @@ object PartitioningUtils { private[datasources] def parsePartition( path: Path, typeInference: Boolean, - basePaths: Set[Path]): (Option[PartitionValues], Option[Path]) = { + basePaths: Set[Path], + timeZone: TimeZone): (Option[PartitionValues], Option[Path]) = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` var finished = path.getParent == null @@ -194,7 +206,7 @@ object PartitioningUtils { // Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1. // Once we get the string, we try to parse it and find the partition column and value. val maybeColumn = - parsePartitionColumn(currentPath.getName, typeInference) + parsePartitionColumn(currentPath.getName, typeInference, timeZone) maybeColumn.foreach(columns += _) // Now, we determine if we should stop. @@ -226,7 +238,8 @@ object PartitioningUtils { private def parsePartitionColumn( columnSpec: String, - typeInference: Boolean): Option[(String, Literal)] = { + typeInference: Boolean, + timeZone: TimeZone): Option[(String, Literal)] = { val equalSignIndex = columnSpec.indexOf('=') if (equalSignIndex == -1) { None @@ -237,7 +250,7 @@ object PartitioningUtils { val rawColumnValue = columnSpec.drop(equalSignIndex + 1) assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'") - val literal = inferPartitionColumnValue(rawColumnValue, typeInference) + val literal = inferPartitionColumnValue(rawColumnValue, typeInference, timeZone) Some(columnName -> literal) } } @@ -370,7 +383,8 @@ object PartitioningUtils { */ private[datasources] def inferPartitionColumnValue( raw: String, - typeInference: Boolean): Literal = { + typeInference: Boolean, + timeZone: TimeZone): Literal = { val decimalTry = Try { // `BigDecimal` conversion can fail when the `field` is not a form of number. val bigDecimal = new JBigDecimal(raw) @@ -390,8 +404,16 @@ object PartitioningUtils { // Then falls back to fractional types .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType))) // Then falls back to date/timestamp types - .orElse(Try(Literal(JDate.valueOf(raw)))) - .orElse(Try(Literal(JTimestamp.valueOf(unescapePathName(raw))))) + .orElse(Try( + Literal.create( + DateTimeUtils.getThreadLocalTimestampFormat(timeZone) + .parse(unescapePathName(raw)).getTime * 1000L, + TimestampType))) + .orElse(Try( + Literal.create( + DateTimeUtils.millisToDays( + DateTimeUtils.getThreadLocalDateFormat.parse(raw).getTime), + DateType))) // Then falls back to string .getOrElse { if (raw == DEFAULT_PARTITION_NAME) { http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index d94eb66..5607180 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -742,10 +742,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .save(iso8601timestampsPath) // This will load back the timestamps as string. + val stringSchema = StructType(StructField("date", StringType, true) :: Nil) val iso8601Timestamps = spark.read .format("csv") + .schema(stringSchema) .option("header", "true") - .option("inferSchema", "false") .load(iso8601timestampsPath) val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ", Locale.US) @@ -775,10 +776,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .save(iso8601datesPath) // This will load back the dates as string. + val stringSchema = StructType(StructField("date", StringType, true) :: Nil) val iso8601dates = spark.read .format("csv") + .schema(stringSchema) .option("header", "true") - .option("inferSchema", "false") .load(iso8601datesPath) val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd", Locale.US) @@ -833,10 +835,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .save(datesWithFormatPath) // This will load back the dates as string. + val stringSchema = StructType(StructField("date", StringType, true) :: Nil) val stringDatesWithFormat = spark.read .format("csv") + .schema(stringSchema) .option("header", "true") - .option("inferSchema", "false") .load(datesWithFormatPath) val expectedStringDatesWithFormat = Seq( Row("2015/08/26"), @@ -864,10 +867,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .save(timestampsWithFormatPath) // This will load back the timestamps as string. + val stringSchema = StructType(StructField("date", StringType, true) :: Nil) val stringTimestampsWithFormat = spark.read .format("csv") + .schema(stringSchema) .option("header", "true") - .option("inferSchema", "false") .load(timestampsWithFormatPath) val expectedStringTimestampsWithFormat = Seq( Row("2015/08/26 18:00"), @@ -896,10 +900,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .save(timestampsWithFormatPath) // This will load back the timestamps as string. + val stringSchema = StructType(StructField("date", StringType, true) :: Nil) val stringTimestampsWithFormat = spark.read .format("csv") + .schema(stringSchema) .option("header", "true") - .option("inferSchema", "false") .load(timestampsWithFormatPath) val expectedStringTimestampsWithFormat = Seq( Row("2015/08/27 01:00"), http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 420cff8..88cb8a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File import java.math.BigInteger import java.sql.{Date, Timestamp} +import java.util.{Calendar, TimeZone} import scala.collection.mutable.ArrayBuffer @@ -51,9 +52,12 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha val defaultPartitionName = ExternalCatalogUtils.DEFAULT_PARTITION_NAME + val timeZone = TimeZone.getDefault() + val timeZoneId = timeZone.getID + test("column type inference") { - def check(raw: String, literal: Literal): Unit = { - assert(inferPartitionColumnValue(raw, true) === literal) + def check(raw: String, literal: Literal, timeZone: TimeZone = timeZone): Unit = { + assert(inferPartitionColumnValue(raw, true, timeZone) === literal) } check("10", Literal.create(10, IntegerType)) @@ -66,6 +70,14 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha check("1990-02-24", Literal.create(Date.valueOf("1990-02-24"), DateType)) check("1990-02-24 12:00:30", Literal.create(Timestamp.valueOf("1990-02-24 12:00:30"), TimestampType)) + + val c = Calendar.getInstance(TimeZone.getTimeZone("GMT")) + c.set(1990, 1, 24, 12, 0, 30) + c.set(Calendar.MILLISECOND, 0) + check("1990-02-24 12:00:30", + Literal.create(new Timestamp(c.getTimeInMillis), TimestampType), + TimeZone.getTimeZone("GMT")) + check(defaultPartitionName, Literal.create(null, NullType)) } @@ -77,7 +89,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha "hdfs://host:9000/path/a=10.5/b=hello") var exception = intercept[AssertionError] { - parsePartitions(paths.map(new Path(_)), true, Set.empty[Path]) + parsePartitions(paths.map(new Path(_)), true, Set.empty[Path], timeZoneId) } assert(exception.getMessage().contains("Conflicting directory structures detected")) @@ -90,7 +102,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha parsePartitions( paths.map(new Path(_)), true, - Set(new Path("hdfs://host:9000/path/"))) + Set(new Path("hdfs://host:9000/path/")), + timeZoneId) // Valid paths = Seq( @@ -102,7 +115,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha parsePartitions( paths.map(new Path(_)), true, - Set(new Path("hdfs://host:9000/path/something=true/table"))) + Set(new Path("hdfs://host:9000/path/something=true/table")), + timeZoneId) // Valid paths = Seq( @@ -114,7 +128,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha parsePartitions( paths.map(new Path(_)), true, - Set(new Path("hdfs://host:9000/path/table=true"))) + Set(new Path("hdfs://host:9000/path/table=true")), + timeZoneId) // Invalid paths = Seq( @@ -126,7 +141,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha parsePartitions( paths.map(new Path(_)), true, - Set(new Path("hdfs://host:9000/path/"))) + Set(new Path("hdfs://host:9000/path/")), + timeZoneId) } assert(exception.getMessage().contains("Conflicting directory structures detected")) @@ -145,20 +161,21 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha parsePartitions( paths.map(new Path(_)), true, - Set(new Path("hdfs://host:9000/tmp/tables/"))) + Set(new Path("hdfs://host:9000/tmp/tables/")), + timeZoneId) } assert(exception.getMessage().contains("Conflicting directory structures detected")) } test("parse partition") { def check(path: String, expected: Option[PartitionValues]): Unit = { - val actual = parsePartition(new Path(path), true, Set.empty[Path])._1 + val actual = parsePartition(new Path(path), true, Set.empty[Path], timeZone)._1 assert(expected === actual) } def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { val message = intercept[T] { - parsePartition(new Path(path), true, Set.empty[Path]) + parsePartition(new Path(path), true, Set.empty[Path], timeZone) }.getMessage assert(message.contains(expected)) @@ -201,7 +218,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha val partitionSpec1: Option[PartitionValues] = parsePartition( path = new Path("file://path/a=10"), typeInference = true, - basePaths = Set(new Path("file://path/a=10")))._1 + basePaths = Set(new Path("file://path/a=10")), + timeZone = timeZone)._1 assert(partitionSpec1.isEmpty) @@ -209,7 +227,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha val partitionSpec2: Option[PartitionValues] = parsePartition( path = new Path("file://path/a=10"), typeInference = true, - basePaths = Set(new Path("file://path")))._1 + basePaths = Set(new Path("file://path")), + timeZone = timeZone)._1 assert(partitionSpec2 == Option(PartitionValues( @@ -226,7 +245,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha parsePartitions( paths.map(new Path(_)), true, - rootPaths) + rootPaths, + timeZoneId) assert(actualSpec === spec) } @@ -307,7 +327,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha test("parse partitions with type inference disabled") { def check(paths: Seq[String], spec: PartitionSpec): Unit = { val actualSpec = - parsePartitions(paths.map(new Path(_)), false, Set.empty[Path]) + parsePartitions(paths.map(new Path(_)), false, Set.empty[Path], timeZoneId) assert(actualSpec === spec) } @@ -686,6 +706,13 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha val fields = schema.map(f => Column(f.name).cast(f.dataType)) checkAnswer(spark.read.load(dir.toString).select(fields: _*), row) } + + withTempPath { dir => + df.write.option("timeZone", "GMT") + .format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString) + val fields = schema.map(f => Column(f.name).cast(f.dataType)) + checkAnswer(spark.read.option("timeZone", "GMT").load(dir.toString).select(fields: _*), row) + } } test("Various inferred partition value types") { @@ -720,6 +747,13 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha val fields = schema.map(f => Column(f.name)) checkAnswer(spark.read.load(dir.toString).select(fields: _*), row) } + + withTempPath { dir => + df.write.option("timeZone", "GMT") + .format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString) + val fields = schema.map(f => Column(f.name)) + checkAnswer(spark.read.option("timeZone", "GMT").load(dir.toString).select(fields: _*), row) + } } test("SPARK-8037: Ignores files whose name starts with dot") { http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index bf7fabe..f251290 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -18,11 +18,13 @@ package org.apache.spark.sql.sources import java.io.File +import java.sql.Timestamp import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.internal.Logging import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -124,6 +126,39 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { } } + test("timeZone setting in dynamic partition writes") { + def checkPartitionValues(file: File, expected: String): Unit = { + val dir = file.getParentFile() + val value = ExternalCatalogUtils.unescapePathName( + dir.getName.substring(dir.getName.indexOf("=") + 1)) + assert(value == expected) + } + val ts = Timestamp.valueOf("2016-12-01 00:00:00") + val df = Seq((1, ts)).toDF("i", "ts") + withTempPath { f => + df.write.partitionBy("ts").parquet(f.getAbsolutePath) + val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) + assert(files.length == 1) + checkPartitionValues(files.head, "2016-12-01 00:00:00") + } + withTempPath { f => + df.write.option("timeZone", "GMT").partitionBy("ts").parquet(f.getAbsolutePath) + val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) + assert(files.length == 1) + // use timeZone option "GMT" to format partition value. + checkPartitionValues(files.head, "2016-12-01 08:00:00") + } + withTempPath { f => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "GMT") { + df.write.partitionBy("ts").parquet(f.getAbsolutePath) + val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) + assert(files.length == 1) + // if there isn't timeZone option, then use session local timezone. + checkPartitionValues(files.head, "2016-12-01 08:00:00") + } + } + } + /** Lists files recursively. */ private def recursiveList(f: File): Array[File] = { require(f.isDirectory) http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 50bb44f..43d9c2b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.ColumnStat -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.hive.client.HiveClient @@ -1008,7 +1008,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def listPartitionsByFilter( db: String, table: String, - predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient { + predicates: Seq[Expression], + defaultTimeZoneId: String): Seq[CatalogTablePartition] = withClient { val rawTable = getRawTable(db, table) val catalogTable = restoreTableMetadata(rawTable) val partitionColumnNames = catalogTable.partitionColumnNames.toSet @@ -1034,7 +1035,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val index = partitionSchema.indexWhere(_.name == att.name) BoundReference(index, partitionSchema(index).dataType, nullable = true) }) - clientPrunedPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) } + clientPrunedPartitions.filter { p => + boundPredicate(p.toRow(partitionSchema, defaultTimeZoneId)) + } } else { client.getPartitions(catalogTable).map { part => part.copy(spec = restorePartitionSpec(part.spec, partColNameMap)) http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 14b9565..28f0748 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -163,7 +163,8 @@ case class HiveTableScanExec( sparkSession.sharedState.externalCatalog.listPartitionsByFilter( relation.tableMeta.database, relation.tableMeta.identifier.table, - normalizedFilters) + normalizedFilters, + sparkSession.sessionState.conf.sessionLocalTimeZone) } else { sparkSession.sharedState.externalCatalog.listPartitions( relation.tableMeta.database, http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index a60c210..4349f1a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -52,7 +52,7 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { test("list partitions by filter") { val catalog = newBasicCatalog() - val selectedPartitions = catalog.listPartitionsByFilter("db2", "tbl2", Seq('a.int === 1)) + val selectedPartitions = catalog.listPartitionsByFilter("db2", "tbl2", Seq('a.int === 1), "GMT") assert(selectedPartitions.length == 1) assert(selectedPartitions.head.spec == part1.spec) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org