Enrico Minack created SPARK-42168:
-------------------------------------

             Summary: CoGroup with window function returns incorrect result 
when partition keys differ in order
                 Key: SPARK-42168
                 URL: https://issues.apache.org/jira/browse/SPARK-42168
             Project: Spark
          Issue Type: Bug
          Components: PySpark, SQL
    Affects Versions: 3.2.3, 3.1.3, 3.0.3
            Reporter: Enrico Minack


The following example returns an incorrect result:
{code:java}
import pandas as pd

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, lit, sum

spark = SparkSession \
    .builder \
    .getOrCreate()

ids = 1000
days = 1000
parts = 10

id_df = spark.range(ids)
day_df = spark.range(days).withColumnRenamed("id", "day")
id_day_df = id_df.join(day_df)
left_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), 
lit("left").alias("side")).repartition(parts).cache()
right_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), 
lit("right").alias("side")).repartition(parts).cache()  
#.withColumnRenamed("id", "id2")

# note the column order is different to the groupBy("id", "day") column order 
below
window = Window.partitionBy("day", "id")

left_grouped_df = left_df.groupBy("id", "day")
right_grouped_df = right_df.withColumn("day_sum", 
sum(col("day")).over(window)).groupBy("id", "day")

def cogroup(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
    return pd.DataFrame([{
        "id": left["id"][0] if not left.empty else (right["id"][0] if not 
right.empty else None),
        "day": left["day"][0] if not left.empty else (right["day"][0] if not 
right.empty else None),
        "lefts": len(left.index),
        "rights": len(right.index)
    }])

df = left_grouped_df.cogroup(right_grouped_df) \
         .applyInPandas(cogroup, schema="id long, day long, lefts integer, 
rights integer")

df.explain()
df.show(5)
{code}
Output is
{code}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, 
day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, 
lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, 
[plan_id=117]
   :     +- InMemoryTableScan [id#8L, day#9L, id#8L, day#9L, side#10]
   :           +- InMemoryRelation [id#8L, day#9L, side#10], StorageLevel(disk, 
memory, deserialized, 1 replicas)
   :                 +- Exchange RoundRobinPartitioning(10), 
REPARTITION_BY_NUM, [plan_id=33]
   :                    +- *(2) Project [id#0L, day#4L, left AS side#10]
   :                       +- *(2) BroadcastNestedLoopJoin BuildRight, Inner
   :                          :- *(2) Range (0, 1000, step=1, splits=16)
   :                          +- BroadcastExchange IdentityBroadcastMode, 
[plan_id=28]
   :                             +- *(1) Project [id#2L AS day#4L]
   :                                +- *(1) Range (0, 1000, step=1, splits=16)
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
         +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS day_sum#54L], [day#30L, id#29L]
            +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(day#30L, id#29L, 200), 
ENSURE_REQUIREMENTS, [plan_id=112]
                  +- InMemoryTableScan [id#29L, day#30L, side#31]
                        +- InMemoryRelation [id#29L, day#30L, side#31], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                              +- Exchange RoundRobinPartitioning(10), 
REPARTITION_BY_NUM, [plan_id=79]
                                 +- *(2) Project [id#0L, day#4L, right AS 
side#31]
                                    +- *(2) BroadcastNestedLoopJoin BuildRight, 
Inner
                                       :- *(2) Range (0, 1000, step=1, 
splits=16)
                                       +- BroadcastExchange 
IdentityBroadcastMode, [plan_id=74]
                                          +- *(1) Project [id#2L AS day#4L]
                                             +- *(1) Range (0, 1000, step=1, 
splits=16)


+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0|  3|    0|     1|
|  0|  4|    0|     1|
|  0| 13|    1|     0|
|  0| 27|    0|     1|
|  0| 31|    0|     1|
+---+---+-----+------+
only showing top 5 rows
{code}
The first child is hash-partitioned by {{id}} and {{{}day{}}}, while the second 
child is hash-partitioned by {{day}} and {{id}} (required by the window 
function). Therefore, rows end up in different partitions.

This has been fixed in Spark 3.3 by 
[#32875|https://github.com/apache/spark/pull/32875/files#diff-e938569a4ca4eba8f7e10fe473d4f9c306ea253df151405bcaba880a601f075fR75-R76]:
{code}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, 
day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L)#63, [id#64L, day#65L, 
lefts#66, rights#67]
   :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, 
[plan_id=117]
   :     +- InMemoryTableScan [id#8L, day#9L, id#8L, day#9L, side#10]
   :           +- InMemoryRelation [id#8L, day#9L, side#10], StorageLevel(disk, 
memory, deserialized, 1 replicas)
   :                 +- Exchange RoundRobinPartitioning(10), 
REPARTITION_BY_NUM, [plan_id=33]
   :                    +- *(2) Project [id#0L, day#4L, left AS side#10]
   :                       +- *(2) BroadcastNestedLoopJoin BuildRight, Inner
   :                          :- *(2) Range (0, 1000, step=1, splits=16)
   :                          +- BroadcastExchange IdentityBroadcastMode, 
[plan_id=28]
   :                             +- *(1) Project [id#2L AS day#4L]
   :                                +- *(1) Range (0, 1000, step=1, splits=16)
   +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, 
[plan_id=118]
         +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
            +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS day_sum#54L], [day#30L, id#29L]
               +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], 
false, 0
                  +- Exchange hashpartitioning(day#30L, id#29L, 200), 
ENSURE_REQUIREMENTS, [plan_id=112]
                     +- InMemoryTableScan [id#29L, day#30L, side#31]
                           +- InMemoryRelation [id#29L, day#30L, side#31], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- Exchange RoundRobinPartitioning(10), 
REPARTITION_BY_NUM, [plan_id=79]
                                    +- *(2) Project [id#0L, day#4L, right AS 
side#31]
                                       +- *(2) BroadcastNestedLoopJoin 
BuildRight, Inner
                                          :- *(2) Range (0, 1000, step=1, 
splits=16)
                                          +- BroadcastExchange 
IdentityBroadcastMode, [plan_id=74]
                                             +- *(1) Project [id#2L AS day#4L]
                                                +- *(1) Range (0, 1000, step=1, 
splits=16)

+---+---+-----+------+
| id|day|lefts|rights|
+---+---+-----+------+
|  0| 13|    1|     1|
|  0| 63|    1|     1|
|  0| 89|    1|     1|
|  0| 95|    1|     1|
|  0| 96|    1|     1|
+---+---+-----+------+
only showing top 5 rows
{code}
Only PySpark seems to be affected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to