cloud-fan commented on code in PR #52153:
URL: https://github.com/apache/spark/pull/52153#discussion_r2309677041
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala:
##########
@@ -1406,6 +1406,87 @@ class PlannerSuite extends SharedSparkSession with
AdaptiveSparkPlanHelper {
assert(planned.exists(_.isInstanceOf[GlobalLimitExec]))
assert(planned.exists(_.isInstanceOf[LocalLimitExec]))
}
+
+ test("SPARK-53401: repartitionById should throw an exception for negative
partition id") {
+ val df = spark.range(10).toDF("id")
+ val repartitioned = df.repartitionById(10, $"id" - 5)
+
+ val e = intercept[SparkException] {
+ repartitioned.collect()
+ }
+ // The error is caught as ArrayIndexOutOfBoundsException wrapped in
SparkException
+ assert(e.getMessage.contains("Index -5 out of bounds"))
+ }
+
+ test("SPARK-53401: repartitionById should throw an exception for partition
id >= numPartitions") {
+ val numPartitions = 10
+ val df = spark.range(20).toDF("id")
+ val repartitioned = df.repartitionById(numPartitions, $"id")
+
+ val e = intercept[SparkException] {
+ repartitioned.collect()
+ }
+ // ArrayIndexOutOfBoundsException for partition IDs >= numPartitions
+ assert(e.getMessage.contains("out of bounds"))
+ }
+
+ /**
+ * A helper function to check the number of shuffle exchanges in a physical
plan.
+ *
+ * @param df The DataFrame whose physical plan will be examined.
+ * @param expectedShuffles The expected number of shuffle exchanges.
+ */
+ private def checkShuffleCount(df: DataFrame, expectedShuffles: Int): Unit = {
+ val plan = df.queryExecution.executedPlan
+ val shuffles = collect(plan) {
+ case s: ShuffleExchangeLike => s
+ }
+ assert(
+ shuffles.size == expectedShuffles,
+ s"Expected $expectedShuffles shuffle(s), but found ${shuffles.size} in
the plan:\n$plan"
+ )
+ }
+
+ test("SPARK-53401: groupBy on a superset of partition keys should reuse the
shuffle") {
+ val df = spark.range(100).select($"id" % 10 as "key1", $"id" as "value")
+ val grouped = df.repartitionById(10, $"key1").groupBy($"key1",
lit(1)).count()
+ checkShuffleCount(grouped, 1)
+ }
+
+ test("SPARK-53401: shuffle reuse is not affected by
spark.sql.shuffle.partitions") {
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
+ val df = spark.range(100).select($"id" % 10 as "key", $"id" as "value")
+ val grouped = df.repartitionById(10, $"key").groupBy($"key").count()
+
+ checkShuffleCount(grouped, 1)
+ // The explicit repartition number should be respected, not the config
value.
+ assert(grouped.rdd.getNumPartitions == 10)
+ }
+ }
+
+ test("SPARK-53401: shuffle reuse with intervening narrow operations") {
+ val df = spark.range(100).select($"id" % 10 as "key", $"id" as "value")
+ val grouped =
+ df.repartitionById(10, $"key")
+ .filter($"value" > 50).groupBy($"key").count()
+ checkShuffleCount(grouped, 1)
+ }
+
+ test("SPARK-53401: shuffle reuse after a join that preserves partitioning") {
Review Comment:
I think a more interesting test is to prove that a join with id pass-through
and hash partitioning will still do a shuffle on the id pass-through side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]