Repository: spark
Updated Branches:
  refs/heads/master 673c67046 -> 11b60af73


[SPARK-17074][SQL] Generate equi-height histogram in column statistics

## What changes were proposed in this pull request?

Equi-height histogram is effective in cardinality estimation, and more accurate 
than basic column stats (min, max, ndv, etc) especially in skew distribution. 
So we need to support it.

For equi-height histogram, all buckets (intervals) have the same height 
(frequency).
In this PR, we use a two-step method to generate an equi-height histogram:
1. use `ApproximatePercentile` to get percentiles `p(0), p(1/n), p(2/n) ... 
p((n-1)/n), p(1)`;
2. construct range values of buckets, e.g. `[p(0), p(1/n)], [p(1/n), p(2/n)] 
... [p((n-1)/n), p(1)]`, and use `ApproxCountDistinctForIntervals` to count ndv 
in each bucket. Each bucket is of the form: `(lowerBound, higherBound, ndv)`.

## How was this patch tested?

Added new test cases and modified some existing test cases.

Author: Zhenhua Wang <wangzhen...@huawei.com>
Author: Zhenhua Wang <wzh_...@163.com>

Closes #19479 from wzhfy/generate_histogram.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11b60af7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11b60af7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11b60af7

Branch: refs/heads/master
Commit: 11b60af737a04d931356aa74ebf3c6cf4a6b08d6
Parents: 673c670
Author: Zhenhua Wang <wangzhen...@huawei.com>
Authored: Tue Nov 14 16:41:43 2017 +0100
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Nov 14 16:41:43 2017 +0100

----------------------------------------------------------------------
 .../sql/catalyst/plans/logical/Statistics.scala | 203 +++++++++++++--
 .../org/apache/spark/sql/internal/SQLConf.scala |  34 ++-
 .../command/AnalyzeColumnCommand.scala          |  57 ++++-
 .../spark/sql/StatisticsCollectionSuite.scala   |  15 +-
 .../sql/StatisticsCollectionTestBase.scala      |  41 ++-
 .../spark/sql/hive/HiveExternalCatalog.scala    |  11 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala | 255 ++++++++++++-------
 7 files changed, 484 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/11b60af7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 5ae1a55..96b199d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -17,16 +17,20 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, 
DataOutputStream}
 import java.math.{MathContext, RoundingMode}
 
 import scala.util.control.NonFatal
 
+import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -88,6 +92,7 @@ case class Statistics(
  * @param nullCount number of nulls
  * @param avgLen average length of the values. For fixed-length types, this 
should be a constant.
  * @param maxLen maximum length of the values. For fixed-length types, this 
should be a constant.
+ * @param histogram histogram of the values
  */
 case class ColumnStat(
     distinctCount: BigInt,
@@ -95,7 +100,8 @@ case class ColumnStat(
     max: Option[Any],
     nullCount: BigInt,
     avgLen: Long,
-    maxLen: Long) {
+    maxLen: Long,
+    histogram: Option[Histogram] = None) {
 
   // We currently don't store min/max for binary/string type. This can change 
in the future and
   // then we need to remove this require.
@@ -121,6 +127,7 @@ case class ColumnStat(
     map.put(ColumnStat.KEY_MAX_LEN, maxLen.toString)
     min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, toExternalString(v, 
colName, dataType)) }
     max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, toExternalString(v, 
colName, dataType)) }
+    histogram.foreach { h => map.put(ColumnStat.KEY_HISTOGRAM, 
HistogramSerializer.serialize(h)) }
     map.toMap
   }
 
