Repository: spark
Updated Branches:
  refs/heads/master a4002651a -> ecc179eca


[SPARK-21322][SQL] support histogram in filter cardinality estimation

## What changes were proposed in this pull request?

Histogram is effective in dealing with skewed distribution. After we generate 
histogram information for column statistics, we need to adjust filter 
estimation based on histogram data structure.

## How was this patch tested?

We revised all the unit test cases by including histogram data structure.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Ron Hu <ron...@huawei.com>

Closes #19783 from ron8hu/supportHistogram.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecc179ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecc179ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecc179ec

Branch: refs/heads/master
Commit: ecc179ecaa9b4381d2f6d3356d88a4dff3e19c0f
Parents: a400265
Author: Ron Hu <ron...@huawei.com>
Authored: Tue Dec 12 15:04:49 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Dec 12 15:04:49 2017 +0800

----------------------------------------------------------------------
 .../statsEstimation/EstimationUtils.scala       |  97 ++++++++-
 .../statsEstimation/FilterEstimation.scala      | 181 ++++++++++++----
 .../statsEstimation/FilterEstimationSuite.scala | 217 ++++++++++++++++++-
 3 files changed, 448 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ecc179ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
index 9c34a9b..2f416f2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
@@ -20,7 +20,7 @@ package 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation
 import scala.math.BigDecimal.RoundingMode
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types.{DecimalType, _}
 
 
@@ -114,4 +114,99 @@ object EstimationUtils {
     }
   }
 
