[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...

2017-08-31 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18697#discussion_r136378843
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -171,6 +171,16 @@ sealed trait Partitioning {
* produced by `A` could have also been produced by `B`.
*/
   def guarantees(other: Partitioning): Boolean = this == other
+
+  /**
+   * Returns the partitioning scheme that is valid under restriction to a 
given set of output
+   * attributes. If the partitioning is an [[Expression]] then the 
attributes that it depends on
+   * must be in the outputSet otherwise the attribute leaks.
+   */
+  def restrict(outputSet: AttributeSet): Partitioning = this match {
--- End diff --

We are refactoring the concept in the PR 
https://github.com/apache/spark/pull/19080

Please review that PR first? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...

2017-07-20 Thread aray
GitHub user aray opened a pull request:

https://github.com/apache/spark/pull/18697

[SPARK-16683][SQL] Repeated joins to same table can leak attributes via 
partitioning

## What changes were proposed in this pull request?

In some complex queries where the same table is joined multiple times 
interleaved with aggregation we can get conflicting attributes that leak via 
partitionings leading to wrong results because shuffles are not inserted. See 
`JoinSuite` diff for example. This patch adds a method to `Partitioning` that 
restricts it to a given set of output attributes. This method is then called by 
operators that generally maintain their input distribution but output only a 
subset of the inputs.

## How was this patch tested?

Unit test based on example code from JIRA and additional unit testing of 
new method.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aray/spark SPARK-16683

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18697.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18697


commit b05e6303a19926327525a9c2ffa399d68fca3911
Author: Andrew Ray 
Date:   2017-06-29T15:43:58Z

fix via partitioning restriction




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...

2017-07-30 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18697#discussion_r130278378
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -80,7 +80,7 @@ case class ProjectExec(projectList: Seq[NamedExpression], 
child: SparkPlan)
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  override def outputPartitioning: Partitioning = 
child.outputPartitioning.restrict(outputSet)
--- End diff --

Project won't change output partitioning, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...

2017-07-31 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18697#discussion_r130286066
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -65,6 +65,10 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 false
   }
 
+  override def verboseStringWithSuffix: String = {
+s"$verboseString $outputPartitioning"
+  }
--- End diff --

Except for debugging this, do we really need to print out output 
partitioning always?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...

2017-07-31 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18697#discussion_r130286222
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---
@@ -36,6 +40,70 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 df.queryExecution.optimizedPlan.stats.sizeInBytes
   }
 
+  test("SPARK-16683 Repeated joins to same table can leak attributes via 
partitioning") {
+val hier = sqlContext.sparkSession.sparkContext.parallelize(Seq(
+  ("A10", "A1"),
+  ("A11", "A1"),
+  ("A20", "A2"),
+  ("A21", "A2"),
+  ("B10", "B1"),
+  ("B11", "B1"),
+  ("B20", "B2"),
+  ("B21", "B2"),
+  ("A1", "A"),
+  ("A2", "A"),
+  ("B1", "B"),
+  ("B2", "B")
+)).toDF("son", "parent").cache() // passes if cache is removed but 
with count on dist1
+hier.createOrReplaceTempView("hier")
+hier.count() // if this is removed it passes
+
+val base = sqlContext.sparkSession.sparkContext.parallelize(Seq(
+  Tuple1("A10"),
+  Tuple1("A11"),
+  Tuple1("A20"),
+  Tuple1("A21"),
+  Tuple1("B10"),
+  Tuple1("B11"),
+  Tuple1("B20"),
+  Tuple1("B21")
+)).toDF("id")
+base.createOrReplaceTempView("base")
+
+val dist1 = spark.sql("""
+SELECT parent level1
--- End diff --

Please fix the code indent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...

2017-07-31 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18697#discussion_r130286295
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---
@@ -36,6 +40,70 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 df.queryExecution.optimizedPlan.stats.sizeInBytes
   }
 
+  test("SPARK-16683 Repeated joins to same table can leak attributes via 
partitioning") {
+val hier = sqlContext.sparkSession.sparkContext.parallelize(Seq(
+  ("A10", "A1"),
+  ("A11", "A1"),
+  ("A20", "A2"),
+  ("A21", "A2"),
+  ("B10", "B1"),
+  ("B11", "B1"),
+  ("B20", "B2"),
+  ("B21", "B2"),
+  ("A1", "A"),
+  ("A2", "A"),
+  ("B1", "B"),
+  ("B2", "B")
+)).toDF("son", "parent").cache() // passes if cache is removed but 
with count on dist1
+hier.createOrReplaceTempView("hier")
+hier.count() // if this is removed it passes
+
+val base = sqlContext.sparkSession.sparkContext.parallelize(Seq(
+  Tuple1("A10"),
+  Tuple1("A11"),
+  Tuple1("A20"),
+  Tuple1("A21"),
+  Tuple1("B10"),
+  Tuple1("B11"),
+  Tuple1("B20"),
+  Tuple1("B21")
+)).toDF("id")
+base.createOrReplaceTempView("base")
+
+val dist1 = spark.sql("""
+SELECT parent level1
+FROM base INNER JOIN hier h1 ON base.id = h1.son
+GROUP BY parent""")
+
+dist1.createOrReplaceTempView("dist1")
+// dist1.count() // or put a count here
+
+val dist2 = spark.sql("""
+SELECT parent level2
--- End diff --

ditto.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...

2017-07-31 Thread aray
Github user aray commented on a diff in the pull request:

https://github.com/apache/spark/pull/18697#discussion_r130396904
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -65,6 +65,10 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 false
   }
 
+  override def verboseStringWithSuffix: String = {
+s"$verboseString $outputPartitioning"
+  }
--- End diff --

This doesn't change anything that is in common use, one has to do 
`plan.treeString(verbose = true, addSuffix = true)` to get it. I would argue 
for keeping it for any future debugging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18697: [SPARK-16683][SQL] Repeated joins to same table c...

2018-11-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18697


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org