[GitHub] spark pull request #22364: [SPARK-25379][SQL] Improve AttributeSet and Colum...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22364 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22364: [SPARK-25379][SQL] Improve AttributeSet and Colum...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22364#discussion_r216572034 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala --- @@ -39,10 +41,15 @@ object AttributeSet { /** Constructs a new [[AttributeSet]] given a sequence of [[Expression Expressions]]. */ def apply(baseSet: Iterable[Expression]): AttributeSet = { -new AttributeSet( - baseSet -.flatMap(_.references) -.map(new AttributeEquals(_)).toSet) --- End diff -- Thank you for your comment and help here! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22364: [SPARK-25379][SQL] Improve AttributeSet and Colum...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22364#discussion_r216525223 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala --- @@ -39,10 +41,15 @@ object AttributeSet { /** Constructs a new [[AttributeSet]] given a sequence of [[Expression Expressions]]. */ def apply(baseSet: Iterable[Expression]): AttributeSet = { -new AttributeSet( - baseSet -.flatMap(_.references) -.map(new AttributeEquals(_)).toSet) --- End diff -- Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22364: [SPARK-25379][SQL] Improve AttributeSet and Colum...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22364#discussion_r216369638 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala --- @@ -39,10 +41,15 @@ object AttributeSet { /** Constructs a new [[AttributeSet]] given a sequence of [[Expression Expressions]]. */ def apply(baseSet: Iterable[Expression]): AttributeSet = { -new AttributeSet( - baseSet -.flatMap(_.references) -.map(new AttributeEquals(_)).toSet) --- End diff -- I have to use a `LinkedHashSet` because without keeping the order of insertion I had some UT failures. But I can use the more compact syntax you suggested here, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22364: [SPARK-25379][SQL] Improve AttributeSet and Colum...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22364#discussion_r216329962 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala --- @@ -39,10 +41,15 @@ object AttributeSet { /** Constructs a new [[AttributeSet]] given a sequence of [[Expression Expressions]]. */ def apply(baseSet: Iterable[Expression]): AttributeSet = { -new AttributeSet( - baseSet -.flatMap(_.references) -.map(new AttributeEquals(_)).toSet) --- End diff -- The name is ok. We still need to use the mutable set instead of `new AttributeSet(sets.foldLeft(Set.empty[AttributeEquals])(_ ++ _.baseSet))`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22364: [SPARK-25379][SQL] Improve AttributeSet and Colum...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22364#discussion_r216315044 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala --- @@ -39,10 +41,15 @@ object AttributeSet { /** Constructs a new [[AttributeSet]] given a sequence of [[Expression Expressions]]. */ def apply(baseSet: Iterable[Expression]): AttributeSet = { -new AttributeSet( - baseSet -.flatMap(_.references) -.map(new AttributeEquals(_)).toSet) --- End diff -- I cannot rename to `apply` because there is already an `apply` with `Iterable` and we cannot have 2 `apply` one with `Iterable[Expression]` and one with `Iterable[AbstractSet]` because they would be conflicting. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22364: [SPARK-25379][SQL] Improve AttributeSet and Colum...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22364#discussion_r216303304 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala --- @@ -39,10 +41,15 @@ object AttributeSet { /** Constructs a new [[AttributeSet]] given a sequence of [[Expression Expressions]]. */ def apply(baseSet: Iterable[Expression]): AttributeSet = { -new AttributeSet( - baseSet -.flatMap(_.references) -.map(new AttributeEquals(_)).toSet) --- End diff -- ah, ok. can you rename `fromAttributeSets` -> `apply`, and then change the mutable set? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22364: [SPARK-25379][SQL] Improve AttributeSet and Colum...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22364#discussion_r216298504 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala --- @@ -39,10 +41,15 @@ object AttributeSet { /** Constructs a new [[AttributeSet]] given a sequence of [[Expression Expressions]]. */ def apply(baseSet: Iterable[Expression]): AttributeSet = { -new AttributeSet( - baseSet -.flatMap(_.references) -.map(new AttributeEquals(_)).toSet) --- End diff -- ok, so there are 2 reasons why the current one is better than the proposed: - in your proposal we are calling `_.references` also in cases when it is not needed. This causes unneeded operations (basically https://github.com/apache/spark/pull/22364/files#diff-75576f0ec7f9d8b5032000245217d233R40 is called for each attribute... see also `Attribute.references`); - the current one uses a mutable Set, but this is not a big deal, it can be changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22364: [SPARK-25379][SQL] Improve AttributeSet and Colum...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22364#discussion_r216294749 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala --- @@ -39,10 +41,15 @@ object AttributeSet { /** Constructs a new [[AttributeSet]] given a sequence of [[Expression Expressions]]. */ def apply(baseSet: Iterable[Expression]): AttributeSet = { -new AttributeSet( - baseSet -.flatMap(_.references) -.map(new AttributeEquals(_)).toSet) --- End diff -- The simple one is better? https://github.com/maropu/spark/commit/cba767e3721e78b934ef02631378435eb3aef730 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22364: [SPARK-25379][SQL] Improve AttributeSet and Colum...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22364#discussion_r216224645 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala --- @@ -39,10 +41,15 @@ object AttributeSet { /** Constructs a new [[AttributeSet]] given a sequence of [[Expression Expressions]]. */ def apply(baseSet: Iterable[Expression]): AttributeSet = { -new AttributeSet( - baseSet -.flatMap(_.references) -.map(new AttributeEquals(_)).toSet) --- End diff -- I think this is just duplicating the code in `fromAttributeSets`, right? What would be the benefit of doing that? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22364: [SPARK-25379][SQL] Improve AttributeSet and Colum...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22364#discussion_r216199910 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala --- @@ -39,10 +41,15 @@ object AttributeSet { /** Constructs a new [[AttributeSet]] given a sequence of [[Expression Expressions]]. */ def apply(baseSet: Iterable[Expression]): AttributeSet = { -new AttributeSet( - baseSet -.flatMap(_.references) -.map(new AttributeEquals(_)).toSet) --- End diff -- hi, good catch. one question here; is it bad to just simply write `new AttributeSet(baseSet.map(_.references.baseSet).foldLeft(Set.empty[AttributeEquals])(_ ++ _)))`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22364: [SPARK-25379][SQL] Improve AttributeSet and Colum...
GitHub user mgaido91 opened a pull request: https://github.com/apache/spark/pull/22364 [SPARK-25379][SQL] Improve AttributeSet and ColumnPruning performance ## What changes were proposed in this pull request? This PR contains 3 optimizations: 1) it improves significantly the operation `--` on `AttributeSet`. As a benchmark for the `--` operation, the following code has been run ``` test("AttributeSet -- benchmark") { val attrSetA = AttributeSet((1 to 100).map { i => AttributeReference(s"c$i", IntegerType)() }) val attrSetB = AttributeSet(attrSetA.take(80).toSeq) val attrSetC = AttributeSet((1 to 100).map { i => AttributeReference(s"c2_$i", IntegerType)() }) val attrSetD = AttributeSet((attrSetA.take(50) ++ attrSetC.take(50)).toSeq) val attrSetE = AttributeSet((attrSetC.take(50) ++ attrSetA.take(50)).toSeq) val n_iter = 100 val t0 = System.nanoTime() (1 to n_iter) foreach { _ => val r1 = attrSetA -- attrSetB val r2 = attrSetA -- attrSetC val r3 = attrSetA -- attrSetD val r4 = attrSetA -- attrSetE } val t1 = System.nanoTime() val totalTime = t1 - t0 println(s"Average time: ${totalTime / n_iter} us") } ``` The results are: ``` Before PR - Average time: 67674 us (100 %) After PR - Average time: 28827 us (42.6 %) ``` 2) In `ColumnPruning`, it replaces the occurrences of `(attributeSet1 -- attributeSet2).nonEmpty` with `attributeSet1.subsetOf(attributeSet2)` which is order of magnitudes more efficient (especially where there are many attributes). Running the previous benchmark replacing `--` with `subsetOf` returns: ``` Average time: 67 us (0.1 %) ``` 3) Provides a more efficient way of building `AttributeSet`s, which can greatly improve the performance of the methods `references` and `outputSet` of `Expression` and `QueryPlan`. This basically avoids unneeded operations (eg. creating many `AttributeEqual` wrapper classes which could be avoided) The overall effect of those optimizations has been tested on `ColumnPruning` with the following benchmark: ``` test("ColumnPruning benchmark") { val attrSetA = (1 to 100).map { i => AttributeReference(s"c$i", IntegerType)() } val attrSetB = attrSetA.take(80) val attrSetC = attrSetA.take(20).map(a => Alias(Add(a, Literal(1)), s"${a.name}_1")()) val input = LocalRelation(attrSetA) val query1 = Project(attrSetB, Project(attrSetA, input)).analyze val query2 = Project(attrSetC, Project(attrSetA, input)).analyze val query3 = Project(attrSetA, Project(attrSetA, input)).analyze val nIter = 10 val t0 = System.nanoTime() (1 to nIter).foreach { _ => ColumnPruning(query1) ColumnPruning(query2) ColumnPruning(query3) } val t1 = System.nanoTime() val totalTime = t1 - t0 println(s"Average time: ${totalTime / nIter} us") } ``` The output of the test is: ``` Before PR - Average time: 733471 us (100 %) After PR - Average time: 362455 us (49.4 %) ``` The performance improvement has been evaluated also on the `SQLQueryTestSuite`'s queries: ``` (before) org.apache.spark.sql.catalyst.optimizer.ColumnPruning 518413198 / 1377707172 2756 / 15717 (after) org.apache.spark.sql.catalyst.optimizer.ColumnPruning 415432579 / 1121147950 2756 / 15717 % Running time 80.1% / 81.3% ``` Also other rules benefit especially from (3), despite the impact is lower, eg: ``` (before) org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences 307341442 / 623436806 2154 / 16480 (after) org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences 290511312 / 560962495 2154 / 16480 % Running time 94.5% / 90.0% ``` The reason why the impact on the `SQLQueryTestSuite`'s queries is lower compared to the other benchmark is that the optimizations are more significant when the number of attributes involved is higher. Since in the tests we often have very few attributes, the effect there is lower. ## How was this patch tested? run benchmarks + existing UTs You can merge