[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2018-01-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19080


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2018-01-07 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r160080116
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -51,12 +76,41 @@ case object AllTuples extends Distribution
  */
 case class ClusteredDistribution(
 clustering: Seq[Expression],
-numPartitions: Option[Int] = None) extends Distribution {
+requiredNumPartitions: Option[Int] = None) extends Distribution {
   require(
 clustering != Nil,
 "The clustering expressions of a ClusteredDistribution should not be 
Nil. " +
   "An AllTuples should be used to represent a distribution that only 
has " +
   "a single partition.")
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == 
numPartitions,
+  s"This ClusteredDistribution requires ${requiredNumPartitions.get} 
partitions, but " +
+s"the actual number of partitions is $numPartitions.")
+HashPartitioning(clustering, numPartitions)
+  }
+}
+
+/**
+ * Represents data where tuples have been partitioned according to the 
hash of the given
+ * `expressions`. The hash function is defined as 
`HashPartitioning.partitionIdExpression`, so only
+ * [[HashPartitioning]] can satisfy this distribution.
+ *
+ * This is a strictly stronger guarantee than [[ClusteredDistribution]]. 
Given a tuple and the
+ * number of partitions, this distribution strictly requires which 
partition the tuple should be in.
+ */
+case class HashPartitionedDistribution(expressions: Seq[Expression]) 
extends Distribution {
--- End diff --

sounds good


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2018-01-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r160079396
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -73,46 +127,31 @@ case class OrderedDistribution(ordering: 
Seq[SortOrder]) extends Distribution {
   "An AllTuples should be used to represent a distribution that only 
has " +
   "a single partition.")
 
-  // TODO: This is not really valid...
-  def clustering: Set[Expression] = ordering.map(_.child).toSet
+  override def requiredNumPartitions: Option[Int] = None
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+RangePartitioning(ordering, numPartitions)
+  }
 }
 
 /**
  * Represents data where tuples are broadcasted to every node. It is quite 
common that the
  * entire set of tuples is transformed into different data structure.
  */
-case class BroadcastDistribution(mode: BroadcastMode) extends Distribution
+case class BroadcastDistribution(mode: BroadcastMode) extends Distribution 
{
--- End diff --

yea good idea, but again, this is an existing problem, let's fix it in 
another PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2018-01-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r160079216
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -73,46 +127,31 @@ case class OrderedDistribution(ordering: 
Seq[SortOrder]) extends Distribution {
   "An AllTuples should be used to represent a distribution that only 
has " +
   "a single partition.")
 
-  // TODO: This is not really valid...
-  def clustering: Set[Expression] = ordering.map(_.child).toSet
+  override def requiredNumPartitions: Option[Int] = None
--- End diff --

According to the comment `This is a strictly stronger guarantee than 
[[ClusteredDistribution]]`, we want to guarantee it. However we actually don't 
respect it, see https://github.com/apache/spark/pull/19080/files#r136419947

Since it is an existing problem, I'd like to fix it in another PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2018-01-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r160079142
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -51,12 +76,41 @@ case object AllTuples extends Distribution
  */
 case class ClusteredDistribution(
 clustering: Seq[Expression],
-numPartitions: Option[Int] = None) extends Distribution {
+requiredNumPartitions: Option[Int] = None) extends Distribution {
   require(
 clustering != Nil,
 "The clustering expressions of a ClusteredDistribution should not be 
Nil. " +
   "An AllTuples should be used to represent a distribution that only 
has " +
   "a single partition.")
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == 
numPartitions,
+  s"This ClusteredDistribution requires ${requiredNumPartitions.get} 
partitions, but " +
+s"the actual number of partitions is $numPartitions.")
+HashPartitioning(clustering, numPartitions)
+  }
+}
+
+/**
+ * Represents data where tuples have been partitioned according to the 
hash of the given
+ * `expressions`. The hash function is defined as 
`HashPartitioning.partitionIdExpression`, so only
+ * [[HashPartitioning]] can satisfy this distribution.
+ *
+ * This is a strictly stronger guarantee than [[ClusteredDistribution]]. 
Given a tuple and the
+ * number of partitions, this distribution strictly requires which 
partition the tuple should be in.
+ */
+case class HashPartitionedDistribution(expressions: Seq[Expression]) 
extends Distribution {
--- End diff --

good idea, I'll rename it to `HashClusteredDistribution`. But I'd like to 
not extend `ClusteredDistribution`, since if a partition can satisfy 
`ClusteredDistribution`, it may not be able to satisfy 
`HashClusteredDistribution`. Thus we can't replace a parent with a child, which 
obeys Liskov Substitution Principle.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2018-01-05 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r160017940
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -73,46 +127,31 @@ case class OrderedDistribution(ordering: 
Seq[SortOrder]) extends Distribution {
   "An AllTuples should be used to represent a distribution that only 
has " +
   "a single partition.")
 
-  // TODO: This is not really valid...
-  def clustering: Set[Expression] = ordering.map(_.child).toSet
+  override def requiredNumPartitions: Option[Int] = None
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+RangePartitioning(ordering, numPartitions)
+  }
 }
 
 /**
  * Represents data where tuples are broadcasted to every node. It is quite 
common that the
  * entire set of tuples is transformed into different data structure.
  */
