[ https://issues.apache.org/jira/browse/SPARK-42168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Enrico Minack updated SPARK-42168: ---------------------------------- Description: The following example returns an incorrect result: {code:java} import pandas as pd from pyspark.sql import SparkSession, Window from pyspark.sql.functions import col, lit, sum spark = SparkSession \ .builder \ .getOrCreate() ids = 1000 days = 1000 parts = 10 id_df = spark.range(ids) day_df = spark.range(days).withColumnRenamed("id", "day") id_day_df = id_df.join(day_df) left_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), lit("left").alias("side")).repartition(parts).cache() right_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), lit("right").alias("side")).repartition(parts).cache() #.withColumnRenamed("id", "id2") # note the column order is different to the groupBy("id", "day") column order below window = Window.partitionBy("day", "id") left_grouped_df = left_df.groupBy("id", "day") right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day") def cogroup(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame: return pd.DataFrame([{ "id": left["id"][0] if not left.empty else (right["id"][0] if not right.empty else None), "day": left["day"][0] if not left.empty else (right["day"][0] if not right.empty else None), "lefts": len(left.index), "rights": len(right.index) }]) df = left_grouped_df.cogroup(right_grouped_df) \ .applyInPandas(cogroup, schema="id long, day long, lefts integer, rights integer") df.explain() df.show(5) {code} Output is {code} == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67] :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117] : +- ... +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L] +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112] +- ... +---+---+-----+------+ | id|day|lefts|rights| +---+---+-----+------+ | 0| 3| 0| 1| | 0| 4| 0| 1| | 0| 13| 1| 0| | 0| 27| 0| 1| | 0| 31| 0| 1| +---+---+-----+------+ only showing top 5 rows {code} The first child is hash-partitioned by {{id}} and {{{}day{}}}, while the second child is hash-partitioned by {{day}} and {{id}} (required by the window function). Therefore, rows end up in different partitions. This has been fixed in Spark 3.3 by [#32875|https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76]: {code} == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L)#63, [id#64L, day#65L, lefts#66, rights#67] :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117] : +- ... +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118] +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L] +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112] +- ... +---+---+-----+------+ | id|day|lefts|rights| +---+---+-----+------+ | 0| 13| 1| 1| | 0| 63| 1| 1| | 0| 89| 1| 1| | 0| 95| 1| 1| | 0| 96| 1| 1| +---+---+-----+------+ only showing top 5 rows {code} Only PySpark seems to be affected. was: The following example returns an incorrect result: {code:java} import pandas as pd from pyspark.sql import SparkSession, Window from pyspark.sql.functions import col, lit, sum spark = SparkSession \ .builder \ .getOrCreate() ids = 1000 days = 1000 parts = 10 id_df = spark.range(ids) day_df = spark.range(days).withColumnRenamed("id", "day") id_day_df = id_df.join(day_df) left_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), lit("left").alias("side")).repartition(parts).cache() right_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), lit("right").alias("side")).repartition(parts).cache() #.withColumnRenamed("id", "id2") # note the column order is different to the groupBy("id", "day") column order below window = Window.partitionBy("day", "id") left_grouped_df = left_df.groupBy("id", "day") right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day") def cogroup(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame: return pd.DataFrame([{ "id": left["id"][0] if not left.empty else (right["id"][0] if not right.empty else None), "day": left["day"][0] if not left.empty else (right["day"][0] if not right.empty else None), "lefts": len(left.index), "rights": len(right.index) }]) df = left_grouped_df.cogroup(right_grouped_df) \ .applyInPandas(cogroup, schema="id long, day long, lefts integer, rights integer") df.explain() df.show(5) {code} Output is {code} == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67] :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117] : +- InMemoryTableScan [id#8L, day#9L, id#8L, day#9L, side#10] : +- InMemoryRelation [id#8L, day#9L, side#10], StorageLevel(disk, memory, deserialized, 1 replicas) : +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=33] : +- *(2) Project [id#0L, day#4L, left AS side#10] : +- *(2) BroadcastNestedLoopJoin BuildRight, Inner : :- *(2) Range (0, 1000, step=1, splits=16) : +- BroadcastExchange IdentityBroadcastMode, [plan_id=28] : +- *(1) Project [id#2L AS day#4L] : +- *(1) Range (0, 1000, step=1, splits=16) +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L] +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112] +- InMemoryTableScan [id#29L, day#30L, side#31] +- InMemoryRelation [id#29L, day#30L, side#31], StorageLevel(disk, memory, deserialized, 1 replicas) +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=79] +- *(2) Project [id#0L, day#4L, right AS side#31] +- *(2) BroadcastNestedLoopJoin BuildRight, Inner :- *(2) Range (0, 1000, step=1, splits=16) +- BroadcastExchange IdentityBroadcastMode, [plan_id=74] +- *(1) Project [id#2L AS day#4L] +- *(1) Range (0, 1000, step=1, splits=16) +---+---+-----+------+ | id|day|lefts|rights| +---+---+-----+------+ | 0| 3| 0| 1| | 0| 4| 0| 1| | 0| 13| 1| 0| | 0| 27| 0| 1| | 0| 31| 0| 1| +---+---+-----+------+ only showing top 5 rows {code} The first child is hash-partitioned by {{id}} and {{{}day{}}}, while the second child is hash-partitioned by {{day}} and {{id}} (required by the window function). Therefore, rows end up in different partitions. This has been fixed in Spark 3.3 by [#32875|https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76]: {code} == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L)#63, [id#64L, day#65L, lefts#66, rights#67] :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117] : +- InMemoryTableScan [id#8L, day#9L, id#8L, day#9L, side#10] : +- InMemoryRelation [id#8L, day#9L, side#10], StorageLevel(disk, memory, deserialized, 1 replicas) : +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=33] : +- *(2) Project [id#0L, day#4L, left AS side#10] : +- *(2) BroadcastNestedLoopJoin BuildRight, Inner : :- *(2) Range (0, 1000, step=1, splits=16) : +- BroadcastExchange IdentityBroadcastMode, [plan_id=28] : +- *(1) Project [id#2L AS day#4L] : +- *(1) Range (0, 1000, step=1, splits=16) +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118] +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L] +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112] +- InMemoryTableScan [id#29L, day#30L, side#31] +- InMemoryRelation [id#29L, day#30L, side#31], StorageLevel(disk, memory, deserialized, 1 replicas) +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=79] +- *(2) Project [id#0L, day#4L, right AS side#31] +- *(2) BroadcastNestedLoopJoin BuildRight, Inner :- *(2) Range (0, 1000, step=1, splits=16) +- BroadcastExchange IdentityBroadcastMode, [plan_id=74] +- *(1) Project [id#2L AS day#4L] +- *(1) Range (0, 1000, step=1, splits=16) +---+---+-----+------+ | id|day|lefts|rights| +---+---+-----+------+ | 0| 13| 1| 1| | 0| 63| 1| 1| | 0| 89| 1| 1| | 0| 95| 1| 1| | 0| 96| 1| 1| +---+---+-----+------+ only showing top 5 rows {code} Only PySpark seems to be affected. > CoGroup with window function returns incorrect result when partition keys > differ in order > ----------------------------------------------------------------------------------------- > > Key: SPARK-42168 > URL: https://issues.apache.org/jira/browse/SPARK-42168 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL > Affects Versions: 3.0.3, 3.1.3, 3.2.3 > Reporter: Enrico Minack > Priority: Major > Labels: correctness > > The following example returns an incorrect result: > {code:java} > import pandas as pd > from pyspark.sql import SparkSession, Window > from pyspark.sql.functions import col, lit, sum > spark = SparkSession \ > .builder \ > .getOrCreate() > ids = 1000 > days = 1000 > parts = 10 > id_df = spark.range(ids) > day_df = spark.range(days).withColumnRenamed("id", "day") > id_day_df = id_df.join(day_df) > left_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), > lit("left").alias("side")).repartition(parts).cache() > right_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), > lit("right").alias("side")).repartition(parts).cache() > #.withColumnRenamed("id", "id2") > # note the column order is different to the groupBy("id", "day") column order > below > window = Window.partitionBy("day", "id") > left_grouped_df = left_df.groupBy("id", "day") > right_grouped_df = right_df.withColumn("day_sum", > sum(col("day")).over(window)).groupBy("id", "day") > def cogroup(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame: > return pd.DataFrame([{ > "id": left["id"][0] if not left.empty else (right["id"][0] if not > right.empty else None), > "day": left["day"][0] if not left.empty else (right["day"][0] if not > right.empty else None), > "lefts": len(left.index), > "rights": len(right.index) > }]) > df = left_grouped_df.cogroup(right_grouped_df) \ > .applyInPandas(cogroup, schema="id long, day long, lefts integer, > rights integer") > df.explain() > df.show(5) > {code} > Output is > {code} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, > day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, > lefts#66, rights#67] > :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, > [plan_id=117] > : +- ... > +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 > +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] > +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, > specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) > AS day_sum#54L], [day#30L, id#29L] > +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, > 0 > +- Exchange hashpartitioning(day#30L, id#29L, 200), > ENSURE_REQUIREMENTS, [plan_id=112] > +- ... > +---+---+-----+------+ > | id|day|lefts|rights| > +---+---+-----+------+ > | 0| 3| 0| 1| > | 0| 4| 0| 1| > | 0| 13| 1| 0| > | 0| 27| 0| 1| > | 0| 31| 0| 1| > +---+---+-----+------+ > only showing top 5 rows > {code} > The first child is hash-partitioned by {{id}} and {{{}day{}}}, while the > second child is hash-partitioned by {{day}} and {{id}} (required by the > window function). Therefore, rows end up in different partitions. > This has been fixed in Spark 3.3 by > [#32875|https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76]: > {code} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, > day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L)#63, [id#64L, day#65L, > lefts#66, rights#67] > :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, > [plan_id=117] > : +- ... > +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#29L, day#30L, 200), > ENSURE_REQUIREMENTS, [plan_id=118] > +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L] > +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, > specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) > AS day_sum#54L], [day#30L, id#29L] > +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], > false, 0 > +- Exchange hashpartitioning(day#30L, id#29L, 200), > ENSURE_REQUIREMENTS, [plan_id=112] > +- ... > +---+---+-----+------+ > | id|day|lefts|rights| > +---+---+-----+------+ > | 0| 13| 1| 1| > | 0| 63| 1| 1| > | 0| 89| 1| 1| > | 0| 95| 1| 1| > | 0| 96| 1| 1| > +---+---+-----+------+ > only showing top 5 rows > {code} > Only PySpark seems to be affected. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org