+  /**
+   * Returns the number of the first bin into which a column value falls for a 
specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param bins an array of bins for a given numeric equi-height histogram
+   * @return the id of the first bin into which a column value falls.
+   */
+  def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = {
+    var i = 0
+    while ((i < bins.length) && (value > bins(i).hi)) {
+      i += 1
+    }
+    i
+  }
+
+  /**
+   * Returns the number of the last bin into which a column value falls for a 
specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param bins an array of bins for a given numeric equi-height histogram
+   * @return the id of the last bin into which a column value falls.
+   */
+  def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = {
+    var i = bins.length - 1
+    while ((i >= 0) && (value < bins(i).lo)) {
+      i -= 1
+    }
+    i
+  }
+
+  /**
+   * Returns a percentage of a bin holding values for column value in the 
range of
+   * [lowerValue, higherValue]
+   *
+   * @param higherValue a given upper bound value of a specified column value 
range
+   * @param lowerValue a given lower bound value of a specified column value 
range
+   * @param bin a single histogram bin
+   * @return the percentage of a single bin holding values in [lowerValue, 
higherValue].
+   */
+  private def getOccupation(
+      higherValue: Double,
+      lowerValue: Double,
+      bin: HistogramBin): Double = {
+    assert(bin.lo <= lowerValue && lowerValue <= higherValue && higherValue <= 
bin.hi)
+    if (bin.hi == bin.lo) {
+      // the entire bin is covered in the range
+      1.0
+    } else if (higherValue == lowerValue) {
+      // set percentage to 1/NDV
+      1.0 / bin.ndv.toDouble
+    } else {
+      // Use proration since the range falls inside this bin.
+      math.min((higherValue - lowerValue) / (bin.hi - bin.lo), 1.0)
+    }
+  }
+
+  /**
+   * Returns the number of bins for column values in [lowerValue, higherValue].
+   * The column value distribution is saved in an equi-height histogram.  The 
return values is a
+   * double value is because we may return a portion of a bin. For example, a 
predicate
+   * "column = 8" may return the number of bins 0.2 if the holding bin has 5 
distinct values.
+   *
+   * @param higherId id of the high end bin holding the high end value of a 
column range
+   * @param lowerId id of the low end bin holding the low end value of a 
column range
+   * @param higherEnd a given upper bound value of a specified column value 
range
+   * @param lowerEnd a given lower bound value of a specified column value 
range
+   * @param histogram a numeric equi-height histogram
+   * @return the number of bins for column values in [lowerEnd, higherEnd].
+   */
+  def getOccupationBins(
+      higherId: Int,
+      lowerId: Int,
+      higherEnd: Double,
+      lowerEnd: Double,
+      histogram: Histogram): Double = {
+    assert(lowerId <= higherId)
+
+    if (lowerId == higherId) {
+      val curBin = histogram.bins(lowerId)
+      getOccupation(higherEnd, lowerEnd, curBin)
+    } else {
+      // compute how much lowerEnd/higherEnd occupies its bin
+      val lowerCurBin = histogram.bins(lowerId)
+      val lowerPart = getOccupation(lowerCurBin.hi, lowerEnd, lowerCurBin)
+
+      val higherCurBin = histogram.bins(higherId)
+      val higherPart = getOccupation(higherEnd, higherCurBin.lo, higherCurBin)
+
+      // the total length is lowerPart + higherPart + bins between them
+      lowerPart + higherPart + higherId - lowerId - 1
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ecc179ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index 74820eb..f52a15e 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, 
LeafNode, Statistics}
+import org.apache.spark.sql.catalyst.plans.logical._
 import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
 import org.apache.spark.sql.types._
 
@@ -265,7 +265,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
    * @param update a boolean flag to specify if we need to update ColumnStat 
of a given column
    *               for subsequent conditions
    * @return an optional double value to show the percentage of rows meeting a 
given condition
-    *         It returns None if no statistics exists for a given column or 
wrong value.
+   *         It returns None if no statistics exists for a given column or 
wrong value.
    */
   def evaluateBinary(
       op: BinaryComparison,
@@ -332,8 +332,44 @@ case class FilterEstimation(plan: Filter) extends Logging {
         colStatsMap.update(attr, newStats)
       }
 
-      Some(1.0 / BigDecimal(ndv))
-    } else {
+      if (colStat.histogram.isEmpty) {
+        // returns 1/ndv if there is no histogram
+        Some(1.0 / BigDecimal(ndv))
+      } else {
+        // We compute filter selectivity using Histogram information.
+        val datum = EstimationUtils.toDecimal(literal.value, 
literal.dataType).toDouble
+        val histogram = colStat.histogram.get
+        val hgmBins = histogram.bins
+
+        // find bins where column's current min and max locate.  Note that a 
column's [min, max]
+        // range may change due to another condition applied earlier.
+        val min = EstimationUtils.toDecimal(colStat.min.get, 
literal.dataType).toDouble
+        val max = EstimationUtils.toDecimal(colStat.max.get, 
literal.dataType).toDouble
+        val minBinId = EstimationUtils.findFirstBinForValue(min, hgmBins)
+        val maxBinId = EstimationUtils.findLastBinForValue(max, hgmBins)
+
+        // compute how many bins the column's current valid range [min, max] 
occupies.
+        // Note that a column's [min, max] range may vary after we apply some 
filter conditions.
+        val validRangeBins = EstimationUtils.getOccupationBins(maxBinId, 
minBinId, max,
+          min, histogram)
+
+        val lowerBinId = EstimationUtils.findFirstBinForValue(datum, hgmBins)
+        val higherBinId = EstimationUtils.findLastBinForValue(datum, hgmBins)
+        assert(lowerBinId <= higherBinId)
+        val lowerBinNdv = hgmBins(lowerBinId).ndv
+        val higherBinNdv = hgmBins(higherBinId).ndv
+        // assume uniform distribution in each bin
+        val occupiedBins = if (lowerBinId == higherBinId) {
+          1.0 / lowerBinNdv
+        } else {
+          (1.0 / lowerBinNdv) +   // lowest bin
+            (higherBinId - lowerBinId - 1) + // middle bins
+            (1.0 / higherBinNdv)  // highest bin
+        }
+        Some(occupiedBins / validRangeBins)
+      }
+
+    } else {  // not in interval
       Some(0.0)
     }
 
@@ -471,37 +507,46 @@ case class FilterEstimation(plan: Filter) extends Logging 
{
       percent = 1.0
     } else {
       // This is the partial overlap case:
-      // Without advanced statistics like histogram, we assume uniform data 
distribution.
-      // We just prorate the adjusted range over the initial range to compute 
filter selectivity.
-      assert(max > min)
-      percent = op match {
-        case _: LessThan =>
-          if (numericLiteral == max) {
-            // If the literal value is right on the boundary, we can minus the 
part of the
-            // boundary value (1/ndv).
-            1.0 - 1.0 / ndv
-          } else {
-            (numericLiteral - min) / (max - min)
-          }
-        case _: LessThanOrEqual =>
-          if (numericLiteral == min) {
-            // The boundary value is the only satisfying value.
-            1.0 / ndv
-          } else {
-            (numericLiteral - min) / (max - min)
-          }
-        case _: GreaterThan =>
-          if (numericLiteral == min) {
-            1.0 - 1.0 / ndv
-          } else {
-            (max - numericLiteral) / (max - min)
-          }
-        case _: GreaterThanOrEqual =>
-          if (numericLiteral == max) {
-            1.0 / ndv
-          } else {
-            (max - numericLiteral) / (max - min)
-          }
+
+      if (colStat.histogram.isEmpty) {
+        // Without advanced statistics like histogram, we assume uniform data 
distribution.
+        // We just prorate the adjusted range over the initial range to 
compute filter selectivity.
+        assert(max > min)
+        percent = op match {
+          case _: LessThan =>
+            if (numericLiteral == max) {
+              // If the literal value is right on the boundary, we can minus 
the part of the
+              // boundary value (1/ndv).
+              1.0 - 1.0 / ndv
+            } else {
+              (numericLiteral - min) / (max - min)
+            }
+          case _: LessThanOrEqual =>
+            if (numericLiteral == min) {
+              // The boundary value is the only satisfying value.
+              1.0 / ndv
+            } else {
+              (numericLiteral - min) / (max - min)
+            }
+          case _: GreaterThan =>
+            if (numericLiteral == min) {
+              1.0 - 1.0 / ndv
+            } else {
+              (max - numericLiteral) / (max - min)
+            }
+          case _: GreaterThanOrEqual =>
+            if (numericLiteral == max) {
+              1.0 / ndv
+            } else {
+              (max - numericLiteral) / (max - min)
+            }
+        }
+      } else {
+        val numericHistogram = colStat.histogram.get
+        val datum = EstimationUtils.toDecimal(literal.value, 
literal.dataType).toDouble
+        val max = EstimationUtils.toDecimal(colStat.max.get, 
literal.dataType).toDouble
+        val min = EstimationUtils.toDecimal(colStat.min.get, 
literal.dataType).toDouble
+        percent = computePercentByEquiHeightHgm(op, numericHistogram, max, 
min, datum)
       }
 
       if (update) {
@@ -513,10 +558,9 @@ case class FilterEstimation(plan: Filter) extends Logging {
 
         op match {
           case _: GreaterThan | _: GreaterThanOrEqual =>
-            // If new ndv is 1, then new max must be equal to new min.
-            newMin = if (newNdv == 1) newMax else newValue
+            newMin = newValue
           case _: LessThan | _: LessThanOrEqual =>
-            newMax = if (newNdv == 1) newMin else newValue
+            newMax = newValue
         }
 
         val newStats =
@@ -530,6 +574,54 @@ case class FilterEstimation(plan: Filter) extends Logging {
   }
 
   /**
+   * Returns the selectivity percentage for binary condition in the column's
+   * current valid range [min, max]
+   *
+   * @param op a binary comparison operator
+   * @param histogram a numeric equi-height histogram
+   * @param max the upper bound of the current valid range for a given column
+   * @param min the lower bound of the current valid range for a given column
+   * @param datumNumber the numeric value of a literal
+   * @return the selectivity percentage for a condition in the current range.
+   */
+
+  def computePercentByEquiHeightHgm(
+      op: BinaryComparison,
+      histogram: Histogram,
+      max: Double,
+      min: Double,
+      datumNumber: Double): Double = {
+    // find bins where column's current min and max locate.  Note that a 
column's [min, max]
+    // range may change due to another condition applied earlier.
+    val minBinId = EstimationUtils.findFirstBinForValue(min, histogram.bins)
+    val maxBinId = EstimationUtils.findLastBinForValue(max, histogram.bins)
+
+    // compute how many bins the column's current valid range [min, max] 
occupies.
+    // Note that a column's [min, max] range may vary after we apply some 
filter conditions.
+    val minToMaxLength = EstimationUtils.getOccupationBins(maxBinId, minBinId, 
max, min, histogram)
+
+    val datumInBinId = op match {
+      case LessThan(_, _) | GreaterThanOrEqual(_, _) =>
+        EstimationUtils.findFirstBinForValue(datumNumber, histogram.bins)
+      case LessThanOrEqual(_, _) | GreaterThan(_, _) =>
+        EstimationUtils.findLastBinForValue(datumNumber, histogram.bins)
+    }
+
+    op match {
+      // LessThan and LessThanOrEqual share the same logic,
+      // but their datumInBinId may be different
+      case LessThan(_, _) | LessThanOrEqual(_, _) =>
+        EstimationUtils.getOccupationBins(datumInBinId, minBinId, datumNumber, 
min,
+          histogram) / minToMaxLength
+      // GreaterThan and GreaterThanOrEqual share the same logic,
+      // but their datumInBinId may be different
+      case GreaterThan(_, _) | GreaterThanOrEqual(_, _) =>
+        EstimationUtils.getOccupationBins(maxBinId, datumInBinId, max, 
datumNumber,
+          histogram) / minToMaxLength
+    }
+  }
+
+  /**
    * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
    * In SQL queries, we also see predicate expressions involving two columns
    * such as "column-1 (op) column-2" where column-1 and column-2 belong to 
same table.
@@ -784,11 +876,16 @@ case class ColumnStatsMap(originalMap: 
AttributeMap[ColumnStat]) {
   def outputColumnStats(rowsBeforeFilter: BigInt, rowsAfterFilter: BigInt)
     : AttributeMap[ColumnStat] = {
     val newColumnStats = originalMap.map { case (attr, oriColStat) =>
-      // Update ndv based on the overall filter selectivity: scale down ndv if 
the number of rows
-      // decreases; otherwise keep it unchanged.
-      val newNdv = EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
-        newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount)
       val colStat = updatedMap.get(attr.exprId).map(_._2).getOrElse(oriColStat)
+      val newNdv = if (colStat.distinctCount > 1) {
+        // Update ndv based on the overall filter selectivity: scale down ndv 
if the number of rows
+        // decreases; otherwise keep it unchanged.
+        EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
+          newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount)
+      } else {
+        // no need to scale down since it is already down to 1 (for skewed 
distribution case)
+        colStat.distinctCount
+      }
       attr -> colStat.copy(distinctCount = newNdv)
     }
     AttributeMap(newColumnStats.toSeq)

http://git-wip-us.apache.org/repos/asf/spark/blob/ecc179ec/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
index 455037e..2b1fe98 100755
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
@@ -22,7 +22,7 @@ import java.sql.Date
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
 import org.apache.spark.sql.catalyst.plans.LeftOuter
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, Join, 
Statistics}
+import org.apache.spark.sql.catalyst.plans.logical._
 import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
@@ -33,7 +33,7 @@ import org.apache.spark.sql.types._
  */
 class FilterEstimationSuite extends StatsEstimationTestBase {
 
-  // Suppose our test table has 10 rows and 6 columns.
+  // Suppose our test table has 10 rows and 10 columns.
   // column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
   // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4
   val attrInt = AttributeReference("cint", IntegerType)()
@@ -91,6 +91,26 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
   val colStatInt4 = ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
     nullCount = 0, avgLen = 4, maxLen = 4)
 
+  // column cintHgm has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 with histogram.
+  // Note that cintHgm has an even distribution with histogram information 
built.
+  // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4
+  val attrIntHgm = AttributeReference("cintHgm", IntegerType)()
+  val hgmInt = Histogram(2.0, Array(HistogramBin(1.0, 2.0, 2),
+    HistogramBin(2.0, 4.0, 2), HistogramBin(4.0, 6.0, 2),
+    HistogramBin(6.0, 8.0, 2), HistogramBin(8.0, 10.0, 2)))
+  val colStatIntHgm = ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
+    nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))
+
+  // column cintSkewHgm has values: 1, 4, 4, 5, 5, 5, 5, 6, 6, 10 with 
histogram.
+  // Note that cintSkewHgm has a skewed distribution with histogram 
information built.
+  // distinctCount:5, min:1, max:10, nullCount:0, avgLen:4, maxLen:4
+  val attrIntSkewHgm = AttributeReference("cintSkewHgm", IntegerType)()
+  val hgmIntSkew = Histogram(2.0, Array(HistogramBin(1.0, 4.0, 2),
+    HistogramBin(4.0, 5.0, 2), HistogramBin(5.0, 5.0, 1),
+    HistogramBin(5.0, 6.0, 2), HistogramBin(6.0, 10.0, 2)))
+  val colStatIntSkewHgm = ColumnStat(distinctCount = 5, min = Some(1), max = 
Some(10),
+    nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))
+
   val attributeMap = AttributeMap(Seq(
     attrInt -> colStatInt,
     attrBool -> colStatBool,
@@ -100,7 +120,9 @@ class FilterEstimationSuite extends StatsEstimationTestBase 
{
     attrString -> colStatString,
     attrInt2 -> colStatInt2,
     attrInt3 -> colStatInt3,
-    attrInt4 -> colStatInt4
+    attrInt4 -> colStatInt4,
+    attrIntHgm -> colStatIntHgm,
+    attrIntSkewHgm -> colStatIntSkewHgm
   ))
 
   test("true") {
@@ -359,7 +381,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase 
{
   test("cbool > false") {
     validateEstimatedStats(
       Filter(GreaterThan(attrBool, Literal(false)), 
childStatsTestPlan(Seq(attrBool), 10L)),
-      Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(true), max = 
Some(true),
+      Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(false), max = 
Some(true),
         nullCount = 0, avgLen = 1, maxLen = 1)),
       expectedRowCount = 5)
   }
@@ -578,6 +600,193 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
       expectedRowCount = 5)
   }
 
+  // The following test cases have histogram information collected for the 
test column with
+  // an even distribution
+  test("Not(cintHgm < 3 AND null)") {
+    val condition = Not(And(LessThan(attrIntHgm, Literal(3)), Literal(null, 
IntegerType)))
+    validateEstimatedStats(
+      Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)),
+      Seq(attrIntHgm -> colStatIntHgm.copy(distinctCount = 7)),
+      expectedRowCount = 7)
+  }
+
+  test("cintHgm = 5") {
+    validateEstimatedStats(
+      Filter(EqualTo(attrIntHgm, Literal(5)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+      Seq(attrIntHgm -> ColumnStat(distinctCount = 1, min = Some(5), max = 
Some(5),
+        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+      expectedRowCount = 1)
+  }
+
+  test("cintHgm = 0") {
+    // This is an out-of-range case since 0 is outside the range [min, max]
+    validateEstimatedStats(
+      Filter(EqualTo(attrIntHgm, Literal(0)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+      Nil,
+      expectedRowCount = 0)
+  }
+
+  test("cintHgm < 3") {
+    validateEstimatedStats(
+      Filter(LessThan(attrIntHgm, Literal(3)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+      Seq(attrIntHgm -> ColumnStat(distinctCount = 3, min = Some(1), max = 
Some(3),
+        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+      expectedRowCount = 3)
+  }
+
+  test("cintHgm < 0") {
+    // This is a corner case since literal 0 is smaller than min.
+    validateEstimatedStats(
+      Filter(LessThan(attrIntHgm, Literal(0)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+      Nil,
+      expectedRowCount = 0)
+  }
+
+  test("cintHgm <= 3") {
+    validateEstimatedStats(
+      Filter(LessThanOrEqual(attrIntHgm, Literal(3)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+      Seq(attrIntHgm -> ColumnStat(distinctCount = 3, min = Some(1), max = 
Some(3),
+        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+      expectedRowCount = 3)
+  }
+
+  test("cintHgm > 6") {
+    validateEstimatedStats(
+      Filter(GreaterThan(attrIntHgm, Literal(6)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+      Seq(attrIntHgm -> ColumnStat(distinctCount = 4, min = Some(6), max = 
Some(10),
+        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+      expectedRowCount = 4)
+  }
+
+  test("cintHgm > 10") {
+    // This is a corner case since max value is 10.
+    validateEstimatedStats(
+      Filter(GreaterThan(attrIntHgm, Literal(10)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+      Nil,
+      expectedRowCount = 0)
+  }
+
+  test("cintHgm >= 6") {
+    validateEstimatedStats(
+      Filter(GreaterThanOrEqual(attrIntHgm, Literal(6)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+      Seq(attrIntHgm -> ColumnStat(distinctCount = 5, min = Some(6), max = 
Some(10),
+        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+      expectedRowCount = 5)
+  }
+
+  test("cintHgm > 3 AND cintHgm <= 6") {
+    val condition = And(GreaterThan(attrIntHgm,
+      Literal(3)), LessThanOrEqual(attrIntHgm, Literal(6)))
+    validateEstimatedStats(
+      Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)),
+      Seq(attrIntHgm -> ColumnStat(distinctCount = 4, min = Some(3), max = 
Some(6),
+        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+      expectedRowCount = 4)
+  }
+
+  test("cintHgm = 3 OR cintHgm = 6") {
+    val condition = Or(EqualTo(attrIntHgm, Literal(3)), EqualTo(attrIntHgm, 
Literal(6)))
+    validateEstimatedStats(
+      Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)),
+      Seq(attrIntHgm -> colStatIntHgm.copy(distinctCount = 3)),
+      expectedRowCount = 3)
+  }
+
+  // The following test cases have histogram information collected for the 
test column with
+  // a skewed distribution.
+  test("Not(cintSkewHgm < 3 AND null)") {
+    val condition = Not(And(LessThan(attrIntSkewHgm, Literal(3)), 
Literal(null, IntegerType)))
+    validateEstimatedStats(
+      Filter(condition, childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
+      Seq(attrIntSkewHgm -> colStatIntSkewHgm.copy(distinctCount = 5)),
+      expectedRowCount = 9)
+  }
+
+  test("cintSkewHgm = 5") {
+    validateEstimatedStats(
+      Filter(EqualTo(attrIntSkewHgm, Literal(5)), 
childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
+      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(5), max = 
Some(5),
+        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+      expectedRowCount = 4)
+  }
+
+  test("cintSkewHgm = 0") {
+    // This is an out-of-range case since 0 is outside the range [min, max]
+    validateEstimatedStats(
+      Filter(EqualTo(attrIntSkewHgm, Literal(0)), 
childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
+      Nil,
+      expectedRowCount = 0)
+  }
+
+  test("cintSkewHgm < 3") {
+    validateEstimatedStats(
+      Filter(LessThan(attrIntSkewHgm, Literal(3)), 
childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
+      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(1), max = 
Some(3),
+        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+      expectedRowCount = 2)
+  }
+
+  test("cintSkewHgm < 0") {
+    // This is a corner case since literal 0 is smaller than min.
+    validateEstimatedStats(
+      Filter(LessThan(attrIntSkewHgm, Literal(0)), 
childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
+      Nil,
+      expectedRowCount = 0)
+  }
+
+  test("cintSkewHgm <= 3") {
+    validateEstimatedStats(
+      Filter(LessThanOrEqual(attrIntSkewHgm, Literal(3)),
+        childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
+      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(1), max = 
Some(3),
+        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+      expectedRowCount = 2)
+  }
+
+  test("cintSkewHgm > 6") {
+    validateEstimatedStats(
+      Filter(GreaterThan(attrIntSkewHgm, Literal(6)), 
childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
+      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(6), max = 
Some(10),
+        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+      expectedRowCount = 2)
+  }
+
+  test("cintSkewHgm > 10") {
+    // This is a corner case since max value is 10.
+    validateEstimatedStats(
+      Filter(GreaterThan(attrIntSkewHgm, Literal(10)),
+        childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
+      Nil,
+      expectedRowCount = 0)
+  }
+
+  test("cintSkewHgm >= 6") {
+    validateEstimatedStats(
+      Filter(GreaterThanOrEqual(attrIntSkewHgm, Literal(6)),
+        childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
+      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 2, min = Some(6), max = 
Some(10),
+        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+      expectedRowCount = 3)
+  }
+
+  test("cintSkewHgm > 3 AND cintSkewHgm <= 6") {
+    val condition = And(GreaterThan(attrIntSkewHgm,
+      Literal(3)), LessThanOrEqual(attrIntSkewHgm, Literal(6)))
+    validateEstimatedStats(
+      Filter(condition, childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
+      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 4, min = Some(3), max = 
Some(6),
+        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+      expectedRowCount = 8)
+  }
+
+  test("cintSkewHgm = 3 OR cintSkewHgm = 6") {
+    val condition = Or(EqualTo(attrIntSkewHgm, Literal(3)), 
EqualTo(attrIntSkewHgm, Literal(6)))
+    validateEstimatedStats(
+      Filter(condition, childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
+      Seq(attrIntSkewHgm -> colStatIntSkewHgm.copy(distinctCount = 2)),
+      expectedRowCount = 3)
+  }
+
   private def childStatsTestPlan(outList: Seq[Attribute], tableRowCount: 
BigInt): StatsTestPlan = {
     StatsTestPlan(
       outputList = outList,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to