-case class BroadcastDistribution(mode: BroadcastMode) extends Distribution
+case class BroadcastDistribution(mode: BroadcastMode) extends Distribution 
{
--- End diff --

Similarly, how about `BroadcastPartitioning` just satisfying the 
`AllTuples` distribution?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2018-01-05 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r160017879
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -51,12 +76,41 @@ case object AllTuples extends Distribution
  */
 case class ClusteredDistribution(
 clustering: Seq[Expression],
-numPartitions: Option[Int] = None) extends Distribution {
+requiredNumPartitions: Option[Int] = None) extends Distribution {
   require(
 clustering != Nil,
 "The clustering expressions of a ClusteredDistribution should not be 
Nil. " +
   "An AllTuples should be used to represent a distribution that only 
has " +
   "a single partition.")
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == 
numPartitions,
+  s"This ClusteredDistribution requires ${requiredNumPartitions.get} 
partitions, but " +
+s"the actual number of partitions is $numPartitions.")
+HashPartitioning(clustering, numPartitions)
+  }
+}
+
+/**
+ * Represents data where tuples have been partitioned according to the 
hash of the given
+ * `expressions`. The hash function is defined as 
`HashPartitioning.partitionIdExpression`, so only
+ * [[HashPartitioning]] can satisfy this distribution.
+ *
+ * This is a strictly stronger guarantee than [[ClusteredDistribution]]. 
Given a tuple and the
+ * number of partitions, this distribution strictly requires which 
partition the tuple should be in.
+ */
+case class HashPartitionedDistribution(expressions: Seq[Expression]) 
extends Distribution {
--- End diff --

Semantically, a `Partitioning` satisfies a `Distribution` so it'd be better 
not to call this `HashPartitioned`. How about we call this 
`DeterminsticClusteredDistribution` or `HashClusteredDistribution`? Also 
perhaps this can just extend `ClusteredDistribution`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2018-01-05 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r160018028
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -73,46 +127,31 @@ case class OrderedDistribution(ordering: 
Seq[SortOrder]) extends Distribution {
   "An AllTuples should be used to represent a distribution that only 
has " +
   "a single partition.")
 
-  // TODO: This is not really valid...
-  def clustering: Set[Expression] = ordering.map(_.child).toSet
+  override def requiredNumPartitions: Option[Int] = None
--- End diff --

Out of curiosity, should an `OrderedDistribution` make any guarantees 
around clustering? Do we care if "tuples that share the same value for the 
ordering expressions will never be split across partitions"?
  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2017-08-31 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r136477108
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -284,24 +241,17 @@ case class RangePartitioning(ordering: 
Seq[SortOrder], numPartitions: Int)
   override def nullable: Boolean = false
   override def dataType: DataType = IntegerType
 
-  override def satisfies(required: Distribution): Boolean = required match 
{
-case UnspecifiedDistribution => true
-case OrderedDistribution(requiredOrdering) =>
-  val minSize = Seq(requiredOrdering.size, ordering.size).min
-  requiredOrdering.take(minSize) == ordering.take(minSize)
-case ClusteredDistribution(requiredClustering) =>
-  ordering.map(_.child).forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
-case _ => false
-  }
-
-  override def compatibleWith(other: Partitioning): Boolean = other match {
-case o: RangePartitioning => this.semanticEquals(o)
-case _ => false
-  }
-
-  override def guarantees(other: Partitioning): Boolean = other match {
-case o: RangePartitioning => this.semanticEquals(o)
-case _ => false
+  override def satisfies(required: Distribution): Boolean = {
+super.satisfies(required) || {
+  required match {
+case OrderedDistribution(requiredOrdering) =>
+  val minSize = Seq(requiredOrdering.size, ordering.size).min
+  requiredOrdering.take(minSize) == ordering.take(minSize)
--- End diff --

BTW this doesn't cause any problems, because `OrderedDistribution` is only 
used for sort operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2017-08-31 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r136476025
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -284,24 +241,17 @@ case class RangePartitioning(ordering: 
Seq[SortOrder], numPartitions: Int)
   override def nullable: Boolean = false
   override def dataType: DataType = IntegerType
 
-  override def satisfies(required: Distribution): Boolean = required match 
{
-case UnspecifiedDistribution => true
-case OrderedDistribution(requiredOrdering) =>
-  val minSize = Seq(requiredOrdering.size, ordering.size).min
-  requiredOrdering.take(minSize) == ordering.take(minSize)
-case ClusteredDistribution(requiredClustering) =>
-  ordering.map(_.child).forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
-case _ => false
-  }
-
-  override def compatibleWith(other: Partitioning): Boolean = other match {
-case o: RangePartitioning => this.semanticEquals(o)
-case _ => false
-  }
-
-  override def guarantees(other: Partitioning): Boolean = other match {
-case o: RangePartitioning => this.semanticEquals(o)
-case _ => false
+  override def satisfies(required: Distribution): Boolean = {
+super.satisfies(required) || {
+  required match {
+case OrderedDistribution(requiredOrdering) =>
+  val minSize = Seq(requiredOrdering.size, ordering.size).min
+  requiredOrdering.take(minSize) == ordering.take(minSize)
--- End diff --

I noticed this too, the current logic only guarantees ordering but not 
clustering. But this is orthogonal to this PR and we can fix it in another PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2017-08-31 Thread aray
Github user aray commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r136419947
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -284,24 +241,17 @@ case class RangePartitioning(ordering: 
Seq[SortOrder], numPartitions: Int)
   override def nullable: Boolean = false
   override def dataType: DataType = IntegerType
 
-  override def satisfies(required: Distribution): Boolean = required match 
{
-case UnspecifiedDistribution => true
-case OrderedDistribution(requiredOrdering) =>
-  val minSize = Seq(requiredOrdering.size, ordering.size).min
-  requiredOrdering.take(minSize) == ordering.take(minSize)
-case ClusteredDistribution(requiredClustering) =>
-  ordering.map(_.child).forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
-case _ => false
-  }
-
-  override def compatibleWith(other: Partitioning): Boolean = other match {
-case o: RangePartitioning => this.semanticEquals(o)
-case _ => false
-  }
-
-  override def guarantees(other: Partitioning): Boolean = other match {
-case o: RangePartitioning => this.semanticEquals(o)
-case _ => false
+  override def satisfies(required: Distribution): Boolean = {
+super.satisfies(required) || {
+  required match {
+case OrderedDistribution(requiredOrdering) =>
+  val minSize = Seq(requiredOrdering.size, ordering.size).min
+  requiredOrdering.take(minSize) == ordering.take(minSize)
--- End diff --

While we are cleaning things up, this needs fixed. 
`RangePartitioning(a+,b+)` does not satisfy `OrderedDistribution(a+)`. It 
violates the requirement that all values of `a` need to be in the same 
partition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2017-08-30 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r136243745
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -30,18 +30,43 @@ import org.apache.spark.sql.types.{DataType, 
IntegerType}
  *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
  *about how tuples are distributed within a single partition.
  */
-sealed trait Distribution
+sealed trait Distribution {
+  /**
+   * The required number of partitions for this distribution. If it's 
None, then any number of
+   * partitions is allowed for this distribution.
+   */
+  def requiredNumPartitions: Option[Int]
+
+  /**
+   * Creates a default partitioning for this distribution, which can 
satisfy this distribution while
+   * matching the given number of partitions.
+   */
+  def createPartitioning(numPartitions: Int): Partitioning
+}
 
 /**
  * Represents a distribution where no promises are made about co-location 
of data.
  */
-case object UnspecifiedDistribution extends Distribution
+case object UnspecifiedDistribution extends Distribution {
+  override def requiredNumPartitions: Option[Int] = None
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+throw new IllegalStateException("UnspecifiedDistribution does not have 
default partitioning.")
+  }
+}
 
 /**
  * Represents a distribution that only has a single partition and all 
tuples of the dataset
  * are co-located.
  */
-case object AllTuples extends Distribution
+case object AllTuples extends Distribution {
--- End diff --

Yea, I did miss the point that `ClusteredDistribution` covers both 
`RangePartitioning` and `HashPartitioning`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2017-08-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r136226265
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -30,18 +30,43 @@ import org.apache.spark.sql.types.{DataType, 
IntegerType}
  *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
  *about how tuples are distributed within a single partition.
  */
-sealed trait Distribution
+sealed trait Distribution {
+  /**
+   * The required number of partitions for this distribution. If it's 
None, then any number of
+   * partitions is allowed for this distribution.
+   */
+  def requiredNumPartitions: Option[Int]
+
+  /**
+   * Creates a default partitioning for this distribution, which can 
satisfy this distribution while
+   * matching the given number of partitions.
+   */
+  def createPartitioning(numPartitions: Int): Partitioning
+}
 
 /**
  * Represents a distribution where no promises are made about co-location 
of data.
  */
-case object UnspecifiedDistribution extends Distribution
+case object UnspecifiedDistribution extends Distribution {
+  override def requiredNumPartitions: Option[Int] = None
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+throw new IllegalStateException("UnspecifiedDistribution does not have 
default partitioning.")
+  }
+}
 
 /**
  * Represents a distribution that only has a single partition and all 
tuples of the dataset
  * are co-located.
  */
-case object AllTuples extends Distribution
+case object AllTuples extends Distribution {
--- End diff --

I'd like to keep the `Distribution` concept, otherwise it's very weird to 
say a `RangePartitioning` satisfy `HashPartitioning`, while it looks reasonable 
to say `RangePartitioning` satisfy `ClusteredDistribution`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2017-08-30 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r136214689
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -30,18 +30,43 @@ import org.apache.spark.sql.types.{DataType, 
IntegerType}
  *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
  *about how tuples are distributed within a single partition.
  */
-sealed trait Distribution
+sealed trait Distribution {
+  /**
+   * The required number of partitions for this distribution. If it's 
None, then any number of
+   * partitions is allowed for this distribution.
+   */
+  def requiredNumPartitions: Option[Int]
+
+  /**
+   * Creates a default partitioning for this distribution, which can 
satisfy this distribution while
+   * matching the given number of partitions.
+   */
+  def createPartitioning(numPartitions: Int): Partitioning
+}
 
 /**
  * Represents a distribution where no promises are made about co-location 
of data.
  */
-case object UnspecifiedDistribution extends Distribution
+case object UnspecifiedDistribution extends Distribution {
+  override def requiredNumPartitions: Option[Int] = None
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+throw new IllegalStateException("UnspecifiedDistribution does not have 
default partitioning.")
+  }
+}
 
 /**
  * Represents a distribution that only has a single partition and all 
tuples of the dataset
  * are co-located.
  */
-case object AllTuples extends Distribution
+case object AllTuples extends Distribution {
--- End diff --

I'd like to keep `AllTuples`. `SingleNodeDistribution` is a special case of 
`AllTuples` and seems we do not really need the extra information introduced by 
`SingleNode`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2017-08-30 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r136213945
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -30,18 +30,43 @@ import org.apache.spark.sql.types.{DataType, 
IntegerType}
  *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
  *about how tuples are distributed within a single partition.
  */
-sealed trait Distribution
+sealed trait Distribution {
+  /**
+   * The required number of partitions for this distribution. If it's 
None, then any number of
+   * partitions is allowed for this distribution.
+   */
+  def requiredNumPartitions: Option[Int]
+
+  /**
+   * Creates a default partitioning for this distribution, which can 
satisfy this distribution while
+   * matching the given number of partitions.
+   */
+  def createPartitioning(numPartitions: Int): Partitioning
+}
 
 /**
  * Represents a distribution where no promises are made about co-location 
of data.
  */
-case object UnspecifiedDistribution extends Distribution
+case object UnspecifiedDistribution extends Distribution {
+  override def requiredNumPartitions: Option[Int] = None
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+throw new IllegalStateException("UnspecifiedDistribution does not have 
default partitioning.")
+  }
+}
 
 /**
  * Represents a distribution that only has a single partition and all 
tuples of the dataset
  * are co-located.
  */
-case object AllTuples extends Distribution
+case object AllTuples extends Distribution {
--- End diff --

It seems we should be moving towards CBO, not away from it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2017-08-30 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r136213879
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -30,18 +30,43 @@ import org.apache.spark.sql.types.{DataType, 
IntegerType}
  *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
  *about how tuples are distributed within a single partition.
  */
-sealed trait Distribution
+sealed trait Distribution {
+  /**
+   * The required number of partitions for this distribution. If it's 
None, then any number of
+   * partitions is allowed for this distribution.
+   */
+  def requiredNumPartitions: Option[Int]
+
+  /**
+   * Creates a default partitioning for this distribution, which can 
satisfy this distribution while
+   * matching the given number of partitions.
+   */
+  def createPartitioning(numPartitions: Int): Partitioning
+}
 
 /**
  * Represents a distribution where no promises are made about co-location 
of data.
  */
-case object UnspecifiedDistribution extends Distribution
+case object UnspecifiedDistribution extends Distribution {
+  override def requiredNumPartitions: Option[Int] = None
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+throw new IllegalStateException("UnspecifiedDistribution does not have 
default partitioning.")
+  }
+}
 
 /**
  * Represents a distribution that only has a single partition and all 
tuples of the dataset
  * are co-located.
  */
-case object AllTuples extends Distribution
+case object AllTuples extends Distribution {
--- End diff --

Sure you can make that argument (I'm not sure I buy it), but then why does 
this PR still have two different concepts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2017-08-30 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r136211685
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -30,18 +30,43 @@ import org.apache.spark.sql.types.{DataType, 
IntegerType}
  *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
  *about how tuples are distributed within a single partition.
  */
-sealed trait Distribution
+sealed trait Distribution {
+  /**
+   * The required number of partitions for this distribution. If it's 
None, then any number of
+   * partitions is allowed for this distribution.
+   */
+  def requiredNumPartitions: Option[Int]
+
+  /**
+   * Creates a default partitioning for this distribution, which can 
satisfy this distribution while
+   * matching the given number of partitions.
+   */
+  def createPartitioning(numPartitions: Int): Partitioning
+}
 
 /**
  * Represents a distribution where no promises are made about co-location 
of data.
  */
-case object UnspecifiedDistribution extends Distribution
+case object UnspecifiedDistribution extends Distribution {
+  override def requiredNumPartitions: Option[Int] = None
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+throw new IllegalStateException("UnspecifiedDistribution does not have 
default partitioning.")
+  }
+}
 
 /**
  * Represents a distribution that only has a single partition and all 
tuples of the dataset
  * are co-located.
  */
-case object AllTuples extends Distribution
+case object AllTuples extends Distribution {
--- End diff --

I think the concern about logical/physical separation is only useful if we 
take CBO into consideration. E.g., for a `AllTuples` distribution requirement, 
the planner may produce two plans using `SinglePartition` and 
`BroadcastPartitioning` respectively and pick a cheaper one. In the scope of 
our current planner framework, this separation doesn't seem to be very useful, 
though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2017-08-30 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r136209980
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -30,18 +30,43 @@ import org.apache.spark.sql.types.{DataType, 
IntegerType}
  *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
  *about how tuples are distributed within a single partition.
  */
-sealed trait Distribution
+sealed trait Distribution {
+  /**
+   * The required number of partitions for this distribution. If it's 
None, then any number of
+   * partitions is allowed for this distribution.
+   */
+  def requiredNumPartitions: Option[Int]
+
+  /**
+   * Creates a default partitioning for this distribution, which can 
satisfy this distribution while
+   * matching the given number of partitions.
+   */
+  def createPartitioning(numPartitions: Int): Partitioning
+}
 
 /**
  * Represents a distribution where no promises are made about co-location 
of data.
  */
-case object UnspecifiedDistribution extends Distribution
+case object UnspecifiedDistribution extends Distribution {
+  override def requiredNumPartitions: Option[Int] = None
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+throw new IllegalStateException("UnspecifiedDistribution does not have 
default partitioning.")
+  }
+}
 
 /**
  * Represents a distribution that only has a single partition and all 
tuples of the dataset
  * are co-located.
  */
-case object AllTuples extends Distribution
+case object AllTuples extends Distribution {
--- End diff --

So I spoke with @sameeragarwal about this a little.  The whole point here 
was to have a logical / physical separation.  `AllTuples` could be `SingleNode` 
or it could be `Broadcast`.  All the operation wants to know is that its seeing 
all of them and it shouldn't care about how that is being accomplished.

Now, since the first version, we have started to deviate from that.  I'm 
not sure if this is still the right thing to do, but I wanted to give a little 
context.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2017-08-30 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r136206624
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -30,18 +30,43 @@ import org.apache.spark.sql.types.{DataType, 
IntegerType}
  *  - Intra-partition ordering of data: In this case the distribution 
describes guarantees made
  *about how tuples are distributed within a single partition.
  */
-sealed trait Distribution
+sealed trait Distribution {
+  /**
+   * The required number of partitions for this distribution. If it's 
None, then any number of
+   * partitions is allowed for this distribution.
+   */
+  def requiredNumPartitions: Option[Int]
+
+  /**
+   * Creates a default partitioning for this distribution, which can 
satisfy this distribution while
+   * matching the given number of partitions.
+   */
+  def createPartitioning(numPartitions: Int): Partitioning
+}
 
 /**
  * Represents a distribution where no promises are made about co-location 
of data.
  */
-case object UnspecifiedDistribution extends Distribution
+case object UnspecifiedDistribution extends Distribution {
+  override def requiredNumPartitions: Option[Int] = None
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+throw new IllegalStateException("UnspecifiedDistribution does not have 
default partitioning.")
+  }
+}
 
 /**
  * Represents a distribution that only has a single partition and all 
tuples of the dataset
  * are co-located.
  */
-case object AllTuples extends Distribution
+case object AllTuples extends Distribution {
--- End diff --

`SingleNodeDistribution`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org