Repository: spark Updated Branches: refs/heads/master 48e44b24a -> bf66335ac
[SPARK-21323][SQL] Rename plans.logical.statsEstimation.Range to ValueInterval ## What changes were proposed in this pull request? Rename org.apache.spark.sql.catalyst.plans.logical.statsEstimation.Range to ValueInterval. The current naming is identical to logical operator "range". Refactoring it to ValueInterval is more accurate. ## How was this patch tested? unit test Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Wang Gengliang <ltn...@gmail.com> Closes #18549 from gengliangwang/ValueInterval. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf66335a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf66335a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf66335a Branch: refs/heads/master Commit: bf66335acab3c0c188f6c378eb8aa6948a259cb2 Parents: 48e44b2 Author: Wang Gengliang <ltn...@gmail.com> Authored: Thu Jul 6 13:58:27 2017 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Thu Jul 6 13:58:27 2017 -0700 ---------------------------------------------------------------------- .../statsEstimation/FilterEstimation.scala | 36 ++++---- .../statsEstimation/JoinEstimation.scala | 14 +-- .../plans/logical/statsEstimation/Range.scala | 88 ------------------- .../logical/statsEstimation/ValueInterval.scala | 91 ++++++++++++++++++++ 4 files changed, 117 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bf66335a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 5a3bee7..e13db85 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -316,8 +316,8 @@ case class FilterEstimation(plan: Filter) extends Logging { // decide if the value is in [min, max] of the column. // We currently don't store min/max for binary/string type. // Hence, we assume it is in boundary for binary/string type. - val statsRange = Range(colStat.min, colStat.max, attr.dataType) - if (statsRange.contains(literal)) { + val statsInterval = ValueInterval(colStat.min, colStat.max, attr.dataType) + if (statsInterval.contains(literal)) { if (update) { // We update ColumnStat structure after apply this equality predicate: // Set distinctCount to 1, nullCount to 0, and min/max values (if exist) to the literal @@ -388,9 +388,10 @@ case class FilterEstimation(plan: Filter) extends Logging { // use [min, max] to filter the original hSet dataType match { case _: NumericType | BooleanType | DateType | TimestampType => - val statsRange = Range(colStat.min, colStat.max, dataType).asInstanceOf[NumericRange] + val statsInterval = + ValueInterval(colStat.min, colStat.max, dataType).asInstanceOf[NumericValueInterval] val validQuerySet = hSet.filter { v => - v != null && statsRange.contains(Literal(v, dataType)) + v != null && statsInterval.contains(Literal(v, dataType)) } if (validQuerySet.isEmpty) { @@ -440,12 +441,13 @@ case class FilterEstimation(plan: Filter) extends Logging { update: Boolean): Option[BigDecimal] = { val colStat = colStatsMap(attr) - val statsRange = Range(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericRange] - val max = statsRange.max.toBigDecimal - val min = statsRange.min.toBigDecimal + val statsInterval = + ValueInterval(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericValueInterval] + val max = statsInterval.max.toBigDecimal + val min = statsInterval.min.toBigDecimal val ndv = BigDecimal(colStat.distinctCount) - // determine the overlapping degree between predicate range and column's range + // determine the overlapping degree between predicate interval and column's interval val numericLiteral = if (literal.dataType == BooleanType) { if (literal.value.asInstanceOf[Boolean]) BigDecimal(1) else BigDecimal(0) } else { @@ -566,18 +568,18 @@ case class FilterEstimation(plan: Filter) extends Logging { } val colStatLeft = colStatsMap(attrLeft) - val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) - .asInstanceOf[NumericRange] - val maxLeft = statsRangeLeft.max - val minLeft = statsRangeLeft.min + val statsIntervalLeft = ValueInterval(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericValueInterval] + val maxLeft = statsIntervalLeft.max + val minLeft = statsIntervalLeft.min val colStatRight = colStatsMap(attrRight) - val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) - .asInstanceOf[NumericRange] - val maxRight = statsRangeRight.max - val minRight = statsRangeRight.min + val statsIntervalRight = ValueInterval(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericValueInterval] + val maxRight = statsIntervalRight.max + val minRight = statsIntervalRight.min - // determine the overlapping degree between predicate range and column's range + // determine the overlapping degree between predicate interval and column's interval val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0) val (noOverlap: Boolean, completeOverlap: Boolean) = op match { // Left < Right or Left <= Right http://git-wip-us.apache.org/repos/asf/spark/blob/bf66335a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala index f481969..dcbe36d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala @@ -175,9 +175,9 @@ case class InnerOuterEstimation(join: Join) extends Logging { // Check if the two sides are disjoint val leftKeyStats = leftStats.attributeStats(leftKey) val rightKeyStats = rightStats.attributeStats(rightKey) - val lRange = Range(leftKeyStats.min, leftKeyStats.max, leftKey.dataType) - val rRange = Range(rightKeyStats.min, rightKeyStats.max, rightKey.dataType) - if (Range.isIntersected(lRange, rRange)) { + val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType) + val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType) + if (ValueInterval.isIntersected(lInterval, rInterval)) { // Get the largest ndv among pairs of join keys val maxNdv = leftKeyStats.distinctCount.max(rightKeyStats.distinctCount) if (maxNdv > ndvDenom) ndvDenom = maxNdv @@ -239,16 +239,16 @@ case class InnerOuterEstimation(join: Join) extends Logging { joinKeyPairs.foreach { case (leftKey, rightKey) => val leftKeyStats = leftStats.attributeStats(leftKey) val rightKeyStats = rightStats.attributeStats(rightKey) - val lRange = Range(leftKeyStats.min, leftKeyStats.max, leftKey.dataType) - val rRange = Range(rightKeyStats.min, rightKeyStats.max, rightKey.dataType) + val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType) + val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType) // When we reach here, join selectivity is not zero, so each pair of join keys should be // intersected. - assert(Range.isIntersected(lRange, rRange)) + assert(ValueInterval.isIntersected(lInterval, rInterval)) // Update intersected column stats assert(leftKey.dataType.sameType(rightKey.dataType)) val newNdv = leftKeyStats.distinctCount.min(rightKeyStats.distinctCount) - val (newMin, newMax) = Range.intersect(lRange, rRange, leftKey.dataType) + val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType) val newMaxLen = math.min(leftKeyStats.maxLen, rightKeyStats.maxLen) val newAvgLen = (leftKeyStats.avgLen + rightKeyStats.avgLen) / 2 val newStats = ColumnStat(newNdv, newMin, newMax, 0, newAvgLen, newMaxLen) http://git-wip-us.apache.org/repos/asf/spark/blob/bf66335a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala deleted file mode 100644 index 4ac5ba5..0000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.plans.logical.statsEstimation - -import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.types._ - - -/** Value range of a column. */ -trait Range { - def contains(l: Literal): Boolean -} - -/** For simplicity we use decimal to unify operations of numeric ranges. */ -case class NumericRange(min: Decimal, max: Decimal) extends Range { - override def contains(l: Literal): Boolean = { - val lit = EstimationUtils.toDecimal(l.value, l.dataType) - min <= lit && max >= lit - } -} - -/** - * This version of Spark does not have min/max for binary/string types, we define their default - * behaviors by this class. - */ -class DefaultRange extends Range { - override def contains(l: Literal): Boolean = true -} - -/** This is for columns with only null values. */ -class NullRange extends Range { - override def contains(l: Literal): Boolean = false -} - -object Range { - def apply(min: Option[Any], max: Option[Any], dataType: DataType): Range = dataType match { - case StringType | BinaryType => new DefaultRange() - case _ if min.isEmpty || max.isEmpty => new NullRange() - case _ => - NumericRange( - min = EstimationUtils.toDecimal(min.get, dataType), - max = EstimationUtils.toDecimal(max.get, dataType)) - } - - def isIntersected(r1: Range, r2: Range): Boolean = (r1, r2) match { - case (_, _: DefaultRange) | (_: DefaultRange, _) => - // The DefaultRange represents string/binary types which do not have max/min stats, - // we assume they are intersected to be conservative on estimation - true - case (_, _: NullRange) | (_: NullRange, _) => - false - case (n1: NumericRange, n2: NumericRange) => - n1.min.compareTo(n2.max) <= 0 && n1.max.compareTo(n2.min) >= 0 - } - - /** - * Intersected results of two ranges. This is only for two overlapped ranges. - * The outputs are the intersected min/max values. - */ - def intersect(r1: Range, r2: Range, dt: DataType): (Option[Any], Option[Any]) = { - (r1, r2) match { - case (_, _: DefaultRange) | (_: DefaultRange, _) => - // binary/string types don't support intersecting. - (None, None) - case (n1: NumericRange, n2: NumericRange) => - // Choose the maximum of two min values, and the minimum of two max values. - val newMin = if (n1.min <= n2.min) n2.min else n1.min - val newMax = if (n1.max <= n2.max) n1.max else n2.max - (Some(EstimationUtils.fromDecimal(newMin, dt)), - Some(EstimationUtils.fromDecimal(newMax, dt))) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/bf66335a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ValueInterval.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ValueInterval.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ValueInterval.scala new file mode 100644 index 0000000..0caaf79 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ValueInterval.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.types._ + + +/** Value range of a column. */ +trait ValueInterval { + def contains(l: Literal): Boolean +} + +/** For simplicity we use decimal to unify operations of numeric intervals. */ +case class NumericValueInterval(min: Decimal, max: Decimal) extends ValueInterval { + override def contains(l: Literal): Boolean = { + val lit = EstimationUtils.toDecimal(l.value, l.dataType) + min <= lit && max >= lit + } +} + +/** + * This version of Spark does not have min/max for binary/string types, we define their default + * behaviors by this class. + */ +class DefaultValueInterval extends ValueInterval { + override def contains(l: Literal): Boolean = true +} + +/** This is for columns with only null values. */ +class NullValueInterval extends ValueInterval { + override def contains(l: Literal): Boolean = false +} + +object ValueInterval { + def apply( + min: Option[Any], + max: Option[Any], + dataType: DataType): ValueInterval = dataType match { + case StringType | BinaryType => new DefaultValueInterval() + case _ if min.isEmpty || max.isEmpty => new NullValueInterval() + case _ => + NumericValueInterval( + min = EstimationUtils.toDecimal(min.get, dataType), + max = EstimationUtils.toDecimal(max.get, dataType)) + } + + def isIntersected(r1: ValueInterval, r2: ValueInterval): Boolean = (r1, r2) match { + case (_, _: DefaultValueInterval) | (_: DefaultValueInterval, _) => + // The DefaultValueInterval represents string/binary types which do not have max/min stats, + // we assume they are intersected to be conservative on estimation + true + case (_, _: NullValueInterval) | (_: NullValueInterval, _) => + false + case (n1: NumericValueInterval, n2: NumericValueInterval) => + n1.min.compareTo(n2.max) <= 0 && n1.max.compareTo(n2.min) >= 0 + } + + /** + * Intersected results of two intervals. This is only for two overlapped intervals. + * The outputs are the intersected min/max values. + */ + def intersect(r1: ValueInterval, r2: ValueInterval, dt: DataType): (Option[Any], Option[Any]) = { + (r1, r2) match { + case (_, _: DefaultValueInterval) | (_: DefaultValueInterval, _) => + // binary/string types don't support intersecting. + (None, None) + case (n1: NumericValueInterval, n2: NumericValueInterval) => + // Choose the maximum of two min values, and the minimum of two max values. + val newMin = if (n1.min <= n2.min) n2.min else n1.min + val newMax = if (n1.max <= n2.max) n1.max else n2.max + (Some(EstimationUtils.fromDecimal(newMin, dt)), + Some(EstimationUtils.fromDecimal(newMax, dt))) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org