[GitHub] spark issue #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileSize bec...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/22232 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileSize bec...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/22232 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileSize bec...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/22232 this seems to be caused by removing support for Hadoop 2.5 and earlier? cc original authors @cloud-fan @srowen to make sure --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileSize bec...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/22232 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21147: [SPARK-23799][SQL][FOLLOW-UP] FilterEstimation.evaluateI...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/21147 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21147: [SPARK-23799][SQL][FOLLOW-UP] FilterEstimation.evaluateI...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/21147 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21147: [SPARK-23799][SQL][FOLLOW-UP] FilterEstimation.ev...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/21147#discussion_r184308017 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -392,13 +392,13 @@ case class FilterEstimation(plan: Filter) extends Logging { val dataType = attr.dataType var newNdv = ndv -if (ndv.toDouble == 0 || colStat.min.isEmpty || colStat.max.isEmpty) { - return Some(0.0) -} - // use [min, max] to filter the original hSet dataType match { case _: NumericType | BooleanType | DateType | TimestampType => +if (ndv.toDouble == 0 || colStat.min.isEmpty || colStat.max.isEmpty) { --- End diff -- min/max could be None when the table is empty --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r181538566 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -304,45 +304,14 @@ case class LoadDataCommand( } } -val loadPath = +val loadPath = { if (isLocal) { val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. --- End diff -- yeah, this is what I was worried about. We need to be careful to change this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21052: [SPARK-23799] FilterEstimation.evaluateInSet prod...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/21052#discussion_r181378148 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -357,6 +357,17 @@ class FilterEstimationSuite extends StatsEstimationTestBase { expectedRowCount = 3) } + test("evaluateInSet with all zeros") { +validateEstimatedStats( + Filter(InSet(attrString, Set(3, 4, 5)), +StatsTestPlan(Seq(attrString), 10, + AttributeMap(Seq(attrString -> +ColumnStat(distinctCount = Some(0), min = Some(0), max = Some(0), --- End diff -- `min` and `max` should be `None`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21052: [SPARK-23799] FilterEstimation.evaluateInSet prod...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/21052#discussion_r181380894 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/CBOSuite.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.sql.{QueryTest, SaveMode} +import org.apache.spark.sql.test.SharedSparkSession + +class CBOSuite extends QueryTest with SharedSparkSession { + + import testImplicits._ + + test("Simple queries must be working, if CBO is turned on") { --- End diff -- Shall we move it to `StatisticsCollectionSuite`? And I think a simple EXPLAIN command on an empty table can just cover the case? We can check the plan's stats (e.g. rowCount == 0) after explain. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21052: [SPARK-23799] FilterEstimation.evaluateInSet prod...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/21052#discussion_r181381874 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -395,27 +395,28 @@ case class FilterEstimation(plan: Filter) extends Logging { // use [min, max] to filter the original hSet dataType match { case _: NumericType | BooleanType | DateType | TimestampType => -val statsInterval = - ValueInterval(colStat.min, colStat.max, dataType).asInstanceOf[NumericValueInterval] -val validQuerySet = hSet.filter { v => - v != null && statsInterval.contains(Literal(v, dataType)) -} +if (colStat.min.isDefined && colStat.max.isDefined) { --- End diff -- check `ndv == 0` at the beginning and return `Some(0.0`? then we don't have to make all these changes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21052: [SPARK-23799] FilterEstimation.evaluateInSet prod...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/21052#discussion_r181378031 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -357,6 +357,17 @@ class FilterEstimationSuite extends StatsEstimationTestBase { expectedRowCount = 3) } + test("evaluateInSet with all zeros") { +validateEstimatedStats( + Filter(InSet(attrString, Set(3, 4, 5)), +StatsTestPlan(Seq(attrString), 10, --- End diff -- change rowCount from `10` to `0`? this is more reasonable for an empty table. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21052: [SPARK-23799] FilterEstimation.evaluateInSet produces de...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/21052 @mshtelma Usually we describe PR using two sections: `What changes were proposed in this pull request?` and `How was this patch tested?`. I think it should be in the template when we open a PR. Could you please update PR description based on the template? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21052: [SPARK-23799] FilterEstimation.evaluateInSet produces de...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/21052 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180389105 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, ORDER_BY_ORDINAL} + +class RemoveRedundantSortsSuite extends PlanTest { + override val conf = new SQLConf().copy(CASE_SENSITIVE -> true, ORDER_BY_ORDINAL -> false) + val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf) + val analyzer = new Analyzer(catalog, conf) --- End diff -- If we don't use ordinal number, we can remove these. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180390907 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -197,6 +198,19 @@ class PlannerSuite extends SharedSQLContext { assert(planned.child.isInstanceOf[CollectLimitExec]) } + test("SPARK-23375: Cached sorted data doesn't need to be re-sorted") { +val query = testData.select('key, 'value).sort('key.desc).cache() + assert(query.queryExecution.optimizedPlan.isInstanceOf[InMemoryRelation]) +val resorted = query.sort('key.desc) +assert(resorted.queryExecution.optimizedPlan.collect { case s: Sort => s}.isEmpty) +assert(resorted.select('key).collect().map(_.getInt(0)).toSeq == + (1 to 100).sorted(Ordering[Int].reverse)) +// with a different order, the sort is needed +val sortedAsc = query.sort('key) +assert(sortedAsc.queryExecution.optimizedPlan.collect { case s: Sort => s}.nonEmpty) --- End diff -- `.nonEmpty` -> `.size == 1` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180389667 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] { } } +/** + * Removes Sort operations on already sorted data --- End diff -- how about `Removes Sort operation if the child is already sorted`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180390730 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -197,6 +198,19 @@ class PlannerSuite extends SharedSQLContext { assert(planned.child.isInstanceOf[CollectLimitExec]) } + test("SPARK-23375: Cached sorted data doesn't need to be re-sorted") { +val query = testData.select('key, 'value).sort('key.desc).cache() + assert(query.queryExecution.optimizedPlan.isInstanceOf[InMemoryRelation]) +val resorted = query.sort('key.desc) +assert(resorted.queryExecution.optimizedPlan.collect { case s: Sort => s}.isEmpty) +assert(resorted.select('key).collect().map(_.getInt(0)).toSeq == + (1 to 100).sorted(Ordering[Int].reverse)) --- End diff -- `(1 to 100).reverse`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180390139 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -522,6 +524,8 @@ case class Range( override def computeStats(): Statistics = { Statistics(sizeInBytes = LongType.defaultSize * numElements) } + + override def outputOrdering: Seq[SortOrder] = output.map(a => SortOrder(a, Descending)) --- End diff -- ordering is the same when `step` in `Range` is positive or negative? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r180389285 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, ORDER_BY_ORDINAL} + +class RemoveRedundantSortsSuite extends PlanTest { + override val conf = new SQLConf().copy(CASE_SENSITIVE -> true, ORDER_BY_ORDINAL -> false) + val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf) + val analyzer = new Analyzer(catalog, conf) + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Remove Redundant Sorts", Once, +RemoveRedundantSorts) :: + Batch("Collapse Project", Once, +CollapseProject) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + test("remove redundant order by") { +val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) +val unnecessaryReordered = orderedPlan.select('a).orderBy('a.asc, 'b.desc_nullsFirst) +val optimized = Optimize.execute(analyzer.execute(unnecessaryReordered)) --- End diff -- just use `unnecessaryReordered.analyze`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180345994 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan) +: Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def checkSameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { --- End diff -- how about `sameJoinOrder`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180346211 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,49 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private def extractLeftDeepInnerJoins(plan: LogicalPlan): Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def checkSameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( + originalPlan: LogicalPlan, + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): LogicalPlan = { +val orderedJoins = createOrderedJoin(input, conditions) +if (!checkSameJoinOrder(orderedJoins, originalPlan)) { --- End diff -- ah, right, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180327289 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -172,17 +174,20 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { case Filter(filterCondition, j @ Join(left, right, _: InnerLike, joinCondition)) => val (plans, conditions) = flattenJoin(j) (plans, conditions ++ splitConjunctivePredicates(filterCondition)) - +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) +// Keep flattening joins when the project has attributes only +if p.projectList.forall(_.isInstanceOf[Attribute]) => + flattenJoin(j) case _ => (Seq((plan, parentJoinType)), Seq.empty) } - def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] - = plan match { -case f @ Filter(filterCondition, j @ Join(_, _, joinType: InnerLike, _)) => - Some(flattenJoin(f)) -case j @ Join(_, _, joinType, _) => - Some(flattenJoin(j)) -case _ => None + def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] = { +val (plans, conditions) = flattenJoin(plan) +if (plans.size > 1) { --- End diff -- how about `plans.size > 2 && conditions.nonEmpty`? then we can remove the `if` condition [here](https://github.com/apache/spark/pull/20345/files#diff-17d31b198ff391188311550fcabd1198R120) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180344569 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -59,12 +75,7 @@ class JoinOptimizationSuite extends PlanTest { (noCartesian, seq_pair._2) } } - testExtractCheckCross(plan, expectedNoCross) -} - -def testExtractCheckCross -(plan: LogicalPlan, expected: Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]) { - assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected) --- End diff -- since you rewrite this function, is the previous comparison logic wrong? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180328615 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -145,4 +161,55 @@ class JoinOptimizationSuite extends PlanTest { } assert(broadcastChildren.size == 1) } + + test("SPARK-23172 skip projections when flattening joins") { +val x = testRelation.subquery('x) +val y = testRelation1.subquery('y) +val z = testRelation.subquery('z) +val joined = x.join(z, Inner, Some($"x.b" === $"z.b")).select($"x.a", $"z.a", $"z.c") + .join(y, Inner, Some($"y.d" === $"z.a")).analyze +val expectedTables = joined.collectLeaves().map { case p => (p, Inner) } +val expectedConditions = joined.collect { case Join(_, _, _, Some(conditions)) => conditions } +testExtractInnerJoins(joined, Some((expectedTables, expectedConditions))) + } + + test("SPARK-23172 reorder joins with projections") { --- End diff -- The case can also happen without star schema enabled, right? Is it possible to use a simpler case like the one in pr description? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180340667 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -46,6 +48,20 @@ class JoinOptimizationSuite extends PlanTest { val testRelation = LocalRelation('a.int, 'b.int, 'c.int) val testRelation1 = LocalRelation('d.int) + def testExtractInnerJoins( --- End diff -- private --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r180344118 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -116,7 +127,12 @@ class JoinOptimizationSuite extends PlanTest { ) queryAnswers foreach { queryAnswerPair => - val optimized = Optimize.execute(queryAnswerPair._1.analyze) + val optimized = Optimize.execute(queryAnswerPair._1.analyze) match { +// `ReorderJoin` may add `Project` to keep the same order of output attributes. +// So, we drop a top `Project` for tests. +case project: Project => project.child --- End diff -- I'm a little hesitate to change this, because if we really forget to add a Project node after join reordering, the test can pass, but that's wrong. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/20611 @sujith71955 any updates? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20913: [SPARK-23799] FilterEstimation.evaluateInSet produces de...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/20913 @mshtelma Would you please update the branch? seems there's something wrong with the commits. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r178736198 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -385,7 +385,9 @@ case class LoadDataCommand( val hadoopConf = sparkSession.sessionState.newHadoopConf() val srcPath = new Path(hdfsUri) val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { +// Check if the path exists or there are matched paths if it's a path with wildcard. +// For HDFS path, we support wildcard in directory name and file name. +if (null == fs.globStatus(srcPath) || fs.globStatus(srcPath).isEmpty) { --- End diff -- @HyukjinKwon @dongjoon-hyun Is it possible to use `fs.globStatus` for both local path and hdfs path? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/20611 I'm ok with the change. Since it's a behavior change of Spark, let's double check with @gatorsmile and @jiangxb1987 . @sujith71955 Please improve PR's description, there are some wrong letter cases and broken lines. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r175725372 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -84,19 +84,49 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + // Extract a list of logical plans to be joined for join-order comparisons. + // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, this function have + // the same strategy to extract the plan list. + private def extractLeftDeepInnerJoins(plan: LogicalPlan): Seq[LogicalPlan] = plan match { +case j @ Join(left, right, _: InnerLike, _) => right +: extractLeftDeepInnerJoins(left) +case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => extractLeftDeepInnerJoins(j) +case _ => Seq(plan) + } + + private def checkSameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2) + } + + private def mayCreateOrderedJoin( + originalPlan: LogicalPlan, + input: Seq[(LogicalPlan, InnerLike)], + conditions: Seq[Expression]): LogicalPlan = { +val orderedJoins = createOrderedJoin(input, conditions) +if (!checkSameJoinOrder(orderedJoins, originalPlan)) { --- End diff -- Is this check necessary? I think check `originalPlan.output != orderedJoins.output` is enough, and faster. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r175696570 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -141,14 +141,16 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { } /** - * A pattern that collects the filter and inner joins. + * A pattern that collects the filter and inner joins (and skip projections in plan sub-trees). --- End diff -- skip projections with attributes only --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r175727668 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala --- @@ -145,4 +159,15 @@ class JoinOptimizationSuite extends PlanTest { } assert(broadcastChildren.size == 1) } + + test("SPARK-23172 skip projections when flattening joins") { --- End diff -- Could you add a test case which would fail before the fix? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r175696187 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -172,17 +174,23 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { case Filter(filterCondition, j @ Join(left, right, _: InnerLike, joinCondition)) => val (plans, conditions) = flattenJoin(j) (plans, conditions ++ splitConjunctivePredicates(filterCondition)) - +case p @ Project(_, j @ Join(left, right, _: InnerLike, joinCondition)) => + // Keep flattening joins when projects having attributes only + if (p.outputSet.subsetOf(j.outputSet)) { --- End diff -- If we want to make sure the project has attributes only, should it be `p.projectList.forall(_.isInstanceOf[Attribute])`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20345#discussion_r175696302 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -172,17 +174,23 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { case Filter(filterCondition, j @ Join(left, right, _: InnerLike, joinCondition)) => val (plans, conditions) = flattenJoin(j) (plans, conditions ++ splitConjunctivePredicates(filterCondition)) - +case p @ Project(_, j @ Join(left, right, _: InnerLike, joinCondition)) => + // Keep flattening joins when projects having attributes only --- End diff -- nit: when projects having attributes only => when the project has attributes only --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r174771588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -385,8 +385,12 @@ case class LoadDataCommand( val hadoopConf = sparkSession.sessionState.newHadoopConf() val srcPath = new Path(hdfsUri) val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") +// A validaton logic is been added for non local files, Error will be thrown +// If hdfs path doest not exist or if no files matches the wild card defined +// in load path +if (null == fs.globStatus(srcPath) || fs.globStatus(srcPath).isEmpty) { + throw new AnalysisException(s"LOAD DATA input path does not exist " + +s"or no files are matching the wildcard string: $path") --- End diff -- I think the previous message ("LOAD DATA input path does not exist: $path") is fine, it covers the case no path matches the wildcard, like the above case for local path. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r174773206 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -385,8 +385,12 @@ case class LoadDataCommand( val hadoopConf = sparkSession.sessionState.newHadoopConf() val srcPath = new Path(hdfsUri) val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") +// A validaton logic is been added for non local files, Error will be thrown +// If hdfs path doest not exist or if no files matches the wild card defined +// in load path --- End diff -- Check if the path exists or there is matched path. For HDFS path, we support wildcard in directory name and file name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/20611 @sujith71955 In the tests, why case2 has less data than case1? '/tmp/hive/dat*/*' has more files than '/tmp/hive/dat1/type*', right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20611: [SPARK-23425][SQL]When wild card is been used in load co...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/20611 @sujith71955 1. It's a little difficult to read as the pictures have different resolution. Maybe you can use ``` to include test results? I think this is more readable. For example: ``` 91-206:~ # hadoop fs -ls /wzh/t1 Found 1 items -rwxr-xr-x 3 root supergroup 20 2017-01-13 09:53 /wzh/t1/part-0 ``` 2. Can you also show the table data by select command, to make sure files are really loaded? 3. Please add test cases for '?' pattern. 4. Please move the test results for your fix to the section **How was this patch tested?**. 5. Update the PR title as "Support wildcard in HDFS path for load table command"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20611: [SPARK-23425][SQL]When wild card is been used in load co...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/20611 Could you provide the test result in Hive here? Also, does hive allow wildcard in dir level, or just file level? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]When wild card is been used in ...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r172716633 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -385,8 +385,12 @@ case class LoadDataCommand( val hadoopConf = sparkSession.sessionState.newHadoopConf() val srcPath = new Path(hdfsUri) val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") +// A validaton logic is been added for non local files, Error will be thrown +// If hdfs path doest not exist or if no files matches the wild card defined +// in load path +if (null == fs.globStatus(srcPath) || fs.globStatus(srcPath).isEmpty) { --- End diff -- please add test cases for this change, e.g. path containing '*', '?', etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20430: [SPARK-23263][SQL] Create table stored as parquet should...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/20430 Can we specialize this CTAS case? For data changing commands like INSERT, I think we should remove the stats if auto update is disabled, because the previous stats are inaccurate after the insertion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20430: [SPARK-23263][SQL] Create table stored as parquet...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20430#discussion_r165349231 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -34,16 +34,12 @@ object CommandUtils extends Logging { /** Change statistics after changing data by commands. */ def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = { -if (table.stats.nonEmpty) { +if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { val catalog = sparkSession.sessionState.catalog - if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { -val newTable = catalog.getTableMetadata(table.identifier) -val newSize = CommandUtils.calculateTotalSize(sparkSession.sessionState, newTable) -val newStats = CatalogStatistics(sizeInBytes = newSize) -catalog.alterTableStats(table.identifier, Some(newStats)) - } else { -catalog.alterTableStats(table.identifier, None) --- End diff -- @felixcheung if the data of a table has been changed and auto size update is disabled, the stats become inaccurate, so we should remove them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14129: [SPARK-16280][SQL] Implement histogram_numeric SQL funct...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/14129 @cloud-fan I think this PR is to implement Hive's `histogram_numeric` function. It produces a histogram to approximate data distribution. It's different from standard equi-width or equi-height histogram used in estimation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20072: [SPARK-22790][SQL] add a configurable factor to d...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20072#discussion_r159135028 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -261,6 +261,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val HADOOPFSRELATION_SIZE_FACTOR = buildConf( +"org.apache.spark.sql.execution.datasources.sizeFactor") --- End diff -- Is this config for all data sources or only hadoopFS-related data sources? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20072: [SPARK-22790][SQL] add a configurable factor to d...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20072#discussion_r159134987 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -261,6 +261,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val HADOOPFSRELATION_SIZE_FACTOR = buildConf( --- End diff -- How about `DISK_TO_MEMORY_SIZE_FACTOR`? IMHO the current name doesn't describe the purpose clearly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20072: [SPARK-22790][SQL] add a configurable factor to d...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20072#discussion_r159135036 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala --- @@ -60,6 +60,8 @@ case class HadoopFsRelation( } } + private val hadoopFSSizeFactor = sqlContext.conf.hadoopFSSizeFactor --- End diff -- shall we move it into the method `sizeInBytes` since it's only used there? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20072: [SPARK-22790][SQL] add a configurable factor to d...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20072#discussion_r159135272 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala --- @@ -82,7 +84,15 @@ case class HadoopFsRelation( } } - override def sizeInBytes: Long = location.sizeInBytes + override def sizeInBytes: Long = { +val size = location.sizeInBytes * hadoopFSSizeFactor +if (size > Long.MaxValue) { --- End diff -- I think this branch can be removed? `Long.MaxValue` is returned when converting a double value larger than `Long.MaxValue`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20122: [TEST][MINOR] remove redundant `EliminateSubqueryAliases...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/20122 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20122: [Minor][TEST] remove redundant `EliminateSubquery...
GitHub user wzhfy opened a pull request: https://github.com/apache/spark/pull/20122 [Minor][TEST] remove redundant `EliminateSubqueryAliases` in test code ## What changes were proposed in this pull request? The `analyze` method in `implicit class DslLogicalPlan` already includes `EliminateSubqueryAliases`. So there's no need to call `EliminateSubqueryAliases` again after calling `analyze` in some test code. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wzhfy/spark redundant_code Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20122.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20122 commit 79daa553640283823c9538a9ab5d08a1209d3b0a Author: Zhenhua Wang Date: 2017-12-30T07:32:17Z remove redundant code --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20062: [SPARK-22892] [SQL] Simplify some estimation logi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20062#discussion_r159016484 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -225,17 +224,17 @@ case class FilterEstimation(plan: Filter) extends Logging { def evaluateNullCheck( attr: Attribute, isNull: Boolean, - update: Boolean): Option[BigDecimal] = { + update: Boolean): Option[Double] = { if (!colStatsMap.contains(attr)) { logDebug("[CBO] No statistics for " + attr) return None } val colStat = colStatsMap(attr) val rowCountValue = childStats.rowCount.get -val nullPercent: BigDecimal = if (rowCountValue == 0) { +val nullPercent: Double = if (rowCountValue == 0) { 0 } else { - BigDecimal(colStat.nullCount) / BigDecimal(rowCountValue) + (BigDecimal(colStat.nullCount) / BigDecimal(rowCountValue)).toDouble --- End diff -- Theoretically, the value range of BigInt is larger than double, so it's better to convert to BigDecimal. But after the division, the result is bettwen [0,1], so it's safe to convert to Double. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20102: [SPARK-22917][SQL] Should not try to generate histogram ...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/20102 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20062: [SPARK-22892] [SQL] Simplify some estimation logic by us...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/20062 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20102: [SPARK-22917][SQL] Should not try to generate his...
GitHub user wzhfy opened a pull request: https://github.com/apache/spark/pull/20102 [SPARK-22917][SQL] Should not try to generate histogram for empty/null columns ## What changes were proposed in this pull request? For empty/null column, the result of `ApproximatePercentile` is null. Then in `ApproxCountDistinctForIntervals`, a `MatchError` (for `endpoints`) will be thrown if we try to generate histogram for that column. Besides, there is no need to generate histogram for such column. In this patch, we exclude such column when generating histogram. ## How was this patch tested? Enhanced test cases for empty/null columns. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wzhfy/spark no_record_hgm_bug Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20102.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20102 commit 9617c2d982ed799580957a1467d47f42e8124636 Author: Zhenhua Wang Date: 2017-12-28T08:36:23Z fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20062: [SPARK-22892] [SQL] Simplify some estimation logi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/20062#discussion_r158806400 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -253,7 +252,7 @@ case class FilterEstimation(plan: Filter) extends Logging { 1.0 - nullPercent } -Some(percent) +Some(percent.toDouble) --- End diff -- This is because `nullPercent` is calculated by `colStat.nullCount / rowCount`, both `nullCount` and rowCount` are BigInt, and need to be converted to BigDecimal.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20062: [SPARK-22892] [SQL] Simplify some estimation logic by us...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/20062 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20062: [SPARK-22892] [SQL] Simplify some estimation logi...
GitHub user wzhfy opened a pull request: https://github.com/apache/spark/pull/20062 [SPARK-22892] [SQL] Simplify some estimation logic by using double instead of decimal ## What changes were proposed in this pull request? Simplify some estimation logic by using double instead of decimal. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wzhfy/spark simplify_by_double Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20062.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20062 commit 11058769c75f2a69cde2ead1d49a0d1517be86e9 Author: Zhenhua Wang Date: 2017-12-23T08:15:02Z simplify --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157699245 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -191,8 +191,19 @@ case class JoinEstimation(join: Join) extends Logging { val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType) if (ValueInterval.isIntersected(lInterval, rInterval)) { val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType) -val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax) -keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat) +val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match { + case (Some(l: Histogram), Some(r: Histogram)) => +computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax) + case _ => +computeByNdv(leftKey, rightKey, newMin, newMax) +} +keyStatsAfterJoin += ( + // Histograms are propagated as unchanged. During future estimation, they should be + // truncated by the updated max/min. In this way, only pointers of the histograms are + // propagated and thus reduce memory consumption. + leftKey -> joinStat.copy(histogram = leftKeyStat.histogram), + rightKey -> joinStat.copy(histogram = rightKeyStat.histogram) --- End diff -- I put it here because `computeByEquiHeightHistogram` returns a single stats, here we keep the histogram for leftKey and rightKey respectively. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157698793 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging { (ceil(card), newStats) } + /** Compute join cardinality using equi-height histograms. */ + private def computeByEquiHeightHistogram( + leftKey: AttributeReference, + rightKey: AttributeReference, + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Option[Any], + newMax: Option[Any]): (BigInt, ColumnStat) = { +val overlappedRanges = getOverlappedRanges( + leftHistogram = leftHistogram, + rightHistogram = rightHistogram, + // Only numeric values have equi-height histograms. + lowerBound = newMin.get.toString.toDouble, + upperBound = newMax.get.toString.toDouble) + +var card: BigDecimal = 0 +var totalNdv: Double = 0 +for (i <- overlappedRanges.indices) { + val range = overlappedRanges(i) + if (i == 0 || range.hi != overlappedRanges(i - 1).hi) { +// If range.hi == overlappedRanges(i - 1).hi, that means the current range has only one +// value, and this value is already counted in the previous range. So there is no need to +// count it in this range. +totalNdv += math.min(range.leftNdv, range.rightNdv) + } + // Apply the formula in this overlapped range. + card += range.leftNumRows * range.rightNumRows / math.max(range.leftNdv, range.rightNdv) +} + +val leftKeyStat = leftStats.attributeStats(leftKey) +val rightKeyStat = rightStats.attributeStats(rightKey) +val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen) +val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2 --- End diff -- how do we use left/right numRows to calculate this? Ideally avgLen is calculated by total length of keys / numRowsAfterJoin. For string type, we don't the exact length of the matched keys (we don't support string histogram yet), for numeric types, their avgLen should be the same. So the equation is a fair approximation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157696227 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging { (ceil(card), newStats) } + /** Compute join cardinality using equi-height histograms. */ + private def computeByEquiHeightHistogram( + leftKey: AttributeReference, + rightKey: AttributeReference, + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Option[Any], + newMax: Option[Any]): (BigInt, ColumnStat) = { +val overlappedRanges = getOverlappedRanges( + leftHistogram = leftHistogram, + rightHistogram = rightHistogram, + // Only numeric values have equi-height histograms. + lowerBound = newMin.get.toString.toDouble, + upperBound = newMax.get.toString.toDouble) --- End diff -- that's because we need to update the column stats' min and max at the end of the method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19594 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19594 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157331840 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +115,183 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax]. + */ + def getOverlappedRanges( + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Double, + newMax: Double): Seq[OverlappedRange] = { +val overlappedRanges = new ArrayBuffer[OverlappedRange]() +// Only bins whose range intersect [newMin, newMax] have join possibility. +val leftBins = leftHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) +val rightBins = rightHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) + +leftBins.foreach { lb => + rightBins.foreach { rb => --- End diff -- We only collect `OverlappedRange` when [left part and right part intersect](https://github.com/apache/spark/pull/19594/files#diff-56eed9f23127c954d9add0f6c5c93820R237), and the decision is based on some computation, it's not very convenient to use it as guards. So it seems `yield` form is not very suitable for this case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157331711 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging { val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType) if (ValueInterval.isIntersected(lInterval, rInterval)) { val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType) -val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax) -keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat) +val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match { + case (Some(l: Histogram), Some(r: Histogram)) => +computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax) + case _ => +computeByNdv(leftKey, rightKey, newMin, newMax) +} +keyStatsAfterJoin += ( + leftKey -> joinStat.copy(histogram = leftKeyStat.histogram), + rightKey -> joinStat.copy(histogram = rightKeyStat.histogram) --- End diff -- ah right, we can keep it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r156847785 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging { val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType) if (ValueInterval.isIntersected(lInterval, rInterval)) { val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType) -val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax) -keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat) +val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match { + case (Some(l: Histogram), Some(r: Histogram)) => +computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax) + case _ => +computeByNdv(leftKey, rightKey, newMin, newMax) +} +keyStatsAfterJoin += ( + leftKey -> joinStat.copy(histogram = leftKeyStat.histogram), + rightKey -> joinStat.copy(histogram = rightKeyStat.histogram) --- End diff -- Currently we don't update histogram since min/max can help us to know which bins are valid. It doesn't affect correctness. But updating histograms helps to reduce memory usage for histogram propagation. We can do this in both filter and join estimation in following PRs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r156847046 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +115,183 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax]. + */ + def getOverlappedRanges( + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Double, + newMax: Double): Seq[OverlappedRange] = { +val overlappedRanges = new ArrayBuffer[OverlappedRange]() +// Only bins whose range intersect [newMin, newMax] have join possibility. +val leftBins = leftHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) +val rightBins = rightHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) + +leftBins.foreach { lb => + rightBins.foreach { rb => +val (left, leftHeight) = trimBin(lb, leftHistogram.height, newMin, newMax) +val (right, rightHeight) = trimBin(rb, rightHistogram.height, newMin, newMax) +// Only collect overlapped ranges. +if (left.lo <= right.hi && left.hi >= right.lo) { + // Collect overlapped ranges. + val range = if (left.lo == left.hi) { +// Case1: the left bin has only one value +OverlappedRange( + lo = left.lo, + hi = left.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight, + rightNumRows = rightHeight / right.ndv +) + } else if (right.lo == right.hi) { +// Case2: the right bin has only one value +OverlappedRange( + lo = right.lo, + hi = right.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight / left.ndv, + rightNumRows = rightHeight +) + } else if (right.lo >= left.lo && right.hi >= left.hi) { +// Case3: the left bin is "smaller" than the right bin +// left.loright.lo left.hi right.hi +// +--+++---> +val leftRatio = (left.hi - right.lo) / (left.hi - left.lo) +val rightRatio = (left.hi - right.lo) / (right.hi - right.lo) +if (leftRatio == 0) { + // The overlapped range has only one value. + OverlappedRange( +lo = right.lo, +hi = right.lo, +leftNdv = 1, +rightNdv = 1, +leftNumRows = leftHeight / left.ndv, +rightNumRows = rightHeight / right.ndv + ) +} else { + OverlappedRange( +lo = right.lo, +hi = left.hi, +leftNdv = left.ndv * leftRatio, +rightNdv = right.ndv * rightRatio, +leftNumRows = leftHeight * leftRatio, +rightNumRows = rightHeight * rightRatio + ) +} + } else if (right.lo <= left.lo && right.hi <= left.hi) { +// Case4: the left bin is "larger" than the right bin +// right.lo left.lo right.hi left.hi +// +--+++---> +val leftRatio = (right.hi - left.lo) / (left.hi - left.lo) +val rightRatio = (right.hi - left.lo) / (right.hi - right.lo) +if (leftRatio == 0) { + // The overlapped range has only one value. + OverlappedRange( +lo = right.hi, +hi = right.hi, +leftNdv = 1, +rightNdv = 1, +leftNumRows = leftHeight / left.ndv, +rightNumRows = rightHeight / right.ndv + ) +} else { + OverlappedRange( +lo = left.lo, +hi = right.hi, +leftNdv = left.ndv * leftRatio, +rightNdv = right.ndv * rightRatio, +leftNumRows = leftHeight * leftRatio, +rightNumRows = rightHeight * rightRatio + ) +} + } else if (right.lo >= left.lo && right.hi <= left
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r156846872 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +115,183 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax]. + */ + def getOverlappedRanges( + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Double, + newMax: Double): Seq[OverlappedRange] = { --- End diff -- yea I think `upperBound/lowerBound` is better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19855: [SPARK-22662] [SQL] Failed to prune columns after rewrit...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19855 @maropu Good to know, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19952: [SPARK-21322][SQL][followup] support histogram in...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19952#discussion_r156551478 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -147,65 +139,78 @@ object EstimationUtils { } /** - * Returns a percentage of a bin holding values for column value in the range of - * [lowerValue, higherValue] - * - * @param higherValue a given upper bound value of a specified column value range - * @param lowerValue a given lower bound value of a specified column value range - * @param bin a single histogram bin - * @return the percentage of a single bin holding values in [lowerValue, higherValue]. + * Returns the possibility of the given histogram bin holding values within the given range + * [lowerBound, upperBound]. */ - private def getOccupation( - higherValue: Double, - lowerValue: Double, + private def binHoldingRangePossibility( + upperBound: Double, + lowerBound: Double, bin: HistogramBin): Double = { -assert(bin.lo <= lowerValue && lowerValue <= higherValue && higherValue <= bin.hi) +assert(bin.lo <= lowerBound && lowerBound <= upperBound && upperBound <= bin.hi) if (bin.hi == bin.lo) { // the entire bin is covered in the range 1.0 -} else if (higherValue == lowerValue) { +} else if (upperBound == lowerBound) { // set percentage to 1/NDV 1.0 / bin.ndv.toDouble } else { // Use proration since the range falls inside this bin. - math.min((higherValue - lowerValue) / (bin.hi - bin.lo), 1.0) + math.min((upperBound - lowerBound) / (bin.hi - bin.lo), 1.0) } } /** - * Returns the number of bins for column values in [lowerValue, higherValue]. - * The column value distribution is saved in an equi-height histogram. The return values is a - * double value is because we may return a portion of a bin. For example, a predicate - * "column = 8" may return the number of bins 0.2 if the holding bin has 5 distinct values. + * Returns the number of histogram bins holding values within the given range + * [lowerBound, upperBound]. * - * @param higherId id of the high end bin holding the high end value of a column range - * @param lowerId id of the low end bin holding the low end value of a column range - * @param higherEnd a given upper bound value of a specified column value range - * @param lowerEnd a given lower bound value of a specified column value range + * Note that the return value is double type, because the range boundaries usually occupy a + * portion of a bin. An extrema case is [value, value] which is generated by equal predicate + * `col = value`, we can get more accuracy by allowing returning portion of histogram bins. + * + * @param upperBound the highest value of the given range + * @param upperBoundInclusive whether the upperBound is included in the range + * @param lowerBound the lowest value of the given range + * @param lowerBoundInclusive whether the lowerBound is included in the range * @param histogram a numeric equi-height histogram - * @return the number of bins for column values in [lowerEnd, higherEnd]. */ - def getOccupationBins( - higherId: Int, - lowerId: Int, - higherEnd: Double, - lowerEnd: Double, + def numBinsHoldingRange( + upperBound: Double, + upperBoundInclusive: Boolean, + lowerBound: Double, + lowerBoundInclusive: Boolean, histogram: Histogram): Double = { --- End diff -- Is it better to pass the bin array instead of histogram? we can simplify many `histogram.bins` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19952: [SPARK-21322][SQL][followup] support histogram in...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19952#discussion_r156552208 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -574,51 +539,90 @@ case class FilterEstimation(plan: Filter) extends Logging { } /** - * Returns the selectivity percentage for binary condition in the column's - * current valid range [min, max] - * - * @param op a binary comparison operator - * @param histogram a numeric equi-height histogram - * @param max the upper bound of the current valid range for a given column - * @param min the lower bound of the current valid range for a given column - * @param datumNumber the numeric value of a literal - * @return the selectivity percentage for a condition in the current range. + * Computes the possibility of a equal predicate using histogram. */ + private def computeEqualityPossibilityByHistogram( + literal: Literal, colStat: ColumnStat): Double = { +val datum = EstimationUtils.toDecimal(literal.value, literal.dataType).toDouble +val histogram = colStat.histogram.get - def computePercentByEquiHeightHgm( - op: BinaryComparison, - histogram: Histogram, - max: Double, - min: Double, - datumNumber: Double): Double = { // find bins where column's current min and max locate. Note that a column's [min, max] // range may change due to another condition applied earlier. -val minBinId = EstimationUtils.findFirstBinForValue(min, histogram.bins) -val maxBinId = EstimationUtils.findLastBinForValue(max, histogram.bins) +val min = EstimationUtils.toDecimal(colStat.min.get, literal.dataType).toDouble +val max = EstimationUtils.toDecimal(colStat.max.get, literal.dataType).toDouble // compute how many bins the column's current valid range [min, max] occupies. -// Note that a column's [min, max] range may vary after we apply some filter conditions. -val minToMaxLength = EstimationUtils.getOccupationBins(maxBinId, minBinId, max, min, histogram) - -val datumInBinId = op match { - case LessThan(_, _) | GreaterThanOrEqual(_, _) => -EstimationUtils.findFirstBinForValue(datumNumber, histogram.bins) - case LessThanOrEqual(_, _) | GreaterThan(_, _) => -EstimationUtils.findLastBinForValue(datumNumber, histogram.bins) -} +val numBinsHoldingEntireRange = EstimationUtils.numBinsHoldingRange( + upperBound = max, + upperBoundInclusive = true, + lowerBound = min, + lowerBoundInclusive = true, + histogram) + +val numBinsHoldingDatum = EstimationUtils.numBinsHoldingRange( + upperBound = datum, + upperBoundInclusive = true, + lowerBound = datum, + lowerBoundInclusive = true, + histogram) + +numBinsHoldingDatum / numBinsHoldingEntireRange + } -op match { - // LessThan and LessThanOrEqual share the same logic, - // but their datumInBinId may be different - case LessThan(_, _) | LessThanOrEqual(_, _) => -EstimationUtils.getOccupationBins(datumInBinId, minBinId, datumNumber, min, - histogram) / minToMaxLength - // GreaterThan and GreaterThanOrEqual share the same logic, - // but their datumInBinId may be different - case GreaterThan(_, _) | GreaterThanOrEqual(_, _) => -EstimationUtils.getOccupationBins(maxBinId, datumInBinId, max, datumNumber, - histogram) / minToMaxLength + /** + * Computes the possibility of a comparison predicate using histogram. + */ + private def computeComparisonPossibilityByHistogram( + op: BinaryComparison, literal: Literal, colStat: ColumnStat): Double = { +val datum = EstimationUtils.toDecimal(literal.value, literal.dataType).toDouble +val histogram = colStat.histogram.get + +// find bins where column's current min and max locate. Note that a column's [min, max] +// range may change due to another condition applied earlier. +val min = EstimationUtils.toDecimal(colStat.min.get, literal.dataType).toDouble +val max = EstimationUtils.toDecimal(colStat.max.get, literal.dataType).toDouble + +// compute how many bins the column's current valid range [min, max] occupies. +val numBinsHoldingEntireRange = EstimationUtils.numBinsHoldingRange( + max, upperBoundInclusive = true, min, lowerBoundInclusive = true, histogram) + +val numBinsHoldingDatum = op match
[GitHub] spark pull request #19952: [SPARK-21322][SQL][followup] support histogram in...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19952#discussion_r156549416 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -147,65 +139,78 @@ object EstimationUtils { } /** - * Returns a percentage of a bin holding values for column value in the range of - * [lowerValue, higherValue] - * - * @param higherValue a given upper bound value of a specified column value range - * @param lowerValue a given lower bound value of a specified column value range - * @param bin a single histogram bin - * @return the percentage of a single bin holding values in [lowerValue, higherValue]. + * Returns the possibility of the given histogram bin holding values within the given range + * [lowerBound, upperBound]. */ - private def getOccupation( - higherValue: Double, - lowerValue: Double, + private def binHoldingRangePossibility( + upperBound: Double, + lowerBound: Double, bin: HistogramBin): Double = { -assert(bin.lo <= lowerValue && lowerValue <= higherValue && higherValue <= bin.hi) +assert(bin.lo <= lowerBound && lowerBound <= upperBound && upperBound <= bin.hi) if (bin.hi == bin.lo) { // the entire bin is covered in the range 1.0 -} else if (higherValue == lowerValue) { +} else if (upperBound == lowerBound) { // set percentage to 1/NDV 1.0 / bin.ndv.toDouble } else { // Use proration since the range falls inside this bin. - math.min((higherValue - lowerValue) / (bin.hi - bin.lo), 1.0) + math.min((upperBound - lowerBound) / (bin.hi - bin.lo), 1.0) } } /** - * Returns the number of bins for column values in [lowerValue, higherValue]. - * The column value distribution is saved in an equi-height histogram. The return values is a - * double value is because we may return a portion of a bin. For example, a predicate - * "column = 8" may return the number of bins 0.2 if the holding bin has 5 distinct values. + * Returns the number of histogram bins holding values within the given range + * [lowerBound, upperBound]. * - * @param higherId id of the high end bin holding the high end value of a column range - * @param lowerId id of the low end bin holding the low end value of a column range - * @param higherEnd a given upper bound value of a specified column value range - * @param lowerEnd a given lower bound value of a specified column value range + * Note that the return value is double type, because the range boundaries usually occupy a --- End diff -- nit: returned value --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19952: [SPARK-21322][SQL][followup] support histogram in...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19952#discussion_r156549603 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -147,65 +139,78 @@ object EstimationUtils { } /** - * Returns a percentage of a bin holding values for column value in the range of - * [lowerValue, higherValue] - * - * @param higherValue a given upper bound value of a specified column value range - * @param lowerValue a given lower bound value of a specified column value range - * @param bin a single histogram bin - * @return the percentage of a single bin holding values in [lowerValue, higherValue]. + * Returns the possibility of the given histogram bin holding values within the given range + * [lowerBound, upperBound]. */ - private def getOccupation( - higherValue: Double, - lowerValue: Double, + private def binHoldingRangePossibility( + upperBound: Double, + lowerBound: Double, bin: HistogramBin): Double = { -assert(bin.lo <= lowerValue && lowerValue <= higherValue && higherValue <= bin.hi) +assert(bin.lo <= lowerBound && lowerBound <= upperBound && upperBound <= bin.hi) if (bin.hi == bin.lo) { // the entire bin is covered in the range 1.0 -} else if (higherValue == lowerValue) { +} else if (upperBound == lowerBound) { // set percentage to 1/NDV 1.0 / bin.ndv.toDouble } else { // Use proration since the range falls inside this bin. - math.min((higherValue - lowerValue) / (bin.hi - bin.lo), 1.0) + math.min((upperBound - lowerBound) / (bin.hi - bin.lo), 1.0) } } /** - * Returns the number of bins for column values in [lowerValue, higherValue]. - * The column value distribution is saved in an equi-height histogram. The return values is a - * double value is because we may return a portion of a bin. For example, a predicate - * "column = 8" may return the number of bins 0.2 if the holding bin has 5 distinct values. + * Returns the number of histogram bins holding values within the given range + * [lowerBound, upperBound]. * - * @param higherId id of the high end bin holding the high end value of a column range - * @param lowerId id of the low end bin holding the low end value of a column range - * @param higherEnd a given upper bound value of a specified column value range - * @param lowerEnd a given lower bound value of a specified column value range + * Note that the return value is double type, because the range boundaries usually occupy a + * portion of a bin. An extrema case is [value, value] which is generated by equal predicate + * `col = value`, we can get more accuracy by allowing returning portion of histogram bins. --- End diff -- nit: get higher accuracy --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19952: [SPARK-21322][SQL][followup] support histogram in...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19952#discussion_r156550872 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -574,51 +539,90 @@ case class FilterEstimation(plan: Filter) extends Logging { } /** - * Returns the selectivity percentage for binary condition in the column's - * current valid range [min, max] - * - * @param op a binary comparison operator - * @param histogram a numeric equi-height histogram - * @param max the upper bound of the current valid range for a given column - * @param min the lower bound of the current valid range for a given column - * @param datumNumber the numeric value of a literal - * @return the selectivity percentage for a condition in the current range. + * Computes the possibility of a equal predicate using histogram. --- End diff -- nit: an equality predicate --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19932: [SPARK-22745][SQL] read partition stats from Hive
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19932#discussion_r156546885 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala --- @@ -1021,8 +998,38 @@ private[hive] object HiveClientImpl { compressed = apiPartition.getSd.isCompressed, properties = Option(apiPartition.getSd.getSerdeInfo.getParameters) .map(_.asScala.toMap).orNull), -parameters = - if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty) + parameters = properties, + stats = readHiveStats(properties)) + } + + /** + * Reads statistics from Hive. + * Note that this statistics could be overridden by Spark's statistics if that's available. + */ + private def readHiveStats(properties: Map[String, String]): Option[CatalogStatistics] = { +val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_)) +val rawDataSize = properties.get(StatsSetupConst.RAW_DATA_SIZE).map(BigInt(_)) +val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)) +// TODO: check if this estimate is valid for tables after partition pruning. --- End diff -- good catch, we can remove this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19932: [SPARK-22745][SQL] read partition stats from Hive
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19932#discussion_r156546859 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -213,6 +213,27 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("SPARK-22745 - read Hive's statistics for partition") { +val tableName = "hive_stats_part_table" +withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2017-01-01') SELECT * FROM src") + var partition = spark.sessionState.catalog +.getPartition(TableIdentifier(tableName), Map("ds" -> "2017-01-01")) + + assert(partition.stats.get.sizeInBytes == 5812) + assert(partition.stats.get.rowCount.isEmpty) + + hiveClient +.runSqlHive(s"ANALYZE TABLE $tableName PARTITION (ds='2017-01-01') COMPUTE STATISTICS") + partition = spark.sessionState.catalog +.getPartition(TableIdentifier(tableName), Map("ds" -> "2017-01-01")) + + assert(partition.stats.get.sizeInBytes == 5812) --- End diff -- `totalSize` exists after the INSERT INTO command, so here `sizeInBytes` doesn't change after ANALYZE command, only rowCount is added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19594 ping @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19932: [SPARK-22745][SQL] read partition stats from Hive
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19932#discussion_r155936167 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -353,15 +374,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto createPartition("2010-01-02", 11, "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src") - sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS NOSCAN") - - assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000) - assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000) - assert(queryStats("2010-01-02", "10") === None) - assert(queryStats("2010-01-02", "11") === None) --- End diff -- After the change, these checks are not right as we read hive stats. So I remove them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19932: [SPARK-22745][SQL] read partition stats from Hive
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19932#discussion_r155936087 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -213,6 +213,29 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("SPARK- - read Hive's statistics for partition") { --- End diff -- oh, I forgot it, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19932: [SPARK-22745][SQL] read partition stats from Hive
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19932#discussion_r155921370 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala --- @@ -413,32 +413,7 @@ private[hive] class HiveClientImpl( case (key, _) => excludedTableProperties.contains(key) } val comment = properties.get("comment") - - // Here we are reading statistics from Hive. - // Note that this statistics could be overridden by Spark's statistics if that's available. - val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_)) --- End diff -- The code path is moved to the method `readHiveStats` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19932: [SPARK-22745][SQL] read partition stats from Hive
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19932 cc @cloud-fan @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19594 retest this please.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19594 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19932: [SPARK-22745][SQL] read partition stats from Hive
GitHub user wzhfy opened a pull request: https://github.com/apache/spark/pull/19932 [SPARK-22745][SQL] read partition stats from Hive ## What changes were proposed in this pull request? Currently Spark can read table stats (e.g. `totalSize, numRows`) from Hive, we can also support to read partition stats from Hive using the same logic. ## How was this patch tested? Added a new test case and modified an existing test case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wzhfy/spark read_hive_partition_stats Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19932.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19932 commit 48b81b5065808ffeff99142a03cd59bf54a9ea5d Author: Zhenhua Wang Date: 2017-12-09T08:32:48Z read partition stats --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19594 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r155910267 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala --- @@ -67,6 +68,205 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + histogram1: Histogram, + histogram2: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { +val col1 = attr("key1") +val col2 = attr("key2") +val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax) +val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax) + +val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) +val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) +val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( +col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), +col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax +) + +// Join order should not affect estimation result. +Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) +} + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { +val colStat = inferColumnStat(histogram) +val t = StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + +val filterCondition = new ArrayBuffer[Expression]() +if (expectedMin > colStat.min.get.toString.toDouble) { + filterCondition += GreaterThanOrEqual(col, Literal(expectedMin)) +} +if (expectedMax < colStat.max.get.toString.toDouble) { + filterCondition += LessThanOrEqual(col, Literal(expectedMax)) +} +if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t) + } + + private def inferColumnStat(histogram: Histogram): ColumnStat = { +var ndv = 0L +for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { +ndv += bin.ndv + } +} +ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60) +assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80) +val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60) +assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20) + +val expectedRanges = Seq( + OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2), + OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2), + OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20) +) +assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D))) + +estimateByHistogram( + histogram1 = histogram1, + histogram2 = histogram2, + expectedMin = 10D, + expectedMax = 60D, + // 10 + 20 + 8 + expectedNdv = 38L, + // 300*40/20 + 200*40/20 + 100*20/10 + expectedRows = 1200L) + } + + test("equi-height histograms: a bin has only one value") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 30, hi = 30, ndv = 1), Hi
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r155910232 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala --- @@ -67,6 +68,205 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + histogram1: Histogram, + histogram2: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { +val col1 = attr("key1") +val col2 = attr("key2") +val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax) +val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax) + +val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) +val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) +val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( +col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), +col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax +) + +// Join order should not affect estimation result. +Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) +} + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { +val colStat = inferColumnStat(histogram) +val t = StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + +val filterCondition = new ArrayBuffer[Expression]() +if (expectedMin > colStat.min.get.toString.toDouble) { + filterCondition += GreaterThanOrEqual(col, Literal(expectedMin)) +} +if (expectedMax < colStat.max.get.toString.toDouble) { + filterCondition += LessThanOrEqual(col, Literal(expectedMax)) +} +if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t) + } + + private def inferColumnStat(histogram: Histogram): ColumnStat = { +var ndv = 0L +for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { +ndv += bin.ndv + } +} +ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60) +assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80) +val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60) +assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20) + +val expectedRanges = Seq( + OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2), + OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2), + OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20) +) +assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D))) + +estimateByHistogram( + histogram1 = histogram1, + histogram2 = histogram2, + expectedMin = 10D, + expectedMax = 60D, + // 10 + 20 + 8 + expectedNdv = 38L, + // 300*40/20 + 200*40/20 + 100*20/10 + expectedRows = 1200L) + } + + test("equi-height histograms: a bin has only one value") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 30, hi = 30, ndv = 1), Hi
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r155692778 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -359,7 +371,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase { test("cbool > false") { validateEstimatedStats( Filter(GreaterThan(attrBool, Literal(false)), childStatsTestPlan(Seq(attrBool), 10L)), - Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(true), max = Some(true), + Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(false), max = Some(true), --- End diff -- That may need special code path for boolean type, but IMHO I don't think it deserves the complexity. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r155691788 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -529,6 +570,56 @@ case class FilterEstimation(plan: Filter) extends Logging { Some(percent) } + /** + * Returns the selectivity percentage for binary condition in the column's + * current valid range [min, max] + * + * @param op a binary comparison operator + * @param histogram a numeric equi-height histogram + * @param max the upper bound of the current valid range for a given column + * @param min the lower bound of the current valid range for a given column + * @param datumNumber the numeric value of a literal + * @return the selectivity percentage for a condition in the current range. + */ + + def computePercentByEquiHeightHgm( + op: BinaryComparison, + histogram: Histogram, + max: Double, + min: Double, + datumNumber: Double): Double = { +// find bins where column's current min and max locate. Note that a column's [min, max] +// range may change due to another condition applied earlier. +val minBinId = EstimationUtils.findFirstBinForValue(min, histogram.bins) +val maxBinId = EstimationUtils.findLastBinForValue(max, histogram.bins) +assert(minBinId <= maxBinId) + +// compute how many bins the column's current valid range [min, max] occupies. +// Note that a column's [min, max] range may vary after we apply some filter conditions. +val minToMaxLength = EstimationUtils.getOccupationBins(maxBinId, minBinId, max, --- End diff -- Personally I prefer to have this method unit-tested, because it's the core part of filter estimation. We can do this in follow-up anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r155690722 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -332,8 +332,41 @@ case class FilterEstimation(plan: Filter) extends Logging { colStatsMap.update(attr, newStats) } - Some(1.0 / BigDecimal(ndv)) -} else { + if (colStat.histogram.isEmpty) { +// returns 1/ndv if there is no histogram +Some(1.0 / BigDecimal(ndv)) + } else { +// We compute filter selectivity using Histogram information. +// Here we traverse histogram bins to locate the range of bins the literal values falls +// into. For skewed distribution, a literal value can occupy multiple bins. +val hgmBins = colStat.histogram.get.bins +val datum = EstimationUtils.toDecimal(literal.value, literal.dataType).toDouble --- End diff -- yes, I'll refactor this part. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19880: [SPARK-22626][SQL][FOLLOWUP] improve documentation and s...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19880 cc @cloud-fan @wangyum --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19880: [SPARK-22626][SQL][FOLLOWUP] improve documentatio...
GitHub user wzhfy opened a pull request: https://github.com/apache/spark/pull/19880 [SPARK-22626][SQL][FOLLOWUP] improve documentation and simplify test case ## What changes were proposed in this pull request? The reason why some Hive tables have `numRows` statistics is that, in Hive, when stats gathering is disabled, `numRows` is always zero after INSERT command: ``` hive> create table src (key int, value string) stored as orc; hive> desc formatted src; Table Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles0 numRows 0 rawDataSize 0 totalSize 0 transient_lastDdlTime 1512399590 hive> set hive.stats.autogather=false; hive> insert into src select 1, 'a'; hive> desc formatted src; Table Parameters: numFiles1 numRows 0 rawDataSize 0 totalSize 275 transient_lastDdlTime 1512399647 hive> insert into src select 1, 'b'; hive> desc formatted src; Table Parameters: numFiles2 numRows 0 rawDataSize 0 totalSize 550 transient_lastDdlTime 1512399687 ``` ## How was this patch tested? Modified existing test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wzhfy/spark doc_zero_rowCount Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19880.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19880 commit 9be829d208f7e2d6a88b9d2008fc04eec4a4ad8e Author: Zhenhua Wang Date: 2017-12-04T15:53:49Z improve doc --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19831: [SPARK-22626][SQL] Wrong Hive table statistics ma...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19831#discussion_r154499581 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala --- @@ -1187,6 +1187,22 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd } } } + + test("Wrong Hive table statistics may trigger OOM if enables join reorder in CBO") { --- End diff -- IMHO you can just test the read logic for Hive's stats properties in `StatisticsSuite`, instead of a end-to-end test case, developers may not know what's going on by this test case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19831: [SPARK-22626][SQL] Wrong Hive table statistics may trigg...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19831 @cloud-fan Yes, Spark doesn't allow user to set (Spark's) statistics manually. This PR treats 0 row count of **Hive's stats**, it doesn't affect the logic for Spark's stats. Besides, Spark currently only use Hive's `totalSize` and `rawDataSize` when they are > 0. This PR changes the behavior for `rowCount` to be consistent with that, so I think it's fine. But the title of the PR should be more specific, i.e. it deals with wrong Hive's statistics (zero rowCount). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19855: [SPARK-22662] [SQL] Failed to prune columns after rewrit...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19855 @gengliangwang @cloud-fan Previously this rule is in the batch `Operator Optimizations`, but after [SPARK-14781](https://github.com/apache/spark/pull/12820), it is moved into a separate batch [by this code change](https://github.com/apache/spark/pull/12820#discussion_r61688622). I think the reason of this change is in the PR description: >This PR also fix a bug in predicate subquery push down through join (they should not). I reverted the change in Optimizer and ran all the test suites in that PR, hoping to find some clue. But I found no error in tests. So it seems something has been changed since that PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r154270654 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,197 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param histogram a numeric equi-height histogram + * @return the number of the first bin into which a column values falls. + */ + + def findFirstBinForValue(value: Double, histogram: Histogram): Int = { +var binId = 0 +histogram.bins.foreach { bin => + if (value > bin.hi) binId += 1 +} +binId + } + + /** + * Returns the number of the last bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param histogram a numeric equi-height histogram + * @return the number of the last bin into which a column values falls. + */ + + def findLastBinForValue(value: Double, histogram: Histogram): Int = { +var binId = 0 +for (i <- 0 until histogram.bins.length) { + if (value > histogram.bins(i).hi) { +// increment binId to point to next bin +binId += 1 + } + if ((value == histogram.bins(i).hi) && (i < histogram.bins.length - 1)) { +if (value == histogram.bins(i + 1).lo) { --- End diff -- just move this condition after the length check: ``` if ((value == histogram.bins(i).hi) && (i < histogram.bins.length - 1) && (value == histogram.bins(i + 1).lo)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19831: [SPARK-22626][SQL] Wrong Hive table statistics may trigg...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19831 Since Hive can't protect user to set a wrong stats properties, I think this solution can alleviate the problem. Besides, it's consistent with what we do for `totalSize and rawDataSize` (only use the stats when > 0). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19831: [SPARK-22626][SQL] Wrong Hive table statistics ma...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19831#discussion_r154250160 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala --- @@ -418,7 +418,7 @@ private[hive] class HiveClientImpl( // Note that this statistics could be overridden by Spark's statistics if that's available. val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_)) val rawDataSize = properties.get(StatsSetupConst.RAW_DATA_SIZE).map(BigInt(_)) - val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)).filter(_ >= 0) + val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)).filter(_ > 0) --- End diff -- Thanks for the investigation. Seems hive can't protect its stats properties. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19831: [SPARK-22626][SQL] Wrong Hive table statistics may trigg...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19831 > Besides, if the size stats totalSize or rawDataSize is wrong, the problem exists whether CBO is enabled or not. > If CBO enabled, the outputRowCount == 0, the getOutputSize is 1, sizeInBytes is 1 and this side can broadcast: If CBO disabled, the sizeInBytes = (p.child.stats.sizeInBytes * outputRowSize) / childRowSize and this side cann't broadcast: @wangyum `totalSize or rawDataSize` can also be wrong, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r154248775 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,197 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param histogram a numeric equi-height histogram + * @return the number of the first bin into which a column values falls. + */ + + def findFirstBinForValue(value: Double, histogram: Histogram): Int = { +var binId = 0 +histogram.bins.foreach { bin => + if (value > bin.hi) binId += 1 +} +binId + } + + /** + * Returns the number of the last bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param histogram a numeric equi-height histogram + * @return the number of the last bin into which a column values falls. + */ + + def findLastBinForValue(value: Double, histogram: Histogram): Int = { +var binId = 0 +for (i <- 0 until histogram.bins.length) { + if (value > histogram.bins(i).hi) { +// increment binId to point to next bin +binId += 1 + } + if ((value == histogram.bins(i).hi) && (i < histogram.bins.length - 1)) { +if (value == histogram.bins(i + 1).lo) { --- End diff -- By "out of bound", do you mean it exceeds 100 length limit? You can just switch new line after `&&` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org