cloud-fan commented on code in PR #52153:
URL: https://github.com/apache/spark/pull/52153#discussion_r2309674562
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala:
##########
@@ -1406,6 +1406,87 @@ class PlannerSuite extends SharedSparkSession with
AdaptiveSparkPlanHelper {
assert(planned.exists(_.isInstanceOf[GlobalLimitExec]))
assert(planned.exists(_.isInstanceOf[LocalLimitExec]))
}
+
+ test("SPARK-53401: repartitionById should throw an exception for negative
partition id") {
+ val df = spark.range(10).toDF("id")
+ val repartitioned = df.repartitionById(10, $"id" - 5)
+
+ val e = intercept[SparkException] {
+ repartitioned.collect()
+ }
+ // The error is caught as ArrayIndexOutOfBoundsException wrapped in
SparkException
+ assert(e.getMessage.contains("Index -5 out of bounds"))
+ }
+
+ test("SPARK-53401: repartitionById should throw an exception for partition
id >= numPartitions") {
+ val numPartitions = 10
+ val df = spark.range(20).toDF("id")
+ val repartitioned = df.repartitionById(numPartitions, $"id")
+
+ val e = intercept[SparkException] {
+ repartitioned.collect()
+ }
+ // ArrayIndexOutOfBoundsException for partition IDs >= numPartitions
+ assert(e.getMessage.contains("out of bounds"))
+ }
+
+ /**
+ * A helper function to check the number of shuffle exchanges in a physical
plan.
+ *
+ * @param df The DataFrame whose physical plan will be examined.
+ * @param expectedShuffles The expected number of shuffle exchanges.
+ */
+ private def checkShuffleCount(df: DataFrame, expectedShuffles: Int): Unit = {
+ val plan = df.queryExecution.executedPlan
+ val shuffles = collect(plan) {
+ case s: ShuffleExchangeLike => s
+ }
+ assert(
+ shuffles.size == expectedShuffles,
+ s"Expected $expectedShuffles shuffle(s), but found ${shuffles.size} in
the plan:\n$plan"
+ )
+ }
+
+ test("SPARK-53401: groupBy on a superset of partition keys should reuse the
shuffle") {
+ val df = spark.range(100).select($"id" % 10 as "key1", $"id" as "value")
+ val grouped = df.repartitionById(10, $"key1").groupBy($"key1",
lit(1)).count()
+ checkShuffleCount(grouped, 1)
+ }
+
+ test("SPARK-53401: shuffle reuse is not affected by
spark.sql.shuffle.partitions") {
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
+ val df = spark.range(100).select($"id" % 10 as "key", $"id" as "value")
+ val grouped = df.repartitionById(10, $"key").groupBy($"key").count()
+
+ checkShuffleCount(grouped, 1)
+ // The explicit repartition number should be respected, not the config
value.
+ assert(grouped.rdd.getNumPartitions == 10)
+ }
+ }
+
+ test("SPARK-53401: shuffle reuse with intervening narrow operations") {
+ val df = spark.range(100).select($"id" % 10 as "key", $"id" as "value")
+ val grouped =
+ df.repartitionById(10, $"key")
+ .filter($"value" > 50).groupBy($"key").count()
Review Comment:
so what this test proves is that `Filter` can propagate child's output
partitioning. which always proven by other tests and we don't need to verify it
again here.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala:
##########
@@ -1406,6 +1406,87 @@ class PlannerSuite extends SharedSparkSession with
AdaptiveSparkPlanHelper {
assert(planned.exists(_.isInstanceOf[GlobalLimitExec]))
assert(planned.exists(_.isInstanceOf[LocalLimitExec]))
}
+
+ test("SPARK-53401: repartitionById should throw an exception for negative
partition id") {
+ val df = spark.range(10).toDF("id")
+ val repartitioned = df.repartitionById(10, $"id" - 5)
+
+ val e = intercept[SparkException] {
+ repartitioned.collect()
+ }
+ // The error is caught as ArrayIndexOutOfBoundsException wrapped in
SparkException
+ assert(e.getMessage.contains("Index -5 out of bounds"))
+ }
+
+ test("SPARK-53401: repartitionById should throw an exception for partition
id >= numPartitions") {
+ val numPartitions = 10
+ val df = spark.range(20).toDF("id")
+ val repartitioned = df.repartitionById(numPartitions, $"id")
+
+ val e = intercept[SparkException] {
+ repartitioned.collect()
+ }
+ // ArrayIndexOutOfBoundsException for partition IDs >= numPartitions
+ assert(e.getMessage.contains("out of bounds"))
+ }
+
+ /**
+ * A helper function to check the number of shuffle exchanges in a physical
plan.
+ *
+ * @param df The DataFrame whose physical plan will be examined.
+ * @param expectedShuffles The expected number of shuffle exchanges.
+ */
+ private def checkShuffleCount(df: DataFrame, expectedShuffles: Int): Unit = {
+ val plan = df.queryExecution.executedPlan
+ val shuffles = collect(plan) {
+ case s: ShuffleExchangeLike => s
+ }
+ assert(
+ shuffles.size == expectedShuffles,
+ s"Expected $expectedShuffles shuffle(s), but found ${shuffles.size} in
the plan:\n$plan"
+ )
+ }
+
+ test("SPARK-53401: groupBy on a superset of partition keys should reuse the
shuffle") {
+ val df = spark.range(100).select($"id" % 10 as "key1", $"id" as "value")
+ val grouped = df.repartitionById(10, $"key1").groupBy($"key1",
lit(1)).count()
+ checkShuffleCount(grouped, 1)
+ }
+
+ test("SPARK-53401: shuffle reuse is not affected by
spark.sql.shuffle.partitions") {
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
+ val df = spark.range(100).select($"id" % 10 as "key", $"id" as "value")
+ val grouped = df.repartitionById(10, $"key").groupBy($"key").count()
+
+ checkShuffleCount(grouped, 1)
+ // The explicit repartition number should be respected, not the config
value.
+ assert(grouped.rdd.getNumPartitions == 10)
+ }
+ }
+
+ test("SPARK-53401: shuffle reuse with intervening narrow operations") {
+ val df = spark.range(100).select($"id" % 10 as "key", $"id" as "value")
+ val grouped =
+ df.repartitionById(10, $"key")
+ .filter($"value" > 50).groupBy($"key").count()
Review Comment:
so what this test proves is that `Filter` can propagate child's output
partitioning, which is already proven by other tests and we don't need to
verify it again here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]