@@ -155,6 +162,7 @@ object ColumnStat extends Logging {
   private val KEY_NULL_COUNT = "nullCount"
   private val KEY_AVG_LEN = "avgLen"
   private val KEY_MAX_LEN = "maxLen"
+  private val KEY_HISTOGRAM = "histogram"
 
   /** Returns true iff the we support gathering column statistics on column of 
the given type. */
   def supportsType(dataType: DataType): Boolean = dataType match {
@@ -168,6 +176,16 @@ object ColumnStat extends Logging {
     case _ => false
   }
 
+  /** Returns true iff the we support gathering histogram on column of the 
given type. */
+  def supportsHistogram(dataType: DataType): Boolean = dataType match {
+    case _: IntegralType => true
+    case _: DecimalType => true
+    case DoubleType | FloatType => true
+    case DateType => true
+    case TimestampType => true
+    case _ => false
+  }
+
   /**
    * Creates a [[ColumnStat]] object from the given map. This is used to 
deserialize column stats
    * from some external storage. The serialization side is defined in 
[[ColumnStat.toMap]].
@@ -183,7 +201,8 @@ object ColumnStat extends Logging {
           .map(fromExternalString(_, field.name, 
field.dataType)).flatMap(Option.apply),
         nullCount = BigInt(map(KEY_NULL_COUNT).toLong),
         avgLen = map.getOrElse(KEY_AVG_LEN, 
field.dataType.defaultSize.toString).toLong,
-        maxLen = map.getOrElse(KEY_MAX_LEN, 
field.dataType.defaultSize.toString).toLong
+        maxLen = map.getOrElse(KEY_MAX_LEN, 
field.dataType.defaultSize.toString).toLong,
+        histogram = map.get(KEY_HISTOGRAM).map(HistogramSerializer.deserialize)
       ))
     } catch {
       case NonFatal(e) =>
@@ -220,12 +239,16 @@ object ColumnStat extends Logging {
    * Constructs an expression to compute column statistics for a given column.
    *
    * The expression should create a single struct column with the following 
schema:
-   * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, 
maxLen: Long
+   * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, 
maxLen: Long,
+   * distinctCountsForIntervals: Array[Long]
    *
    * Together with [[rowToColumnStat]], this function is used to create 
[[ColumnStat]] and
    * as a result should stay in sync with it.
    */
-  def statExprs(col: Attribute, relativeSD: Double): CreateNamedStruct = {
+  def statExprs(
+      col: Attribute,
+      conf: SQLConf,
+      colPercentiles: AttributeMap[ArrayData]): CreateNamedStruct = {
     def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map 
{ expr =>
       expr.transformUp { case af: AggregateFunction => 
af.toAggregateExpression() }
     })
@@ -233,40 +256,55 @@ object ColumnStat extends Logging {
 
     // the approximate ndv (num distinct value) should never be larger than 
the number of rows
     val numNonNulls = if (col.nullable) Count(col) else Count(one)
-    val ndv = Least(Seq(HyperLogLogPlusPlus(col, relativeSD), numNonNulls))
+    val ndv = Least(Seq(HyperLogLogPlusPlus(col, conf.ndvMaxError), 
numNonNulls))
     val numNulls = Subtract(Count(one), numNonNulls)
     val defaultSize = Literal(col.dataType.defaultSize, LongType)
+    val nullArray = Literal(null, ArrayType(LongType))
 
-    def fixedLenTypeStruct(castType: DataType) = {
+    def fixedLenTypeStruct: CreateNamedStruct = {
+      val genHistogram =
+        ColumnStat.supportsHistogram(col.dataType) && 
colPercentiles.contains(col)
+      val intervalNdvsExpr = if (genHistogram) {
+        ApproxCountDistinctForIntervals(col,
+          Literal(colPercentiles(col), ArrayType(col.dataType)), 
conf.ndvMaxError)
+      } else {
+        nullArray
+      }
       // For fixed width types, avg size should be the same as max size.
-      struct(ndv, Cast(Min(col), castType), Cast(Max(col), castType), 
numNulls, defaultSize,
-        defaultSize)
+      struct(ndv, Cast(Min(col), col.dataType), Cast(Max(col), col.dataType), 
numNulls,
+        defaultSize, defaultSize, intervalNdvsExpr)
     }
 
     col.dataType match {
-      case dt: IntegralType => fixedLenTypeStruct(dt)
-      case _: DecimalType => fixedLenTypeStruct(col.dataType)
-      case dt @ (DoubleType | FloatType) => fixedLenTypeStruct(dt)
-      case BooleanType => fixedLenTypeStruct(col.dataType)
-      case DateType => fixedLenTypeStruct(col.dataType)
-      case TimestampType => fixedLenTypeStruct(col.dataType)
+      case _: IntegralType => fixedLenTypeStruct
+      case _: DecimalType => fixedLenTypeStruct
+      case DoubleType | FloatType => fixedLenTypeStruct
+      case BooleanType => fixedLenTypeStruct
+      case DateType => fixedLenTypeStruct
+      case TimestampType => fixedLenTypeStruct
       case BinaryType | StringType =>
-        // For string and binary type, we don't store min/max.
+        // For string and binary type, we don't compute min, max or histogram
         val nullLit = Literal(null, col.dataType)
         struct(
           ndv, nullLit, nullLit, numNulls,
           // Set avg/max size to default size if all the values are null or 
there is no value.
           Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)),
-          Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)))
+          Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)),
+          nullArray)
       case _ =>
         throw new AnalysisException("Analyzing column statistics is not 
supported for column " +
-            s"${col.name} of data type: ${col.dataType}.")
+          s"${col.name} of data type: ${col.dataType}.")
     }
   }
 
