[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19828 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user adrian-ionescu commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r154159546 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -839,8 +839,6 @@ case class RepartitionByExpression( require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") - require(partitionExpressions.nonEmpty, "At least one partition-by expression must be specified.") --- End diff -- That would change the current behavior of '.repartition(numPartitions, Seq.empty: _*)' and I'd like to avoid that. In fact, I've just raised a separate ticket about the latter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r154158225 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -839,8 +839,6 @@ case class RepartitionByExpression( require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") - require(partitionExpressions.nonEmpty, "At least one partition-by expression must be specified.") --- End diff -- Just for safety, also keep this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r154149190 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -838,6 +839,30 @@ case class RepartitionByExpression( require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") + val partitioning: Partitioning = { +val (sortOrder, nonSortOrder) = partitionExpressions.partition(_.isInstanceOf[SortOrder]) + +require(sortOrder.isEmpty || nonSortOrder.isEmpty, + s"${getClass.getSimpleName} expects that either all its `partitionExpressions` are of type " + +"`SortOrder`, which means `RangePartitioning`, or none of them are `SortOrder`, which " + +"means `HashPartitioning`. In this case we have:" + + s --- End diff -- This still exists after we revert the previous changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153857985 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2733,23 +2733,63 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan { +// The underlying `LogicalPlan` operator special-cases all-`SortOrder` arguments. +// However, we don't want to complicate the semantics of this API method. Instead, let's +// give users a friendly error message, pointing them to the new method. +val sortOrders = partitionExprs.filter(_.expr.isInstanceOf[SortOrder]) +if (sortOrders.nonEmpty) throw new IllegalArgumentException( + s"""Invalid partitionExprs specified: $sortOrders + |For range partitioning use repartitionByRange(...) instead. + """.stripMargin) RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions) } /** - * Returns a new Dataset partitioned by the given partitioning expressions, using - * `spark.sql.shuffle.partitions` as number of partitions. - * The resulting Dataset is hash partitioned. + * Returns a new Dataset that is hash partitioned by the given expressions, using + * `spark.sql.shuffle.partitions` as the number of partitions. If no expressions are specified, + * round robin partitioning is used. * * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). * * @group typedrel * @since 2.0.0 */ @scala.annotation.varargs - def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan { -RepartitionByExpression( - partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions) + def repartition(partitionExprs: Column*): Dataset[T] = { +repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + } + + /** + * Returns a new Dataset that is hash partitioned by the given expressions into `numPartitions`. --- End diff -- `hash` -> `range` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user adrian-ionescu commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153755261 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -838,6 +839,30 @@ case class RepartitionByExpression( require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") + val partitioning: Partitioning = { +val (sortOrder, nonSortOrder) = partitionExpressions.partition(_.isInstanceOf[SortOrder]) + +require(sortOrder.isEmpty || nonSortOrder.isEmpty, + s"${getClass.getSimpleName} expects that either all its `partitionExpressions` are of type " + +"`SortOrder`, which means `RangePartitioning`, or none of them are `SortOrder`, which " + +"means `HashPartitioning`. In this case we have:" + + s --- End diff -- nice catch :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153619969 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2747,9 +2755,41 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @scala.annotation.varargs - def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan { -RepartitionByExpression( - partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions) + def repartition(partitionExprs: Column*): Dataset[T] = { +repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + } + + /** + * Returns a new Dataset partitioned by the given partitioning expressions into + * `numPartitions`. The resulting Dataset is range partitioned. --- End diff -- Could you update this to describe the latest change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153606380 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -838,6 +839,30 @@ case class RepartitionByExpression( require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") + val partitioning: Partitioning = { +val (sortOrder, nonSortOrder) = partitionExpressions.partition(_.isInstanceOf[SortOrder]) + +require(sortOrder.isEmpty || nonSortOrder.isEmpty, + s"${getClass.getSimpleName} expects that either all its `partitionExpressions` are of type " + +"`SortOrder`, which means `RangePartitioning`, or none of them are `SortOrder`, which " + +"means `HashPartitioning`. In this case we have:" + + s --- End diff -- `"` * 4? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user adrian-ionescu commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153410340 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2747,9 +2755,41 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @scala.annotation.varargs - def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan { -RepartitionByExpression( - partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions) + def repartition(partitionExprs: Column*): Dataset[T] = { +repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + } + + /** + * Returns a new Dataset partitioned by the given partitioning expressions into + * `numPartitions`. The resulting Dataset is range partitioned. + * + * @group typedrel + * @since 2.3.0 + */ + @scala.annotation.varargs + def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan { --- End diff -- Good call! Raised [SPARK-22624](https://issues.apache.org/jira/browse/SPARK-22624). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user adrian-ionescu commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153409247 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -838,6 +839,27 @@ case class RepartitionByExpression( require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") + require(partitionExpressions.nonEmpty, "At least one partition-by expression must be specified.") + + val partitioning: Partitioning = { +val (sortOrder, nonSortOrder) = partitionExpressions.partition(_.isInstanceOf[SortOrder]) --- End diff -- It's going to follow the `HashPartitioning` path and eventually lead to a "Cannot evaluate expression" exception, just like it would presently do if you tried running `df.repartition($"col".asc +1)` or `df.sort($"col".asc + 1)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153337049 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2747,9 +2755,41 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @scala.annotation.varargs - def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan { -RepartitionByExpression( - partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions) + def repartition(partitionExprs: Column*): Dataset[T] = { +repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + } + + /** + * Returns a new Dataset partitioned by the given partitioning expressions into + * `numPartitions`. The resulting Dataset is range partitioned. + * + * @group typedrel + * @since 2.3.0 + */ + @scala.annotation.varargs + def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan { +val sortOrder: Seq[SortOrder] = partitionExprs.map { col => + col.expr match { +case expr: SortOrder => + expr +case expr: Expression => --- End diff -- The error is slightly different because the project is whole stage code generated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153335289 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2747,9 +2755,41 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @scala.annotation.varargs - def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan { -RepartitionByExpression( - partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions) + def repartition(partitionExprs: Column*): Dataset[T] = { +repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + } + + /** + * Returns a new Dataset partitioned by the given partitioning expressions into + * `numPartitions`. The resulting Dataset is range partitioned. + * + * @group typedrel + * @since 2.3.0 + */ + @scala.annotation.varargs + def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan { +val sortOrder: Seq[SortOrder] = partitionExprs.map { col => + col.expr match { +case expr: SortOrder => + expr +case expr: Expression => --- End diff -- ``` Cannot evaluate expression: input[0, bigint, false] ASC NULLS FIRST java.lang.UnsupportedOperationException: Cannot evaluate expression: input[0, bigint, false] ASC NULLS FIRST at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:259) at org.apache.spark.sql.catalyst.expressions.SortOrder.doGenCode(SortOrder.scala:60) ``` Yeah. It also does not work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153292668 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2747,9 +2755,41 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @scala.annotation.varargs - def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan { -RepartitionByExpression( - partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions) + def repartition(partitionExprs: Column*): Dataset[T] = { +repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + } + + /** + * Returns a new Dataset partitioned by the given partitioning expressions into + * `numPartitions`. The resulting Dataset is range partitioned. + * + * @group typedrel + * @since 2.3.0 + */ + @scala.annotation.varargs + def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan { +val sortOrder: Seq[SortOrder] = partitionExprs.map { col => + col.expr match { +case expr: SortOrder => + expr +case expr: Expression => --- End diff -- This is a more generic problem right? I think a similar error gets thrown if you do something like this: `spark.range(10).select($"id".asc + 1).show()` Let's fix that in a different ticket. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153269248 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2757,4 +2759,43 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("repartitionByRange") { --- End diff -- Move it to `DataFrameSuite`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153275651 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -838,6 +839,27 @@ case class RepartitionByExpression( require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") + require(partitionExpressions.nonEmpty, "At least one partition-by expression must be specified.") + + val partitioning: Partitioning = { +val (sortOrder, nonSortOrder) = partitionExpressions.partition(_.isInstanceOf[SortOrder]) --- End diff -- Still the same question. What happened when the SortOrder is not at the root node. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153274811 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2747,9 +2755,41 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @scala.annotation.varargs - def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan { -RepartitionByExpression( - partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions) + def repartition(partitionExprs: Column*): Dataset[T] = { +repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + } + + /** + * Returns a new Dataset partitioned by the given partitioning expressions into + * `numPartitions`. The resulting Dataset is range partitioned. + * + * @group typedrel + * @since 2.3.0 + */ + @scala.annotation.varargs + def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan { +val sortOrder: Seq[SortOrder] = partitionExprs.map { col => + col.expr match { +case expr: SortOrder => + expr +case expr: Expression => --- End diff -- What happened if we have a `SortOrder ` that is not in the root node of `expr`? ```Scala data1d.toDF("val").repartitionByRange(data1d.size, $"val".desc + 1) .select(spark_partition_id().as("id"), $"val").show() ``` ``` org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange rangepartitioning((val#236 DESC NULLS LAST + 1) ASC NULLS FIRST, 10) +- LocalTableScan [val#236] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:116) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:113) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153273279 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2747,9 +2755,41 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @scala.annotation.varargs - def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan { -RepartitionByExpression( - partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions) + def repartition(partitionExprs: Column*): Dataset[T] = { +repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + } + + /** + * Returns a new Dataset partitioned by the given partitioning expressions into + * `numPartitions`. The resulting Dataset is range partitioned. + * + * @group typedrel + * @since 2.3.0 + */ + @scala.annotation.varargs + def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan { --- End diff -- Open a JIRA for adding the corresponding API in PySpark? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153244046 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -448,8 +448,15 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case r: logical.Range => execution.RangeExec(r) :: Nil case logical.RepartitionByExpression(expressions, child, numPartitions) => -exchange.ShuffleExchangeExec(HashPartitioning( - expressions, numPartitions), planLater(child)) :: Nil +// RepartitionByExpression's constructor verifies that either all expressions are +// of type SortOrder, in which case we're doing RangePartitioning, or none of them are, +// in which case we're doing HashPartitioning. +val partitioning = if (expressions.forall(_.isInstanceOf[SortOrder])) { --- End diff -- Oh, and it also makes it easier to unit test this code :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153243250 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2757,4 +2759,43 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("repartitionByRange") { +val data1d = Random.shuffle(0.to(9)) +val data2d = data1d.map(i => (i, data1d.size - i)) + +checkAnswer( + data1d.toDF("val").repartitionByRange(data1d.size, $"val".asc) +.select(spark_partition_id().as("id"), $"val"), + data1d.map(i => Row(i, i))) + +checkAnswer( + data1d.toDF("val").repartitionByRange(data1d.size, $"val".desc) +.select(spark_partition_id().as("id"), $"val"), + data1d.map(i => Row(i, data1d.size - 1 - i))) + +// .repartitionByRange() assumes .asc by default if no explicit sort order is specified +checkAnswer( + data2d.toDF("a", "b").repartitionByRange(data1d.size, $"a".desc, $"b") --- End diff -- `data1d.size`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153242566 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -448,8 +448,15 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case r: logical.Range => execution.RangeExec(r) :: Nil case logical.RepartitionByExpression(expressions, child, numPartitions) => -exchange.ShuffleExchangeExec(HashPartitioning( - expressions, numPartitions), planLater(child)) :: Nil +// RepartitionByExpression's constructor verifies that either all expressions are +// of type SortOrder, in which case we're doing RangePartitioning, or none of them are, +// in which case we're doing HashPartitioning. +val partitioning = if (expressions.forall(_.isInstanceOf[SortOrder])) { --- End diff -- We have discussed this before, but to me it makes slightly more sense to add this logic to the `RepartitionByExpression` plan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153242129 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -836,6 +836,16 @@ case class RepartitionByExpression( child: LogicalPlan, numPartitions: Int) extends RepartitionOperation { + val (sortOrder, nonSortOrder) = partitionExpressions.partition(_.isInstanceOf[SortOrder]) + + require(sortOrder.isEmpty || nonSortOrder.isEmpty, +s"""${getClass.getSimpleName} expects that either all its `partitionExpressions` are of type --- End diff -- Do you want this to be a multiline message? it makes sense to put the sort order and non sort order on new lines. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19828#discussion_r153241835 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2733,6 +2733,12 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan { +partitionExprs.find(_.expr.isInstanceOf[SortOrder]).foreach { sortOrder => --- End diff -- Use `collect` or `filter`? That way we can show all offending columns. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......
GitHub user adrian-ionescu opened a pull request: https://github.com/apache/spark/pull/19828 [SPARK-22614] Dataset API: repartitionByRange(...) ## What changes were proposed in this pull request? This PR introduces a way to explicitly range-partition a Dataset. So far, only round-robin and hash partitioning were possible via `df.repartition(...)`, but sometimes range partitioning might be desirable: e.g. when writing to disk, better compression without the cost of global sort. The current implementation piggybacks on the existing `RepartitionByExpression` `LogicalPlan` and simply adds the following logic: If its expressions are of type `SortOrder`, then it will do `RangePartitioning`; otherwise `HashPartitioning`. This is by far the least intrusive approach. I also considered: - adding a new `RepartitionByRange` node, but that resulted in a lot of code duplication, touching 10+ files - `RepartitionByExpression(child: LogicalPlan, partitioning: Partitioning)`, but that: - also involved touching a lot of the existing code - involved pulling a `catalyst.plans.physical` thing into `catalyst.plans.logical` - would have required special code for performing `Analysis` on the expressions under `Partitioning`, as that wouldn't happen by default anymore, as `Partitioning` isn't an `Expression` - `RepartitionByExpression(child: LogicalPlan, partitioning: LogicalPartitioning)`, with `trait LogicalPartitioning extends Expression with Unevaluable` and corresponding subclasses for Range/Hash partitioning, but - required a lot of new code - basically turned `RepartitionByExpression` into a useless wrapper over `LogicalPartitioning` ## How was this patch tested? Simple end-to-end test in `SQLQuerySuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/adrian-ionescu/apache-spark repartitionByRange Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19828.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19828 commit 08527b0c5da6e7ea3dc80392b92ea88f212e5761 Author: Adrian Ionescu Date: 2017-11-27T11:54:38Z repartitionByRange() + repartition() with SortOrders commit 950b3dc6429dcade98a9483d90c4f9773a6fc48e Author: Adrian Ionescu Date: 2017-11-27T15:17:36Z avoid changing semantics for .repartition() --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org