[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19828


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-30 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r154159546
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -839,8 +839,6 @@ case class RepartitionByExpression(
 
   require(numPartitions > 0, s"Number of partitions ($numPartitions) must 
be positive.")
 
-  require(partitionExpressions.nonEmpty, "At least one partition-by 
expression must be specified.")
--- End diff --

That would change the current behavior of '.repartition(numPartitions, 
Seq.empty: _*)' and I'd like to avoid that.

In fact, I've just raised a separate ticket about the latter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-30 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r154158225
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -839,8 +839,6 @@ case class RepartitionByExpression(
 
   require(numPartitions > 0, s"Number of partitions ($numPartitions) must 
be positive.")
 
-  require(partitionExpressions.nonEmpty, "At least one partition-by 
expression must be specified.")
--- End diff --

Just for safety, also keep this change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-30 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r154149190
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -838,6 +839,30 @@ case class RepartitionByExpression(
 
   require(numPartitions > 0, s"Number of partitions ($numPartitions) must 
be positive.")
 
+  val partitioning: Partitioning = {
+val (sortOrder, nonSortOrder) = 
partitionExpressions.partition(_.isInstanceOf[SortOrder])
+
+require(sortOrder.isEmpty || nonSortOrder.isEmpty,
+  s"${getClass.getSimpleName} expects that either all its 
`partitionExpressions` are of type " +
+"`SortOrder`, which means `RangePartitioning`, or none of them are 
`SortOrder`, which " +
+"means `HashPartitioning`. In this case we have:" +
+  s
--- End diff --

This still exists after we revert the previous changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153857985
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2733,23 +2733,63 @@ class Dataset[T] private[sql](
*/
   @scala.annotation.varargs
   def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] 
= withTypedPlan {
+// The underlying `LogicalPlan` operator special-cases all-`SortOrder` 
arguments.
+// However, we don't want to complicate the semantics of this API 
method. Instead, let's
+// give users a friendly error message, pointing them to the new 
method.
+val sortOrders = partitionExprs.filter(_.expr.isInstanceOf[SortOrder])
+if (sortOrders.nonEmpty) throw new IllegalArgumentException(
+  s"""Invalid partitionExprs specified: $sortOrders
+ |For range partitioning use repartitionByRange(...) instead.
+   """.stripMargin)
 RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, 
numPartitions)
   }
 
   /**
-   * Returns a new Dataset partitioned by the given partitioning 
expressions, using
-   * `spark.sql.shuffle.partitions` as number of partitions.
-   * The resulting Dataset is hash partitioned.
+   * Returns a new Dataset that is hash partitioned by the given 
expressions, using
+   * `spark.sql.shuffle.partitions` as the number of partitions. If no 
expressions are specified,
+   * round robin partitioning is used.
*
* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
*
* @group typedrel
* @since 2.0.0
*/
   @scala.annotation.varargs
-  def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
-RepartitionByExpression(
-  partitionExprs.map(_.expr), logicalPlan, 
sparkSession.sessionState.conf.numShufflePartitions)
+  def repartition(partitionExprs: Column*): Dataset[T] = {
+repartition(sparkSession.sessionState.conf.numShufflePartitions, 
partitionExprs: _*)
+  }
+
+  /**
+   * Returns a new Dataset that is hash partitioned by the given 
expressions into `numPartitions`.
--- End diff --

`hash` -> `range`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-29 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153755261
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -838,6 +839,30 @@ case class RepartitionByExpression(
 
   require(numPartitions > 0, s"Number of partitions ($numPartitions) must 
be positive.")
 
+  val partitioning: Partitioning = {
+val (sortOrder, nonSortOrder) = 
partitionExpressions.partition(_.isInstanceOf[SortOrder])
+
+require(sortOrder.isEmpty || nonSortOrder.isEmpty,
+  s"${getClass.getSimpleName} expects that either all its 
`partitionExpressions` are of type " +
+"`SortOrder`, which means `RangePartitioning`, or none of them are 
`SortOrder`, which " +
+"means `HashPartitioning`. In this case we have:" +
+  s
--- End diff --

nice catch :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153619969
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2747,9 +2755,41 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
   @scala.annotation.varargs
-  def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
-RepartitionByExpression(
-  partitionExprs.map(_.expr), logicalPlan, 
sparkSession.sessionState.conf.numShufflePartitions)
+  def repartition(partitionExprs: Column*): Dataset[T] = {
+repartition(sparkSession.sessionState.conf.numShufflePartitions, 
partitionExprs: _*)
+  }
+
+  /**
+   * Returns a new Dataset partitioned by the given partitioning 
expressions into
+   * `numPartitions`. The resulting Dataset is range partitioned.
--- End diff --

Could you update this to describe the latest change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153606380
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -838,6 +839,30 @@ case class RepartitionByExpression(
 
   require(numPartitions > 0, s"Number of partitions ($numPartitions) must 
be positive.")
 
+  val partitioning: Partitioning = {
+val (sortOrder, nonSortOrder) = 
partitionExpressions.partition(_.isInstanceOf[SortOrder])
+
+require(sortOrder.isEmpty || nonSortOrder.isEmpty,
+  s"${getClass.getSimpleName} expects that either all its 
`partitionExpressions` are of type " +
+"`SortOrder`, which means `RangePartitioning`, or none of them are 
`SortOrder`, which " +
+"means `HashPartitioning`. In this case we have:" +
+  s
--- End diff --

`"` * 4?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153410340
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2747,9 +2755,41 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
   @scala.annotation.varargs
-  def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
-RepartitionByExpression(
-  partitionExprs.map(_.expr), logicalPlan, 
sparkSession.sessionState.conf.numShufflePartitions)
+  def repartition(partitionExprs: Column*): Dataset[T] = {
+repartition(sparkSession.sessionState.conf.numShufflePartitions, 
partitionExprs: _*)
+  }
+
+  /**
+   * Returns a new Dataset partitioned by the given partitioning 
expressions into
+   * `numPartitions`. The resulting Dataset is range partitioned.
+   *
+   * @group typedrel
+   * @since 2.3.0
+   */
+  @scala.annotation.varargs
+  def repartitionByRange(numPartitions: Int, partitionExprs: Column*): 
Dataset[T] = withTypedPlan {
--- End diff --

Good call! Raised 
[SPARK-22624](https://issues.apache.org/jira/browse/SPARK-22624).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153409247
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -838,6 +839,27 @@ case class RepartitionByExpression(
 
   require(numPartitions > 0, s"Number of partitions ($numPartitions) must 
be positive.")
 
+  require(partitionExpressions.nonEmpty, "At least one partition-by 
expression must be specified.")
+
+  val partitioning: Partitioning = {
+val (sortOrder, nonSortOrder) = 
partitionExpressions.partition(_.isInstanceOf[SortOrder])
--- End diff --

It's going to follow the `HashPartitioning` path and eventually lead to a 
"Cannot evaluate expression" exception, just like it would presently do if you 
tried running `df.repartition($"col".asc +1)` or `df.sort($"col".asc + 1)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153337049
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2747,9 +2755,41 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
   @scala.annotation.varargs
-  def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
-RepartitionByExpression(
-  partitionExprs.map(_.expr), logicalPlan, 
sparkSession.sessionState.conf.numShufflePartitions)
+  def repartition(partitionExprs: Column*): Dataset[T] = {
+repartition(sparkSession.sessionState.conf.numShufflePartitions, 
partitionExprs: _*)
+  }
+
+  /**
+   * Returns a new Dataset partitioned by the given partitioning 
expressions into
+   * `numPartitions`. The resulting Dataset is range partitioned.
+   *
+   * @group typedrel
+   * @since 2.3.0
+   */
+  @scala.annotation.varargs
+  def repartitionByRange(numPartitions: Int, partitionExprs: Column*): 
Dataset[T] = withTypedPlan {
+val sortOrder: Seq[SortOrder] = partitionExprs.map { col =>
+  col.expr match {
+case expr: SortOrder =>
+  expr
+case expr: Expression =>
--- End diff --

The error is slightly different because the project is whole stage code 
generated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153335289
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2747,9 +2755,41 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
   @scala.annotation.varargs
-  def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
-RepartitionByExpression(
-  partitionExprs.map(_.expr), logicalPlan, 
sparkSession.sessionState.conf.numShufflePartitions)
+  def repartition(partitionExprs: Column*): Dataset[T] = {
+repartition(sparkSession.sessionState.conf.numShufflePartitions, 
partitionExprs: _*)
+  }
+
+  /**
+   * Returns a new Dataset partitioned by the given partitioning 
expressions into
+   * `numPartitions`. The resulting Dataset is range partitioned.
+   *
+   * @group typedrel
+   * @since 2.3.0
+   */
+  @scala.annotation.varargs
+  def repartitionByRange(numPartitions: Int, partitionExprs: Column*): 
Dataset[T] = withTypedPlan {
+val sortOrder: Seq[SortOrder] = partitionExprs.map { col =>
+  col.expr match {
+case expr: SortOrder =>
+  expr
+case expr: Expression =>
--- End diff --

```
Cannot evaluate expression: input[0, bigint, false] ASC NULLS FIRST
java.lang.UnsupportedOperationException: Cannot evaluate expression: 
input[0, bigint, false] ASC NULLS FIRST
at 
org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:259)
at 
org.apache.spark.sql.catalyst.expressions.SortOrder.doGenCode(SortOrder.scala:60)
```

Yeah. It also does not work. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153292668
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2747,9 +2755,41 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
   @scala.annotation.varargs
-  def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
-RepartitionByExpression(
-  partitionExprs.map(_.expr), logicalPlan, 
sparkSession.sessionState.conf.numShufflePartitions)
+  def repartition(partitionExprs: Column*): Dataset[T] = {
+repartition(sparkSession.sessionState.conf.numShufflePartitions, 
partitionExprs: _*)
+  }
+
+  /**
+   * Returns a new Dataset partitioned by the given partitioning 
expressions into
+   * `numPartitions`. The resulting Dataset is range partitioned.
+   *
+   * @group typedrel
+   * @since 2.3.0
+   */
+  @scala.annotation.varargs
+  def repartitionByRange(numPartitions: Int, partitionExprs: Column*): 
Dataset[T] = withTypedPlan {
+val sortOrder: Seq[SortOrder] = partitionExprs.map { col =>
+  col.expr match {
+case expr: SortOrder =>
+  expr
+case expr: Expression =>
--- End diff --

This is a more generic problem right? I think a similar error gets thrown 
if you do something like this: `spark.range(10).select($"id".asc + 1).show()`

Let's fix that in a different ticket.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153269248
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2757,4 +2759,43 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 }
   }
+
+  test("repartitionByRange") {
--- End diff --

Move it to `DataFrameSuite`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153275651
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -838,6 +839,27 @@ case class RepartitionByExpression(
 
   require(numPartitions > 0, s"Number of partitions ($numPartitions) must 
be positive.")
 
+  require(partitionExpressions.nonEmpty, "At least one partition-by 
expression must be specified.")
+
+  val partitioning: Partitioning = {
+val (sortOrder, nonSortOrder) = 
partitionExpressions.partition(_.isInstanceOf[SortOrder])
--- End diff --

Still the same question. What happened when the SortOrder is not at the 
root node. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153274811
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2747,9 +2755,41 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
   @scala.annotation.varargs
-  def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
-RepartitionByExpression(
-  partitionExprs.map(_.expr), logicalPlan, 
sparkSession.sessionState.conf.numShufflePartitions)
+  def repartition(partitionExprs: Column*): Dataset[T] = {
+repartition(sparkSession.sessionState.conf.numShufflePartitions, 
partitionExprs: _*)
+  }
+
+  /**
+   * Returns a new Dataset partitioned by the given partitioning 
expressions into
+   * `numPartitions`. The resulting Dataset is range partitioned.
+   *
+   * @group typedrel
+   * @since 2.3.0
+   */
+  @scala.annotation.varargs
+  def repartitionByRange(numPartitions: Int, partitionExprs: Column*): 
Dataset[T] = withTypedPlan {
+val sortOrder: Seq[SortOrder] = partitionExprs.map { col =>
+  col.expr match {
+case expr: SortOrder =>
+  expr
+case expr: Expression =>
--- End diff --

What happened if we have a `SortOrder ` that is not in the root node of 
`expr`?

```Scala
data1d.toDF("val").repartitionByRange(data1d.size, $"val".desc + 1)
  .select(spark_partition_id().as("id"), $"val").show()
```

```

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, 
tree:
Exchange rangepartitioning((val#236 DESC NULLS LAST + 1) ASC NULLS FIRST, 
10)
+- LocalTableScan [val#236]

at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:116)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:113)
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153273279
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2747,9 +2755,41 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
   @scala.annotation.varargs
-  def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
-RepartitionByExpression(
-  partitionExprs.map(_.expr), logicalPlan, 
sparkSession.sessionState.conf.numShufflePartitions)
+  def repartition(partitionExprs: Column*): Dataset[T] = {
+repartition(sparkSession.sessionState.conf.numShufflePartitions, 
partitionExprs: _*)
+  }
+
+  /**
+   * Returns a new Dataset partitioned by the given partitioning 
expressions into
+   * `numPartitions`. The resulting Dataset is range partitioned.
+   *
+   * @group typedrel
+   * @since 2.3.0
+   */
+  @scala.annotation.varargs
+  def repartitionByRange(numPartitions: Int, partitionExprs: Column*): 
Dataset[T] = withTypedPlan {
--- End diff --

Open a JIRA for adding the corresponding API in PySpark?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153244046
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -448,8 +448,15 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   case r: logical.Range =>
 execution.RangeExec(r) :: Nil
   case logical.RepartitionByExpression(expressions, child, 
numPartitions) =>
-exchange.ShuffleExchangeExec(HashPartitioning(
-  expressions, numPartitions), planLater(child)) :: Nil
+// RepartitionByExpression's constructor verifies that either all 
expressions are
+// of type SortOrder, in which case we're doing RangePartitioning, 
or none of them are,
+// in which case we're doing HashPartitioning.
+val partitioning = if 
(expressions.forall(_.isInstanceOf[SortOrder])) {
--- End diff --

Oh, and it also makes it easier to unit test this code :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153243250
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2757,4 +2759,43 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 }
   }
+
+  test("repartitionByRange") {
+val data1d = Random.shuffle(0.to(9))
+val data2d = data1d.map(i => (i, data1d.size - i))
+
+checkAnswer(
+  data1d.toDF("val").repartitionByRange(data1d.size, $"val".asc)
+.select(spark_partition_id().as("id"), $"val"),
+  data1d.map(i => Row(i, i)))
+
+checkAnswer(
+  data1d.toDF("val").repartitionByRange(data1d.size, $"val".desc)
+.select(spark_partition_id().as("id"), $"val"),
+  data1d.map(i => Row(i, data1d.size - 1 - i)))
+
+// .repartitionByRange() assumes .asc by default if no explicit sort 
order is specified
+checkAnswer(
+  data2d.toDF("a", "b").repartitionByRange(data1d.size, $"a".desc, 
$"b")
--- End diff --

`data1d.size`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153242566
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -448,8 +448,15 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   case r: logical.Range =>
 execution.RangeExec(r) :: Nil
   case logical.RepartitionByExpression(expressions, child, 
numPartitions) =>
-exchange.ShuffleExchangeExec(HashPartitioning(
-  expressions, numPartitions), planLater(child)) :: Nil
+// RepartitionByExpression's constructor verifies that either all 
expressions are
+// of type SortOrder, in which case we're doing RangePartitioning, 
or none of them are,
+// in which case we're doing HashPartitioning.
+val partitioning = if 
(expressions.forall(_.isInstanceOf[SortOrder])) {
--- End diff --

We have discussed this before, but to me it makes slightly more sense to 
add this logic to the `RepartitionByExpression` plan.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153242129
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -836,6 +836,16 @@ case class RepartitionByExpression(
 child: LogicalPlan,
 numPartitions: Int) extends RepartitionOperation {
 
+  val (sortOrder, nonSortOrder) = 
partitionExpressions.partition(_.isInstanceOf[SortOrder])
+
+  require(sortOrder.isEmpty || nonSortOrder.isEmpty,
+s"""${getClass.getSimpleName} expects that either all its 
`partitionExpressions` are of type
--- End diff --

Do you want this to be a multiline message? it makes sense to put the sort 
order and non sort order on new lines.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153241835
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2733,6 +2733,12 @@ class Dataset[T] private[sql](
*/
   @scala.annotation.varargs
   def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] 
= withTypedPlan {
+partitionExprs.find(_.expr.isInstanceOf[SortOrder]).foreach { 
sortOrder =>
--- End diff --

Use `collect` or `filter`? That way we can show all offending columns.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread adrian-ionescu
GitHub user adrian-ionescu opened a pull request:

https://github.com/apache/spark/pull/19828

[SPARK-22614] Dataset API: repartitionByRange(...)

## What changes were proposed in this pull request?

This PR introduces a way to explicitly range-partition a Dataset. So far, 
only round-robin and hash partitioning were possible via `df.repartition(...)`, 
but sometimes range partitioning might be desirable: e.g. when writing to disk, 
better compression without the cost of global sort.

The current implementation piggybacks on the existing 
`RepartitionByExpression` `LogicalPlan` and simply adds the following logic: If 
its expressions are of type `SortOrder`, then it will do `RangePartitioning`; 
otherwise `HashPartitioning`. This is by far the least intrusive approach.

I also considered:
- adding a new `RepartitionByRange` node, but that resulted in a lot of 
code duplication, touching 10+ files
- `RepartitionByExpression(child: LogicalPlan, partitioning: 
Partitioning)`, but that:
  - also involved touching a lot of the existing code
  - involved pulling a `catalyst.plans.physical` thing into 
`catalyst.plans.logical`
  - would have required special code for performing `Analysis` on the 
expressions under `Partitioning`, as that wouldn't happen by default anymore, 
as `Partitioning` isn't an `Expression`
- `RepartitionByExpression(child: LogicalPlan, partitioning: 
LogicalPartitioning)`, with `trait LogicalPartitioning extends Expression with 
Unevaluable` and corresponding subclasses for Range/Hash partitioning, but
  - required a lot of new code
  - basically turned `RepartitionByExpression` into a useless wrapper over 
`LogicalPartitioning`

## How was this patch tested?
Simple end-to-end test in `SQLQuerySuite`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/adrian-ionescu/apache-spark repartitionByRange

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19828.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19828


commit 08527b0c5da6e7ea3dc80392b92ea88f212e5761
Author: Adrian Ionescu 
Date:   2017-11-27T11:54:38Z

repartitionByRange() + repartition() with SortOrders

commit 950b3dc6429dcade98a9483d90c4f9773a6fc48e
Author: Adrian Ionescu 
Date:   2017-11-27T15:17:36Z

avoid changing semantics for .repartition()




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org