-  /** Convert a struct for column stats (defined in statExprs) into 
[[ColumnStat]]. */
-  def rowToColumnStat(row: InternalRow, attr: Attribute): ColumnStat = {
-    ColumnStat(
+  /** Convert a struct for column stats (defined in `statExprs`) into 
[[ColumnStat]]. */
+  def rowToColumnStat(
+      row: InternalRow,
+      attr: Attribute,
+      rowCount: Long,
+      percentiles: Option[ArrayData]): ColumnStat = {
+    // The first 6 fields are basic column stats, the 7th is ndvs for 
histogram bins.
+    val cs = ColumnStat(
       distinctCount = BigInt(row.getLong(0)),
       // for string/binary min/max, get should return null
       min = Option(row.get(1, attr.dataType)),
@@ -275,6 +313,129 @@ object ColumnStat extends Logging {
       avgLen = row.getLong(4),
       maxLen = row.getLong(5)
     )
+    if (row.isNullAt(6)) {
+      cs
+    } else {
+      val ndvs = row.getArray(6).toLongArray()
+      assert(percentiles.get.numElements() == ndvs.length + 1)
+      val endpoints = 
percentiles.get.toArray[Any](attr.dataType).map(_.toString.toDouble)
+      // Construct equi-height histogram
+      val bins = ndvs.zipWithIndex.map { case (ndv, i) =>
+        HistogramBin(endpoints(i), endpoints(i + 1), ndv)
+      }
+      val nonNullRows = rowCount - cs.nullCount
+      val histogram = Histogram(nonNullRows.toDouble / ndvs.length, bins)
+      cs.copy(histogram = Some(histogram))
+    }
+  }
+
+}
+
+/**
+ * This class is an implementation of equi-height histogram.
+ * Equi-height histogram represents the distribution of a column's values by a 
sequence of bins.
+ * Each bin has a value range and contains approximately the same number of 
rows.
+ *
+ * @param height number of rows in each bin
+ * @param bins equi-height histogram bins
+ */
+case class Histogram(height: Double, bins: Array[HistogramBin]) {
+
+  // Only for histogram equality test.
+  override def equals(other: Any): Boolean = other match {
+    case otherHgm: Histogram =>
+      height == otherHgm.height && bins.sameElements(otherHgm.bins)
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    val temp = java.lang.Double.doubleToLongBits(height)
+    var result = (temp ^ (temp >>> 32)).toInt
+    result = 31 * result + 
java.util.Arrays.hashCode(bins.asInstanceOf[Array[AnyRef]])
+    result
   }
+}
+
+/**
+ * A bin in an equi-height histogram. We use double type for lower/higher 
bound for simplicity.
+ *
+ * @param lo lower bound of the value range in this bin
+ * @param hi higher bound of the value range in this bin
+ * @param ndv approximate number of distinct values in this bin
+ */
+case class HistogramBin(lo: Double, hi: Double, ndv: Long)
 
+object HistogramSerializer {
+  /**
+   * Serializes a given histogram to a string. For advanced statistics like 
histograms, sketches,
+   * etc, we don't provide readability for their serialized formats in 
metastore
+   * (string-to-string table properties). This is because it's hard or 
unnatural for these
+   * statistics to be human readable. For example, a histogram usually cannot 
fit in a single,
+   * self-described property. And for count-min-sketch, it's essentially 
unnatural to make it
+   * a readable string.
+   */
+  final def serialize(histogram: Histogram): String = {
+    val bos = new ByteArrayOutputStream()
+    val out = new DataOutputStream(new LZ4BlockOutputStream(bos))
+    out.writeDouble(histogram.height)
+    out.writeInt(histogram.bins.length)
+    // Write data with same type together for compression.
+    var i = 0
+    while (i < histogram.bins.length) {
+      out.writeDouble(histogram.bins(i).lo)
+      i += 1
+    }
+    i = 0
+    while (i < histogram.bins.length) {
+      out.writeDouble(histogram.bins(i).hi)
+      i += 1
+    }
+    i = 0
+    while (i < histogram.bins.length) {
+      out.writeLong(histogram.bins(i).ndv)
+      i += 1
+    }
+    out.writeInt(-1)
+    out.flush()
+    out.close()
+
+    org.apache.commons.codec.binary.Base64.encodeBase64String(bos.toByteArray)
+  }
+
+  /** Deserializes a given string to a histogram. */
+  final def deserialize(str: String): Histogram = {
+    val bytes = org.apache.commons.codec.binary.Base64.decodeBase64(str)
+    val bis = new ByteArrayInputStream(bytes)
+    val ins = new DataInputStream(new LZ4BlockInputStream(bis))
+    val height = ins.readDouble()
+    val numBins = ins.readInt()
+
+    val los = new Array[Double](numBins)
+    var i = 0
+    while (i < numBins) {
+      los(i) = ins.readDouble()
+      i += 1
+    }
+    val his = new Array[Double](numBins)
+    i = 0
+    while (i < numBins) {
+      his(i) = ins.readDouble()
+      i += 1
+    }
+    val ndvs = new Array[Long](numBins)
+    i = 0
+    while (i < numBins) {
+      ndvs(i) = ins.readLong()
+      i += 1
+    }
+    ins.close()
+
+    val bins = new Array[HistogramBin](numBins)
+    i = 0
+    while (i < numBins) {
+      bins(i) = HistogramBin(los(i), his(i), ndvs(i))
+      i += 1
+    }
+    Histogram(height, bins)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/11b60af7/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 0cb58fa..3452a1e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -31,7 +31,6 @@ import org.apache.spark.internal.config._
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
-import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 
 
////////////////////////////////////////////////////////////////////////////////////////////////////
 // This file defines the configuration options for Spark SQL.
@@ -835,6 +834,33 @@ object SQLConf {
       .doubleConf
       .createWithDefault(0.05)
 
+  val HISTOGRAM_ENABLED =
+    buildConf("spark.sql.statistics.histogram.enabled")
+      .doc("Generates histograms when computing column statistics if enabled. 
Histograms can " +
+        "provide better estimation accuracy. Currently, Spark only supports 
equi-height " +
+        "histogram. Note that collecting histograms takes extra cost. For 
example, collecting " +
+        "column statistics usually takes only one table scan, but generating 
equi-height " +
+        "histogram will cause an extra table scan.")
+      .booleanConf
+      .createWithDefault(false)
+
+  val HISTOGRAM_NUM_BINS =
+    buildConf("spark.sql.statistics.histogram.numBins")
+      .internal()
+      .doc("The number of bins when generating histograms.")
+      .intConf
+      .checkValue(num => num > 1, "The number of bins must be large than 1.")
+      .createWithDefault(254)
+
+  val PERCENTILE_ACCURACY =
+    buildConf("spark.sql.statistics.percentile.accuracy")
+      .internal()
+      .doc("Accuracy of percentile approximation when generating equi-height 
histograms. " +
+        "Larger value means better accuracy. The relative error can be deduced 
by " +
+        "1.0 / PERCENTILE_ACCURACY.")
+      .intConf
+      .createWithDefault(10000)
+
   val AUTO_SIZE_UPDATE_ENABLED =
     buildConf("spark.sql.statistics.size.autoUpdate.enabled")
       .doc("Enables automatic update for table size once table's data is 
changed. Note that if " +
@@ -1241,6 +1267,12 @@ class SQLConf extends Serializable with Logging {
 
   def ndvMaxError: Double = getConf(NDV_MAX_ERROR)
 
+  def histogramEnabled: Boolean = getConf(HISTOGRAM_ENABLED)
+
+  def histogramNumBins: Int = getConf(HISTOGRAM_NUM_BINS)
+
+  def percentileAccuracy: Int = getConf(PERCENTILE_ACCURACY)
+
   def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)
 
   def autoSizeUpdateEnabled: Boolean = 
getConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED)

http://git-wip-us.apache.org/repos/asf/spark/blob/11b60af7/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index caf12ad..e3bb4d3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.sql.execution.command
 
+import scala.collection.mutable
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, 
CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.ArrayData
 import org.apache.spark.sql.execution.QueryExecution
 
 
@@ -68,11 +71,11 @@ case class AnalyzeColumnCommand(
       tableIdent: TableIdentifier,
       columnNames: Seq[String]): (Long, Map[String, ColumnStat]) = {
 
+    val conf = sparkSession.sessionState.conf
     val relation = sparkSession.table(tableIdent).logicalPlan
     // Resolve the column names and dedup using AttributeSet
-    val resolver = sparkSession.sessionState.conf.resolver
     val attributesToAnalyze = columnNames.map { col =>
-      val exprOption = relation.output.find(attr => resolver(attr.name, col))
+      val exprOption = relation.output.find(attr => conf.resolver(attr.name, 
col))
       exprOption.getOrElse(throw new AnalysisException(s"Column $col does not 
exist."))
     }
 
@@ -86,12 +89,21 @@ case class AnalyzeColumnCommand(
     }
 
     // Collect statistics per column.
+    // If no histogram is required, we run a job to compute basic column stats 
such as
+    // min, max, ndv, etc. Otherwise, besides basic column stats, histogram 
will also be
+    // generated. Currently we only support equi-height histogram.
+    // To generate an equi-height histogram, we need two jobs:
+    // 1. compute percentiles p(0), p(1/n) ... p((n-1)/n), p(1).
+    // 2. use the percentiles as value intervals of bins, e.g. [p(0), p(1/n)],
+    // [p(1/n), p(2/n)], ..., [p((n-1)/n), p(1)], and then count ndv in each 
bin.
+    // Basic column stats will be computed together in the second job.
+    val attributePercentiles = computePercentiles(attributesToAnalyze, 
sparkSession, relation)
+
     // The first element in the result will be the overall row count, the 
following elements
     // will be structs containing all column stats.
     // The layout of each struct follows the layout of the ColumnStats.
-    val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
     val expressions = Count(Literal(1)).toAggregateExpression() +:
-      attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))
+      attributesToAnalyze.map(ColumnStat.statExprs(_, conf, 
attributePercentiles))
 
     val namedExpressions = expressions.map(e => Alias(e, e.toString)())
     val statsRow = new QueryExecution(sparkSession, Aggregate(Nil, 
namedExpressions, relation))
@@ -99,9 +111,42 @@ case class AnalyzeColumnCommand(
 
     val rowCount = statsRow.getLong(0)
     val columnStats = attributesToAnalyze.zipWithIndex.map { case (attr, i) =>
-      // according to `ColumnStat.statExprs`, the stats struct always have 6 
fields.
-      (attr.name, ColumnStat.rowToColumnStat(statsRow.getStruct(i + 1, 6), 
attr))
+      // according to `ColumnStat.statExprs`, the stats struct always have 7 
fields.
+      (attr.name, ColumnStat.rowToColumnStat(statsRow.getStruct(i + 1, 7), 
attr, rowCount,
+        attributePercentiles.get(attr)))
     }.toMap
     (rowCount, columnStats)
   }
+
+  /** Computes percentiles for each attribute. */
+  private def computePercentiles(
+      attributesToAnalyze: Seq[Attribute],
+      sparkSession: SparkSession,
+      relation: LogicalPlan): AttributeMap[ArrayData] = {
+    val attrsToGenHistogram = if (conf.histogramEnabled) {
+      attributesToAnalyze.filter(a => ColumnStat.supportsHistogram(a.dataType))
+    } else {
+      Nil
+    }
+    val attributePercentiles = mutable.HashMap[Attribute, ArrayData]()
+    if (attrsToGenHistogram.nonEmpty) {
+      val percentiles = (0 to conf.histogramNumBins)
+        .map(i => i.toDouble / conf.histogramNumBins).toArray
+
+      val namedExprs = attrsToGenHistogram.map { attr =>
+        val aggFunc =
+          new ApproximatePercentile(attr, Literal(percentiles), 
Literal(conf.percentileAccuracy))
+        val expr = aggFunc.toAggregateExpression()
+        Alias(expr, expr.toString)()
+      }
+
+      val percentilesRow = new QueryExecution(sparkSession, Aggregate(Nil, 
namedExprs, relation))
+        .executedPlan.executeTake(1).head
+      attrsToGenHistogram.zipWithIndex.foreach { case (attr, i) =>
+        attributePercentiles += attr -> percentilesRow.getArray(i)
+      }
+    }
+    AttributeMap(attributePercentiles.toSeq)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/11b60af7/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index 7247c3a..fba5d26 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -142,10 +142,12 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   test("column stats round trip serialization") {
     // Make sure we serialize and then deserialize and we will get the result 
data
     val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
-    stats.zip(df.schema).foreach { case ((k, v), field) =>
-      withClue(s"column $k with type ${field.dataType}") {
-        val roundtrip = ColumnStat.fromMap("table_is_foo", field, v.toMap(k, 
field.dataType))
-        assert(roundtrip == Some(v))
+    Seq(stats, statsWithHgms).foreach { s =>
+      s.zip(df.schema).foreach { case ((k, v), field) =>
+        withClue(s"column $k with type ${field.dataType}") {
+          val roundtrip = ColumnStat.fromMap("table_is_foo", field, v.toMap(k, 
field.dataType))
+          assert(roundtrip == Some(v))
+        }
       }
     }
   }
@@ -155,6 +157,11 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
     assert(stats.size == data.head.productArity - 1)
     val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
     checkColStats(df, stats)
+
+    // test column stats with histograms
+    withSQLConf(SQLConf.HISTOGRAM_ENABLED.key -> "true", 
SQLConf.HISTOGRAM_NUM_BINS.key -> "2") {
+      checkColStats(df, statsWithHgms)
+    }
   }
 
   test("column stats collection for null columns") {

http://git-wip-us.apache.org/repos/asf/spark/blob/11b60af7/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index a2f63ed..f6df077 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -25,7 +25,7 @@ import scala.util.Random
 
 import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
HiveTableRelation}
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Histogram, 
HistogramBin, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.internal.StaticSQLConf
@@ -46,6 +46,10 @@ abstract class StatisticsCollectionTestBase extends 
QueryTest with SQLTestUtils
   private val d2 = Date.valueOf("2016-05-09")
   private val t1 = Timestamp.valueOf("2016-05-08 00:00:01")
   private val t2 = Timestamp.valueOf("2016-05-09 00:00:02")
+  private val d1Internal = DateTimeUtils.fromJavaDate(d1)
+  private val d2Internal = DateTimeUtils.fromJavaDate(d2)
+  private val t1Internal = DateTimeUtils.fromJavaTimestamp(t1)
+  private val t2Internal = DateTimeUtils.fromJavaTimestamp(t2)
 
   /**
    * Define a very simple 3 row table used for testing column serialization.
@@ -73,12 +77,39 @@ abstract class StatisticsCollectionTestBase extends 
QueryTest with SQLTestUtils
     "cdecimal" -> ColumnStat(2, Some(Decimal(dec1)), Some(Decimal(dec2)), 1, 
16, 16),
     "cstring" -> ColumnStat(2, None, None, 1, 3, 3),
     "cbinary" -> ColumnStat(2, None, None, 1, 3, 3),
-    "cdate" -> ColumnStat(2, Some(DateTimeUtils.fromJavaDate(d1)),
-      Some(DateTimeUtils.fromJavaDate(d2)), 1, 4, 4),
-    "ctimestamp" -> ColumnStat(2, Some(DateTimeUtils.fromJavaTimestamp(t1)),
-      Some(DateTimeUtils.fromJavaTimestamp(t2)), 1, 8, 8)
+    "cdate" -> ColumnStat(2, Some(d1Internal), Some(d2Internal), 1, 4, 4),
+    "ctimestamp" -> ColumnStat(2, Some(t1Internal), Some(t2Internal), 1, 8, 8)
   )
 
+  /**
+   * A mapping from column to the stats collected including histograms.
+   * The number of bins in the histograms is 2.
+   */
+  protected val statsWithHgms = {
+    val colStats = mutable.LinkedHashMap(stats.toSeq: _*)
+    colStats.update("cbyte", stats("cbyte").copy(histogram =
+      Some(Histogram(1, Array(HistogramBin(1, 1, 1), HistogramBin(1, 2, 1))))))
+    colStats.update("cshort", stats("cshort").copy(histogram =
+      Some(Histogram(1, Array(HistogramBin(1, 1, 1), HistogramBin(1, 3, 1))))))
+    colStats.update("cint", stats("cint").copy(histogram =
+      Some(Histogram(1, Array(HistogramBin(1, 1, 1), HistogramBin(1, 4, 1))))))
+    colStats.update("clong", stats("clong").copy(histogram =
+      Some(Histogram(1, Array(HistogramBin(1, 1, 1), HistogramBin(1, 5, 1))))))
+    colStats.update("cdouble", stats("cdouble").copy(histogram =
+      Some(Histogram(1, Array(HistogramBin(1, 1, 1), HistogramBin(1, 6, 1))))))
+    colStats.update("cfloat", stats("cfloat").copy(histogram =
+      Some(Histogram(1, Array(HistogramBin(1, 1, 1), HistogramBin(1, 7, 1))))))
+    colStats.update("cdecimal", stats("cdecimal").copy(histogram =
+      Some(Histogram(1, Array(HistogramBin(1, 1, 1), HistogramBin(1, 8, 1))))))
+    colStats.update("cdate", stats("cdate").copy(histogram =
+      Some(Histogram(1, Array(HistogramBin(d1Internal, d1Internal, 1),
+        HistogramBin(d1Internal, d2Internal, 1))))))
+    colStats.update("ctimestamp", stats("ctimestamp").copy(histogram =
+      Some(Histogram(1, Array(HistogramBin(t1Internal, t1Internal, 1),
+        HistogramBin(t1Internal, t2Internal, 1))))))
+    colStats
+  }
+
   private val randomName = new Random(31)
 
   def getCatalogTable(tableName: String): CatalogTable = {

http://git-wip-us.apache.org/repos/asf/spark/blob/11b60af7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 7cd7725..44e680d 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -1032,8 +1032,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       stats: CatalogStatistics,
       schema: StructType): Map[String, String] = {
 
-    var statsProperties: Map[String, String] =
-      Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
+    val statsProperties = new mutable.HashMap[String, String]()
+    statsProperties += STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString()
     if (stats.rowCount.isDefined) {
       statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()
     }
@@ -1046,7 +1046,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       }
     }
 
-    statsProperties
+    statsProperties.toMap
   }
 
   private def statsFromProperties(
@@ -1072,9 +1072,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
           val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map 
{ case (k, v) =>
             (k.drop(keyPrefix.length), v)
           }
-
-          ColumnStat.fromMap(table, field, colStatMap).foreach {
-            colStat => colStats += field.name -> colStat
+          ColumnStat.fromMap(table, field, colStatMap).foreach { cs =>
+            colStats += field.name -> cs
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/11b60af7/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 9e8fc32..7427948 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.io.{File, PrintWriter}
+import java.sql.Timestamp
 
 import scala.reflect.ClassTag
 import scala.util.matching.Regex
@@ -28,8 +29,8 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
 import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, 
HiveTableRelation}
-import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
-import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, HistogramBin, 
HistogramSerializer}
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, StringUtils}
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.joins._
@@ -963,98 +964,174 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
     assert(stats.size == data.head.productArity - 1)
     val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
 
+    val expectedSerializedColStats = Map(
+      "spark.sql.statistics.colStats.cbinary.avgLen" -> "3",
+      "spark.sql.statistics.colStats.cbinary.distinctCount" -> "2",
+      "spark.sql.statistics.colStats.cbinary.maxLen" -> "3",
+      "spark.sql.statistics.colStats.cbinary.nullCount" -> "1",
+      "spark.sql.statistics.colStats.cbinary.version" -> "1",
+      "spark.sql.statistics.colStats.cbool.avgLen" -> "1",
+      "spark.sql.statistics.colStats.cbool.distinctCount" -> "2",
+      "spark.sql.statistics.colStats.cbool.max" -> "true",
+      "spark.sql.statistics.colStats.cbool.maxLen" -> "1",
+      "spark.sql.statistics.colStats.cbool.min" -> "false",
+      "spark.sql.statistics.colStats.cbool.nullCount" -> "1",
+      "spark.sql.statistics.colStats.cbool.version" -> "1",
+      "spark.sql.statistics.colStats.cbyte.avgLen" -> "1",
+      "spark.sql.statistics.colStats.cbyte.distinctCount" -> "2",
+      "spark.sql.statistics.colStats.cbyte.max" -> "2",
+      "spark.sql.statistics.colStats.cbyte.maxLen" -> "1",
+      "spark.sql.statistics.colStats.cbyte.min" -> "1",
+      "spark.sql.statistics.colStats.cbyte.nullCount" -> "1",
+      "spark.sql.statistics.colStats.cbyte.version" -> "1",
+      "spark.sql.statistics.colStats.cdate.avgLen" -> "4",
+      "spark.sql.statistics.colStats.cdate.distinctCount" -> "2",
+      "spark.sql.statistics.colStats.cdate.max" -> "2016-05-09",
+      "spark.sql.statistics.colStats.cdate.maxLen" -> "4",
+      "spark.sql.statistics.colStats.cdate.min" -> "2016-05-08",
+      "spark.sql.statistics.colStats.cdate.nullCount" -> "1",
+      "spark.sql.statistics.colStats.cdate.version" -> "1",
+      "spark.sql.statistics.colStats.cdecimal.avgLen" -> "16",
+      "spark.sql.statistics.colStats.cdecimal.distinctCount" -> "2",
+      "spark.sql.statistics.colStats.cdecimal.max" -> "8.000000000000000000",
+      "spark.sql.statistics.colStats.cdecimal.maxLen" -> "16",
+      "spark.sql.statistics.colStats.cdecimal.min" -> "1.000000000000000000",
+      "spark.sql.statistics.colStats.cdecimal.nullCount" -> "1",
+      "spark.sql.statistics.colStats.cdecimal.version" -> "1",
+      "spark.sql.statistics.colStats.cdouble.avgLen" -> "8",
+      "spark.sql.statistics.colStats.cdouble.distinctCount" -> "2",
+      "spark.sql.statistics.colStats.cdouble.max" -> "6.0",
+      "spark.sql.statistics.colStats.cdouble.maxLen" -> "8",
+      "spark.sql.statistics.colStats.cdouble.min" -> "1.0",
+      "spark.sql.statistics.colStats.cdouble.nullCount" -> "1",
+      "spark.sql.statistics.colStats.cdouble.version" -> "1",
+      "spark.sql.statistics.colStats.cfloat.avgLen" -> "4",
+      "spark.sql.statistics.colStats.cfloat.distinctCount" -> "2",
+      "spark.sql.statistics.colStats.cfloat.max" -> "7.0",
+      "spark.sql.statistics.colStats.cfloat.maxLen" -> "4",
+      "spark.sql.statistics.colStats.cfloat.min" -> "1.0",
+      "spark.sql.statistics.colStats.cfloat.nullCount" -> "1",
+      "spark.sql.statistics.colStats.cfloat.version" -> "1",
+      "spark.sql.statistics.colStats.cint.avgLen" -> "4",
+      "spark.sql.statistics.colStats.cint.distinctCount" -> "2",
+      "spark.sql.statistics.colStats.cint.max" -> "4",
+      "spark.sql.statistics.colStats.cint.maxLen" -> "4",
+      "spark.sql.statistics.colStats.cint.min" -> "1",
+      "spark.sql.statistics.colStats.cint.nullCount" -> "1",
+      "spark.sql.statistics.colStats.cint.version" -> "1",
+      "spark.sql.statistics.colStats.clong.avgLen" -> "8",
+      "spark.sql.statistics.colStats.clong.distinctCount" -> "2",
+      "spark.sql.statistics.colStats.clong.max" -> "5",
+      "spark.sql.statistics.colStats.clong.maxLen" -> "8",
+      "spark.sql.statistics.colStats.clong.min" -> "1",
+      "spark.sql.statistics.colStats.clong.nullCount" -> "1",
+      "spark.sql.statistics.colStats.clong.version" -> "1",
+      "spark.sql.statistics.colStats.cshort.avgLen" -> "2",
+      "spark.sql.statistics.colStats.cshort.distinctCount" -> "2",
+      "spark.sql.statistics.colStats.cshort.max" -> "3",
+      "spark.sql.statistics.colStats.cshort.maxLen" -> "2",
+      "spark.sql.statistics.colStats.cshort.min" -> "1",
+      "spark.sql.statistics.colStats.cshort.nullCount" -> "1",
+      "spark.sql.statistics.colStats.cshort.version" -> "1",
+      "spark.sql.statistics.colStats.cstring.avgLen" -> "3",
+      "spark.sql.statistics.colStats.cstring.distinctCount" -> "2",
+      "spark.sql.statistics.colStats.cstring.maxLen" -> "3",
+      "spark.sql.statistics.colStats.cstring.nullCount" -> "1",
+      "spark.sql.statistics.colStats.cstring.version" -> "1",
+      "spark.sql.statistics.colStats.ctimestamp.avgLen" -> "8",
+      "spark.sql.statistics.colStats.ctimestamp.distinctCount" -> "2",
+      "spark.sql.statistics.colStats.ctimestamp.max" -> "2016-05-09 
00:00:02.0",
+      "spark.sql.statistics.colStats.ctimestamp.maxLen" -> "8",
+      "spark.sql.statistics.colStats.ctimestamp.min" -> "2016-05-08 
00:00:01.0",
+      "spark.sql.statistics.colStats.ctimestamp.nullCount" -> "1",
+      "spark.sql.statistics.colStats.ctimestamp.version" -> "1"
+    )
+
+    val expectedSerializedHistograms = Map(
+      "spark.sql.statistics.colStats.cbyte.histogram" ->
+        HistogramSerializer.serialize(statsWithHgms("cbyte").histogram.get),
+      "spark.sql.statistics.colStats.cshort.histogram" ->
+        HistogramSerializer.serialize(statsWithHgms("cshort").histogram.get),
+      "spark.sql.statistics.colStats.cint.histogram" ->
+        HistogramSerializer.serialize(statsWithHgms("cint").histogram.get),
+      "spark.sql.statistics.colStats.clong.histogram" ->
+        HistogramSerializer.serialize(statsWithHgms("clong").histogram.get),
+      "spark.sql.statistics.colStats.cdouble.histogram" ->
+        HistogramSerializer.serialize(statsWithHgms("cdouble").histogram.get),
+      "spark.sql.statistics.colStats.cfloat.histogram" ->
+        HistogramSerializer.serialize(statsWithHgms("cfloat").histogram.get),
+      "spark.sql.statistics.colStats.cdecimal.histogram" ->
+        HistogramSerializer.serialize(statsWithHgms("cdecimal").histogram.get),
+      "spark.sql.statistics.colStats.cdate.histogram" ->
+        HistogramSerializer.serialize(statsWithHgms("cdate").histogram.get),
+      "spark.sql.statistics.colStats.ctimestamp.histogram" ->
+        
HistogramSerializer.serialize(statsWithHgms("ctimestamp").histogram.get)
+    )
+
+    def checkColStatsProps(expected: Map[String, String]): Unit = {
+      sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS " + 
stats.keys.mkString(", "))
+      val table = hiveClient.getTable("default", tableName)
+      val props = 
table.properties.filterKeys(_.startsWith("spark.sql.statistics.colStats"))
+      assert(props == expected)
+    }
+
     withTable(tableName) {
       df.write.saveAsTable(tableName)
 
-      // Collect statistics
-      sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " + 
stats.keys.mkString(", "))
+      // Collect and validate statistics
+      checkColStatsProps(expectedSerializedColStats)
 
-      // Validate statistics
-      val table = hiveClient.getTable("default", tableName)
+      withSQLConf(
+        SQLConf.HISTOGRAM_ENABLED.key -> "true", 
SQLConf.HISTOGRAM_NUM_BINS.key -> "2") {
 
-      val props = 
table.properties.filterKeys(_.startsWith("spark.sql.statistics.colStats"))
-      assert(props == Map(
-        "spark.sql.statistics.colStats.cbinary.avgLen" -> "3",
-        "spark.sql.statistics.colStats.cbinary.distinctCount" -> "2",
-        "spark.sql.statistics.colStats.cbinary.maxLen" -> "3",
-        "spark.sql.statistics.colStats.cbinary.nullCount" -> "1",
-        "spark.sql.statistics.colStats.cbinary.version" -> "1",
-        "spark.sql.statistics.colStats.cbool.avgLen" -> "1",
-        "spark.sql.statistics.colStats.cbool.distinctCount" -> "2",
-        "spark.sql.statistics.colStats.cbool.max" -> "true",
-        "spark.sql.statistics.colStats.cbool.maxLen" -> "1",
-        "spark.sql.statistics.colStats.cbool.min" -> "false",
-        "spark.sql.statistics.colStats.cbool.nullCount" -> "1",
-        "spark.sql.statistics.colStats.cbool.version" -> "1",
-        "spark.sql.statistics.colStats.cbyte.avgLen" -> "1",
-        "spark.sql.statistics.colStats.cbyte.distinctCount" -> "2",
-        "spark.sql.statistics.colStats.cbyte.max" -> "2",
-        "spark.sql.statistics.colStats.cbyte.maxLen" -> "1",
-        "spark.sql.statistics.colStats.cbyte.min" -> "1",
-        "spark.sql.statistics.colStats.cbyte.nullCount" -> "1",
-        "spark.sql.statistics.colStats.cbyte.version" -> "1",
-        "spark.sql.statistics.colStats.cdate.avgLen" -> "4",
-        "spark.sql.statistics.colStats.cdate.distinctCount" -> "2",
-        "spark.sql.statistics.colStats.cdate.max" -> "2016-05-09",
-        "spark.sql.statistics.colStats.cdate.maxLen" -> "4",
-        "spark.sql.statistics.colStats.cdate.min" -> "2016-05-08",
-        "spark.sql.statistics.colStats.cdate.nullCount" -> "1",
-        "spark.sql.statistics.colStats.cdate.version" -> "1",
-        "spark.sql.statistics.colStats.cdecimal.avgLen" -> "16",
-        "spark.sql.statistics.colStats.cdecimal.distinctCount" -> "2",
-        "spark.sql.statistics.colStats.cdecimal.max" -> "8.000000000000000000",
-        "spark.sql.statistics.colStats.cdecimal.maxLen" -> "16",
-        "spark.sql.statistics.colStats.cdecimal.min" -> "1.000000000000000000",
-        "spark.sql.statistics.colStats.cdecimal.nullCount" -> "1",
-        "spark.sql.statistics.colStats.cdecimal.version" -> "1",
-        "spark.sql.statistics.colStats.cdouble.avgLen" -> "8",
-        "spark.sql.statistics.colStats.cdouble.distinctCount" -> "2",
-        "spark.sql.statistics.colStats.cdouble.max" -> "6.0",
-        "spark.sql.statistics.colStats.cdouble.maxLen" -> "8",
-        "spark.sql.statistics.colStats.cdouble.min" -> "1.0",
-        "spark.sql.statistics.colStats.cdouble.nullCount" -> "1",
-        "spark.sql.statistics.colStats.cdouble.version" -> "1",
-        "spark.sql.statistics.colStats.cfloat.avgLen" -> "4",
-        "spark.sql.statistics.colStats.cfloat.distinctCount" -> "2",
-        "spark.sql.statistics.colStats.cfloat.max" -> "7.0",
-        "spark.sql.statistics.colStats.cfloat.maxLen" -> "4",
-        "spark.sql.statistics.colStats.cfloat.min" -> "1.0",
-        "spark.sql.statistics.colStats.cfloat.nullCount" -> "1",
-        "spark.sql.statistics.colStats.cfloat.version" -> "1",
-        "spark.sql.statistics.colStats.cint.avgLen" -> "4",
-        "spark.sql.statistics.colStats.cint.distinctCount" -> "2",
-        "spark.sql.statistics.colStats.cint.max" -> "4",
-        "spark.sql.statistics.colStats.cint.maxLen" -> "4",
-        "spark.sql.statistics.colStats.cint.min" -> "1",
-        "spark.sql.statistics.colStats.cint.nullCount" -> "1",
-        "spark.sql.statistics.colStats.cint.version" -> "1",
-        "spark.sql.statistics.colStats.clong.avgLen" -> "8",
-        "spark.sql.statistics.colStats.clong.distinctCount" -> "2",
-        "spark.sql.statistics.colStats.clong.max" -> "5",
-        "spark.sql.statistics.colStats.clong.maxLen" -> "8",
-        "spark.sql.statistics.colStats.clong.min" -> "1",
-        "spark.sql.statistics.colStats.clong.nullCount" -> "1",
-        "spark.sql.statistics.colStats.clong.version" -> "1",
-        "spark.sql.statistics.colStats.cshort.avgLen" -> "2",
-        "spark.sql.statistics.colStats.cshort.distinctCount" -> "2",
-        "spark.sql.statistics.colStats.cshort.max" -> "3",
-        "spark.sql.statistics.colStats.cshort.maxLen" -> "2",
-        "spark.sql.statistics.colStats.cshort.min" -> "1",
-        "spark.sql.statistics.colStats.cshort.nullCount" -> "1",
-        "spark.sql.statistics.colStats.cshort.version" -> "1",
-        "spark.sql.statistics.colStats.cstring.avgLen" -> "3",
-        "spark.sql.statistics.colStats.cstring.distinctCount" -> "2",
-        "spark.sql.statistics.colStats.cstring.maxLen" -> "3",
-        "spark.sql.statistics.colStats.cstring.nullCount" -> "1",
-        "spark.sql.statistics.colStats.cstring.version" -> "1",
-        "spark.sql.statistics.colStats.ctimestamp.avgLen" -> "8",
-        "spark.sql.statistics.colStats.ctimestamp.distinctCount" -> "2",
-        "spark.sql.statistics.colStats.ctimestamp.max" -> "2016-05-09 
00:00:02.0",
-        "spark.sql.statistics.colStats.ctimestamp.maxLen" -> "8",
-        "spark.sql.statistics.colStats.ctimestamp.min" -> "2016-05-08 
00:00:01.0",
-        "spark.sql.statistics.colStats.ctimestamp.nullCount" -> "1",
-        "spark.sql.statistics.colStats.ctimestamp.version" -> "1"
-      ))
+        checkColStatsProps(expectedSerializedColStats ++ 
expectedSerializedHistograms)
+      }
+    }
+  }
+
+  test("serialization and deserialization of histograms to/from hive 
metastore") {
+    import testImplicits._
+
+    def checkBinsOrder(bins: Array[HistogramBin]): Unit = {
+      for (i <- bins.indices) {
+        val b = bins(i)
+        assert(b.lo <= b.hi)
+        if (i > 0) {
+          val pre = bins(i - 1)
+          assert(pre.hi <= b.lo)
+        }
+      }
+    }
+
+    val startTimestamp = 
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2016-05-08 00:00:01"))
+    val df = (1 to 5000)
+      .map(i => (i, DateTimeUtils.toJavaTimestamp(startTimestamp + i)))
+      .toDF("cint", "ctimestamp")
+    val tableName = "histogram_serde_test"
+
+    withTable(tableName) {
+      df.write.saveAsTable(tableName)
+
+      withSQLConf(SQLConf.HISTOGRAM_ENABLED.key -> "true") {
+        sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS cint, 
ctimestamp")
+        val table = hiveClient.getTable("default", tableName)
+        val intHistogramProps = table.properties
+          
.filterKeys(_.startsWith("spark.sql.statistics.colStats.cint.histogram"))
+        assert(intHistogramProps.size == 1)
+
+        val tsHistogramProps = table.properties
+          
.filterKeys(_.startsWith("spark.sql.statistics.colStats.ctimestamp.histogram"))
+        assert(tsHistogramProps.size == 1)
+
+        // Validate histogram after deserialization.
+        val cs = getCatalogStatistics(tableName).colStats
+        val intHistogram = cs("cint").histogram.get
+        val tsHistogram = cs("ctimestamp").histogram.get
+        assert(intHistogram.bins.length == 
spark.sessionState.conf.histogramNumBins)
+        checkBinsOrder(intHistogram.bins)
+        assert(tsHistogram.bins.length == 
spark.sessionState.conf.histogramNumBins)
+        checkBinsOrder(tsHistogram.bins)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to