This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9c6efd0 [SPARK-26740][SPARK-26654][SQL] Make statistics of timestamp/date columns independent from system time zones 9c6efd0 is described below commit 9c6efd0427b9268dafc901fbdecf8a6dec738654 Author: Maxim Gekk <max.g...@gmail.com> AuthorDate: Tue Feb 12 10:58:00 2019 +0800 [SPARK-26740][SPARK-26654][SQL] Make statistics of timestamp/date columns independent from system time zones ## What changes were proposed in this pull request? In the PR, I propose to covert underlying types of timestamp/date columns to strings, and store the converted values as column statistics. This makes statistics for timestamp/date columns independent from system time zone while saving and retrieving such statistics. I bumped versions of stored statistics from 1 to 2 since the PR changes the format. ## How was this patch tested? The changes were tested by `StatisticsCollectionSuite` and by `StatisticsSuite`. Closes #23662 from MaxGekk/column-stats-time-date. Lead-authored-by: Maxim Gekk <max.g...@gmail.com> Co-authored-by: Maxim Gekk <maxim.g...@databricks.com> Co-authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/catalog/interface.scala | 36 ++++++++----- .../sql/catalyst/plans/logical/Statistics.scala | 7 ++- .../spark/sql/StatisticsCollectionSuite.scala | 43 +++++++++++++++ .../spark/sql/StatisticsCollectionTestBase.scala | 62 ++++++++++++---------- 4 files changed, 105 insertions(+), 43 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 817abeb..69b5cb4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -415,7 +415,8 @@ case class CatalogColumnStat( nullCount: Option[BigInt] = None, avgLen: Option[Long] = None, maxLen: Option[Long] = None, - histogram: Option[Histogram] = None) { + histogram: Option[Histogram] = None, + version: Int = CatalogColumnStat.VERSION) { /** * Returns a map from string to string that can be used to serialize the column stats. @@ -429,7 +430,7 @@ case class CatalogColumnStat( */ def toMap(colName: String): Map[String, String] = { val map = new scala.collection.mutable.HashMap[String, String] - map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1") + map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", CatalogColumnStat.VERSION.toString) distinctCount.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString) } @@ -452,12 +453,13 @@ case class CatalogColumnStat( dataType: DataType): ColumnStat = ColumnStat( distinctCount = distinctCount, - min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType)), - max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType)), + min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType, version)), + max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType, version)), nullCount = nullCount, avgLen = avgLen, maxLen = maxLen, - histogram = histogram) + histogram = histogram, + version = version) } object CatalogColumnStat extends Logging { @@ -472,14 +474,23 @@ object CatalogColumnStat extends Logging { private val KEY_MAX_LEN = "maxLen" private val KEY_HISTOGRAM = "histogram" + val VERSION = 2 + + private def getTimestampFormatter(): TimestampFormatter = { + TimestampFormatter(format = "yyyy-MM-dd HH:mm:ss.SSSSSS", timeZone = DateTimeUtils.TimeZoneUTC) + } + /** * Converts from string representation of data type to the corresponding Catalyst data type. */ - def fromExternalString(s: String, name: String, dataType: DataType): Any = { + def fromExternalString(s: String, name: String, dataType: DataType, version: Int): Any = { dataType match { case BooleanType => s.toBoolean - case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s)) - case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s)) + case DateType if version == 1 => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s)) + case DateType => DateFormatter().parse(s) + case TimestampType if version == 1 => + DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s)) + case TimestampType => getTimestampFormatter().parse(s) case ByteType => s.toByte case ShortType => s.toShort case IntegerType => s.toInt @@ -501,8 +512,8 @@ object CatalogColumnStat extends Logging { */ def toExternalString(v: Any, colName: String, dataType: DataType): String = { val externalValue = dataType match { - case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int]) - case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long]) + case DateType => DateFormatter().format(v.asInstanceOf[Int]) + case TimestampType => getTimestampFormatter().format(v.asInstanceOf[Long]) case BooleanType | _: IntegralType | FloatType | DoubleType => v case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal // This version of Spark does not use min/max for binary/string types so we ignore it. @@ -532,7 +543,8 @@ object CatalogColumnStat extends Logging { nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)), avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong), maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong), - histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize) + histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize), + version = map(s"${colName}.${KEY_VERSION}").toInt )) } catch { case NonFatal(e) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala index 5a38811..c008d77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala @@ -90,6 +90,7 @@ case class Statistics( * @param avgLen average length of the values. For fixed-length types, this should be a constant. * @param maxLen maximum length of the values. For fixed-length types, this should be a constant. * @param histogram histogram of the values + * @param version version of statistics saved to or retrieved from the catalog */ case class ColumnStat( distinctCount: Option[BigInt] = None, @@ -98,7 +99,8 @@ case class ColumnStat( nullCount: Option[BigInt] = None, avgLen: Option[Long] = None, maxLen: Option[Long] = None, - histogram: Option[Histogram] = None) { + histogram: Option[Histogram] = None, + version: Int = CatalogColumnStat.VERSION) { // Are distinctCount and nullCount statistics defined? val hasCountStats = distinctCount.isDefined && nullCount.isDefined @@ -117,7 +119,8 @@ case class ColumnStat( nullCount = nullCount, avgLen = avgLen, maxLen = maxLen, - histogram = histogram) + histogram = histogram, + version = version) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 9984268..7ba9f9f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -18,12 +18,15 @@ package org.apache.spark.sql import java.io.File +import java.util.TimeZone +import java.util.concurrent.TimeUnit import scala.collection.mutable import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData.ArrayData @@ -427,4 +430,44 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("store and retrieve column stats in different time zones") { + val (start, end) = (0, TimeUnit.DAYS.toSeconds(2)) + + def checkTimestampStats( + t: DataType, + srcTimeZone: TimeZone, + dstTimeZone: TimeZone)(checker: ColumnStat => Unit): Unit = { + val table = "time_table" + val column = "T" + val original = TimeZone.getDefault + try { + withTable(table) { + TimeZone.setDefault(srcTimeZone) + spark.range(start, end) + .select('id.cast(TimestampType).cast(t).as(column)) + .write.saveAsTable(table) + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS $column") + + TimeZone.setDefault(dstTimeZone) + val stats = getCatalogTable(table) + .stats.get.colStats(column).toPlanStat(column, t) + checker(stats) + } + } finally { + TimeZone.setDefault(original) + } + } + + DateTimeTestUtils.outstandingTimezones.foreach { timeZone => + checkTimestampStats(DateType, DateTimeUtils.TimeZoneUTC, timeZone) { stats => + assert(stats.min.get.asInstanceOf[Int] == TimeUnit.SECONDS.toDays(start)) + assert(stats.max.get.asInstanceOf[Int] == TimeUnit.SECONDS.toDays(end - 1)) + } + checkTimestampStats(TimestampType, DateTimeUtils.TimeZoneUTC, timeZone) { stats => + assert(stats.min.get.asInstanceOf[Long] == TimeUnit.SECONDS.toMicros(start)) + assert(stats.max.get.asInstanceOf[Long] == TimeUnit.SECONDS.toMicros(end - 1)) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala index bf4abb6..346fb765 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.{lang => jl} import java.sql.{Date, Timestamp} +import java.util.concurrent.TimeUnit import scala.collection.mutable import scala.util.Random @@ -26,12 +27,10 @@ import scala.util.Random import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, CatalogTable, HiveTableRelation} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Histogram, HistogramBin, HistogramSerializer, LogicalPlan} -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.types.Decimal - /** * The base for statistics test cases that we want to include in both the hive module (for @@ -42,14 +41,18 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils private val dec1 = new java.math.BigDecimal("1.000000000000000000") private val dec2 = new java.math.BigDecimal("8.000000000000000000") - private val d1 = Date.valueOf("2016-05-08") - private val d2 = Date.valueOf("2016-05-09") - private val t1 = Timestamp.valueOf("2016-05-08 00:00:01") - private val t2 = Timestamp.valueOf("2016-05-09 00:00:02") - private val d1Internal = DateTimeUtils.fromJavaDate(d1) - private val d2Internal = DateTimeUtils.fromJavaDate(d2) - private val t1Internal = DateTimeUtils.fromJavaTimestamp(t1) - private val t2Internal = DateTimeUtils.fromJavaTimestamp(t2) + private val d1Str = "2016-05-08" + private val d1Internal = days(2016, 5, 8) + private val d1 = Date.valueOf(d1Str) + private val d2Str = "2016-05-09" + private val d2Internal = days(2016, 5, 9) + private val d2 = Date.valueOf(d2Str) + private val t1Str = "2016-05-08 00:00:01.000000" + private val t1Internal = date(2016, 5, 8, 0, 0, 1) + private val t1 = new Timestamp(TimeUnit.MICROSECONDS.toMillis(t1Internal)) + private val t2Str = "2016-05-09 00:00:02.000000" + private val t2Internal = date(2016, 5, 9, 0, 0, 2) + private val t2 = new Timestamp(TimeUnit.MICROSECONDS.toMillis(t2Internal)) /** * Define a very simple 3 row table used for testing column serialization. @@ -78,10 +81,10 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils Some(16), Some(16)), "cstring" -> CatalogColumnStat(Some(2), None, None, Some(1), Some(3), Some(3)), "cbinary" -> CatalogColumnStat(Some(2), None, None, Some(1), Some(3), Some(3)), - "cdate" -> CatalogColumnStat(Some(2), Some(d1.toString), Some(d2.toString), Some(1), Some(4), - Some(4)), - "ctimestamp" -> CatalogColumnStat(Some(2), Some(t1.toString), Some(t2.toString), Some(1), - Some(8), Some(8)) + "cdate" -> CatalogColumnStat(Some(2), Some(d1Str), Some(d2Str), + Some(1), Some(4), Some(4)), + "ctimestamp" -> CatalogColumnStat(Some(2), Some(t1Str), + Some(t2Str), Some(1), Some(8), Some(8)) ) /** @@ -113,87 +116,88 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils colStats } + private val strVersion = CatalogColumnStat.VERSION.toString val expectedSerializedColStats = Map( "spark.sql.statistics.colStats.cbinary.avgLen" -> "3", "spark.sql.statistics.colStats.cbinary.distinctCount" -> "2", "spark.sql.statistics.colStats.cbinary.maxLen" -> "3", "spark.sql.statistics.colStats.cbinary.nullCount" -> "1", - "spark.sql.statistics.colStats.cbinary.version" -> "1", + "spark.sql.statistics.colStats.cbinary.version" -> strVersion, "spark.sql.statistics.colStats.cbool.avgLen" -> "1", "spark.sql.statistics.colStats.cbool.distinctCount" -> "2", "spark.sql.statistics.colStats.cbool.max" -> "true", "spark.sql.statistics.colStats.cbool.maxLen" -> "1", "spark.sql.statistics.colStats.cbool.min" -> "false", "spark.sql.statistics.colStats.cbool.nullCount" -> "1", - "spark.sql.statistics.colStats.cbool.version" -> "1", + "spark.sql.statistics.colStats.cbool.version" -> strVersion, "spark.sql.statistics.colStats.cbyte.avgLen" -> "1", "spark.sql.statistics.colStats.cbyte.distinctCount" -> "2", "spark.sql.statistics.colStats.cbyte.max" -> "2", "spark.sql.statistics.colStats.cbyte.maxLen" -> "1", "spark.sql.statistics.colStats.cbyte.min" -> "1", "spark.sql.statistics.colStats.cbyte.nullCount" -> "1", - "spark.sql.statistics.colStats.cbyte.version" -> "1", + "spark.sql.statistics.colStats.cbyte.version" -> strVersion, "spark.sql.statistics.colStats.cdate.avgLen" -> "4", "spark.sql.statistics.colStats.cdate.distinctCount" -> "2", "spark.sql.statistics.colStats.cdate.max" -> "2016-05-09", "spark.sql.statistics.colStats.cdate.maxLen" -> "4", "spark.sql.statistics.colStats.cdate.min" -> "2016-05-08", "spark.sql.statistics.colStats.cdate.nullCount" -> "1", - "spark.sql.statistics.colStats.cdate.version" -> "1", + "spark.sql.statistics.colStats.cdate.version" -> strVersion, "spark.sql.statistics.colStats.cdecimal.avgLen" -> "16", "spark.sql.statistics.colStats.cdecimal.distinctCount" -> "2", "spark.sql.statistics.colStats.cdecimal.max" -> "8.000000000000000000", "spark.sql.statistics.colStats.cdecimal.maxLen" -> "16", "spark.sql.statistics.colStats.cdecimal.min" -> "1.000000000000000000", "spark.sql.statistics.colStats.cdecimal.nullCount" -> "1", - "spark.sql.statistics.colStats.cdecimal.version" -> "1", + "spark.sql.statistics.colStats.cdecimal.version" -> strVersion, "spark.sql.statistics.colStats.cdouble.avgLen" -> "8", "spark.sql.statistics.colStats.cdouble.distinctCount" -> "2", "spark.sql.statistics.colStats.cdouble.max" -> "6.0", "spark.sql.statistics.colStats.cdouble.maxLen" -> "8", "spark.sql.statistics.colStats.cdouble.min" -> "1.0", "spark.sql.statistics.colStats.cdouble.nullCount" -> "1", - "spark.sql.statistics.colStats.cdouble.version" -> "1", + "spark.sql.statistics.colStats.cdouble.version" -> strVersion, "spark.sql.statistics.colStats.cfloat.avgLen" -> "4", "spark.sql.statistics.colStats.cfloat.distinctCount" -> "2", "spark.sql.statistics.colStats.cfloat.max" -> "7.0", "spark.sql.statistics.colStats.cfloat.maxLen" -> "4", "spark.sql.statistics.colStats.cfloat.min" -> "1.0", "spark.sql.statistics.colStats.cfloat.nullCount" -> "1", - "spark.sql.statistics.colStats.cfloat.version" -> "1", + "spark.sql.statistics.colStats.cfloat.version" -> strVersion, "spark.sql.statistics.colStats.cint.avgLen" -> "4", "spark.sql.statistics.colStats.cint.distinctCount" -> "2", "spark.sql.statistics.colStats.cint.max" -> "4", "spark.sql.statistics.colStats.cint.maxLen" -> "4", "spark.sql.statistics.colStats.cint.min" -> "1", "spark.sql.statistics.colStats.cint.nullCount" -> "1", - "spark.sql.statistics.colStats.cint.version" -> "1", + "spark.sql.statistics.colStats.cint.version" -> strVersion, "spark.sql.statistics.colStats.clong.avgLen" -> "8", "spark.sql.statistics.colStats.clong.distinctCount" -> "2", "spark.sql.statistics.colStats.clong.max" -> "5", "spark.sql.statistics.colStats.clong.maxLen" -> "8", "spark.sql.statistics.colStats.clong.min" -> "1", "spark.sql.statistics.colStats.clong.nullCount" -> "1", - "spark.sql.statistics.colStats.clong.version" -> "1", + "spark.sql.statistics.colStats.clong.version" -> strVersion, "spark.sql.statistics.colStats.cshort.avgLen" -> "2", "spark.sql.statistics.colStats.cshort.distinctCount" -> "2", "spark.sql.statistics.colStats.cshort.max" -> "3", "spark.sql.statistics.colStats.cshort.maxLen" -> "2", "spark.sql.statistics.colStats.cshort.min" -> "1", "spark.sql.statistics.colStats.cshort.nullCount" -> "1", - "spark.sql.statistics.colStats.cshort.version" -> "1", + "spark.sql.statistics.colStats.cshort.version" -> strVersion, "spark.sql.statistics.colStats.cstring.avgLen" -> "3", "spark.sql.statistics.colStats.cstring.distinctCount" -> "2", "spark.sql.statistics.colStats.cstring.maxLen" -> "3", "spark.sql.statistics.colStats.cstring.nullCount" -> "1", - "spark.sql.statistics.colStats.cstring.version" -> "1", + "spark.sql.statistics.colStats.cstring.version" -> strVersion, "spark.sql.statistics.colStats.ctimestamp.avgLen" -> "8", "spark.sql.statistics.colStats.ctimestamp.distinctCount" -> "2", - "spark.sql.statistics.colStats.ctimestamp.max" -> "2016-05-09 00:00:02.0", + "spark.sql.statistics.colStats.ctimestamp.max" -> "2016-05-09 00:00:02.000000", "spark.sql.statistics.colStats.ctimestamp.maxLen" -> "8", - "spark.sql.statistics.colStats.ctimestamp.min" -> "2016-05-08 00:00:01.0", + "spark.sql.statistics.colStats.ctimestamp.min" -> "2016-05-08 00:00:01.000000", "spark.sql.statistics.colStats.ctimestamp.nullCount" -> "1", - "spark.sql.statistics.colStats.ctimestamp.version" -> "1" + "spark.sql.statistics.colStats.ctimestamp.version" -> strVersion ) val expectedSerializedHistograms = Map( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org