[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21745 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r202242420 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2387,4 +2387,25 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val mapWithBinaryKey = map(lit(Array[Byte](1.toByte)), lit(1)) checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))), Row(1)) } + + test("SPARK-24781: Using a reference from Dataset in Filter/Sort might not work") { +val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id") +val filter1 = df.select(df("name")).filter(df("id") === 0) +val filter2 = df.select(col("name")).filter(col("id") === 0) +checkAnswer(filter1, filter2.collect()) + +val sort1 = df.select(df("name")).orderBy(df("id")) +val sort2 = df.select(col("name")).orderBy(col("id")) +checkAnswer(sort1, sort2.collect()) + +withSQLConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS.key -> "false") { --- End diff -- Will update it in next commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r202236189 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2387,4 +2387,25 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val mapWithBinaryKey = map(lit(Array[Byte](1.toByte)), lit(1)) checkAnswer(spark.range(1).select(mapWithBinaryKey.getItem(Array[Byte](1.toByte))), Row(1)) } + + test("SPARK-24781: Using a reference from Dataset in Filter/Sort might not work") { +val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id") +val filter1 = df.select(df("name")).filter(df("id") === 0) +val filter2 = df.select(col("name")).filter(col("id") === 0) +checkAnswer(filter1, filter2.collect()) + +val sort1 = df.select(df("name")).orderBy(df("id")) +val sort2 = df.select(col("name")).orderBy(col("id")) +checkAnswer(sort1, sort2.collect()) + +withSQLConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS.key -> "false") { --- End diff -- This test case should be split to two. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r202217276 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1165,15 +1173,19 @@ class Analyzer( (newExprs, AnalysisBarrier(newChild)) case p: Project => +// Resolving expressions against current plan. val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) +// Recursively resolving expressions on the child of current plan. val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) -val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) +// If some attributes used by expressions are resolvable only on the rewritten child +// plan, we need to add them into original projection. +val missingAttrs = (AttributeSet(newExprs) -- p.outputSet).intersect(newChild.outputSet) --- End diff -- Without this `intersect`, some tests fail, e.g., `group-analytics.sql` in `SQLQueryTestSuite`. Some attributes are resolved on parent plans, not on child plans. We can't add them as missing attributes here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r202055806 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1165,15 +1173,19 @@ class Analyzer( (newExprs, AnalysisBarrier(newChild)) case p: Project => +// Resolving expressions against current plan. val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) +// Recursively resolving expressions on the child of current plan. val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) -val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) +// If some attributes used by expressions are resolvable only on the rewritten child +// plan, we need to add them into original projection. +val missingAttrs = (AttributeSet(newExprs) -- p.outputSet).intersect(newChild.outputSet) --- End diff -- what if we do not do the `.intersect(newChild.outputSet)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r202054901 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1151,10 +1152,17 @@ class Analyzer( } } +/** + * This method tries to resolve expressions and find missing attributes recursively. Specially, + * when the expressions used in `Sort` or `Filter` contain unresolved attributes or resolved + * attributes which are missed from SELECT clause. This method tries to find the missing --- End diff -- `which are missed from child output` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201971954 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1167,7 +1169,8 @@ class Analyzer( case p: Project => val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) -val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) +// Only add missing attributes coming from `newChild`. +val missingAttrs = (AttributeSet(newExprs) -- p.outputSet).intersect(newChild.outputSet) --- End diff -- The logic gets convoluted here and we need to add comments. Basically we need to explain when we should expand the project list. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201971131 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1167,7 +1169,8 @@ class Analyzer( case p: Project => val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) -val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) +// Only add missing attributes coming from `newChild`. +val missingAttrs = (AttributeSet(newExprs) -- p.outputSet).intersect(newChild.outputSet) --- End diff -- The logic gets convoluted here and we need to add comments. Basically we need to explain when we should expand the project list. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201945591 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1167,7 +1169,8 @@ class Analyzer( case p: Project => val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) -val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) +// Only add missing attributes coming from `newChild`. +val missingAttrs = (AttributeSet(newExprs) -- p.outputSet).intersect(newChild.outputSet) --- End diff -- Thanks. I think it's better to have a re-producible test case before changing `Aggregate` case. I'm trying to create a test case for it. Then it can be more confident to change `Aggregate` case. Actually I found another place we need to fix. Seems we don't have enough test coverage for similar features. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201930407 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1167,7 +1169,8 @@ class Analyzer( case p: Project => val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) -val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) +// Only add missing attributes coming from `newChild`. +val missingAttrs = (AttributeSet(newExprs) -- p.outputSet).intersect(newChild.outputSet) --- End diff -- This is a second time, but we need to fix in `Aggregate` case? The logic seems completely different. Or can we remove `Aggregate` case if `ResolveAggregateFunctions` can handle this? I don't think we have any reason to keep a wrong logic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201914482 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1163,7 +1165,8 @@ class Analyzer( case p: Project => val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) -val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) +// The resolved attributes might not come from `p.child`. Need to filter it. --- End diff -- At least, this case was [resolved](https://github.com/apache/spark/pull/21745#issuecomment-404380823) in `ResolveMissingReferences` in spark-v2.2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201901412 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1163,7 +1165,8 @@ class Analyzer( case p: Project => val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) -val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) +// The resolved attributes might not come from `p.child`. Need to filter it. --- End diff -- how can this happen? if the resolved attributes do not exist in child, then the plan is invalid, isn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201900877 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1163,7 +1165,8 @@ class Analyzer( case p: Project => val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) -val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) +// The resolved attributes might not come from `p.child`. Need to filter it. --- End diff -- how can this happen? if the resolved attributes do not exist in child, then the plan is invalid, isn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201718356 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala --- @@ -60,7 +60,8 @@ abstract class LogicalPlan * [[org.apache.spark.sql.catalyst.analysis.UnresolvedRelation UnresolvedRelation]] * should return `false`). */ - lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved + lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved && +missingInput.isEmpty --- End diff -- Yeah, I found that this change causes one test failure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201716700 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala --- @@ -60,7 +60,8 @@ abstract class LogicalPlan * [[org.apache.spark.sql.catalyst.analysis.UnresolvedRelation UnresolvedRelation]] * should return `false`). */ - lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved + lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved && +missingInput.isEmpty --- End diff -- `missingInput` is special, mostly we can't resolve it. I think that's why we didn't consider it in the `resolved` at the first place. We can update the if condition in `ResolveMissingReferences` to take `missingInput` into consideration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201660016 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1163,7 +1163,8 @@ class Analyzer( case p: Project => val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) -val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) +val missingAttrs = AttributeSet(newExprs) -- --- End diff -- Yeah, I think using `p.outputSet` is correct. Will update later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201658654 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1163,7 +1163,8 @@ class Analyzer( case p: Project => val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) -val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) +val missingAttrs = AttributeSet(newExprs) -- --- End diff -- For `Aggregate`, I've tested it. Seems `ResolveAggregateFunctions` already covers it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201658301 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala --- @@ -119,4 +119,16 @@ class LogicalPlanSuite extends SparkFunSuite { OneRowRelation()) assert(result.sameResult(expected)) } + + test("Logical plan with missing inputs should be unresolved") { +// Normally we won't add a missing resolved reference into a logical plan, +// but a valid query like `df.select(df("name")).filter(df("id") === 0)` can make a query +// like this. +val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)()) +val plan = Project(Stream(AttributeReference("b", IntegerType, nullable = true)()), relation) --- End diff -- No special reason. Just following above test case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201621224 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala --- @@ -119,4 +119,16 @@ class LogicalPlanSuite extends SparkFunSuite { OneRowRelation()) assert(result.sameResult(expected)) } + + test("Logical plan with missing inputs should be unresolved") { +// Normally we won't add a missing resolved reference into a logical plan, +// but a valid query like `df.select(df("name")).filter(df("id") === 0)` can make a query +// like this. +val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)()) +val plan = Project(Stream(AttributeReference("b", IntegerType, nullable = true)()), relation) --- End diff -- Why `Stream`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201619815 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1163,7 +1163,8 @@ class Analyzer( case p: Project => val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) -val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) +val missingAttrs = AttributeSet(newExprs) -- --- End diff -- We should also fix in `Aggregate` case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21745#discussion_r201622249 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1163,7 +1163,8 @@ class Analyzer( case p: Project => val maybeResolvedExprs = exprs.map(resolveExpression(_, p)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child) -val missingAttrs = AttributeSet(newExprs) -- AttributeSet(maybeResolvedExprs) +val missingAttrs = AttributeSet(newExprs) -- --- End diff -- I might miss something, but how about `val missingAttrs = AttributeSet(newExprs) -- p.outputSet`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21745: [SPARK-24781][SQL] Using a reference from Dataset...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/21745 [SPARK-24781][SQL] Using a reference from Dataset in Filter/Sort might not work ## What changes were proposed in this pull request? When we use a reference from Dataset in filter or sort, which was not used in the prior select, an AnalysisException occurs, e.g., ```scala val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id") df.select(df("name")).filter(df("id") === 0).show() ``` ```scala org.apache.spark.sql.AnalysisException: Resolved attribute(s) id#6 missing from name#5 in operator !Filter (id#6 = 0).;; !Filter (id#6 = 0) +- AnalysisBarrier +- Project [name#5] +- Project [_1#2 AS name#5, _2#3 AS id#6] +- LocalRelation [_1#2, _2#3] ``` This change adds a condition `missingInput.isEmpty` to `resolved` of `LogicalPlan`. Previously a logical plan is resolved if all expressions are resolved and its children are resolved. However, as we possibly add a resolved reference like `df("name")` into a query plan, it is possible that all expressions in a query plan are resolved but have missing inputs. ## How was this patch tested? Added tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 SPARK-24781 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21745.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21745 commit 97837a46b790ceb1f0df38cc7a3094b1cb4eb556 Author: Liang-Chi Hsieh Date: 2018-07-11T07:44:43Z Resolved references from Dataset should be checked if it is missed from plan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org