[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18697#discussion_r136378843 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -171,6 +171,16 @@ sealed trait Partitioning { * produced by `A` could have also been produced by `B`. */ def guarantees(other: Partitioning): Boolean = this == other + + /** + * Returns the partitioning scheme that is valid under restriction to a given set of output + * attributes. If the partitioning is an [[Expression]] then the attributes that it depends on + * must be in the outputSet otherwise the attribute leaks. + */ + def restrict(outputSet: AttributeSet): Partitioning = this match { --- End diff -- We are refactoring the concept in the PR https://github.com/apache/spark/pull/19080 Please review that PR first? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...
GitHub user aray opened a pull request: https://github.com/apache/spark/pull/18697 [SPARK-16683][SQL] Repeated joins to same table can leak attributes via partitioning ## What changes were proposed in this pull request? In some complex queries where the same table is joined multiple times interleaved with aggregation we can get conflicting attributes that leak via partitionings leading to wrong results because shuffles are not inserted. See `JoinSuite` diff for example. This patch adds a method to `Partitioning` that restricts it to a given set of output attributes. This method is then called by operators that generally maintain their input distribution but output only a subset of the inputs. ## How was this patch tested? Unit test based on example code from JIRA and additional unit testing of new method. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aray/spark SPARK-16683 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18697.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18697 commit b05e6303a19926327525a9c2ffa399d68fca3911 Author: Andrew Ray Date: 2017-06-29T15:43:58Z fix via partitioning restriction --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18697#discussion_r130278378 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -80,7 +80,7 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputPartitioning: Partitioning = child.outputPartitioning.restrict(outputSet) --- End diff -- Project won't change output partitioning, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18697#discussion_r130286066 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -65,6 +65,10 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ false } + override def verboseStringWithSuffix: String = { +s"$verboseString $outputPartitioning" + } --- End diff -- Except for debugging this, do we really need to print out output partitioning always? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18697#discussion_r130286222 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala --- @@ -36,6 +40,70 @@ class JoinSuite extends QueryTest with SharedSQLContext { df.queryExecution.optimizedPlan.stats.sizeInBytes } + test("SPARK-16683 Repeated joins to same table can leak attributes via partitioning") { +val hier = sqlContext.sparkSession.sparkContext.parallelize(Seq( + ("A10", "A1"), + ("A11", "A1"), + ("A20", "A2"), + ("A21", "A2"), + ("B10", "B1"), + ("B11", "B1"), + ("B20", "B2"), + ("B21", "B2"), + ("A1", "A"), + ("A2", "A"), + ("B1", "B"), + ("B2", "B") +)).toDF("son", "parent").cache() // passes if cache is removed but with count on dist1 +hier.createOrReplaceTempView("hier") +hier.count() // if this is removed it passes + +val base = sqlContext.sparkSession.sparkContext.parallelize(Seq( + Tuple1("A10"), + Tuple1("A11"), + Tuple1("A20"), + Tuple1("A21"), + Tuple1("B10"), + Tuple1("B11"), + Tuple1("B20"), + Tuple1("B21") +)).toDF("id") +base.createOrReplaceTempView("base") + +val dist1 = spark.sql(""" +SELECT parent level1 --- End diff -- Please fix the code indent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18697#discussion_r130286295 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala --- @@ -36,6 +40,70 @@ class JoinSuite extends QueryTest with SharedSQLContext { df.queryExecution.optimizedPlan.stats.sizeInBytes } + test("SPARK-16683 Repeated joins to same table can leak attributes via partitioning") { +val hier = sqlContext.sparkSession.sparkContext.parallelize(Seq( + ("A10", "A1"), + ("A11", "A1"), + ("A20", "A2"), + ("A21", "A2"), + ("B10", "B1"), + ("B11", "B1"), + ("B20", "B2"), + ("B21", "B2"), + ("A1", "A"), + ("A2", "A"), + ("B1", "B"), + ("B2", "B") +)).toDF("son", "parent").cache() // passes if cache is removed but with count on dist1 +hier.createOrReplaceTempView("hier") +hier.count() // if this is removed it passes + +val base = sqlContext.sparkSession.sparkContext.parallelize(Seq( + Tuple1("A10"), + Tuple1("A11"), + Tuple1("A20"), + Tuple1("A21"), + Tuple1("B10"), + Tuple1("B11"), + Tuple1("B20"), + Tuple1("B21") +)).toDF("id") +base.createOrReplaceTempView("base") + +val dist1 = spark.sql(""" +SELECT parent level1 +FROM base INNER JOIN hier h1 ON base.id = h1.son +GROUP BY parent""") + +dist1.createOrReplaceTempView("dist1") +// dist1.count() // or put a count here + +val dist2 = spark.sql(""" +SELECT parent level2 --- End diff -- ditto. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...
Github user aray commented on a diff in the pull request: https://github.com/apache/spark/pull/18697#discussion_r130396904 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -65,6 +65,10 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ false } + override def verboseStringWithSuffix: String = { +s"$verboseString $outputPartitioning" + } --- End diff -- This doesn't change anything that is in common use, one has to do `plan.treeString(verbose = true, addSuffix = true)` to get it. I would argue for keeping it for any future debugging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18697 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org