Repository: spark Updated Branches: refs/heads/master a4002651a -> ecc179eca
[SPARK-21322][SQL] support histogram in filter cardinality estimation ## What changes were proposed in this pull request? Histogram is effective in dealing with skewed distribution. After we generate histogram information for column statistics, we need to adjust filter estimation based on histogram data structure. ## How was this patch tested? We revised all the unit test cases by including histogram data structure. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Ron Hu <ron...@huawei.com> Closes #19783 from ron8hu/supportHistogram. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecc179ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecc179ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecc179ec Branch: refs/heads/master Commit: ecc179ecaa9b4381d2f6d3356d88a4dff3e19c0f Parents: a400265 Author: Ron Hu <ron...@huawei.com> Authored: Tue Dec 12 15:04:49 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Tue Dec 12 15:04:49 2017 +0800 ---------------------------------------------------------------------- .../statsEstimation/EstimationUtils.scala | 97 ++++++++- .../statsEstimation/FilterEstimation.scala | 181 ++++++++++++---- .../statsEstimation/FilterEstimationSuite.scala | 217 ++++++++++++++++++- 3 files changed, 448 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ecc179ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala index 9c34a9b..2f416f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical.statsEstimation import scala.math.BigDecimal.RoundingMode import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} -import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.{DecimalType, _} @@ -114,4 +114,99 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column value falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the id of the first bin into which a column value falls. + */ + def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = { + var i = 0 + while ((i < bins.length) && (value > bins(i).hi)) { + i += 1 + } + i + } + + /** + * Returns the number of the last bin into which a column value falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the id of the last bin into which a column value falls. + */ + def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = { + var i = bins.length - 1 + while ((i >= 0) && (value < bins(i).lo)) { + i -= 1 + } + i + } + + /** + * Returns a percentage of a bin holding values for column value in the range of + * [lowerValue, higherValue] + * + * @param higherValue a given upper bound value of a specified column value range + * @param lowerValue a given lower bound value of a specified column value range + * @param bin a single histogram bin + * @return the percentage of a single bin holding values in [lowerValue, higherValue]. + */ + private def getOccupation( + higherValue: Double, + lowerValue: Double, + bin: HistogramBin): Double = { + assert(bin.lo <= lowerValue && lowerValue <= higherValue && higherValue <= bin.hi) + if (bin.hi == bin.lo) { + // the entire bin is covered in the range + 1.0 + } else if (higherValue == lowerValue) { + // set percentage to 1/NDV + 1.0 / bin.ndv.toDouble + } else { + // Use proration since the range falls inside this bin. + math.min((higherValue - lowerValue) / (bin.hi - bin.lo), 1.0) + } + } + + /** + * Returns the number of bins for column values in [lowerValue, higherValue]. + * The column value distribution is saved in an equi-height histogram. The return values is a + * double value is because we may return a portion of a bin. For example, a predicate + * "column = 8" may return the number of bins 0.2 if the holding bin has 5 distinct values. + * + * @param higherId id of the high end bin holding the high end value of a column range + * @param lowerId id of the low end bin holding the low end value of a column range + * @param higherEnd a given upper bound value of a specified column value range + * @param lowerEnd a given lower bound value of a specified column value range + * @param histogram a numeric equi-height histogram + * @return the number of bins for column values in [lowerEnd, higherEnd]. + */ + def getOccupationBins( + higherId: Int, + lowerId: Int, + higherEnd: Double, + lowerEnd: Double, + histogram: Histogram): Double = { + assert(lowerId <= higherId) + + if (lowerId == higherId) { + val curBin = histogram.bins(lowerId) + getOccupation(higherEnd, lowerEnd, curBin) + } else { + // compute how much lowerEnd/higherEnd occupies its bin + val lowerCurBin = histogram.bins(lowerId) + val lowerPart = getOccupation(lowerCurBin.hi, lowerEnd, lowerCurBin) + + val higherCurBin = histogram.bins(higherId) + val higherPart = getOccupation(higherEnd, higherCurBin.lo, higherCurBin) + + // the total length is lowerPart + higherPart + bins between them + lowerPart + higherPart + higherId - lowerId - 1 + } + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/ecc179ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 74820eb..f52a15e 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} -import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, LeafNode, Statistics} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ import org.apache.spark.sql.types._ @@ -265,7 +265,7 @@ case class FilterEstimation(plan: Filter) extends Logging { * @param update a boolean flag to specify if we need to update ColumnStat of a given column * for subsequent conditions * @return an optional double value to show the percentage of rows meeting a given condition - * It returns None if no statistics exists for a given column or wrong value. + * It returns None if no statistics exists for a given column or wrong value. */ def evaluateBinary( op: BinaryComparison, @@ -332,8 +332,44 @@ case class FilterEstimation(plan: Filter) extends Logging { colStatsMap.update(attr, newStats) } - Some(1.0 / BigDecimal(ndv)) - } else { + if (colStat.histogram.isEmpty) { + // returns 1/ndv if there is no histogram + Some(1.0 / BigDecimal(ndv)) + } else { + // We compute filter selectivity using Histogram information. + val datum = EstimationUtils.toDecimal(literal.value, literal.dataType).toDouble + val histogram = colStat.histogram.get + val hgmBins = histogram.bins + + // find bins where column's current min and max locate. Note that a column's [min, max] + // range may change due to another condition applied earlier. + val min = EstimationUtils.toDecimal(colStat.min.get, literal.dataType).toDouble + val max = EstimationUtils.toDecimal(colStat.max.get, literal.dataType).toDouble + val minBinId = EstimationUtils.findFirstBinForValue(min, hgmBins) + val maxBinId = EstimationUtils.findLastBinForValue(max, hgmBins) + + // compute how many bins the column's current valid range [min, max] occupies. + // Note that a column's [min, max] range may vary after we apply some filter conditions. + val validRangeBins = EstimationUtils.getOccupationBins(maxBinId, minBinId, max, + min, histogram) + + val lowerBinId = EstimationUtils.findFirstBinForValue(datum, hgmBins) + val higherBinId = EstimationUtils.findLastBinForValue(datum, hgmBins) + assert(lowerBinId <= higherBinId) + val lowerBinNdv = hgmBins(lowerBinId).ndv + val higherBinNdv = hgmBins(higherBinId).ndv + // assume uniform distribution in each bin + val occupiedBins = if (lowerBinId == higherBinId) { + 1.0 / lowerBinNdv + } else { + (1.0 / lowerBinNdv) + // lowest bin + (higherBinId - lowerBinId - 1) + // middle bins + (1.0 / higherBinNdv) // highest bin + } + Some(occupiedBins / validRangeBins) + } + + } else { // not in interval Some(0.0) } @@ -471,37 +507,46 @@ case class FilterEstimation(plan: Filter) extends Logging { percent = 1.0 } else { // This is the partial overlap case: - // Without advanced statistics like histogram, we assume uniform data distribution. - // We just prorate the adjusted range over the initial range to compute filter selectivity. - assert(max > min) - percent = op match { - case _: LessThan => - if (numericLiteral == max) { - // If the literal value is right on the boundary, we can minus the part of the - // boundary value (1/ndv). - 1.0 - 1.0 / ndv - } else { - (numericLiteral - min) / (max - min) - } - case _: LessThanOrEqual => - if (numericLiteral == min) { - // The boundary value is the only satisfying value. - 1.0 / ndv - } else { - (numericLiteral - min) / (max - min) - } - case _: GreaterThan => - if (numericLiteral == min) { - 1.0 - 1.0 / ndv - } else { - (max - numericLiteral) / (max - min) - } - case _: GreaterThanOrEqual => - if (numericLiteral == max) { - 1.0 / ndv - } else { - (max - numericLiteral) / (max - min) - } + + if (colStat.histogram.isEmpty) { + // Without advanced statistics like histogram, we assume uniform data distribution. + // We just prorate the adjusted range over the initial range to compute filter selectivity. + assert(max > min) + percent = op match { + case _: LessThan => + if (numericLiteral == max) { + // If the literal value is right on the boundary, we can minus the part of the + // boundary value (1/ndv). + 1.0 - 1.0 / ndv + } else { + (numericLiteral - min) / (max - min) + } + case _: LessThanOrEqual => + if (numericLiteral == min) { + // The boundary value is the only satisfying value. + 1.0 / ndv + } else { + (numericLiteral - min) / (max - min) + } + case _: GreaterThan => + if (numericLiteral == min) { + 1.0 - 1.0 / ndv + } else { + (max - numericLiteral) / (max - min) + } + case _: GreaterThanOrEqual => + if (numericLiteral == max) { + 1.0 / ndv + } else { + (max - numericLiteral) / (max - min) + } + } + } else { + val numericHistogram = colStat.histogram.get + val datum = EstimationUtils.toDecimal(literal.value, literal.dataType).toDouble + val max = EstimationUtils.toDecimal(colStat.max.get, literal.dataType).toDouble + val min = EstimationUtils.toDecimal(colStat.min.get, literal.dataType).toDouble + percent = computePercentByEquiHeightHgm(op, numericHistogram, max, min, datum) } if (update) { @@ -513,10 +558,9 @@ case class FilterEstimation(plan: Filter) extends Logging { op match { case _: GreaterThan | _: GreaterThanOrEqual => - // If new ndv is 1, then new max must be equal to new min. - newMin = if (newNdv == 1) newMax else newValue + newMin = newValue case _: LessThan | _: LessThanOrEqual => - newMax = if (newNdv == 1) newMin else newValue + newMax = newValue } val newStats = @@ -530,6 +574,54 @@ case class FilterEstimation(plan: Filter) extends Logging { } /** + * Returns the selectivity percentage for binary condition in the column's + * current valid range [min, max] + * + * @param op a binary comparison operator + * @param histogram a numeric equi-height histogram + * @param max the upper bound of the current valid range for a given column + * @param min the lower bound of the current valid range for a given column + * @param datumNumber the numeric value of a literal + * @return the selectivity percentage for a condition in the current range. + */ + + def computePercentByEquiHeightHgm( + op: BinaryComparison, + histogram: Histogram, + max: Double, + min: Double, + datumNumber: Double): Double = { + // find bins where column's current min and max locate. Note that a column's [min, max] + // range may change due to another condition applied earlier. + val minBinId = EstimationUtils.findFirstBinForValue(min, histogram.bins) + val maxBinId = EstimationUtils.findLastBinForValue(max, histogram.bins) + + // compute how many bins the column's current valid range [min, max] occupies. + // Note that a column's [min, max] range may vary after we apply some filter conditions. + val minToMaxLength = EstimationUtils.getOccupationBins(maxBinId, minBinId, max, min, histogram) + + val datumInBinId = op match { + case LessThan(_, _) | GreaterThanOrEqual(_, _) => + EstimationUtils.findFirstBinForValue(datumNumber, histogram.bins) + case LessThanOrEqual(_, _) | GreaterThan(_, _) => + EstimationUtils.findLastBinForValue(datumNumber, histogram.bins) + } + + op match { + // LessThan and LessThanOrEqual share the same logic, + // but their datumInBinId may be different + case LessThan(_, _) | LessThanOrEqual(_, _) => + EstimationUtils.getOccupationBins(datumInBinId, minBinId, datumNumber, min, + histogram) / minToMaxLength + // GreaterThan and GreaterThanOrEqual share the same logic, + // but their datumInBinId may be different + case GreaterThan(_, _) | GreaterThanOrEqual(_, _) => + EstimationUtils.getOccupationBins(maxBinId, datumInBinId, max, datumNumber, + histogram) / minToMaxLength + } + } + + /** * Returns a percentage of rows meeting a binary comparison expression containing two columns. * In SQL queries, we also see predicate expressions involving two columns * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. @@ -784,11 +876,16 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) { def outputColumnStats(rowsBeforeFilter: BigInt, rowsAfterFilter: BigInt) : AttributeMap[ColumnStat] = { val newColumnStats = originalMap.map { case (attr, oriColStat) => - // Update ndv based on the overall filter selectivity: scale down ndv if the number of rows - // decreases; otherwise keep it unchanged. - val newNdv = EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter, - newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount) val colStat = updatedMap.get(attr.exprId).map(_._2).getOrElse(oriColStat) + val newNdv = if (colStat.distinctCount > 1) { + // Update ndv based on the overall filter selectivity: scale down ndv if the number of rows + // decreases; otherwise keep it unchanged. + EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter, + newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount) + } else { + // no need to scale down since it is already down to 1 (for skewed distribution case) + colStat.distinctCount + } attr -> colStat.copy(distinctCount = newNdv) } AttributeMap(newColumnStats.toSeq) http://git-wip-us.apache.org/repos/asf/spark/blob/ecc179ec/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala index 455037e..2b1fe98 100755 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala @@ -22,7 +22,7 @@ import java.sql.Date import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.plans.LeftOuter -import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, Join, Statistics} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ @@ -33,7 +33,7 @@ import org.apache.spark.sql.types._ */ class FilterEstimationSuite extends StatsEstimationTestBase { - // Suppose our test table has 10 rows and 6 columns. + // Suppose our test table has 10 rows and 10 columns. // column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4 val attrInt = AttributeReference("cint", IntegerType)() @@ -91,6 +91,26 @@ class FilterEstimationSuite extends StatsEstimationTestBase { val colStatInt4 = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), nullCount = 0, avgLen = 4, maxLen = 4) + // column cintHgm has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 with histogram. + // Note that cintHgm has an even distribution with histogram information built. + // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4 + val attrIntHgm = AttributeReference("cintHgm", IntegerType)() + val hgmInt = Histogram(2.0, Array(HistogramBin(1.0, 2.0, 2), + HistogramBin(2.0, 4.0, 2), HistogramBin(4.0, 6.0, 2), + HistogramBin(6.0, 8.0, 2), HistogramBin(8.0, 10.0, 2))) + val colStatIntHgm = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt)) + + // column cintSkewHgm has values: 1, 4, 4, 5, 5, 5, 5, 6, 6, 10 with histogram. + // Note that cintSkewHgm has a skewed distribution with histogram information built. + // distinctCount:5, min:1, max:10, nullCount:0, avgLen:4, maxLen:4 + val attrIntSkewHgm = AttributeReference("cintSkewHgm", IntegerType)() + val hgmIntSkew = Histogram(2.0, Array(HistogramBin(1.0, 4.0, 2), + HistogramBin(4.0, 5.0, 2), HistogramBin(5.0, 5.0, 1), + HistogramBin(5.0, 6.0, 2), HistogramBin(6.0, 10.0, 2))) + val colStatIntSkewHgm = ColumnStat(distinctCount = 5, min = Some(1), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew)) + val attributeMap = AttributeMap(Seq( attrInt -> colStatInt, attrBool -> colStatBool, @@ -100,7 +120,9 @@ class FilterEstimationSuite extends StatsEstimationTestBase { attrString -> colStatString, attrInt2 -> colStatInt2, attrInt3 -> colStatInt3, - attrInt4 -> colStatInt4 + attrInt4 -> colStatInt4, + attrIntHgm -> colStatIntHgm, + attrIntSkewHgm -> colStatIntSkewHgm )) test("true") { @@ -359,7 +381,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase { test("cbool > false") { validateEstimatedStats( Filter(GreaterThan(attrBool, Literal(false)), childStatsTestPlan(Seq(attrBool), 10L)), - Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(true), max = Some(true), + Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(false), max = Some(true), nullCount = 0, avgLen = 1, maxLen = 1)), expectedRowCount = 5) } @@ -578,6 +600,193 @@ class FilterEstimationSuite extends StatsEstimationTestBase { expectedRowCount = 5) } + // The following test cases have histogram information collected for the test column with + // an even distribution + test("Not(cintHgm < 3 AND null)") { + val condition = Not(And(LessThan(attrIntHgm, Literal(3)), Literal(null, IntegerType))) + validateEstimatedStats( + Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> colStatIntHgm.copy(distinctCount = 7)), + expectedRowCount = 7) + } + + test("cintHgm = 5") { + validateEstimatedStats( + Filter(EqualTo(attrIntHgm, Literal(5)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> ColumnStat(distinctCount = 1, min = Some(5), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))), + expectedRowCount = 1) + } + + test("cintHgm = 0") { + // This is an out-of-range case since 0 is outside the range [min, max] + validateEstimatedStats( + Filter(EqualTo(attrIntHgm, Literal(0)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Nil, + expectedRowCount = 0) + } + + test("cintHgm < 3") { + validateEstimatedStats( + Filter(LessThan(attrIntHgm, Literal(3)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3), + nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))), + expectedRowCount = 3) + } + + test("cintHgm < 0") { + // This is a corner case since literal 0 is smaller than min. + validateEstimatedStats( + Filter(LessThan(attrIntHgm, Literal(0)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Nil, + expectedRowCount = 0) + } + + test("cintHgm <= 3") { + validateEstimatedStats( + Filter(LessThanOrEqual(attrIntHgm, Literal(3)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3), + nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))), + expectedRowCount = 3) + } + + test("cintHgm > 6") { + validateEstimatedStats( + Filter(GreaterThan(attrIntHgm, Literal(6)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> ColumnStat(distinctCount = 4, min = Some(6), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))), + expectedRowCount = 4) + } + + test("cintHgm > 10") { + // This is a corner case since max value is 10. + validateEstimatedStats( + Filter(GreaterThan(attrIntHgm, Literal(10)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Nil, + expectedRowCount = 0) + } + + test("cintHgm >= 6") { + validateEstimatedStats( + Filter(GreaterThanOrEqual(attrIntHgm, Literal(6)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> ColumnStat(distinctCount = 5, min = Some(6), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))), + expectedRowCount = 5) + } + + test("cintHgm > 3 AND cintHgm <= 6") { + val condition = And(GreaterThan(attrIntHgm, + Literal(3)), LessThanOrEqual(attrIntHgm, Literal(6))) + validateEstimatedStats( + Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> ColumnStat(distinctCount = 4, min = Some(3), max = Some(6), + nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))), + expectedRowCount = 4) + } + + test("cintHgm = 3 OR cintHgm = 6") { + val condition = Or(EqualTo(attrIntHgm, Literal(3)), EqualTo(attrIntHgm, Literal(6))) + validateEstimatedStats( + Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> colStatIntHgm.copy(distinctCount = 3)), + expectedRowCount = 3) + } + + // The following test cases have histogram information collected for the test column with + // a skewed distribution. + test("Not(cintSkewHgm < 3 AND null)") { + val condition = Not(And(LessThan(attrIntSkewHgm, Literal(3)), Literal(null, IntegerType))) + validateEstimatedStats( + Filter(condition, childStatsTestPlan(Seq(attrIntSkewHgm), 10L)), + Seq(attrIntSkewHgm -> colStatIntSkewHgm.copy(distinctCount = 5)), + expectedRowCount = 9) + } + + test("cintSkewHgm = 5") { + validateEstimatedStats( + Filter(EqualTo(attrIntSkewHgm, Literal(5)), childStatsTestPlan(Seq(attrIntSkewHgm), 10L)), + Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(5), max = Some(5), + nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))), + expectedRowCount = 4) + } + + test("cintSkewHgm = 0") { + // This is an out-of-range case since 0 is outside the range [min, max] + validateEstimatedStats( + Filter(EqualTo(attrIntSkewHgm, Literal(0)), childStatsTestPlan(Seq(attrIntSkewHgm), 10L)), + Nil, + expectedRowCount = 0) + } + + test("cintSkewHgm < 3") { + validateEstimatedStats( + Filter(LessThan(attrIntSkewHgm, Literal(3)), childStatsTestPlan(Seq(attrIntSkewHgm), 10L)), + Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(3), + nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))), + expectedRowCount = 2) + } + + test("cintSkewHgm < 0") { + // This is a corner case since literal 0 is smaller than min. + validateEstimatedStats( + Filter(LessThan(attrIntSkewHgm, Literal(0)), childStatsTestPlan(Seq(attrIntSkewHgm), 10L)), + Nil, + expectedRowCount = 0) + } + + test("cintSkewHgm <= 3") { + validateEstimatedStats( + Filter(LessThanOrEqual(attrIntSkewHgm, Literal(3)), + childStatsTestPlan(Seq(attrIntSkewHgm), 10L)), + Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(3), + nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))), + expectedRowCount = 2) + } + + test("cintSkewHgm > 6") { + validateEstimatedStats( + Filter(GreaterThan(attrIntSkewHgm, Literal(6)), childStatsTestPlan(Seq(attrIntSkewHgm), 10L)), + Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(6), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))), + expectedRowCount = 2) + } + + test("cintSkewHgm > 10") { + // This is a corner case since max value is 10. + validateEstimatedStats( + Filter(GreaterThan(attrIntSkewHgm, Literal(10)), + childStatsTestPlan(Seq(attrIntSkewHgm), 10L)), + Nil, + expectedRowCount = 0) + } + + test("cintSkewHgm >= 6") { + validateEstimatedStats( + Filter(GreaterThanOrEqual(attrIntSkewHgm, Literal(6)), + childStatsTestPlan(Seq(attrIntSkewHgm), 10L)), + Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 2, min = Some(6), max = Some(10), + nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))), + expectedRowCount = 3) + } + + test("cintSkewHgm > 3 AND cintSkewHgm <= 6") { + val condition = And(GreaterThan(attrIntSkewHgm, + Literal(3)), LessThanOrEqual(attrIntSkewHgm, Literal(6))) + validateEstimatedStats( + Filter(condition, childStatsTestPlan(Seq(attrIntSkewHgm), 10L)), + Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 4, min = Some(3), max = Some(6), + nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))), + expectedRowCount = 8) + } + + test("cintSkewHgm = 3 OR cintSkewHgm = 6") { + val condition = Or(EqualTo(attrIntSkewHgm, Literal(3)), EqualTo(attrIntSkewHgm, Literal(6))) + validateEstimatedStats( + Filter(condition, childStatsTestPlan(Seq(attrIntSkewHgm), 10L)), + Seq(attrIntSkewHgm -> colStatIntSkewHgm.copy(distinctCount = 2)), + expectedRowCount = 3) + } + private def childStatsTestPlan(outList: Seq[Attribute], tableRowCount: BigInt): StatsTestPlan = { StatsTestPlan( outputList = outList, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org