[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238696879 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- Ok, I think I got it now, sorry, I didn't understand :) yes I think this is doable then. Let me try and do that, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238650207 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- seems we are not on the same page... Let's make the example clearer. Assuming a `relation[a ,b]`'s partitioning is `hash(a, b)`, then `Project(a as c, a as d, b, relation)`'s partitioning should be `[hash(c, b), hash(d, b)]`. It's like a flatMap. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238642801 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- I don't think that is right: that would cause the shuffle to happen for every plan which is hashed by both `[hash part c, hash part b]` and `[hash part d, hash part b]` (and also `[hash part a, hash part b]`). I think that if we want to go that way, we'd need a set of equivalent outputPatitioning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238634730 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- For `Project(a as c, a as d, b, relation)`, I think the `outputPartitioning` should be `[hash part c, hash part d, hash part b]`. The point is, we should not report an output partitioning whose attribute is not even in the current plan's output. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238630487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- ah I see now what you mean. I am not sure what you are suggesting is feasible. Imagine that in your example the Project is: `Project(a as c, a as d, b, relation)`. What should the output partitioning be? > What do you mean by ... I meant that when we collect the alias for `a as c`, we are mapping all the attr references of `c` with `a as c` here. In the `outputPartitioning`, there will never be an occurrence of a reference to `c`, but only references to `a`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238626367 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- for example, `relation[a, b]`'s output partitioning is `[hash partition a, hash partition b]`, and `Project(a as c, b, relation)`'s output partitioning should be `[hash partition c, hash partition b]`. What do you mean by `But ProjectExec.outputPartitioning cannot contain a reference to the aliases`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238583330 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- But `ProjectExec.outputPartitioning` cannot contain a reference to the aliases in its project list, as its output partitioning is the one of the child, where that alias doesn't exist. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238524763 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- As an example, `ProjectExec.outputPartitioning` can be wrong, as it doesn't consider the aliases in the project list. I think it's clearer to adjust the `outputPartitioning` there, instead of dealing with it in a rule. What if we have more rules need to check `outputPartitioning` and `requiredChildDistribution`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238460901 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- this is not dealing with the aliases in the `outputPartitioning` but with the ones in the `requiredChildDistribution`. Anyway, I wouldn't do it there, because this would mean moving also the logic for collecting the aliases from the children there, which seems to me an operations which belong to a rule/transforming operator, rather than to the plan operator itself (eg. now these methods are in `PredicateHelper`...). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238459238 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { --- End diff -- I think it is. We are only checking the presence of aliases. In particular, we are collecting all the aliases which are defined in the previous operator. The solution you are suggesting works too IMHO and restricts the scope, but I am not sure it is a good thing, because I see no harm in doing it for other operators: simply they won't contain aliases; while I do see some issues in the maintenance of the "whitelist" of operators you are suggesting (we may miss some now or forget to update later...) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238321460 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- instead of doing it here, shall we deal with alias in `SparkPlan.outputPartitioning` for operators with a project list? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238318256 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { --- End diff -- is it safe to do so? I think we should only collect aliases from operators with a project list, i.e. `Project`, `Aggregate`, `Window`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238292468 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -780,6 +780,23 @@ class PlannerSuite extends SharedSQLContext { classOf[PartitioningCollection]) } } + + test("SPARK-25951: avoid redundant shuffle on rename") { --- End diff -- thanks @gatorsmile, I added also a negative case, but I don't think it is enough. Do you have some hints on cases to test? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238291824 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -223,14 +223,35 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. + * + * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the + * same and one can replace the other (eg. in Optimizer/Analyzer rules where we want to replace + * equivalent expressions). It should not be used (and `sameResult` should be used instead) when + * comparing if 2 expressions produce the same results (in this case `semanticEquals` can be too + * strict). */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output from + * plan perspective may be different, because of different names or similar differences. + * Usually this means that their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + * + * This method should be used (instead of `semanticEquals`) when checking if 2 expressions + * produce the same results (eg. as in the case we are interested to check if the ordering is the + * same). It should not be used (and `semanticEquals` should be used instead) when comparing if 2 + * expressions are the same and one can replace the other. + */ + final def sameResult(other: Expression): Boolean = --- End diff -- thanks, added --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238028577 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -223,14 +223,35 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. + * + * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the + * same and one can replace the other (eg. in Optimizer/Analyzer rules where we want to replace + * equivalent expressions). It should not be used (and `sameResult` should be used instead) when + * comparing if 2 expressions produce the same results (in this case `semanticEquals` can be too + * strict). */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output from + * plan perspective may be different, because of different names or similar differences. + * Usually this means that their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + * + * This method should be used (instead of `semanticEquals`) when checking if 2 expressions + * produce the same results (eg. as in the case we are interested to check if the ordering is the + * same). It should not be used (and `semanticEquals` should be used instead) when comparing if 2 + * expressions are the same and one can replace the other. + */ + final def sameResult(other: Expression): Boolean = --- End diff -- Add unit test cases to SubexpressionEliminationSuite? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238027941 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -780,6 +780,23 @@ class PlannerSuite extends SharedSQLContext { classOf[PartitioningCollection]) } } + + test("SPARK-25951: avoid redundant shuffle on rename") { --- End diff -- I think the test case coverage is not enough for such massive changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237905754 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -780,6 +780,23 @@ class PlannerSuite extends SharedSQLContext { classOf[PartitioningCollection]) } } + + test("SPARK-25951: avoid redundant shuffle on rename") { --- End diff -- @cloud-fan @viirya I added the test, but as I mentioned I had to do another change in order to make it working. Sorry for the mistake. I'd really appreciate if you could review it again. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237887275 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -780,6 +780,23 @@ class PlannerSuite extends SharedSQLContext { classOf[PartitioningCollection]) } } + + test("SPARK-25951: avoid redundant shuffle on rename") { --- End diff -- ah, good point and indeed very useful. In my previous tests I always used a very simple query to verify this and never the one reported in the JIRA. Now I tried that one and I realized that this fix is not very useful as of now, because in renaming like that in the `HashPatitioning` there is the `AttributeReference` to the `Alias`, rather than the `Alias` itself. Since that is the common case, the PR as it is now it is not very useful. If I won't be able to figure out a good way for that, I am going to close this. Thanks and sorry for the trouble. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237818279 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,35 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. + * + * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the + * same and one can replace the other (eg. in Optimizer/Analyzer rules where we want to replace + * equivalent expressions). It should not be used (and `sameResult` should be used instead) when + * comparing if 2 expressions produce the same results (in this case `semanticEquals` can be too + * strict). */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. --- End diff -- How about replace `output` with `output from plan perspective`? ``` Returns true when two expressions will always compute the same result, even if the output from plan perspective may be different, because of different names or similar differences. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237787920 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,35 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. + * + * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the + * same and one can replace the other (eg. in Optimizer/Analyzer rules where we want to replace + * equivalent expressions). It should not be used (and `sameResult` should be used instead) when + * comparing if 2 expressions produce the same results (in this case `semanticEquals` can be too + * strict). */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. --- End diff -- yes, I mean: `sameResult` returns true if 2 expressions return the same data even though from plan perspective they are not the same (eg. the output name/exprIds is different as in this case), while `semanticEquals` ensure they are the same from plan perspective too. If you have better suggestions how to rephrase this, I am happy to improve it. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237749287 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -780,6 +780,23 @@ class PlannerSuite extends SharedSQLContext { classOf[PartitioningCollection]) } } + + test("SPARK-25951: avoid redundant shuffle on rename") { --- End diff -- +1 if possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237747550 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,35 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. + * + * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the + * same and one can replace the other (eg. in Optimizer/Analyzer rules where we want to replace + * equivalent expressions). It should not be used (and `sameResult` should be used instead) when + * comparing if 2 expressions produce the same results (in this case `semanticEquals` can be too + * strict). */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. --- End diff -- I think here `output` is a bit confusing. Do we mean the output names? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237747770 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,35 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. + * + * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the + * same and one can replace the other (eg. in Optimizer/Analyzer rules where we want to replace + * equivalent expressions). It should not be used (and `sameResult` should be used instead) when + * comparing if 2 expressions produce the same results (in this case `semanticEquals` can be too + * strict). */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. --- End diff -- So sameResult returns if the evaluated results between two expressions are exactly the same? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237745005 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -780,6 +780,23 @@ class PlannerSuite extends SharedSQLContext { classOf[PartitioningCollection]) } } + + test("SPARK-25951: avoid redundant shuffle on rename") { --- End diff -- can we have an end-to-end test as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237539036 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2542,10 +2542,10 @@ object EliminateUnions extends Rule[LogicalPlan] { * rule can't work for those parameters. */ object CleanupAliases extends Rule[LogicalPlan] { - private def trimAliases(e: Expression): Expression = { + private[catalyst] def trimAliases(e: Expression): Expression = { e.transformDown { - case Alias(child, _) => child - case MultiAlias(child, _) => child + case Alias(child, _) => trimAliases(child) --- End diff -- just using `transformUp`solves the issue --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237536540 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2542,10 +2542,10 @@ object EliminateUnions extends Rule[LogicalPlan] { * rule can't work for those parameters. */ object CleanupAliases extends Rule[LogicalPlan] { - private def trimAliases(e: Expression): Expression = { + private[catalyst] def trimAliases(e: Expression): Expression = { e.transformDown { - case Alias(child, _) => child - case MultiAlias(child, _) => child + case Alias(child, _) => trimAliases(child) --- End diff -- ah, I did a stupid thing here. So the problem is that: since it returns `child` for `this`, in transformDown we apply the rule to `child` children, instead of applying to `child` itself. So the problem here is with 2 consecutive `Alias`. Let me find a better fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237532156 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2542,10 +2542,10 @@ object EliminateUnions extends Rule[LogicalPlan] { * rule can't work for those parameters. */ object CleanupAliases extends Rule[LogicalPlan] { - private def trimAliases(e: Expression): Expression = { + private[catalyst] def trimAliases(e: Expression): Expression = { e.transformDown { - case Alias(child, _) => child - case MultiAlias(child, _) => child + case Alias(child, _) => trimAliases(child) --- End diff -- it's `transformDown`, why doesn't it work? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237526646 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2542,10 +2542,10 @@ object EliminateUnions extends Rule[LogicalPlan] { * rule can't work for those parameters. */ object CleanupAliases extends Rule[LogicalPlan] { - private def trimAliases(e: Expression): Expression = { + private[catalyst] def trimAliases(e: Expression): Expression = { e.transformDown { - case Alias(child, _) => child - case MultiAlias(child, _) => child + case Alias(child, _) => trimAliases(child) --- End diff -- the point is that now this method removes only the first `Alias` it finds (and it doesn't go on recursively), which is the reason of the UT failure. Also checking the comment on the method it seems not the expected behavior of this method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237519971 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2542,10 +2542,10 @@ object EliminateUnions extends Rule[LogicalPlan] { * rule can't work for those parameters. */ object CleanupAliases extends Rule[LogicalPlan] { - private def trimAliases(e: Expression): Expression = { + private[catalyst] def trimAliases(e: Expression): Expression = { e.transformDown { - case Alias(child, _) => child - case MultiAlias(child, _) => child + case Alias(child, _) => trimAliases(child) --- End diff -- what's going on here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237076213 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { +case a: Alias => sameResult(a.child) +case _ => this.semanticEquals(other) --- End diff -- we can do ``` CleanupAliases.trimAliases(this) semanticEquals CleanupAliases.trimAliases(other) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237075431 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { +case a: Alias => sameResult(a.child) +case _ => this.semanticEquals(other) --- End diff -- well, it needs to be overridden by `HashPartitioning` too, so since I am not able to make it final anyway, I don't think it is a good idea. Well, I can add a match on `HashPartitioning`too, but it doesn't seem a clean solution to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237073129 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { +case a: Alias => sameResult(a.child) +case _ => this.semanticEquals(other) --- End diff -- I think it is doable, but I didn't want to put too many `match` where it was not needed. But if you prefer that way I can try and do that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237070739 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { +case a: Alias => sameResult(a.child) +case _ => this.semanticEquals(other) --- End diff -- can we also strip the alias of this here? so that we can mark `sameResult` as final. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237070496 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { --- End diff -- Sure, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237069486 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { --- End diff -- can you put it in the method doc(both `semanticEquals` and `sameResult`)? This makes sense to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237068718 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { --- End diff -- remove `Alias` is not possible for the reason explained in https://github.com/apache/spark/pull/22957#issuecomment-436992955. In general, `semanticEquals` should be used when we want to replace an expression with another, while `sameResult` should be used in order to check that 2 expressions return the same output. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237065506 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { --- End diff -- "erase the name" can also mean remove `Alias`. If we can't clearly tell the difference between `semanticEquals` and `sameResult`, and give a guideline about using which one in which case, I think we should just update `semanticEquals`(i.e. `Canonicalize`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237062190 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { --- End diff -- that is reasonable but it doesn't solve the problem stated in the JIRA. So the goal here is to avoid that something like `a as b` is considered different from `a` in terms of ordering/distribution. If we just erase the name of alias, the 2 expression would still be different because of the presence of `Alias` itself would make the 2 expressions different. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237049166 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { --- End diff -- I know it's always safer to introduce a new API, does is it really necessary? In `Canonicalize`, we erase the name for attributes, I think it's reasonable to erase the name of `Alias`, as it doesn't affect the output. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org