[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/23171 As @rxin said, if we introduce a separate expression for the switch-based approach, then we will need to modify other places. For example, `DataSourceStrategy$translateFilter`. So, integrating into `In` or `InSet` seems safer. I think we can move the switch-based logic to `InSet` and make `InSet` responsible for picking the most optimal execution path. We might need to modify the condition when we convert `In` to `InSet` as this will most likely depend on the underlying data type. I saw noticeable improvements starting from 5 elements when you compare the if-else approach to the switch-based one. Right now, the conversion happens for more than 10 elements. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/23171 To sum up, I would set the goal of this PR is to make `In` expressions as efficient as possible for bytes/shorts/ints. Then we can do benchmarks for `In` vs `InSet` in [SPARK-26203](https://issues.apache.org/jira/browse/SPARK-26203) and try to come up with a solution for `InSet` in [SPARK-26204](https://issues.apache.org/jira/browse/SPARK-26204). By the time we solve [SPARK-26204](https://issues.apache.org/jira/browse/SPARK-26204), we will have a clear undestanding of pros and cons in `In` and `InSet` and would be able to determine the right thresholds. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/23171 @dbtsai @mgaido91 I think we can come back to this question once [SPARK-26203](https://issues.apache.org/jira/browse/SPARK-26203) is resolved. That JIRA will give us enough information about each data type. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/23171 @cloud-fan, yeah, letâs see if this PR is useful. The original idea wasnât to avoid fixing autoboxing in `InSet`. `In` was tested on 250 numbers to prove O(1) time complexity on compact values and outline problems with `InSet`. After this change, `In` will be faster than `InSet` but this is not the goal. Overall, the intent was to have a tiny and straightforward change that would optimize `In` expressions even if the number of elements is less than âspark.sql.optimizer.inSetConversionThresholdâ and Spark does not use `InSet`. Once we solve autoboxing issues in `InSet`, we would need to benchmark against this approach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/23171 @gatorsmile @cloud-fan @dongjoon-hyun @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts,...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/23171 [SPARK-26205][SQL] Optimize In for bytes, shorts, ints ## What changes were proposed in this pull request? This PR optimizes `In` expressions for byte, short, integer types. It is a follow-up on PR #21442 from @dbtsai. Currently, `In` expressions are compiled into a sequence of if-else statement, which results in O(n) time complexity. `InSet` is an optimized version of `In`, which is supposed to improve the performance if the number of elements is big enough. However, `InSet` actually degrades the performance in many cases due to various reasons (`InSet` will be handled in [SPARK-26203](https://issues.apache.org/jira/browse/SPARK-26203) and [SPARK-26204](https://issues.apache.org/jira/browse/SPARK-26204)). The main idea of this PR is to make use of `tableswitch` and `lookupswitch` bytecode instructions to speed up `In` expressions. In short, we can improve our time complexity from O(n) to O(1) or at least O(log n) by using Java `switch` statements. We will have O(1) time complexity if our case values are compact and `tableswitch` can be used. Otherwise, `lookupswitch` will give us O(log n). See [here](https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-3.html#jvms-3.10) and [here](https://stackoverflow.com/questions/10287700/difference-between-jvms-lookupswitch-and-tableswitch) for more information. An important benefit of the proposed approach is that we do not have to pay an extra cost for autoboxing as in case of `InSet`. As a consequence, we can substantially outperform `InSet` even on 250+ elements. This PR does not cover long values as Java `switch` statements cannot be used on them. However, we can have a follow-up PR with an approach similar to binary search. ## How was this patch tested? ### Correctness There is a new test that verifies the logic of the proposed optimization. ### Performance Benchmarks The performance impact was measured by the benchmarks below (an appropriate part of them will be included in [SPARK-26203](https://issues.apache.org/jira/browse/SPARK-26203)). ``` def compactBytesBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Benchmark = { val name = s"$numItems compact bytes" val values = (1 to numItems).map(v => s"CAST($v AS tinyint)") val df = spark.range(1, numRows).select($"id".cast(ByteType)) benchmark(name, df, values, numRows, minNumIters) } def nonCompactBytesBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Benchmark = { val step = (Byte.MaxValue.toInt - Byte.MinValue.toInt) / numItems val maxValue = Byte.MinValue + numItems * step val rangeSize = maxValue - Byte.MinValue require(isLookupSwitch(rangeSize, numItems)) val name = s"$numItems non-compact bytes" val values = (Byte.MinValue until maxValue by step).map(v => s"CAST($v AS tinyint)") val df = spark.range(1, numRows).select($"id".cast(ByteType)) benchmark(name, df, values, numRows, minNumIters) } def compactShortsBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Benchmark = { val name = s"$numItems compact shorts" val values = (1 to numItems).map(v => s"CAST($v AS smallint)") val df = spark.range(1, numRows).select($"id".cast(ShortType)) benchmark(name, df, values, numRows, minNumIters) } def nonCompactShortsBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Benchmark = { val step = (Short.MaxValue.toInt - Short.MinValue.toInt) / numItems val maxValue = Short.MinValue + numItems * step val rangeSize = maxValue - Short.MinValue require(isLookupSwitch(rangeSize, numItems)) val name = s"$numItems non-compact shorts" val values = (Short.MinValue until maxValue by step).map(v => s"CAST($v AS smallint)") val df = spark.range(1, numRows).select($"id".cast(ShortType)) benchmark(name, df, values, numRows, minNumIters) } def compactIntsBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Benchmark = { val name = s"$numItems compact ints" val values = (1 to numItems).map(v => 1000 + v) val df = spark.range(1, numRows).select($"id".cast(IntegerType)) benchmark(name, df, values, numRows, minNumIters) } def nonCompactIntsBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Benchmark = { val step = (Int.MaxValue.toLong - Int.MinValue.toLong) / numItems val maxValue = Int.MinValue + numItems * step val rangeSize = maxValue - Int.MinValue require
[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23139#discussion_r236466962 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, ArrayFilter, CaseWhen, Expression, If} +import org.apache.spark.sql.catalyst.expressions.{LambdaFunction, Literal, MapFilter, Or} +import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.BooleanType +import org.apache.spark.util.Utils + + +/** + * A rule that replaces `Literal(null, BooleanType)` with `FalseLiteral`, if possible, in the search + * condition of the WHERE/HAVING/ON(JOIN) clauses, which contain an implicit Boolean operator --- End diff -- I think the scope of this rule is a bit bigger. For example, higher-order functions, all `If` and `CaseWhen` expressions. Would it make sense to replace `"in the search condition of the WHERE/HAVING/ON(JOIN) clauses"` with `"in predicates"`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23139: [SPARK-25860][SPARK-26107] [FOLLOW-UP] Rule Repla...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23139#discussion_r236467423 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, ArrayFilter, CaseWhen, Expression, If} +import org.apache.spark.sql.catalyst.expressions.{LambdaFunction, Literal, MapFilter, Or} +import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.BooleanType +import org.apache.spark.util.Utils + + +/** + * A rule that replaces `Literal(null, BooleanType)` with `FalseLiteral`, if possible, in the search + * condition of the WHERE/HAVING/ON(JOIN) clauses, which contain an implicit Boolean operator + * "(search condition) = TRUE". The replacement is only valid when `Literal(null, BooleanType)` is --- End diff -- I am not sure I understand `"which contain an implicit Boolean operator "(search condition) = TRUE""`. Could you, please, elaborate a bit? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23079: [SPARK-26107][SQL] Extend ReplaceNullWithFalseInPredicat...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/23079 LGTM as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23079: [SPARK-26107][SQL] Extend ReplaceNullWithFalseInP...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/23079#discussion_r234467085 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala --- @@ -298,6 +299,45 @@ class ReplaceNullWithFalseSuite extends PlanTest { testProjection(originalExpr = column, expectedExpr = column) } + test("replace nulls in lambda function of ArrayFilter") { +val cond = GreaterThan(UnresolvedAttribute("e"), Literal(0)) --- End diff -- Test cases for `ArrayFilter` and `ArrayExists` seem to be identical. As we have those tests anyway, would it make sense to cover different lambda functions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23079: [SPARK-26107][SQL] Extend ReplaceNullWithFalseInPredicat...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/23079 @rednaxelafx I am glad the rule gets more adoption. Renaming also makes sense to me. Shall we extend `ReplaceNullWithFalseEndToEndSuite` as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22966: [SPARK-25965][SQL][TEST] Add avro read benchmark
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/22966 I also think having a performance trend would be useful. I'll be glad to help with this effort. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r229705741 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2585,4 +2585,45 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(swappedDf.filter($"key"($"map") > "a"), Row(2, Map(2 -> "b"))) } + + test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever possible") { + +def checkPlanIsEmptyLocalScan(df: DataFrame): Unit = df.queryExecution.executedPlan match { + case s: LocalTableScanExec => assert(s.rows.isEmpty) + case p => fail(s"$p is not LocalTableScanExec") +} + +val df1 = Seq((1, true), (2, false)).toDF("l", "b") +val df2 = Seq(2, 3).toDF("l") + +val q1 = df1.where("IF(l > 10, false, b AND null)") +checkAnswer(q1, Seq.empty) +checkPlanIsEmptyLocalScan(q1) + +val q2 = df1.where("CASE WHEN l < 10 THEN null WHEN l > 40 THEN false ELSE null END") +checkAnswer(q2, Seq.empty) +checkPlanIsEmptyLocalScan(q2) + +val q3 = df1.join(df2, when(df1("l") > df2("l"), lit(null)).otherwise(df1("b") && lit(null))) +checkAnswer(q3, Seq.empty) +checkPlanIsEmptyLocalScan(q3) + +val q4 = df1.where("IF(IF(b, null, false), true, null)") +checkAnswer(q4, Seq.empty) +checkPlanIsEmptyLocalScan(q4) + +val q5 = df1.selectExpr("IF(l > 1 AND null, 5, 1) AS out") +checkAnswer(q5, Row(1) :: Row(1) :: Nil) +q5.queryExecution.executedPlan.foreach { p => + assert(p.expressions.forall(e => e.find(_.isInstanceOf[If]).isEmpty)) --- End diff -- You are right, this can pass if `ConvertToLocalRelation` is enabled. When I tested this check, I did not take into account that `SharedSparkSession` disables `ConvertToLocalRelation`. So, the check worked correctly but only because `ConvertToLocalRelation` was disabled in `SharedSparkSession`. Letâs switch to tables. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r229449496 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,60 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * This rule applies to conditions in [[Filter]] and [[Join]]. Moreover, it transforms predicates + * in all [[If]] expressions as well as branch conditions in all [[CaseWhen]] expressions. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As this rule is not limited to conditions in [[Filter]] and [[Join]], arbitrary plans can + * benefit from it. For example, `Project(If(And(cond, Literal(null)), Literal(1), Literal(2)))` + * can be simplified into `Project(Literal(2))`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case cw @ CaseWhen(branches, _) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +cw.copy(branches = newBranches) +} + } + + /** + * Recursively replaces `Literal(null, _)` with `FalseLiteral`. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = e match { +case cw: CaseWhen if cw.dataType == BooleanType => + val newBranches = cw.branches.map { case (cond, value) => +replaceNullWithFalse(cond) -> replaceNullWithFalse(value) + } + val newElseValue = cw.elseValue.map(replaceNullWithFalse) + CaseWhen(newBranches, newElseValue) +case i @ If(pred, trueVal, falseVal) if i.dataType == BooleanType => + If(replaceNullWithFalse(pred), replaceNullWithFalse(trueVal), replaceNullWithFalse(falseVal)) --- End diff -- Let me know if I got you correctly here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r229449194 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2578,4 +2578,45 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row ("abc", 1)) } } + + test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever possible") { + +def checkPlanIsEmptyLocalScan(df: DataFrame): Unit = df.queryExecution.executedPlan match { --- End diff -- I see, thanks. So, you mean to use `withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "") {...}` to ensure that `ConvertToLocalRelation` is not excluded? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r229445682 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,60 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * This rule applies to conditions in [[Filter]] and [[Join]]. Moreover, it transforms predicates + * in all [[If]] expressions as well as branch conditions in all [[CaseWhen]] expressions. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As this rule is not limited to conditions in [[Filter]] and [[Join]], arbitrary plans can + * benefit from it. For example, `Project(If(And(cond, Literal(null)), Literal(1), Literal(2)))` + * can be simplified into `Project(Literal(2))`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case cw @ CaseWhen(branches, _) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +cw.copy(branches = newBranches) +} + } + + /** + * Recursively replaces `Literal(null, _)` with `FalseLiteral`. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = e match { +case cw: CaseWhen if cw.dataType == BooleanType => --- End diff -- This case is also covered and tested in `"replace null in conditions of CaseWhen"`, `"replace null in conditions of CaseWhen inside another CaseWhen"`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r229445313 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,60 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * This rule applies to conditions in [[Filter]] and [[Join]]. Moreover, it transforms predicates + * in all [[If]] expressions as well as branch conditions in all [[CaseWhen]] expressions. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As this rule is not limited to conditions in [[Filter]] and [[Join]], arbitrary plans can + * benefit from it. For example, `Project(If(And(cond, Literal(null)), Literal(1), Literal(2)))` + * can be simplified into `Project(Literal(2))`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case cw @ CaseWhen(branches, _) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +cw.copy(branches = newBranches) +} + } + + /** + * Recursively replaces `Literal(null, _)` with `FalseLiteral`. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = e match { +case cw: CaseWhen if cw.dataType == BooleanType => + val newBranches = cw.branches.map { case (cond, value) => +replaceNullWithFalse(cond) -> replaceNullWithFalse(value) + } + val newElseValue = cw.elseValue.map(replaceNullWithFalse) + CaseWhen(newBranches, newElseValue) +case i @ If(pred, trueVal, falseVal) if i.dataType == BooleanType => + If(replaceNullWithFalse(pred), replaceNullWithFalse(trueVal), replaceNullWithFalse(falseVal)) --- End diff -- This case is handled in `apply` and tested in `"replace null in predicates of If"`, `"replace null in predicates of If inside another If"` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r229442843 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,60 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * This rule applies to conditions in [[Filter]] and [[Join]]. Moreover, it transforms predicates + * in all [[If]] expressions as well as branch conditions in all [[CaseWhen]] expressions. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As this rule is not limited to conditions in [[Filter]] and [[Join]], arbitrary plans can + * benefit from it. For example, `Project(If(And(cond, Literal(null)), Literal(1), Literal(2)))` + * can be simplified into `Project(Literal(2))`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case cw @ CaseWhen(branches, _) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +cw.copy(branches = newBranches) +} + } + + /** + * Recursively replaces `Literal(null, _)` with `FalseLiteral`. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. --- End diff -- I like your snippet because it is clean. We also considered a similar approach. 1. Unfortunately, it does not handle nested `If`/`CaseWhen` expressions as they are not `NullIntolerant`. For example, cases like `If(If(a > 1, FalseLiteral, Literal(null, _)), 1, 2)` will not be optimized if we remove branches for `If`/`CaseWhen`. 2. If we just add one more brach to handle all `NullIntolerant` expressions, I am not sure it will give a lot of benefits as those expressions are transformed into `Literal(null, _)` by `NullPropagation` and we operate in the same batch. 3. As @gatorsmile said, we should be really careful with generalization. For example, `Not` is `NullIntolerant`. `Not(null)` is transformed into `null` by `NullPropagation`. We need to ensure that we do not replace `null` inside `Not` and do not convert `Not(null)` into `Not(FalseLiteral)`. Therefore, the intention was to keep things simple to be safe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r229133793 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2578,4 +2578,45 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row ("abc", 1)) } } + + test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever possible") { + +def checkPlanIsEmptyLocalScan(df: DataFrame): Unit = df.queryExecution.executedPlan match { --- End diff -- Do we actually have a way to enable/disable `ConvertToLocalRelation`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r229133550 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,65 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + * + * Similarly, the same logic can be applied to conditions in [[Join]], predicates in [[If]], + * conditions in [[CaseWhen]]. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case CaseWhen(branches, elseValue) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +CaseWhen(newBranches, elseValue) +} + } + + /** + * Recursively replaces `Literal(null, _)` with `FalseLiteral`. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = e match { +case cw: CaseWhen if getValues(cw).forall(isNullOrBoolean) => + val newBranches = cw.branches.map { case (cond, value) => +replaceNullWithFalse(cond) -> replaceNullWithFalse(value) + } + val newElseValue = cw.elseValue.map(replaceNullWithFalse) + CaseWhen(newBranches, newElseValue) +case If(pred, trueVal, falseVal) if Seq(trueVal, falseVal).forall(isNullOrBoolean) => + If(replaceNullWithFalse(pred), replaceNullWithFalse(trueVal), replaceNullWithFalse(falseVal)) +case And(left, right) => --- End diff -- Could you elaborate a bit more on `null && false`? I had in mind `AND(true, null)` and `OR(false, null)`, which are tricky. For example, if we use `AND(true, null)` in SELECT, we will get `null`. However, if we use it inside `Filter` or predicate of `If`, it will be semantically equivalent to `false` (e.g., `If$eval`). Therefore, the proposed rule has a limited scope. I explored the source code & comments in `And/Or` to come up with an edge case that wouldnât work. I could not find such a case. To me, it seems safe because the rule is applied only to expressions that evaluate to `false` if the underlying expression is `null` (i.e., conditions in `Filter`/`Join`, predicates in `If`, conditions in `CaseWhen`). Please, let me know if you have a particular case to test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r228741884 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,65 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + * + * Similarly, the same logic can be applied to conditions in [[Join]], predicates in [[If]], + * conditions in [[CaseWhen]]. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case CaseWhen(branches, elseValue) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +CaseWhen(newBranches, elseValue) +} + } + + /** + * Recursively replaces `Literal(null, _)` with `FalseLiteral`. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = e match { +case cw: CaseWhen if getValues(cw).forall(isNullOrBoolean) => + val newBranches = cw.branches.map { case (cond, value) => +replaceNullWithFalse(cond) -> replaceNullWithFalse(value) + } + val newElseValue = cw.elseValue.map(replaceNullWithFalse) + CaseWhen(newBranches, newElseValue) +case If(pred, trueVal, falseVal) if Seq(trueVal, falseVal).forall(isNullOrBoolean) => --- End diff -- Yep, I shortened this to stay in one line below. I can either rename `pred` to `p`or split line 783 into multiple. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r228741800 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,65 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + * + * Similarly, the same logic can be applied to conditions in [[Join]], predicates in [[If]], + * conditions in [[CaseWhen]]. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case CaseWhen(branches, elseValue) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +CaseWhen(newBranches, elseValue) +} + } + + /** + * Recursively replaces `Literal(null, _)` with `FalseLiteral`. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = e match { --- End diff -- Also, that's the reason why we don't use `transformExpressionsDown`. We will stop the replacement as soon as we hit an expression that is not `CaseWhen`, `If`, `And`, `Or` or `Literal(null, _)`. Therefore, `If(IsNull(Literal(null, _)))` won't be transformed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22857: [SPARK-25860][SQL] Replace Literal(null, _) with FalseLi...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/22857 @dbtsai @gatorsmile @cloud-fan could you guys, please, take a look? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/22857 [SPARK-25860][SQL] Replace Literal(null, _) with FalseLiteral whenever possible ## What changes were proposed in this pull request? This PR proposes a new optimization rule that replaces `Literal(null, _)` with `FalseLiteral` in conditions in `Join` and `Filter`, predicates in `If`, conditions in `CaseWhen`. The idea is that some expressions evaluate to `false` if the underlying expression is `null` (as an example see `GeneratePredicate$create` or `doGenCode` and `eval` methods in `If` and `CaseWhen`). Therefore, we can replace `Literal(null, _)` with `FalseLiteral`, which can lead to more optimizations later on. Letâs consider a few examples. ``` val df = spark.range(1, 100).select($"id".as("l"), ($"id" > 50).as("b")) df.createOrReplaceTempView("t") df.createOrReplaceTempView("p") ``` **Case 1** ``` spark.sql("SELECT * FROM t WHERE if(l > 10, false, NULL)").explain(true) // without the new rule ⦠== Optimized Logical Plan == Project [id#0L AS l#2L, cast(id#0L as string) AS s#3] +- Filter if ((id#0L > 10)) false else null +- Range (1, 100, step=1, splits=Some(12)) == Physical Plan == *(1) Project [id#0L AS l#2L, cast(id#0L as string) AS s#3] +- *(1) Filter if ((id#0L > 10)) false else null +- *(1) Range (1, 100, step=1, splits=12) // with the new rule ⦠== Optimized Logical Plan == LocalRelation , [l#2L, s#3] == Physical Plan == LocalTableScan , [l#2L, s#3] ``` **Case 2** ``` spark.sql("SELECT * FROM t WHERE CASE WHEN l < 10 THEN null WHEN l > 40 THEN false ELSE null ENDâ).explain(true) // without the new rule ... == Optimized Logical Plan == Project [id#0L AS l#2L, cast(id#0L as string) AS s#3] +- Filter CASE WHEN (id#0L < 10) THEN null WHEN (id#0L > 40) THEN false ELSE null END +- Range (1, 100, step=1, splits=Some(12)) == Physical Plan == *(1) Project [id#0L AS l#2L, cast(id#0L as string) AS s#3] +- *(1) Filter CASE WHEN (id#0L < 10) THEN null WHEN (id#0L > 40) THEN false ELSE null END +- *(1) Range (1, 100, step=1, splits=12) // with the new rule ... == Optimized Logical Plan == LocalRelation , [l#2L, s#3] == Physical Plan == LocalTableScan , [l#2L, s#3] ``` **Case 3** ``` spark.sql("SELECT * FROM t JOIN p ON IF(t.l > p.l, null, false)").explain(true) // without the new rule ... == Optimized Logical Plan == Join Inner, if ((l#2L > l#37L)) null else false :- Project [id#0L AS l#2L, cast(id#0L as string) AS s#3] : +- Range (1, 100, step=1, splits=Some(12)) +- Project [id#0L AS l#37L, cast(id#0L as string) AS s#38] +- Range (1, 100, step=1, splits=Some(12)) == Physical Plan == BroadcastNestedLoopJoin BuildRight, Inner, if ((l#2L > l#37L)) null else false :- *(1) Project [id#0L AS l#2L, cast(id#0L as string) AS s#3] : +- *(1) Range (1, 100, step=1, splits=12) +- BroadcastExchange IdentityBroadcastMode +- *(2) Project [id#0L AS l#37L, cast(id#0L as string) AS s#38] +- *(2) Range (1, 100, step=1, splits=12) // with the new rule ... == Optimized Logical Plan == LocalRelation , [l#2L, s#3, l#37L, s#38] ``` ## How was this patch tested? This PR comes with a set of dedicated tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark spark-25860 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22857.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22857 commit 1d8fefd9b227c6aba50b7e012726ec292c75b5a1 Author: Anton Okolnychyi Date: 2018-10-23T09:09:23Z [SPARK-25860][SQL] Replace Literal(null, _) with FalseLiteral whenever possible --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/19193 Hi @dongjoon-hyun. Yep, I'll close this one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...
Github user aokolnychyi closed the pull request at: https://github.com/apache/spark/pull/19193 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21580: [SPARK-24575][SQL] Prohibit window expressions in...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/21580 [SPARK-24575][SQL] Prohibit window expressions inside WHERE and HAVING clauses ## What changes were proposed in this pull request? As discussed [before](https://github.com/apache/spark/pull/19193#issuecomment-393726964), this PR prohibits window expressions inside WHERE and HAVING clauses. ## How was this patch tested? This PR comes with a dedicated unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark spark-24575 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21580.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21580 commit 9a07ea361eccfefe348db8a9b50acf3c68ec7a56 Author: aokolnychyi Date: 2018-06-17T16:56:40Z [SPARK-24575][SQL] Prohibit window expressions inside WHERE and HAVING clauses --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/19193 @cloud-fan @hvanhovell I created PR #21473 that fixes StackOverflow. Apart from that, I think we might have other potential problems. **1. Window functions inside WHERE and HAVING** Why such cases should be prohibited is described [here](https://stackoverflow.com/questions/13997177/why-no-windowed-functions-in-where-clauses). Spark, on the other hand, does not handle this explicitly and will fail with non-descriptive exceptions. ``` val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b") df.createTempView("t1") spark.sql("SELECT t1.a FROM t1 WHERE RANK() OVER(ORDER BY t1.b) = 1").explain(true) spark.sql("SELECT t1.a FROM t1 WHERE RANK() OVER(ORDER BY t1.b) = 1").show(false) Exception in thread "main" java.lang.UnsupportedOperationException: Cannot evaluate expression: rank(input[1, int, false]) windowspecdefinition(input[1, int, false] ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261) at org.apache.spark.sql.catalyst.expressions.WindowExpression.doGenCode(windowExpressions.scala:278) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105) at scala.Option.getOrElse(Option.scala:121) ... ``` ``` val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b") df.createTempView("t1") spark.sql("SELECT t1.a, MAX(t1.b) FROM t1 GROUP BY t1.a HAVING RANK() OVER(ORDER BY t1.a) = 1").explain(true) spark.sql("SELECT t1.a, MAX(t1.b) FROM t1 GROUP BY t1.a HAVING RANK() OVER(ORDER BY t1.a) = 1").show(false) Exception in thread "main" java.lang.UnsupportedOperationException: Cannot evaluate expression: rank(input[1, int, false]) windowspecdefinition(input[1, int, false] ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261) at org.apache.spark.sql.catalyst.expressions.WindowExpression.doGenCode(windowExpressions.scala:278) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105) at scala.Option.getOrElse(Option.scala:121) ``` Shall this be explicitly validated? **2. HAVING clause using the Dataset API** It seems we cannot use HAVING with aggregate functions in the Dataset API if you have a window function in the same query. The following query works correctly as ``ResolveAggregateFunctions`` will apply once you have the complete plan. I.e., ``ResolveAggregateFunctions`` will apply when you call ``where``. ``` val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b") df.groupBy('a).agg(max('b)).where(sum('b) === 5).show(false) +---+--+ |a |max(b)| +---+--+ |1 |3 | |5 |5 | +---+--+ ``` The query below, however, will fail even though it is a valid one (notice window functions). ``` val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b") df.groupBy('a).agg(max('b), rank().over(window)).where(sum('b) === 5).show(false) Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`b`' given input columns: [a, max(b), RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())];; 'Filter (sum('b) = 5) +- AnalysisBarrier +- Project [a#5, max(b)#14, RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())#15] +- Project [a#5, max(b)#14, RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())#15, RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())#15] +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())#15], [a#5 ASC NULLS FIRST] +- Aggregate [a#5], [a#5, max(b#6) AS max(b)#14] +- Project [_1#2 AS a#5, _2#3 AS b#6] +- LocalRelation [_1#2, _2#3] ``` It fails because ``ExtractWindowExpressions`` will apply when you call ``agg`` and not ``where``. At that point of time, you do not have the full plan and ``ExtractWindowExpressions`` will no
[GitHub] spark pull request #21473: [SPARK-21896][SQL] Fix StackOverflow caused by wi...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/21473#discussion_r192234621 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1744,11 +1744,14 @@ class Analyzer( *it into the plan tree. */ object ExtractWindowExpressions extends Rule[LogicalPlan] { -private def hasWindowFunction(projectList: Seq[NamedExpression]): Boolean = - projectList.exists(hasWindowFunction) +private def hasWindowFunction(exprs: Seq[Expression]): Boolean = + exprs.exists(hasWindowFunction) -private def hasWindowFunction(expr: NamedExpression): Boolean = { +private def hasWindowFunction(expr: Expression): Boolean = { expr.find { +case AggregateExpression(aggFunc, _, _, _) if hasWindowFunction(aggFunc.children) => --- End diff -- I have some doubts that this is the best place for this check. StackOverflow happens in ``extract``. We can also define a separate method and call it inside ``extract``. However, that method will share the same structure as ``hasWindowFunction``. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21473: [SPARK-21896][SQL] Fix StackOverflow caused by wi...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/21473 [SPARK-21896][SQL] Fix StackOverflow caused by window functions inside aggregate functions ## What changes were proposed in this pull request? This PR explicitly prohibits window functions inside aggregates. Currently, this will cause StackOverflow during analysis. See PR #19193 for previous discussion. ## How was this patch tested? This PR comes with a dedicated unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark fix-stackoverflow-window-funcs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21473.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21473 commit 646a96d75c12b2e3c6886bc0cc9743e7ba838c8a Author: aokolnychyi Date: 2018-05-31T10:58:29Z [SPARK-21896][SQL] Fix StackOverflow caused by window functions inside aggregate functions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/19193 @hvanhovell @cloud-fan I think it would be safer to be consistent with other databases and what Spark does for nested aggregate functions. It is really simple to write a subquery to work around any problems. As it is right now, Scenario 2 mentioned above gives a wrong result and Scenario 4 should be double-checked if we want to support nested window functions. Let me known your opinion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/19193 I checked PostgreSQL(10.3), MySQL(8.0), Hive(2.1.0). **1. PostgreSQL** ``` postgres=# CREATE TABLE t1 (c1 integer, c2 integer); postgres=# INSERT INTO t1 VALUES (1, 2), (1, 3), (2,4), (5,5); postgres=# SELECT c1, c2, ROW_NUMBER() OVER() as c3 FROM t1; c1 | c2 | c3 ++ 1 | 2 | 1 1 | 3 | 2 2 | 4 | 3 5 | 5 | 4 (4 rows) postgres=# SELECT c1, MAX(ROW_NUMBER() OVER()) as c3 FROM t1; ERROR: aggregate function calls cannot contain window function calls LINE 1: SELECT c1, MAX(ROW_NUMBER() OVER()) as c3 FROM t1; ``` **2. MySQL** ``` mysql> CREATE TABLE t1 (c1 integer, c2 integer); mysql> INSERT INTO t1 VALUES (1, 2), (1, 3), (2,4), (5,5); mysql> SELECT c1, c2, ROW_NUMBER() OVER() FROM t1; +--+--+-+ | c1 | c2 | ROW_NUMBER() OVER() | +--+--+-+ |1 |2 | 1 | |1 |3 | 2 | |2 |4 | 3 | |5 |5 | 4 | +--+--+-+ 4 rows in set (0.00 sec) mysql> SELECT c1, MAX(ROW_NUMBER() OVER()) as c3 FROM t1; ERROR 3593 (HY000): You cannot use the window function 'row_number' in this context.' ``` **3. Hive** ``` hive> CREATE TABLE t1(c1 INT, c2 INT); hive> INSERT INTO t1 VALUES (1, 2), (1, 3), (2,4), (5,5); hive> SELECT c1, c2, ROW_NUMBER() OVER() as c3 FROM t1; OK 5 5 1 2 4 2 1 3 3 1 2 4 hive> SELECT c1, MAX(ROW_NUMBER() OVER()) as c3 FROM t1; FAILED: SemanticException [Error 10002]: Line 1:15 Invalid column reference 'ROW_NUMBER': (possible column names are: c1, c2) ``` I will adapt the PR to prohibit window functions inside aggregates. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/19193 Let me check other databases and come up with a summary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/19193 @hvanhovell here is a summary of tried scenarios: ``` val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b") val window1 = Window.orderBy('a) val window2 = Window.orderBy('a.desc) ``` **Scenario 1: An aggregate on top of a window expression (did not work before, looks OK now)** ``` df.groupBy().agg(max(rank().over(window1))).explain(true) df.groupBy().agg(max(rank().over(window1))).show(false) == Analyzed Logical Plan == max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())): int Aggregate [max(_we0#27) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#16] +- Project [a#5, b#6, _we0#27, _we0#27] +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#27], [a#5 ASC NULLS FIRST] +- Project [_1#2 AS a#5, _2#3 AS b#6] +- LocalRelation [_1#2, _2#3] == Optimized Logical Plan == Aggregate [max(_we0#27) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#16] +- Project [_we0#27, _we0#27] +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#27], [a#5 ASC NULLS FIRST] +- LocalRelation [a#5] +-+ |max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))| +-+ |4| +-+ ``` **Scenario 2: An aggregate with grouping expressions on top of a window expression** TODO: Is the result wrong? What is expected? ``` df.groupBy('a).agg(max(rank().over(window1))).explain(true) df.groupBy('a).agg(max(rank().over(window1))).show(false) == Analyzed Logical Plan == a: int, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())): int Aggregate [a#5], [a#5, max(_we0#75) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#63] +- Project [a#5, b#6, _we0#75, _we0#75] +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#75], [a#5 ASC NULLS FIRST] +- Project [_1#2 AS a#5, _2#3 AS b#6] +- LocalRelation [_1#2, _2#3] == Optimized Logical Plan == Aggregate [a#5], [a#5, max(_we0#75) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#63] +- Project [a#5, _we0#75, _we0#75] +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#75], [a#5 ASC NULLS FIRST] +- LocalRelation [a#5] +---+-+ |a |max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))| +---+-+ |1 |1| |2 |3| |5 |4| +---+-+ ``` **Scenario 3: A normal aggregate, an aggregate on top of a window expression, a window expression on top of an aggregate in one query** This is resolved in two steps. ``` df.groupBy('a).agg(max(rank().over(window1)), sum('b), sum(sum('b)).over(window2)).explain(true) df.groupBy('a).agg(max(rank().over(window1)), sum('b), sum(sum('b)).over(window2)).show(false) == Analyzed Logical Plan == a: int, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())): int, sum(b): bigint, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$()): bigint Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#116, sum(b)#117L, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#118L] +- Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#116, sum(b)#117L, _w0#137L, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#118L, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#118L] +- Window [sum(_w0#137L) windowspecdefinition(a#5 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#118L], [a#5 DESC NULLS LAST] +- Aggregate [a#5], [a#5, max(_we0#133) AS max(RANK() OVER (ORDER B
[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18692 I am not sure we can infer ``a == b`` if ``a in (0, 2, 3, 4)`` and ``b in (0, 2, 3, 4)``. table 'a' ``` a1 a2 1 2 3 3 4 5 ``` table 'b' ``` b1 b2 1 -1 2 -2 3 -4 ``` ``` SELECT * FROM a, b WHERE a1 in (1, 2) AND b1 in (1, 2) // 1 2 1 -1 // 1 2 2 -2 ``` ``` SELECT * FROM a JOIN b ON a.a1 = b.b1 WHERE a1 in (1, 2) AND b1 in (1, 2) // 1 2 1 -1 ``` Do I miss anything? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18692 Yeah, correct. So, we should revert then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18692 I took a look at ``JoinSelection``. It seems we will not get ``BroadcastHashJoin`` or ``ShuffledHashJoin`` if we revert this rule. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18692 Sure, if you guys think it does not give any performance benefits, then let's revert it. I also had similar concerns but my understanding was that having an inner join with some equality condition can be beneficial during the generation of a physical plan. In other words, Spark should be able to select a more efficient join implementation. I am not sure how it is right now but previously you could have only ``BroadcastNestedLoopJoin`` or ``CartesianProduct`` without any equality condition. Again, that was my assumption based on what I remember. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19193#discussion_r156495899 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1920,7 +1927,34 @@ class Analyzer( case p: LogicalPlan if !p.childrenResolved => p - // Aggregate without Having clause. + // Extract window expressions from aggregate functions. There might be an aggregate whose + // aggregate function contains a window expression as a child, which we need to extract. + // e.g., df.groupBy().agg(max(rank().over(window)) + case a @ Aggregate(groupingExprs, aggregateExprs, child) +if containsAggregateFunctionWithWindowExpression(aggregateExprs) && + a.expressions.forall(_.resolved) => + +val windowExprAliases = new ArrayBuffer[NamedExpression]() +val newAggregateExprs = aggregateExprs.map { expr => + expr.transform { --- End diff -- Hmm, do you actually mean smth like this? ``` val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b") val window1 = Window.orderBy('a) df.groupBy('a).agg(max(sum(sum('b)).over(window1))).explain(true) df.groupBy('a).agg(max(sum(sum('b)).over(window1))).show(false) ``` ``` == Analyzed Logical Plan == a: int, max(sum(sum(b)) OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())): bigint Aggregate [a#5], [a#5, max(_we0#22L) AS max(sum(sum(b)) OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#16L] +- Project [a#5, b#6, _we0#22L, _we0#22L] +- Window [sum(sum(cast(b#6 as bigint))) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we0#22L], [a#5 ASC NULLS FIRST] +- Project [_1#2 AS a#5, _2#3 AS b#6] +- LocalRelation [_1#2, _2#3] == Optimized Logical Plan == Aggregate [a#5], [a#5, max(_we0#22L) AS max(sum(sum(b)) OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#16L] +- Project [a#5, _we0#22L, _we0#22L] +- Window [sum(sum(cast(b#6 as bigint))) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we0#22L], [a#5 ASC NULLS FIRST] +- LocalRelation [a#5, b#6] ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19193#discussion_r156493072 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1920,7 +1927,34 @@ class Analyzer( case p: LogicalPlan if !p.childrenResolved => p - // Aggregate without Having clause. + // Extract window expressions from aggregate functions. There might be an aggregate whose + // aggregate function contains a window expression as a child, which we need to extract. + // e.g., df.groupBy().agg(max(rank().over(window)) + case a @ Aggregate(groupingExprs, aggregateExprs, child) +if containsAggregateFunctionWithWindowExpression(aggregateExprs) && + a.expressions.forall(_.resolved) => + +val windowExprAliases = new ArrayBuffer[NamedExpression]() +val newAggregateExprs = aggregateExprs.map { expr => + expr.transform { --- End diff -- Thanks for looking into this. I am not sure I fully understood "it will push the regular aggregate into the underlying window". Could you, please, elaborate? This is what I tried: ``` val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b") val window1 = Window.orderBy('a) val window2 = Window.orderBy('a.desc) df.groupBy('a).agg(max(rank().over(window1)), sum('b), sum(sum('b)).over(window2)).explain(true) df.groupBy('a).agg(max(rank().over(window1)), sum('b), sum(sum('b)).over(window2)).show(false) ``` It produced the following plans: ``` == Analyzed Logical Plan == a: int, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())): int, sum(b): bigint, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$()): bigint Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#19, sum(b)#20L, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#21L] +- Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#19, sum(b)#20L, _w0#40L, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#21L, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#21L] +- Window [sum(_w0#40L) windowspecdefinition(a#5 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#21L], [a#5 DESC NULLS LAST] +- Aggregate [a#5], [a#5, max(_we0#36) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#19, sum(cast(b#6 as bigint)) AS sum(b)#20L, sum(cast(b#6 as bigint)) AS _w0#40L] +- Project [a#5, b#6, _we0#36, _we0#36] +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#36], [a#5 ASC NULLS FIRST] +- Project [_1#2 AS a#5, _2#3 AS b#6] +- LocalRelation [_1#2, _2#3] == Optimized Logical Plan == Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#19, sum(b)#20L, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#21L] +- Window [sum(_w0#40L) windowspecdefinition(a#5 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#21L], [a#5 DESC NULLS LAST] +- Aggregate [a#5], [a#5, max(_we0#36) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#19, sum(cast(b#6 as bigint)) AS sum(b)#20L, sum(cast(b#6 as bigint)) AS _w0#40L] +- Project [a#5, b#6, _we0#36, _we0#36] +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#36], [a#5 ASC NULLS FIRST] +- LocalRelation [a#5, b#6] ``` The result was: ``` +---+-+--+-+ |a |max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))|sum(b)|sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())| +---+-+--+-+ |5 |4|5 |5| |2 |3|4 |9| |1 |1
[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/19193 @gatorsmile @cloud-fan could you provide any input? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/18692#discussion_r154164912 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala --- @@ -27,6 +27,8 @@ object ExpressionSet { expressions.foreach(set.add) set } + + val empty: ExpressionSet = ExpressionSet(Nil) --- End diff -- I thought that writing ``ExpressionSet.empty`` would be more readable than ``ExpressionSet(Nil)``. Usually, mutable collections have ``def empty()`` and immutable ones have separate objects that represent empty collections (e.g., ``Nil``, ``Stream.Empty``). I defined ``val empty`` since ``ExpressionSet`` is immutable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/18692#discussion_r153420088 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints. + * + * The optimization is applicable only to CROSS joins. For other join types, adding inferred join + * conditions would potentially shuffle children as child node's partitioning won't satisfy the JOIN + * node's requirements which otherwise could have. + * + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right + * relation has 'b = 1', the rule infers 'a = b' as a join predicate. + */ +object EliminateCrossJoin extends Rule[LogicalPlan] with PredicateHelper { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (SQLConf.get.constraintPropagationEnabled) { + eliminateCrossJoin(plan) +} else { + plan +} + } + + private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan transform { +case join@Join(leftPlan, rightPlan, Cross, None) => + val leftConstraints = join.constraints.filter(_.references.subsetOf(leftPlan.outputSet)) + val rightConstraints = join.constraints.filter(_.references.subsetOf(rightPlan.outputSet)) + val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints) + val joinConditionOpt = inferredJoinPredicates.reduceOption(And) + if (joinConditionOpt.isDefined) Join(leftPlan, rightPlan, Inner, joinConditionOpt) else join + } + + private def inferJoinPredicates( + leftConstraints: Set[Expression], + rightConstraints: Set[Expression]): Set[EqualTo] = { + +// iterate through the left constraints and build a hash map that points semantically +// equivalent expressions into attributes +val emptyEquivalenceMap = Map.empty[SemanticExpression, Set[Attribute]] +val equivalenceMap = leftConstraints.foldLeft(emptyEquivalenceMap) { case (map, constraint) => + constraint match { +case EqualTo(attr: Attribute, expr: Expression) => + updateEquivalenceMap(map, attr, expr) +case EqualTo(expr: Expression, attr: Attribute) => + updateEquivalenceMap(map, attr, expr) +case _ => map + } +} + +// iterate through the right constraints and infer join conditions using the equivalence map +rightConstraints.foldLeft(Set.empty[EqualTo]) { case (joinConditions, constraint) => + constraint match { +case EqualTo(attr: Attribute, expr: Expression) => + appendJoinConditions(attr, expr, equivalenceMap, joinConditions) +case EqualTo(expr: Expression, attr: Attribute) => + appendJoinConditions(attr, expr, equivalenceMap, joinConditions) +case _ => joinConditions + } +} + } + + private def updateEquivalenceMap( + equivalenceMap: Map[SemanticExpression, Set[Attribute]], + attr: Attribute, + expr: Expression): Map[SemanticExpression, Set[Attribute]] = { + +val equivalentAttrs = equivalenceMap.getOrElse(expr, Set.empty[Attribute]) +if (equivalentAttrs.contains(attr)) { + equivalenceMap +} else { + equivalenceMap.updated(expr, equivalentAttrs + attr) +} + } + + private def appendJoinConditions( + attr: Attribute, + expr: Expression, + equivalenceMap: Map[SemanticExpression, Set[Attribute]], + joinConditions: Set[EqualTo]): Set[EqualTo] = { + +equivalenceMap.get(expr) match { + case Some(equivalentAttrs) => joinConditions ++ equivalentAttrs.map(EqualTo(attr, _)) + case None => joinConditions +} + } + + // the purpose of this class is to treat 'a === 1 and 1 === 'a as the same expressions + implicit class SemanticExpression(private val expr: Expression) { --- End diff -- I am afraid ``ExpressionSet`` will not help here since we need to map a semantically equivalent expression into a set of attributes that correspond to it. It is not enough to check if there is an equivalent expression. Therefore, ``EquivalentExpressions`` and ``ExpressionSet`` are not applicable (as far as I see). ``EquivalentExpressionMap`` from the previous comment assumes the following workflow: ``` val equivalentExression
[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/18692#discussion_r153329031 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints. + * + * The optimization is applicable only to CROSS joins. For other join types, adding inferred join + * conditions would potentially shuffle children as child node's partitioning won't satisfy the JOIN + * node's requirements which otherwise could have. + * + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right + * relation has 'b = 1', the rule infers 'a = b' as a join predicate. + */ +object EliminateCrossJoin extends Rule[LogicalPlan] with PredicateHelper { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (SQLConf.get.constraintPropagationEnabled) { + eliminateCrossJoin(plan) +} else { + plan +} + } + + private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan transform { +case join@Join(leftPlan, rightPlan, Cross, None) => + val leftConstraints = join.constraints.filter(_.references.subsetOf(leftPlan.outputSet)) + val rightConstraints = join.constraints.filter(_.references.subsetOf(rightPlan.outputSet)) + val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints) + val joinConditionOpt = inferredJoinPredicates.reduceOption(And) + if (joinConditionOpt.isDefined) Join(leftPlan, rightPlan, Inner, joinConditionOpt) else join + } + + private def inferJoinPredicates( + leftConstraints: Set[Expression], + rightConstraints: Set[Expression]): Set[EqualTo] = { + +// iterate through the left constraints and build a hash map that points semantically +// equivalent expressions into attributes +val emptyEquivalenceMap = Map.empty[SemanticExpression, Set[Attribute]] +val equivalenceMap = leftConstraints.foldLeft(emptyEquivalenceMap) { case (map, constraint) => + constraint match { +case EqualTo(attr: Attribute, expr: Expression) => + updateEquivalenceMap(map, attr, expr) +case EqualTo(expr: Expression, attr: Attribute) => + updateEquivalenceMap(map, attr, expr) +case _ => map + } +} + +// iterate through the right constraints and infer join conditions using the equivalence map +rightConstraints.foldLeft(Set.empty[EqualTo]) { case (joinConditions, constraint) => + constraint match { +case EqualTo(attr: Attribute, expr: Expression) => + appendJoinConditions(attr, expr, equivalenceMap, joinConditions) +case EqualTo(expr: Expression, attr: Attribute) => + appendJoinConditions(attr, expr, equivalenceMap, joinConditions) +case _ => joinConditions + } +} + } + + private def updateEquivalenceMap( + equivalenceMap: Map[SemanticExpression, Set[Attribute]], + attr: Attribute, + expr: Expression): Map[SemanticExpression, Set[Attribute]] = { + +val equivalentAttrs = equivalenceMap.getOrElse(expr, Set.empty[Attribute]) +if (equivalentAttrs.contains(attr)) { + equivalenceMap +} else { + equivalenceMap.updated(expr, equivalentAttrs + attr) +} + } + + private def appendJoinConditions( + attr: Attribute, + expr: Expression, + equivalenceMap: Map[SemanticExpression, Set[Attribute]], + joinConditions: Set[EqualTo]): Set[EqualTo] = { + +equivalenceMap.get(expr) match { + case Some(equivalentAttrs) => joinConditions ++ equivalentAttrs.map(EqualTo(attr, _)) + case None => joinConditions +} + } + + // the purpose of this class is to treat 'a === 1 and 1 === 'a as the same expressions + implicit class SemanticExpression(private val expr: Expression) { --- End diff -- Using a Set instead of a List might be beneficial in the proposed rule. What about the following? ``` class EquivalentExpressionMap { private val equivalenceMap = mutable.HashMap.empty[SemanticallyEqualExpr, mutable.Set[Expression]] def put(expression: Expression, equivalentExpression: Expression): Unit = { val equivalentExpressions = equivalenceMap.getOrElse(expression, mutable.Set.empty) if (!equivalentExpressi
[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/18692#discussion_r153066992 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -152,3 +152,99 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * A rule that eliminates CROSS joins by inferring join conditions from propagated constraints. + * + * The optimization is applicable only to CROSS joins. For other join types, adding inferred join + * conditions would potentially shuffle children as child node's partitioning won't satisfy the JOIN + * node's requirements which otherwise could have. + * + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right + * relation has 'b = 1', the rule infers 'a = b' as a join predicate. + */ +object EliminateCrossJoin extends Rule[LogicalPlan] with PredicateHelper { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (SQLConf.get.constraintPropagationEnabled) { + eliminateCrossJoin(plan) +} else { + plan +} + } + + private def eliminateCrossJoin(plan: LogicalPlan): LogicalPlan = plan transform { +case join@Join(leftPlan, rightPlan, Cross, None) => + val leftConstraints = join.constraints.filter(_.references.subsetOf(leftPlan.outputSet)) + val rightConstraints = join.constraints.filter(_.references.subsetOf(rightPlan.outputSet)) + val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints) + val joinConditionOpt = inferredJoinPredicates.reduceOption(And) + if (joinConditionOpt.isDefined) Join(leftPlan, rightPlan, Inner, joinConditionOpt) else join + } + + private def inferJoinPredicates( + leftConstraints: Set[Expression], + rightConstraints: Set[Expression]): Set[EqualTo] = { + +// iterate through the left constraints and build a hash map that points semantically +// equivalent expressions into attributes +val emptyEquivalenceMap = Map.empty[SemanticExpression, Set[Attribute]] +val equivalenceMap = leftConstraints.foldLeft(emptyEquivalenceMap) { case (map, constraint) => + constraint match { +case EqualTo(attr: Attribute, expr: Expression) => + updateEquivalenceMap(map, attr, expr) +case EqualTo(expr: Expression, attr: Attribute) => + updateEquivalenceMap(map, attr, expr) +case _ => map + } +} + +// iterate through the right constraints and infer join conditions using the equivalence map +rightConstraints.foldLeft(Set.empty[EqualTo]) { case (joinConditions, constraint) => + constraint match { +case EqualTo(attr: Attribute, expr: Expression) => + appendJoinConditions(attr, expr, equivalenceMap, joinConditions) +case EqualTo(expr: Expression, attr: Attribute) => + appendJoinConditions(attr, expr, equivalenceMap, joinConditions) +case _ => joinConditions + } +} + } + + private def updateEquivalenceMap( + equivalenceMap: Map[SemanticExpression, Set[Attribute]], + attr: Attribute, + expr: Expression): Map[SemanticExpression, Set[Attribute]] = { + +val equivalentAttrs = equivalenceMap.getOrElse(expr, Set.empty[Attribute]) +if (equivalentAttrs.contains(attr)) { + equivalenceMap +} else { + equivalenceMap.updated(expr, equivalentAttrs + attr) +} + } + + private def appendJoinConditions( + attr: Attribute, + expr: Expression, + equivalenceMap: Map[SemanticExpression, Set[Attribute]], + joinConditions: Set[EqualTo]): Set[EqualTo] = { + +equivalenceMap.get(expr) match { + case Some(equivalentAttrs) => joinConditions ++ equivalentAttrs.map(EqualTo(attr, _)) + case None => joinConditions +} + } + + // the purpose of this class is to treat 'a === 1 and 1 === 'a as the same expressions + implicit class SemanticExpression(private val expr: Expression) { --- End diff -- @gatorsmile I think we just need the case class inside ``EquivalentExpressions`` since we have to map all semantically equivalent expressions into a set of attributes (as opposed to mapping an expression into a set of equivalent expressions). I see two ways to go: 1. Expose the case class inside ``EquivalentExpressions`` with minimum changes in the code base (e.g., using a companion object):
[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/18692#discussion_r152660385 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable + * only to CROSS joins. + * + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate. + */ +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (SQLConf.get.constraintPropagationEnabled) { + inferJoinConditions(plan) +} else { + plan +} + } + + private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform { +case join @ Join(left, right, Cross, conditionOpt) => + val leftConstraints = join.constraints.filter(_.references.subsetOf(left.outputSet)) + val rightConstraints = join.constraints.filter(_.references.subsetOf(right.outputSet)) + val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints) + + val newConditionOpt = conditionOpt match { +case Some(condition) => + val existingPredicates = splitConjunctivePredicates(condition) + val newPredicates = findNewPredicates(inferredJoinPredicates, existingPredicates) + if (newPredicates.nonEmpty) Some(And(newPredicates.reduce(And), condition)) else None +case None => + inferredJoinPredicates.reduceOption(And) + } + if (newConditionOpt.isDefined) Join(left, right, Inner, newConditionOpt) else join --- End diff -- @gatorsmile Thanks for getting back. ``CheckCartesianProducts`` identifies a join of type ``Inner | LeftOuter | RightOuter | FullOuter`` as a cartesian product if there is no join predicate that has references to both relations. If we agree to ignore joins of type Cross that have a condition (in this PR), then the use case in this [discussion](https://github.com/apache/spark/pull/18692#discussion_r144466472) is no longer possible (even if you remove t1.col1 >= t2.col1). Correct? ``PushPredicateThroughJoin`` will push ``t1.col1 = t1.col2 + t2.col2 and t2.col1 = t1.col2 + t2.col2`` into the join condition and the proposed rule will not infer anything and the final join will be of type Cross with a condition that covers both relations. According to the logic of ``CheckCartesianProducts``, it is not considered to be a cartesian product (since there exists a join predicate that covers both relations, e.g. ``t1.col1 = t1.col2 + t2.col2``). So, if I have a confirmation that we need to consider only joins of type Cross and without any join conditions, I can update the PR accordingly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18692 @SimonBin The initial solution handled your case but then there was a decision to restrict the proposed rule to cross joins only. You can find the reason in this [comment](https://github.com/apache/spark/pull/18692#issuecomment-326694822) or in the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/18692#discussion_r145498671 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -152,3 +152,79 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable + * only to CROSS joins. For other join types, adding inferred join conditions would potentially + * shuffle children as child node's partitioning won't satisfy the JOIN node's requirements + * which otherwise could have. + * + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate. + */ +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (SQLConf.get.constraintPropagationEnabled) { + inferJoinConditions(plan) +} else { + plan +} + } + + private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform { +case join @ Join(left, right, Cross, conditionOpt) => + + val rightEqualToPredicates = join.constraints.collect { --- End diff -- I thought about improving the time complexity here via a hash map with semantic equals/hashcode. However, this idea will require a wrapper so I keep it as it is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/18692#discussion_r144722742 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable + * only to CROSS joins. + * + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate. + */ +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (SQLConf.get.constraintPropagationEnabled) { + inferJoinConditions(plan) +} else { + plan +} + } + + private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform { +case join @ Join(left, right, Cross, conditionOpt) => + val leftConstraints = join.constraints.filter(_.references.subsetOf(left.outputSet)) + val rightConstraints = join.constraints.filter(_.references.subsetOf(right.outputSet)) --- End diff -- @gengliangwang Yeah, makes sense. So, ``PushPredicateThroughJoin`` would push the where clause into the join and the proposed rule will infer ``t1.col1 = t2.col1`` and change the join type to INNER. As a result, the final join condition will be ``t1.col1 = t2.col1 and t1.col1 >= t2.col1 and (t1.col1 = t1.col2 + t2.col2 and t2.col1 = t1.col2 + t2.col2)``. Am I right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19252: [SPARK-21969][SQL] CommandUtils.updateTableStats should ...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/19252 @gatorsmile thanks for the feedback. I also covered ``TruncateTableCommand`` with additional tests. However, I see a bit strange behavior while creating a test for ``AlterTableAddPartitionCommand ``. ``` sql(s"CREATE TABLE t1 (col1 int, col2 int) USING PARQUET") sql(s"INSERT INTO TABLE t1 SELECT 1, 2") sql(s"INSERT INTO TABLE t1 SELECT 2, 4") sql("SELECT * FROM t1").show() +++ |col1|col2| +++ | 1| 2| | 2| 4| +++ sql(s"CREATE TABLE t2 (col1 int, col2 int) USING PARQUET PARTITIONED BY (col1)") sql(s"INSERT INTO TABLE t2 SELECT 1, 2") sql(s"INSERT INTO TABLE t2 SELECT 2, 4") sql("SELECT * FROM t2").show() +++ |col2|col1| +++ | 2| 4| | 1| 2| +++ ``` Why are the results different? Is it a bug? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19252: [SPARK-21969][SQL] CommandUtils.updateTableStats ...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/19252 [SPARK-21969][SQL] CommandUtils.updateTableStats should call refreshTable ## What changes were proposed in this pull request? Tables in the catalog cache are not invalidated once their statistics are updated. As a consequence, existing sessions will use the cached information even though it is not valid anymore. Consider and an example below. ``` // step 1 spark.range(100).write.saveAsTable("tab1") // step 2 spark.sql("analyze table tab1 compute statistics") // step 3 spark.sql("explain cost select distinct * from tab1").show(false) // step 4 spark.range(100).write.mode("append").saveAsTable("tab1") // step 5 spark.sql("explain cost select distinct * from tab1").show(false) ``` After step 3, the table will be present in the catalog relation cache. Step 4 will correctly update the metadata inside the catalog but will NOT invalidate the cache. By the way, ``spark.sql("analyze table tab1 compute statistics")`` between step 3 and step 4 would also solve the problem. ## How was this patch tested? Current and additional unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark spark-21969 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19252.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19252 ---- commit ba963b46cd2917315bc2bd0cf237c7d9f79e9d65 Author: aokolnychyi <anton.okolnyc...@sap.com> Date: 2017-09-16T11:57:52Z [SPARK-21969][SQL] CommandUtils.updateTableStats should call refreshTable --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/19193 [WIP][SPARK-21896][SQL] Fix Stack Overflow when window function is nested inside an aggregate function ## What changes were proposed in this pull request? This WIP PR contains a prototype that fixes a StackOverflowError in ``Analyzer``. Shortly speaking, Spark cannot handle window expressions inside aggregate functions. The root cause of the bug is the inability of ``ExtractWindowExpressions`` to extract window expressions from aggregate functions. ``` val df = Seq((1, 2), (1, 3), (2, 4)).toDF("a", "b") val window = Window.orderBy("a") df.groupBy().agg(max(rank().over(window))) // does not work df.select(rank().over(window).alias("rank")).agg(max("rank")) // works ``` It would be nice to get some initial feedback since there are alternative ways for solving this problem. ## How was this patch tested? This PR represents only an idea and was tested manually in several scenarios. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark spark-21896 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19193.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19193 commit c14aa2ff6161de7d45869d91e53b0b25b18ad2dd Author: aokolnychyi <anton.okolnyc...@sap.com> Date: 2017-09-10T19:04:38Z [SPARK-21896][SQL] Fix Stack Overflow when window function is nested inside an aggregate function --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/18692#discussion_r137343500 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable + * only to CROSS joins. + * + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate. + */ +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (SQLConf.get.constraintPropagationEnabled) { + inferJoinConditions(plan) +} else { + plan +} + } + + private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform { +case join @ Join(left, right, Cross, conditionOpt) => + val leftConstraints = join.constraints.filter(_.references.subsetOf(left.outputSet)) + val rightConstraints = join.constraints.filter(_.references.subsetOf(right.outputSet)) + val inferredJoinPredicates = inferJoinPredicates(leftConstraints, rightConstraints) + + val newConditionOpt = conditionOpt match { +case Some(condition) => + val existingPredicates = splitConjunctivePredicates(condition) + val newPredicates = findNewPredicates(inferredJoinPredicates, existingPredicates) + if (newPredicates.nonEmpty) Some(And(newPredicates.reduce(And), condition)) else None +case None => + inferredJoinPredicates.reduceOption(And) + } + if (newConditionOpt.isDefined) Join(left, right, Inner, newConditionOpt) else join --- End diff -- And what about CROSS joins with join conditions? Not sure if they will benefit from the proposed rule, but it is better to ask. ``` Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1") Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t2") val df = spark.sql("SELECT * FROM t1 CROSS JOIN t2 ON t1.col1 >= t2.col1 WHERE t1.col1 = 1 AND t2.col1 = 1") df.explain(true) == Optimized Logical Plan == Join Cross, (col1#40 >= col1#42) :- Filter (isnotnull(col1#40) && (col1#40 = 1)) : +- Relation[col1#40,col2#41] parquet +- Filter (isnotnull(col1#42) && (col1#42 = 1)) +- Relation[col1#42,col2#43] parquet ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/18692#discussion_r137343433 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -152,3 +152,71 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable + * only to CROSS joins. + * + * For instance, if there is a CROSS join, where the left relation has 'a = 1' and the right + * relation has 'b = 1', then the rule infers 'a = b' as a join predicate. + */ +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper { --- End diff -- I also thought about this but `InferFiltersFromConstraints` does not change considered join types. Therefore, I kept them separated. In addition, I thought about renaming it to `EliminateCrossJoin`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18692 @gatorsmile what is our decision here? Shall we wait until SPARK-21652 is resolved? In the meantime, I can add some tests and see how the proposed rule works together with all others. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18909: [MINOR][SQL] Additional test case for CheckCartesianProd...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18909 @gatorsmile sure, this PR is only about tests, I was just wondering what is planned regarding cross joins with inequality conditions. I borrowed several tests from PR #16762 and added additional ones. As I mentioned, there is a small overlap between the existing tests and proposed ones but they are defined at different levels. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18909: [MINOR][SQL] Additional test case for CheckCartesianProd...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18909 @gatorsmile I took a look at both PRs. I quickly scanned PR #14866 and did not find tests for existence joins. Also, `SQLConf.CROSS_JOINS_ENABLED = true` is checked only for `left_outer`. So, the proposed tests slightly improve the coverage. PR #16762 checks everything from a different prospective than the proposed rules and has some unique scenarios compared to PR #14866. The main question that PR #16762 rises is about, for instance, inner joins with inequality conditions. As far as I understood, the ability to detect such cartesian products was the motivation to move the check away from the Optimizer. Is it still planned? Cannot this be also done by modifying the existing rule in the Optimizer? Currently, it only checks that there are conditions which reference to both sides. Instead, it can rely on equality predicates, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18909: [MINOR][SQL] Additional test case for CheckCartes...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/18909 [MINOR][SQL] Additional test case for CheckCartesianProducts rule ## What changes were proposed in this pull request? While discovering optimization rules and their test coverage, I did not find any tests for `CheckCartesianProducts` in the Catalyst folder. So, I decided to create a new test suite. Once I finished, I found a test in `JoinSuite` for this functionality so feel free to discard this change if it does not make much sense. The proposed test suite covers a few additional use cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark check-cartesian-join-tests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18909.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18909 commit 98b54cad45b638515d36966a1e64955f4f1531d1 Author: aokolnychyi <anton.okolnyc...@sap.com> Date: 2017-08-09T21:13:02Z [MINOR][SQL] Additional test case for CheckCartesianProducts rule --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18692: [SPARK-21417][SQL] Infer join conditions using propagate...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18692 @gatorsmile I updated the rule to cover cross join cases. Regarding the case with the redundant condition mentioned by you, I opened [SPARK-21652](https://issues.apache.org/jira/browse/SPARK-21652). It is an existing issue and is not caused by the proposed rule. BTW, I can try to fix it once we agree on a solution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18692: [SPARK-21417][SQL] Infer join conditions using pr...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/18692#discussion_r130662925 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -152,3 +152,72 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * A rule that uses propagated constraints to infer join conditions. The optimization is applicable + * only to inner joins. + * + * For instance, if there is a join, where the left relation has 'a = 1' and the right relation + * has 'b = 1', then the rule infers 'a = b' as a join predicate. Only semantically new predicates + * are appended to the existing join condition. + */ +object InferJoinConditionsFromConstraints extends Rule[LogicalPlan] with PredicateHelper { + + def apply(plan: LogicalPlan): LogicalPlan = { +if (SQLConf.get.constraintPropagationEnabled) { + inferJoinConditions(plan) +} else { + plan +} + } + + private def inferJoinConditions(plan: LogicalPlan): LogicalPlan = plan transform { +case join @ Join(left, right, Inner, conditionOpt) => --- End diff -- I also thought about this but decided to start with a smaller scope. The motivation was that `"SELECT * FROM t1, t2"` is resolved into an Inner Join and one has to explicitly use the Cross Join syntax to allow cartesian products. I was not sure if it was OK to replace an explicit Cross Join with a join of a different type. Semantically, we can have `InnerLike` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18692 @gatorsmile I took a look at the case above. Indeed, the proposed rule triggers this issue but only indirectly. In the example above, the optimizer will never reach a fixed point. Please, find my investigation below. ``` ... // The new rule infers correct join predicates Join Inner, ((col2#33 = col#34) && (col1#32 = col#34)) :- Filter ((col1#32 = col2#33) && (col1#32 = 1)) : +- Relation[col1#32,col2#33] parquet +- Filter (col#34 = 1) +- Relation[col#34] parquet // InferFiltersFromConstraints adds more filters Join Inner, ((col2#33 = col#34) && (col1#32 = col#34)) :- Filter col2#33 = 1) && isnotnull(col1#32)) && isnotnull(col2#33)) && ((col1#32 = col2#33) && (col1#32 = 1))) : +- Relation[col1#32,col2#33] parquet +- Filter (isnotnull(col#34) && (col#34 = 1)) +- Relation[col#34] parquet // ConstantPropagation is applied Join Inner, ((col2#33 = col#34) && (col1#32 = col#34)) !:- Filter (col2#33 = 1) && isnotnull(col2#33)) && isnotnull(col1#32)) && ((1 = col2#33) && (col1#32 = 1))) : +- Relation[col1#32,col2#33] parquet +- Filter (isnotnull(col#34) && (col#34 = 1)) +- Relation[col#34] parquet // (Important) InferFiltersFromConstraints infers (col1#32 = col2#33), which is added to the join condition. Join Inner, ((col1#32 = col2#33) && ((col2#33 = col#34) && (col1#32 = col#34))) !:- Filter (col2#33 = 1) && isnotnull(col2#33)) && isnotnull(col1#32)) && ((1 = col2#33) && (col1#32 = 1))) : +- Relation[col1#32,col2#33] parquet +- Filter (isnotnull(col#34) && (col#34 = 1)) +- Relation[col#34] parquet // PushPredicateThroughJoin pushes down (col1#32 = col2#33) and then CombineFilters produces Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) !:- Filter isnotnull(col1#32) && (col2#33 = 1)) && isnotnull(col2#33)) && ((1 = col2#33) && (col1#32 = 1))) && (col2#33 = col1#32)) : +- Relation[col1#32,col2#33] parquet +- Filter (isnotnull(col#34) && (col#34 = 1)) +- Relation[col#34] parquet ``` After that, `ConstantPropagation` replaces `(col2#33 = col1#32)` as `(1 = 1)`, `BooleanSimplification` removes `(1 = 1)`, `InferFiltersFromConstraints` infers `(col2#33 = col1#32)` again and the procedure repeats forever. Since `InferFiltersFromConstraints` is the last optimization rule, we have the redundant condition mentioned by you. The Optimizer without the new rule will also not converge on the following query: ``` Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1") Seq(1, 2).toDF("col").write.saveAsTable("t2") spark.sql("SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND 1 = t1.col2 AND t1.col1 = t2.col AND t1.col2 = t2.col").explain(true) ``` Correct me if I am wrong, but it seems like an issue with the existing rules. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18740: [SPARK-21538][SQL] Attribute resolution inconsist...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/18740#discussion_r129911780 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -1304,6 +1304,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext { assert(rlike3.count() == 0) } } + + test("SPARK-21538: Attribute resolution inconsistency in Dataset API") { +val df = spark.range(1).withColumnRenamed("id", "x") +checkAnswer(df.sort(col("id")), df.sort("id")) +checkAnswer(df.sort($"id"), df.sort("id")) +checkAnswer(df.sort('id), df.sort("id")) +checkAnswer(df.orderBy('id), df.sort("id")) +checkAnswer(df.orderBy("id"), df.sort("id")) --- End diff -- Indeed, looks much better. I appreciate the explanation and will take this into account in the future. I will update the test in a minute, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18740: [SPARK-21538][SQL] Attribute resolution inconsist...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/18740 [SPARK-21538][SQL] Attribute resolution inconsistency in the Dataset API ## What changes were proposed in this pull request? This PR contains a tiny update that removes an attribute resolution inconsistency in the Dataset API. The following example is taken from the ticket description: ``` spark.range(1).withColumnRenamed("id", "x").sort(col("id")) // works spark.range(1).withColumnRenamed("id", "x").sort($"id") // works spark.range(1).withColumnRenamed("id", "x").sort('id) // works spark.range(1).withColumnRenamed("id", "x").sort("id") // fails with: org.apache.spark.sql.AnalysisException: Cannot resolve column name "id" among (x); ``` The above `AnalysisException` happens because the last case calls `Dataset.apply()` to convert strings into columns, which triggers attribute resolution. To make the API consistent between overloaded methods, this PR delays the resolution and constructs columns directly. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark spark-21538 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18740.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18740 commit b0fbd5ee98de0ed725d96e63943da934150f6c3e Author: aokolnychyi <anton.okolnyc...@sap.com> Date: 2017-07-26T19:56:44Z [SPARK-21538][SQL] Fix attribute resolution inconsistency in the Dataset API --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18692 @gatorsmile thanks for the input. Let me check that I understood everything correctly. So, I keep it as a separate rule that is applied only if constraint propagation enabled. Inside the rule, I rely on `join.constraints` to infer the join conditions. The remaining logic stays the same. Correct? I guess that `InferFiltersFromConstraints` can be used as a guideline. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18692: [SPARK-21417][SQL] Detect joind conditions via filter ex...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18692 @cloud-fan which rule do you mean? `PushPredicateThroughJoin` seems to be the closest by logic but it has a slightly different purpose and does not cover this use case. In fact, I used the proposed rule in conjunction with `PushPredicateThroughJoin` in the tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18692: [SPARK-21417][SQL] Detect joind conditions via fi...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/18692 [SPARK-21417][SQL] Detect joind conditions via filter expressions ## What changes were proposed in this pull request? This PR adds an optimization rule that infers join conditions based on filter expressions that are specified. For example, `SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND t2.col2 = 1` can be transformed into `SELECT * FROM t1 JOIN t2 ON t1.col1 = t2.col2 WHERE t1.col1 = 1 AND t2.col2 = 1`. Refer to the corresponding ticket and tests for more details. ## How was this patch tested? This patch comes with a new test suite to cover the implemented logic. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark spark-21417 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18692.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18692 commit e67d4d3c0bdf5cac4c6b17b50314984a2a6378d2 Author: aokolnychyi <anton.okolnyc...@sap.com> Date: 2017-07-18T18:49:16Z [SPARK-21417][SQL] Detect joind conditions via filter expressions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18583: [SPARK-21332][SQL] Incorrect result type inferred for so...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18583 Can we, please, trigger this one more time? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18583: [SPARK-21332][SQL] Incorrect result type inferred...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/18583 [SPARK-21332][SQL] Incorrect result type inferred for some decimal expressions ## What changes were proposed in this pull request? This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below: ``` val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil) val sc = spark.sparkContext val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12))) val df = spark.createDataFrame(rdd, inputSchema) // Works correctly since no nested decimal expression is involved // Expected result type: (26, 6) * (26, 6) = (38, 12) df.select($"col" * $"col").explain(true) df.select($"col" * $"col").printSchema() // Gives a wrong result since there is a nested decimal expression that should be visited first // Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18) df.select($"col" * $"col" * $"col").explain(true) df.select($"col" * $"col" * $"col").printSchema() ``` The example above gives the following output: ``` // Correct result without sub-expressions == Parsed Logical Plan == 'Project [('col * 'col) AS (col * col)#4] +- LogicalRDD [col#1] == Analyzed Logical Plan == (col * col): decimal(38,12) Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6, DecimalType(38,12)) AS (col * col)#4] +- LogicalRDD [col#1] == Optimized Logical Plan == Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4] +- LogicalRDD [col#1] == Physical Plan == *Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4] +- Scan ExistingRDD[col#1] // Schema root |-- (col * col): decimal(38,12) (nullable = true) // Incorrect result with sub-expressions == Parsed Logical Plan == 'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11] +- LogicalRDD [col#1] == Analyzed Logical Plan == ((col * col) * col): decimal(38,12) Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6, DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6, DecimalType(38,12)) AS ((col * col) * col)#11] +- LogicalRDD [col#1] == Optimized Logical Plan == Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11] +- LogicalRDD [col#1] == Physical Plan == *Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11] +- Scan ExistingRDD[col#1] // Schema root |-- ((col * col) * col): decimal(38,12) (nullable = true) ``` ## How was this patch tested? This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark spark-21332 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18583.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18583 commit 7b35ed2e497f3660e100ad103244beea767f2a73 Author: aokolnychyi <anton.okolnyc...@sap.com> Date: 2017-07-09T12:54:48Z [SPARK-21332][SQL] Fix the incorrect result type inferred for some decimal expressions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18368: [SPARK-21102][SQL] Refresh command is too aggressive in ...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18368 @gatorsmile should be fixed now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18368: [SPARK-21102][SQL] Make refresh resource command less ag...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18368 @shaneknapp It seems that the build fails with an exception non-related to the PR. Therefore, I will just close this one and open a new one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18368: [SPARK-21102][SQL] Make refresh resource command less ag...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18368 @shaneknapp can we trigger this one more time, please? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18368: [SPARK-21102][SQL] Make refresh resource command ...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/18368 [SPARK-21102][SQL] Make refresh resource command less aggressive in p⦠### Idea This PR adds validation to REFRESH sql statements. Currently, users can specify whatever they want as resource path. For example, spark.sql("REFRESH ! $ !") will be executed without any exceptions. ### Implementation I am not sure that my current implementation is the most optimal, so any feedback is appreciated. My first idea was to make the grammar as strict as possible. Unfortunately, there were some problems. I tried the approach below: SqlBase.g4 ``` ... | REFRESH TABLE tableIdentifier #refreshTable | REFRESH resourcePath #refreshResource ... resourcePath : STRING | (IDENTIFIER | number | nonReserved | '/' | '-')+ // other symbols can be added if needed ; ``` It is not flexible enough and requires to explicitly mention all possible symbols. Therefore, I came up with the current approach that is implemented in the code. Let me know your opinion on which one is better. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark spark-21102 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18368.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18368 commit fc2b7c02fab7f570ae3ca080ae1c2c9502300de7 Author: aokolnychyi <anton.okolnyc...@sap.com> Date: 2017-06-19T18:17:18Z [SPARK-21102][SQL] Make refresh resource command less aggressive in parsing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18252: [SPARK-17914][SQL] Fix parsing of timestamp strings with...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18252 @wzhfy @rxin @ueshin can someone, please, merge this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18252: [SPARK-17914][SQL] Fix parsing of timestamp strin...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/18252#discussion_r121252397 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -399,13 +399,13 @@ object DateTimeUtils { digitsMilli += 1 } -if (!justTime && isInvalidDate(segments(0), segments(1), segments(2))) { - return None +while (digitsMilli > 6) { --- End diff -- @wzhfy done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18252: [SPARK-17914][SQL] Fix parsing of timestamp strin...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/18252#discussion_r121251811 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -32,7 +32,7 @@ import org.apache.spark.unsafe.types.UTF8String * Helper functions for converting between internal and external date and time representations. * Dates are exposed externally as java.sql.Date and are represented internally as the number of * dates since the Unix epoch (1970-01-01). Timestamps are exposed externally as java.sql.Timestamp - * and are stored internally as longs, which are capable of storing timestamps with 100 nanosecond + * and are stored internally as longs, which are capable of storing timestamps with microsecond --- End diff -- Sure, but the previous comment, which was introduced in [this](https://github.com/apache/spark/commit/6b7f2ceafdcbb014791909747c2210b527305df9) commit, was no longer correct. The logic was changed in [this](https://github.com/apache/spark/commit/a290814877308c6fa9b0f78b1a81145db7651ca4) commit and now it is up to microseconds. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18252: [SPARK-17914][SQL] Fix parsing of timestamp strings with...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/18252 @ueshin good point, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18252: [SPARK-17914][SQL] Fix parsing of timestamp strin...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/18252 [SPARK-17914][SQL] Fix parsing of timestamp strings with nanoseconds The PR contains a tiny change to fix the way Spark parses string literals into timestamps. Currently, some timestamps that contain nanoseconds are corrupted during the conversion from internal UTF8Strings into the internal representation of timestamps. Consider the following example: ``` spark.sql("SELECT cast('2015-01-02 00:00:00.1' as TIMESTAMP)").show(false) ++ |CAST(2015-01-02 00:00:00.1 AS TIMESTAMP)| ++ |2015-01-02 00:00:00.01 | ++ ``` The fix was tested with existing tests. Also, there is a new test to cover cases that did not work previously. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark spark-17914 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18252.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18252 commit 2f232a7bda28fb42759ee35923044f886a1ff19e Author: aokolnychyi <anton.okolnyc...@sap.com> Date: 2017-06-08T18:52:14Z [SPARK-17914][SQL] Fix parsing of timestamp strings with nanoseconds --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/16329#discussion_r97589440 --- Diff: examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql; + +// $example on:typed_custom_aggregation$ +import java.io.Serializable; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.TypedColumn; +import org.apache.spark.sql.expressions.Aggregator; +// $example off:typed_custom_aggregation$ + +public class JavaUserDefinedTypedAggregation { + + // $example on:typed_custom_aggregation$ + public static class Employee implements Serializable { +private String name; +private long salary; + +// Constructors, getters, setters... +// $example off:typed_custom_aggregation$ +public String getName() { + return name; +} + +public void setName(String name) { + this.name = name; +} + +public long getSalary() { + return salary; +} + +public void setSalary(long salary) { + this.salary = salary; +} +// $example on:typed_custom_aggregation$ + } + + public static class Average implements Serializable { +private long sum; +private long count; + +// Constructors, getters, setters... +// $example off:typed_custom_aggregation$ +public Average() { +} + +public Average(long sum, long count) { + this.sum = sum; + this.count = count; +} + +public long getSum() { + return sum; +} + +public void setSum(long sum) { + this.sum = sum; +} + +public long getCount() { + return count; +} + +public void setCount(long count) { + this.count = count; +} +// $example on:typed_custom_aggregation$ + } + + public static class MyAverage extends Aggregator<Employee, Average, Double> { +// A zero value for this aggregation. Should satisfy the property that any b + zero = b +public Average zero() { --- End diff -- @srowen `Average` is a Java bean that holds current sum and count. It is defined earlier. Here it represents a zero value. `MyAverage`, in turn, is the actual aggregator that accepts instances of the `Employee` class, stores intermediate results using an instance of`Average`, and produces `Double` as a result. I can rename `MyAverage` to `MyAverageAggregator` if this makes things clearer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/16329#discussion_r93606051 --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql + +// $example on:typed_custom_aggregation$ +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.SparkSession +// $example off:typed_custom_aggregation$ + +object UserDefinedTypedAggregation { + + // $example on:typed_custom_aggregation$ + case class Employee(name: String, salary: Long) + case class Average(var sum: Long, var count: Long) + + object MyAverage extends Aggregator[Employee, Average, Double] { +// A zero value for this aggregation. Should satisfy the property that any b + zero = b +def zero: Average = Average(0L, 0L) +// Combine two values to produce a new value. For performance, the function may modify `buffer` +// and return it instead of constructing a new object +def reduce(buffer: Average, employee: Employee): Average = { + buffer.sum += employee.salary + buffer.count += 1 + buffer +} +// Merge two intermediate values +def merge(b1: Average, b2: Average): Average = Average(b1.sum + b2.sum, b1.count + b2.count) --- End diff -- @michalsenkyr It is not required to create a new object in the `merge` method. One can modify the vars and return the existing object just like in the `reduce` method. However, it is less critical here since this method will be called on pre-aggregated data and not for every element. On the one hand, I can apply here the same approach as in the `reduce` method to make the example consistent. On the other hand, the current code shows that it is not mandatory to modify vars. Probably, a comment might help. I am not sure which approach is better. Therefore, I am open to suggestions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/16329#discussion_r93395745 --- Diff: docs/sql-programming-guide.md --- @@ -382,6 +382,52 @@ For example: +## Aggregations + +The [built-in DataFrames functions](api/scala/index.html#org.apache.spark.sql.functions$) mentioned +before provide such common aggregations as `count()`, `countDistinct()`, `avg()`, `max()`, `min()`, etc. +While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in +[Scala](api/scala/index.html#org.apache.spark.sql.expressions.scalalang.typed$) and +[Java](api/java/org/apache/spark/sql/expressions/javalang/typed.html) to work with strongly typed Datasets. +Moreover, users are not limited to the predefined aggregate functions and can create their own. --- End diff -- I also thought about this. In my view, it will be appropriate to have a separate subsection before Aggregations to show how to apply predefined SQL functions, including writing your own UDFs. That's will be worth another pull request. Alternatively, I can also try to extend this one to add an example of `max()` or `min()`. @marmbrus what's your opinion? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL progra...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/16329 @marmbrus I have updated the pull request. The compiled docs can be found [here](https://aokolnychyi.github.io/spark-docs/sql-programming-guide.html). I did not manage to build the Java API docs. I believe the problem is in my local installation. Therefore, I checked each url manually, they should work once the API docs are compiled. I will verify everything one more time in the nightly build. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/16329#discussion_r93019316 --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql + +// $example on:untyped_custom_aggregation$ +import org.apache.spark.sql.expressions.MutableAggregationBuffer +import org.apache.spark.sql.expressions.UserDefinedAggregateFunction +import org.apache.spark.sql.types._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession +// $example off:untyped_custom_aggregation$ + +object UserDefinedUntypedAggregation { + + // $example on:untyped_custom_aggregation$ + object MyAverage extends UserDefinedAggregateFunction { +// Data types of input arguments +def inputSchema: StructType = StructType(StructField("salary", LongType) :: Nil) +// Data types of values in the aggregation buffer +def bufferSchema: StructType = { + StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) +} +// The data type of the returned value +def dataType: DataType = DoubleType +// Whether this function always returns the same output on the identical input +def deterministic: Boolean = true +// Initializes the given aggregation buffer +def initialize(buffer: MutableAggregationBuffer): Unit = { --- End diff -- Agree, I will try to add a small but meaningful explanation here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/16329#discussion_r93019035 --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql + +// $example on:untyped_custom_aggregation$ +import org.apache.spark.sql.expressions.MutableAggregationBuffer +import org.apache.spark.sql.expressions.UserDefinedAggregateFunction +import org.apache.spark.sql.types._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession +// $example off:untyped_custom_aggregation$ + +object UserDefinedUntypedAggregation { + + // $example on:untyped_custom_aggregation$ + object MyAverage extends UserDefinedAggregateFunction { +// Data types of input arguments +def inputSchema: StructType = StructType(StructField("salary", LongType) :: Nil) --- End diff -- Yes, your point is definitely reasonable. Now I am thinking whether I should keep "salary" here. As an option, I can replace "salary" with "inputColumn" or something like this to make `MyAverage` more generic. No reason to bound it to salary. What's your opinion? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16329: [SPARK-16046][DOCS] Aggregations in the Spark SQL...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/16329 [SPARK-16046][DOCS] Aggregations in the Spark SQL programming guide ## What changes were proposed in this pull request? - A separate subsection for Aggregations under âGetting Startedâ in the Spark SQL programming guide. It mentions which are predefined and how users can create their own. - Examples of using the `UserDefinedAggregateFunction` abstract class for untyped aggregations in Java and Scala. - Examples of using the `Aggregator` abstract class for type-safe aggregations in Java and Scala. - Python is not covered. - The PR might not resolve the ticket since I do not know what was exactly planned by the author. In total, there are four new standalone examples that can be executed via `spark-submit` or `run-example`. The updated Spark SQL programming guide references to these examples and does not contain hard-coded snippets. ## How was this patch tested? The patch was tested locally by building the docs. The examples were run as well. ![image](https://cloud.githubusercontent.com/assets/6235869/21292915/04d9d084-c515-11e6-811a-999d598dffba.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark SPARK-16046 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16329.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16329 commit 8c18b2a980ddfe220c380d0a60e379d9fdeac488 Author: aokolnychyi <okolnychyyan...@gmail.com> Date: 2016-12-18T10:08:03Z [SPARK-16046][DOCS] Aggregations in the Spark SQL programming guide --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16024: [MINOR][DOCS] Updates to the Accumulator example ...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/16024#discussion_r89791975 --- Diff: docs/programming-guide.md --- @@ -1378,29 +1378,36 @@ res2: Long = 10 While this code used the built-in support for accumulators of type Long, programmers can also create their own types by subclassing [AccumulatorV2](api/scala/index.html#org.apache.spark.util.AccumulatorV2). -The AccumulatorV2 abstract class has several methods which need to override: -`reset` for resetting the accumulator to zero, and `add` for add anothor value into the accumulator, `merge` for merging another same-type accumulator into this one. Other methods need to override can refer to scala API document. For example, supposing we had a `MyVector` class -representing mathematical vectors, we could write: +The AccumulatorV2 abstract class has several methods which one has to override: +`reset` for resetting the accumulator to zero, `add` for adding another value into the accumulator, +and `merge` for merging another same-type accumulator into this one. Other methods that must be overridden +are contained in the [API documentation](api/scala/index.html#org.apache.spark.util.AccumulatorV2). +For example, supposing we had a `MathVector` class representing mathematical vectors, we could write: --- End diff -- Your point makes sense to me. The renaming was to done to improve the consistency between Java/Scala examples. For instance, the Scala example used MyVector, while the Java one - Vector. Will it be a better idea to keep MyVector in the Scala example and reuse this name in Java? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16024: [MINOR][DOCS] Updates to the Accumulator example ...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/16024#discussion_r89788527 --- Diff: docs/programming-guide.md --- @@ -1424,29 +1431,38 @@ accum.value(); // returns 10 {% endhighlight %} -Programmers can also create their own types by subclassing -[AccumulatorParam](api/java/index.html?org/apache/spark/AccumulatorParam.html). -The AccumulatorParam interface has two methods: `zero` for providing a "zero value" for your data -type, and `addInPlace` for adding two values together. For example, supposing we had a `Vector` class -representing mathematical vectors, we could write: +While this code used the built-in support for accumulators of type Long, programmers can also +create their own types by subclassing [AccumulatorV2](api/scala/index.html#org.apache.spark.util.AccumulatorV2). +The AccumulatorV2 abstract class has several methods which one has to override: +`reset` for resetting the accumulator to zero, `add` for adding another value into the accumulator, +and `merge` for merging another same-type accumulator into this one. Other methods that must be overridden +are contained in the [API documentation](api/scala/index.html#org.apache.spark.util.AccumulatorV2). +For example, supposing we had a `MathVector` class representing mathematical vectors, we could write: {% highlight java %} -class VectorAccumulatorParam implements AccumulatorParam { - public Vector zero(Vector initialValue) { -return Vector.zeros(initialValue.size()); +class MathVectorAccumulatorV2 implements AccumulatorV2<MathVector, MathVector> { + + private MathVector mathVector = MathVector.newZeroVector(); + + public void reset() { +mathVector.reset(); } - public Vector addInPlace(Vector v1, Vector v2) { -v1.addInPlace(v2); return v1; + + public void add(MathVector v) { +mathVector.add(v); } + + ... } -// Then, create an Accumulator of this type: -Accumulator vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam()); +// Create an instance +MathVectorAccumulatorV2 mathVectorAcc = new MathVectorAccumulatorV2(); +// Register it +jsc.sc().register(mathVectorAcc, "MathVectorAcc1"); {% endhighlight %} -In Java, Spark also supports the more general [Accumulable](api/java/index.html?org/apache/spark/Accumulable.html) -interface to accumulate data where the resulting type is not the same as the elements added (e.g. build -a list by collecting together elements). +Note that, when programmers define their own type of AccumulatorV2,the resulting type can be different --- End diff -- Sure, I'll update that. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16024: [MINOR][DOCS] Updates to the Accumulator example ...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/16024 [MINOR][DOCS] Updates to the Accumulator example in the programming guide. Fixed typos, AccumulatorV2 in Java ## What changes were proposed in this pull request? This pull request contains updates to Scala and Java Accumulator code snippets in the programming guide. - For Scala, the pull request fixes the signature of the 'add()' method in the custom Accumulator, which contained two params (as the old AccumulatorParam) instead of one (as in AccumulatorV2). - The Java example was updated to use the AccumulatorV2 class since AccumulatorParam is marked as deprecated. - Scala and Java examples are more consistent now. ## How was this patch tested? This patch was tested manually by building the docs locally. ![image](https://cloud.githubusercontent.com/assets/6235869/20652099/77d98d18-b4f3-11e6-8565-a995fe8cf8e5.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark fixed_accumulator_example Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16024.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16024 commit f4388a84181e7c40003db0b1574602350160721a Author: aokolnychyi <okolnychyyan...@gmail.com> Date: 2016-11-27T21:30:26Z [MINOR][DOCS] Updates to the Accumulator example in the programming guide. Fixed typos, AccumulatorV2 in Java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14050: [MINOR][EXAMPLES] Window function examples
Github user aokolnychyi closed the pull request at: https://github.com/apache/spark/pull/14050 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14119: [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL programmi...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/14119 **Summary of the updates** - `JavaSparkSQL.java` file was removed. I kept it initially since the file itself was quite old (2+ years) and it was present in your original WIP branch alongside the new file. But I can confirm that the new file covers the same functionality and more. No need to keep the old one, agree with you. - Apache header in `JavaSqlDataSourceExample.java` was added. - `$`-notation instead of `df.col("...")` in Scala examples. - `col("...")` instead of `df.col("...")` in Java examples. - Blank lines before `{% include_example programmatic_schema ... }` were added. However, everything was rendered fine locally even without them. - 2 space indentation for chained method calls. My fault, sorry. - Actual outputs for all `show()` calls were added. - Tested manually and via `./dev/run-tests`. **Open questions** - Shall I add blank lines before each `{% include_example ... }` or only before those two examples? - I pointed to a wrong location that exceeded the length limit. It is exactly the same functionality but in Java. So, 113 and 117 lines of the `JavaSqlDataSourceExample.java` file. In my view, it would make sense to keep them as they are now for the better looking documentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14119: [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL pr...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/14119#discussion_r70173180 --- Diff: examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSqlExample.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql; + +// $example on:programmatic_schema$ +import java.util.ArrayList; +import java.util.List; +// $example off:programmatic_schema$ +// $example on:create_ds$ +import java.util.Arrays; --- End diff -- Here the imports do not follow the alphabetical order to avoid too many imports groups in the documentation (there would be a blank line between each "example on/off" block). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14119: [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL pr...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/14119#discussion_r70173131 --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/SqlDataSourceExample.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql + +import org.apache.spark.sql.SparkSession + +object SqlDataSourceExample { + + case class Person(name: String, age: Long) + + def main(args: Array[String]) { +val spark = SparkSession +.builder() +.appName("Spark SQL Data Soures Example") +.config("spark.some.config.option", "some-value") +.getOrCreate() + +runBasicDataSourceExample(spark) +runBasicParquetExample(spark) +runParquetSchemaMergingExample(spark) +runJsonDatasetExample(spark) + +spark.stop() + } + + private def runBasicDataSourceExample(spark: SparkSession): Unit = { +// $example on:generic_load_save_functions$ +val usersDF = spark.read.load("examples/src/main/resources/users.parquet") +usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") +// $example off:generic_load_save_functions$ +// $example on:manual_load_options$ +val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") +peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet") +// $example off:manual_load_options$ +// $example on:direct_sql$ +val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") --- End diff -- Here the line length slightly exceeds the limit to make the look of the documentation better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14119: [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL pr...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/14119#discussion_r70173058 --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala --- @@ -41,43 +35,47 @@ object HiveFromSpark { // in the current directory and creates a directory configured by `spark.sql.warehouse.dir`, // which defaults to the directory `spark-warehouse` in the current directory that the spark // application is started. -val spark = SparkSession.builder - .appName("HiveFromSpark") - .enableHiveSupport() - .getOrCreate() + +// $example on:spark_hive$ +// warehouseLocation points to the default location for managed databases and tables +val warehouseLocation = "file:${system:user.dir}/spark-warehouse" + +val spark = SparkSession +.builder() +.appName("Spark Hive Example") +.config("spark.sql.warehouse.dir", warehouseLocation) +.enableHiveSupport() +.getOrCreate() import spark.implicits._ import spark.sql sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") -sql(s"LOAD DATA LOCAL INPATH '${kv1File.getAbsolutePath}' INTO TABLE src") +sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL -println("Result of 'SELECT *': ") -sql("SELECT * FROM src").collect().foreach(println) +sql("SELECT * FROM src").show() --- End diff -- I replaced collect().foreach(println) with show() in all examples. Is it OK? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14119: [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL pr...
Github user aokolnychyi commented on a diff in the pull request: https://github.com/apache/spark/pull/14119#discussion_r70173035 --- Diff: docs/sql-programming-guide.md --- @@ -1380,17 +949,17 @@ metadata. {% highlight scala %} -// spark is an existing HiveContext -spark.refreshTable("my_table") +// spark is an existing SparkSession +spark.catalog.refreshTable("my_table") --- End diff -- Is it the correct way to refresh? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14119: [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL programmi...
Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/14119 @liancheng could you, please, review this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14119: [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL pr...
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/14119 [SPARK-16303][DOCS][EXAMPLES][WIP] Updated SQL programming guide and examples ## What changes were proposed in this pull request? - Hard-coded Spark SQL sample snippets were moved into source files under examples sub-project. - Removed the inconsistency between Scala and Java Spark SQL examples - Scala and Java Spark SQL examples were updated ## How was this patch tested? The work is still in progress. All involved examples were tested manually. An additional round of testing will be done after the code review. ![image](https://cloud.githubusercontent.com/assets/6235869/16710314/51851606-462a-11e6-9fbe-0818daef65e4.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark spark_16303 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14119.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14119 commit 95f0f41fa12e1c6f0fb8ce6cd4222fb63842b495 Author: aokolnychyi <okolnychyyan...@gmail.com> Date: 2016-07-09T20:56:47Z [SPARK-16303][DOCS][EXAMPLES] Updated SQL programming guide and examples --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14050: [MINOR][EXAMPLES] Window function examples
GitHub user aokolnychyi opened a pull request: https://github.com/apache/spark/pull/14050 [MINOR][EXAMPLES] Window function examples ## What changes were proposed in this pull request? An example that explains the usage of window functions. It shows the difference between no/unbounded/bounded window frames and how they are resolved. The example also embraces 2 ways to define window frames: based on physical (rowsBetween) and logical (rangeBetween) offsets. The example should be useful for people who do not have much experience with window functions since it explains how Spark internally deals with window frames. ## How was this patch tested? The existing tests were run, no failures. No additional test cases are needed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aokolnychyi/spark window_function_examples Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14050.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14050 commit fed17613e23c1634ae47542a00960dac77bc95fc Author: aokolnychyi <okolnychyyan...@gmail.com> Date: 2016-07-04T22:25:39Z [MINOR][EXAMPLES] Window function examples --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org