[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel closed the pull request at: https://github.com/apache/spark/pull/13585 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r72184495 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression from the given expression, to optimize the + * partition pruning. For instances: (We assume part1 & part2 are the partition keys): + * (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) + * (part1 == 1 and a > 3) or (a < 100) => None + * (a > 100 && b < 100) or (part1 = 10) => None + * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) + * @param predicate The given expression + * @param partitionKeyIds partition keys in attribute set + * @return + */ + def extractPartitionKeyExpression( +predicate: Expression, partitionKeyIds: AttributeSet): Option[Expression] = { +// drop the non-partition key expression in conjunction of the expression tree +val additionalPartPredicate = predicate transformUp { --- End diff -- I can keep updating the code if we are agreed for approach, otherwise, I think we'd better close this PR for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r72184424 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression from the given expression, to optimize the + * partition pruning. For instances: (We assume part1 & part2 are the partition keys): + * (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) + * (part1 == 1 and a > 3) or (a < 100) => None + * (a > 100 && b < 100) or (part1 = 10) => None + * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) + * @param predicate The given expression + * @param partitionKeyIds partition keys in attribute set + * @return + */ + def extractPartitionKeyExpression( +predicate: Expression, partitionKeyIds: AttributeSet): Option[Expression] = { +// drop the non-partition key expression in conjunction of the expression tree +val additionalPartPredicate = predicate transformUp { --- End diff -- This PR may have critical bugs, when user implements a UDF which logically like the `NOT` operator in the partition filter expression. Probably we need a white list the built-in UDFs. @yhuai @liancheng @yangw1234 @clockfly any comments on this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66831261 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression from the given expression, to optimize the + * partition pruning. For instances: (We assume part1 & part2 are the partition keys): + * (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) + * (part1 == 1 and a > 3) or (a < 100) => None + * (a > 100 && b < 100) or (part1 = 10) => None + * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) --- End diff -- Please don't mix up different sets of operators here (`&&`/`and` and `=`/`==`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user yangw1234 commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66743506 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -65,15 +65,20 @@ private[hive] trait HiveStrategies { // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => - !predicate.references.isEmpty && + predicate.references.nonEmpty && predicate.references.subsetOf(partitionKeyIds) } +val additionalPartPredicates = + PhysicalOperation.partitionPrunningFromDisjunction( +otherPredicates.foldLeft[Expression](Literal(true))(And(_, _)), partitionKeyIds) pruneFilterProject( projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil +HiveTableScanExec(_, +relation, +pruningPredicates ++ additionalPartPredicates)(sparkSession)) :: Nil --- End diff -- glad I could help ^_^ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66743318 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -65,15 +65,20 @@ private[hive] trait HiveStrategies { // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => - !predicate.references.isEmpty && + predicate.references.nonEmpty && predicate.references.subsetOf(partitionKeyIds) } +val additionalPartPredicates = + PhysicalOperation.partitionPrunningFromDisjunction( +otherPredicates.foldLeft[Expression](Literal(true))(And(_, _)), partitionKeyIds) pruneFilterProject( projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil +HiveTableScanExec(_, +relation, +pruningPredicates ++ additionalPartPredicates)(sparkSession)) :: Nil --- End diff -- Thanks @clockfly to point the exception also. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66743131 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -65,15 +65,20 @@ private[hive] trait HiveStrategies { // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => - !predicate.references.isEmpty && + predicate.references.nonEmpty && predicate.references.subsetOf(partitionKeyIds) } +val additionalPartPredicates = + PhysicalOperation.partitionPrunningFromDisjunction( +otherPredicates.foldLeft[Expression](Literal(true))(And(_, _)), partitionKeyIds) pruneFilterProject( projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil +HiveTableScanExec(_, +relation, +pruningPredicates ++ additionalPartPredicates)(sparkSession)) :: Nil --- End diff -- Thanks @yangw1234 , I will update the code to be more strict for the partition pruning filter extraction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66742892 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression in the disjunctions, to optimize the partition pruning. + * For instances: (We assume part1 & part2 are the partition keys) + * (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) + * (part1 == 1 and a > 3) or (a < 100) => None + * (a > 100 && b < 100) or (part1 = 10) => None + * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) + * @param predicate disjunctions + * @param partitionKeyIds partition keys in attribute set + * @return + */ + def partitionPrunningFromDisjunction( +predicate: Expression, partitionKeyIds: AttributeSet): Option[Expression] = { +// ignore the pure non-partition key expression in conjunction of the expression tree +val additionalPartPredicate = predicate transformUp { + case a @ And(left, right) if a.deterministic && +left.references.intersect(partitionKeyIds).isEmpty => right + case a @ And(left, right) if a.deterministic && +right.references.intersect(partitionKeyIds).isEmpty => left --- End diff -- OK, I got it, thanks for the explanation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user yangw1234 commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66742485 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression in the disjunctions, to optimize the partition pruning. + * For instances: (We assume part1 & part2 are the partition keys) + * (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) + * (part1 == 1 and a > 3) or (a < 100) => None + * (a > 100 && b < 100) or (part1 = 10) => None + * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) + * @param predicate disjunctions + * @param partitionKeyIds partition keys in attribute set + * @return + */ + def partitionPrunningFromDisjunction( +predicate: Expression, partitionKeyIds: AttributeSet): Option[Expression] = { +// ignore the pure non-partition key expression in conjunction of the expression tree +val additionalPartPredicate = predicate transformUp { + case a @ And(left, right) if a.deterministic && +left.references.intersect(partitionKeyIds).isEmpty => right + case a @ And(left, right) if a.deterministic && +right.references.intersect(partitionKeyIds).isEmpty => left --- End diff -- The problem is here. Imagine a record `a = 2` in `partition = 1`. Such a record satisfies the above expression (`!(partition = 1 && a > 3)`), but if we simply drop `a > 3` and push `!(partition = 1)` down to the table scan, partition =1 will be discarded and the record won't appear in the result. The test case passed because the `BooleanSimplification` optimizer rule will transform `!(partition =1 && a > 3)` to `(!(partition=1) || (a <= 3))`, such an expression will be dropped entirely by your `partitionPrunningFromDisjunction`, in which case "partition = 1" will not be discarded. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66741771 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -65,15 +65,20 @@ private[hive] trait HiveStrategies { // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => - !predicate.references.isEmpty && + predicate.references.nonEmpty && predicate.references.subsetOf(partitionKeyIds) } +val additionalPartPredicates = + PhysicalOperation.partitionPrunningFromDisjunction( +otherPredicates.foldLeft[Expression](Literal(true))(And(_, _)), partitionKeyIds) pruneFilterProject( projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil +HiveTableScanExec(_, +relation, +pruningPredicates ++ additionalPartPredicates)(sparkSession)) :: Nil --- End diff -- Sorry, @clockfly I am not so sure your mean, this PR is not designed to depends on the Optimizer (CNF), can you please give more concrete example if there is a bug? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user clockfly commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66741254 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -65,15 +65,20 @@ private[hive] trait HiveStrategies { // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => - !predicate.references.isEmpty && + predicate.references.nonEmpty && predicate.references.subsetOf(partitionKeyIds) } +val additionalPartPredicates = + PhysicalOperation.partitionPrunningFromDisjunction( +otherPredicates.foldLeft[Expression](Literal(true))(And(_, _)), partitionKeyIds) pruneFilterProject( projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil +HiveTableScanExec(_, +relation, +pruningPredicates ++ additionalPartPredicates)(sparkSession)) :: Nil --- End diff -- Sure, we understand that the additionalPartPredicates is the partition filter. But we may not be able to assure BooleanSimplification will push all NOT operator to leaf expression, as BooleanSimplification is an "optimizer" rule, which can be skipped if exceeding max iterations during optimization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66733226 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -65,15 +65,20 @@ private[hive] trait HiveStrategies { // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => - !predicate.references.isEmpty && + predicate.references.nonEmpty && predicate.references.subsetOf(partitionKeyIds) } +val additionalPartPredicates = + PhysicalOperation.partitionPrunningFromDisjunction( +otherPredicates.foldLeft[Expression](Literal(true))(And(_, _)), partitionKeyIds) pruneFilterProject( projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil +HiveTableScanExec(_, +relation, +pruningPredicates ++ additionalPartPredicates)(sparkSession)) :: Nil --- End diff -- @yangw1234 @liancheng @clockfly `pruningPredicates ++ additionalPartPredicates` is the partition filter, and, the original filter still need to be applied after the partition pruned. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66714698 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression in the disjunctions, to optimize the partition pruning. --- End diff -- Oh, OK, originally, I think the conjunction cases was handled in `collectProjectsAndFilters` already, before being passed into this function, and here, we only handle the `AND` in the disjunction. (You can see this in HiveTableScans in HiveStrategies.scala) Anyway, you convinced me. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user yangw1234 commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66714468 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression in the disjunctions, to optimize the partition pruning. --- End diff -- It is `(part=1 conjunction a=1) disjunction (part=2 conjunction a=4)`, right? But the expression get dropped is `a=1` which is in "conjunction" with `part=1` and `a=4` which is in "conjunction" with `part=2`. So I thought it should be conjunctions. Or maybe we can phrase it in another way to avoid the confusion? ^_^ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66714358 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -65,15 +65,20 @@ private[hive] trait HiveStrategies { // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => - !predicate.references.isEmpty && + predicate.references.nonEmpty && predicate.references.subsetOf(partitionKeyIds) } +val additionalPartPredicates = + PhysicalOperation.partitionPrunningFromDisjunction( +otherPredicates.foldLeft[Expression](Literal(true))(And(_, _)), partitionKeyIds) pruneFilterProject( projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil +HiveTableScanExec(_, +relation, +pruningPredicates ++ additionalPartPredicates)(sparkSession)) :: Nil --- End diff -- For `HiveTableScan`, the predicate here just to minimize the partition scanning, so what we need to do is to put a more specific partition pruning predicate. Sorry if there is something confused. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66714324 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression in the disjunctions, to optimize the partition pruning. --- End diff -- I think it's should be `disjunction`. for example: `(part=1 and a=1) or (part = 2 and a=4)`, this should be disjunction, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66714314 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala --- @@ -65,4 +69,95 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl sql("DROP TABLE IF EXISTS createAndInsertTest") } } + + test("partition pruning in disjunction") { +withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "true")) { + val testData = sparkContext.parallelize( +(1 to 10).map(i => TestData(i, i.toString))).toDF() + testData.registerTempTable("testData") + + val testData2 = sparkContext.parallelize( +(11 to 20).map(i => TestData(i, i.toString))).toDF() + testData2.registerTempTable("testData2") + + val testData3 = sparkContext.parallelize( +(21 to 30).map(i => TestData(i, i.toString))).toDF() + testData3.registerTempTable("testData3") + + val testData4 = sparkContext.parallelize( +(31 to 40).map(i => TestData(i, i.toString))).toDF() + testData4.registerTempTable("testData4") + + val tmpDir = Files.createTempDir() + // create the table for test + sql(s"CREATE TABLE table_with_partition(key int,value string) " + +s"PARTITIONED by (ds string, ds2 string) location '${tmpDir.toURI.toString}' ") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1', ds2='d1') " + +"SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2', ds2='d1') " + +"SELECT key,value FROM testData2") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3', ds2='d3') " + +"SELECT key,value FROM testData3") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4', ds2='d4') " + +"SELECT key,value FROM testData4") + + checkAnswer(sql("select key,value from table_with_partition"), +testData.collect ++ testData2.collect ++ testData3.collect ++ testData4.collect) + + checkAnswer( +sql( + """select key,value from table_with_partition +| where (ds='4' and key=38) or (ds='3' and key=22)""".stripMargin), + Row(38, "38") :: Row(22, "22") :: Nil) + + checkAnswer( +sql( + """select key,value from table_with_partition +| where (key<40 and key>38) or (ds='3' and key=22)""".stripMargin), +Row(39, "39") :: Row(22, "22") :: Nil) + + sql("DROP TABLE table_with_partition") + sql("DROP TABLE createAndInsertTest") --- End diff -- good catch. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user chenghao-intel commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66714297 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression in the disjunctions, to optimize the partition pruning. + * For instances: (We assume part1 & part2 are the partition keys) + * (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) + * (part1 == 1 and a > 3) or (a < 100) => None + * (a > 100 && b < 100) or (part1 = 10) => None + * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) + * @param predicate disjunctions + * @param partitionKeyIds partition keys in attribute set + * @return + */ + def partitionPrunningFromDisjunction( +predicate: Expression, partitionKeyIds: AttributeSet): Option[Expression] = { +// ignore the pure non-partition key expression in conjunction of the expression tree +val additionalPartPredicate = predicate transformUp { + case a @ And(left, right) if a.deterministic && +left.references.intersect(partitionKeyIds).isEmpty => right + case a @ And(left, right) if a.deterministic && +right.references.intersect(partitionKeyIds).isEmpty => left --- End diff -- Actually, the output of `!(partition = 1 && a > 3)` should be `!(partition=1)`, what should be dropped here is the `a>3`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66702127 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression in the disjunctions, to optimize the partition pruning. + * For instances: (We assume part1 & part2 are the partition keys) + * (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) + * (part1 == 1 and a > 3) or (a < 100) => None + * (a > 100 && b < 100) or (part1 = 10) => None + * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) + * @param predicate disjunctions + * @param partitionKeyIds partition keys in attribute set + * @return + */ + def partitionPrunningFromDisjunction( +predicate: Expression, partitionKeyIds: AttributeSet): Option[Expression] = { +// ignore the pure non-partition key expression in conjunction of the expression tree +val additionalPartPredicate = predicate transformUp { + case a @ And(left, right) if a.deterministic && +left.references.intersect(partitionKeyIds).isEmpty => right + case a @ And(left, right) if a.deterministic && +right.references.intersect(partitionKeyIds).isEmpty => left --- End diff -- At first I think we can set !(... ...) equals to 'true' that make full scan because now Hive did as it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user wangyang1992 commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66575743 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression in the disjunctions, to optimize the partition pruning. + * For instances: (We assume part1 & part2 are the partition keys) + * (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) + * (part1 == 1 and a > 3) or (a < 100) => None + * (a > 100 && b < 100) or (part1 = 10) => None + * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) + * @param predicate disjunctions + * @param partitionKeyIds partition keys in attribute set + * @return + */ + def partitionPrunningFromDisjunction( +predicate: Expression, partitionKeyIds: AttributeSet): Option[Expression] = { +// ignore the pure non-partition key expression in conjunction of the expression tree +val additionalPartPredicate = predicate transformUp { + case a @ And(left, right) if a.deterministic && +left.references.intersect(partitionKeyIds).isEmpty => right + case a @ And(left, right) if a.deterministic && +right.references.intersect(partitionKeyIds).isEmpty => left --- End diff -- Great point @clockfly , but maybe the optimizer will turn this expression to (!(partition = 1) || !(a > 3)) ? [BooleanSimplification](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L907) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user clockfly commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66569556 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression in the disjunctions, to optimize the partition pruning. + * For instances: (We assume part1 & part2 are the partition keys) + * (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) + * (part1 == 1 and a > 3) or (a < 100) => None + * (a > 100 && b < 100) or (part1 = 10) => None + * (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) + * @param predicate disjunctions + * @param partitionKeyIds partition keys in attribute set + * @return + */ + def partitionPrunningFromDisjunction( +predicate: Expression, partitionKeyIds: AttributeSet): Option[Expression] = { +// ignore the pure non-partition key expression in conjunction of the expression tree +val additionalPartPredicate = predicate transformUp { + case a @ And(left, right) if a.deterministic && +left.references.intersect(partitionKeyIds).isEmpty => right + case a @ And(left, right) if a.deterministic && +right.references.intersect(partitionKeyIds).isEmpty => left --- End diff -- How about predicate expression `!(partition = 1 && a > 3)`? Partition `partition = 1` should not be pruned as `partition = 1, a = 2` satisfies the predicate. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user wangyang1992 commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66564745 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala --- @@ -65,4 +69,95 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl sql("DROP TABLE IF EXISTS createAndInsertTest") } } + + test("partition pruning in disjunction") { +withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "true")) { + val testData = sparkContext.parallelize( +(1 to 10).map(i => TestData(i, i.toString))).toDF() + testData.registerTempTable("testData") + + val testData2 = sparkContext.parallelize( +(11 to 20).map(i => TestData(i, i.toString))).toDF() + testData2.registerTempTable("testData2") + + val testData3 = sparkContext.parallelize( +(21 to 30).map(i => TestData(i, i.toString))).toDF() + testData3.registerTempTable("testData3") + + val testData4 = sparkContext.parallelize( +(31 to 40).map(i => TestData(i, i.toString))).toDF() + testData4.registerTempTable("testData4") + + val tmpDir = Files.createTempDir() + // create the table for test + sql(s"CREATE TABLE table_with_partition(key int,value string) " + +s"PARTITIONED by (ds string, ds2 string) location '${tmpDir.toURI.toString}' ") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1', ds2='d1') " + +"SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2', ds2='d1') " + +"SELECT key,value FROM testData2") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3', ds2='d3') " + +"SELECT key,value FROM testData3") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4', ds2='d4') " + +"SELECT key,value FROM testData4") + + checkAnswer(sql("select key,value from table_with_partition"), +testData.collect ++ testData2.collect ++ testData3.collect ++ testData4.collect) + + checkAnswer( +sql( + """select key,value from table_with_partition +| where (ds='4' and key=38) or (ds='3' and key=22)""".stripMargin), + Row(38, "38") :: Row(22, "22") :: Nil) + + checkAnswer( +sql( + """select key,value from table_with_partition +| where (key<40 and key>38) or (ds='3' and key=22)""".stripMargin), +Row(39, "39") :: Row(22, "22") :: Nil) + + sql("DROP TABLE table_with_partition") + sql("DROP TABLE createAndInsertTest") --- End diff -- Not really sure why we should drop "createAndInsertTest", I can find it anywhere. Maybe those temp tables named "testData*" are the ones should be dropped. ^_^ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
Github user wangyang1992 commented on a diff in the pull request: https://github.com/apache/spark/pull/13585#discussion_r66563744 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,6 +92,36 @@ object PhysicalOperation extends PredicateHelper { .map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a) } } + + /** + * Drop the non-partition key expression in the disjunctions, to optimize the partition pruning. --- End diff -- "Drop the non-partition key expression in the disjunctions". Should it be "conjunctions"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13585: [SPARK-15859][SQL] Optimize the partition pruning...
GitHub user chenghao-intel opened a pull request: https://github.com/apache/spark/pull/13585 [SPARK-15859][SQL] Optimize the partition pruning within the disjunction ## What changes were proposed in this pull request? In disjunction, the partition pruning expression can simply ignore the non-partitioned expression if it appears in the junction. For instance: ```scala (part1 == 1 and a > 3) or (part2 == 2 and a < 5) ==> (part1 == 1 or part1 == 2) (part1 == 1 and a > 3) or (a < 100) => None (a > 100 && b < 100) or (part1 = 10) => None (a > 100 && b < 100 and part1 = 10) or (part1 == 2) => (part1 = 10 or part1 == 2) ``` This PR will only works for the HiveTableScan, will submit another PR to optimize the data source API back-end scan. ## How was this patch tested? The unit test is also included in this PR. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chenghao-intel/spark partition Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13585.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13585 commit 08519f2e7a3222cb791e6ce1b8af0c132ff16b29 Author: Cheng HaoDate: 2016-06-08T08:48:52Z optimize the partition pruning --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org