[ 
https://issues.apache.org/jira/browse/SPARK-40181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-40181:
---------------------------------
    Component/s: SQL

> DataFrame.intersect and .intersectAll are inconsistently dropping rows
> ----------------------------------------------------------------------
>
>                 Key: SPARK-40181
>                 URL: https://issues.apache.org/jira/browse/SPARK-40181
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 3.0.1
>            Reporter: Luke
>            Priority: Major
>
> I don't have a minimal reproducible example for this, but the place where it 
> shows up in our workflow is very simple.
> The data in "COLUMN" are a few hundred million distinct strings (gets 
> deduplicated in the plan also) and it is being compared against itself using 
> intersect.
> The code that is failing is essentially:
> {quote}values = [...] # python list containing many unique strings, none of 
> which are None
> df = spark.createDataFrame(
>     spark.sparkContext.parallelize(
>         [(value,) for value in values], numSlices=2 + len(values) // 10000
>     ),
>     schema=StructType([StructField("COLUMN", StringType())]),
> )
> df = df.distinct()
> assert df.count() == df.intersect(df).count()
> assert df.count() == df.intersectAll(df).count()
> {quote}
> The issue is that both of the above asserts sometimes pass, and sometimes 
> fail (technically we haven't seen intersectAll pass yet, but we have only 
> tried a few times). One thing which is striking is that if you call 
> df.intersect(df).count() multiple times, the returned count is not always the 
> same. Sometimes it is exactly df.count(), sometimes it is ~1% lower, but how 
> much lower exactly seems random.
> In particular, we have called df.intersect(df).count() twice in a row, and 
> got two different counts, which is very surprising given that df should be 
> deterministic, and suggests maybe there is some kind of 
> concurrency/inconsistent hashing issue?
> One other thing which is possibly noteworthy is that using df.join(df, 
> df.columns, how="inner") does seem to reliably have the desired behavior (not 
> dropping any rows).
> Here is the resulting plan from df.intersect(df)
> {quote}== Parsed Logical Plan ==
> 'Intersect false
> :- Deduplicate [COLUMN#144487]
> :  +- LogicalRDD [COLUMN#144487], false
> +- Deduplicate [COLUMN#144487]
>    +- LogicalRDD [COLUMN#144487], false
> == Analyzed Logical Plan ==
> COLUMN: string
> Intersect false
> :- Deduplicate [COLUMN#144487]
> :  +- LogicalRDD [COLUMN#144487], false
> +- Deduplicate [COLUMN#144523]
>    +- LogicalRDD [COLUMN#144523], false
> == Optimized Logical Plan ==
> Aggregate [COLUMN#144487], [COLUMN#144487]
> +- Join LeftSemi, (COLUMN#144487 <=> COLUMN#144523)
>    :- LogicalRDD [COLUMN#144487], false
>    +- Aggregate [COLUMN#144523], [COLUMN#144523]
>       +- LogicalRDD [COLUMN#144523], false
> == Physical Plan ==
> *(7) HashAggregate(keys=[COLUMN#144487], functions=[], output=[COLUMN#144487])
> +- Exchange hashpartitioning(COLUMN#144487, 200), true, [id=#22790]
>    +- *(6) HashAggregate(keys=[COLUMN#144487], functions=[], 
> output=[COLUMN#144487])
>       +- *(6) SortMergeJoin [coalesce(COLUMN#144487, ), 
> isnull(COLUMN#144487)], [coalesce(COLUMN#144523, ), isnull(COLUMN#144523)], 
> LeftSemi
>          :- *(2) Sort [coalesce(COLUMN#144487, ) ASC NULLS FIRST, 
> isnull(COLUMN#144487) ASC NULLS FIRST], false, 0
>          :  +- Exchange hashpartitioning(coalesce(COLUMN#144487, ), 
> isnull(COLUMN#144487), 200), true, [id=#22772]
>          :     +- *(1) Scan ExistingRDD[COLUMN#144487]
>          +- *(5) Sort [coalesce(COLUMN#144523, ) ASC NULLS FIRST, 
> isnull(COLUMN#144523) ASC NULLS FIRST], false, 0
>             +- Exchange hashpartitioning(coalesce(COLUMN#144523, ), 
> isnull(COLUMN#144523), 200), true, [id=#22782]
>                +- *(4) HashAggregate(keys=[COLUMN#144523], functions=[], 
> output=[COLUMN#144523])
>                   +- Exchange hashpartitioning(COLUMN#144523, 200), true, 
> [id=#22778]
>                      +- *(3) HashAggregate(keys=[COLUMN#144523], 
> functions=[], output=[COLUMN#144523])
>                         +- *(3) Scan ExistingRDD[COLUMN#144523]
> {quote}
> and for df.intersectAll(df)
> {quote}== Parsed Logical Plan ==
> 'IntersectAll true
> :- Deduplicate [COLUMN#144487]
> :  +- LogicalRDD [COLUMN#144487], false
> +- Deduplicate [COLUMN#144487]
>    +- LogicalRDD [COLUMN#144487], false
> == Analyzed Logical Plan ==
> COLUMN: string
> IntersectAll true
> :- Deduplicate [COLUMN#144487]
> :  +- LogicalRDD [COLUMN#144487], false
> +- Deduplicate [COLUMN#144533]
>    +- LogicalRDD [COLUMN#144533], false
> == Optimized Logical Plan ==
> Project [COLUMN#144487]
> +- Generate replicaterows(min_count#144566L, COLUMN#144487), [1], false, 
> [COLUMN#144487]
>    +- Project [COLUMN#144487, if ((vcol1_count#144563L > 
> vcol2_count#144565L)) vcol2_count#144565L else vcol1_count#144563L AS 
> min_count#144566L]
>       +- Filter ((vcol1_count#144563L >= 1) AND (vcol2_count#144565L >= 1))
>          +- Aggregate [COLUMN#144487], [count(vcol1#144558) AS 
> vcol1_count#144563L, count(vcol2#144561) AS vcol2_count#144565L, 
> COLUMN#144487]
>             +- Union
>                :- Aggregate [COLUMN#144487], [true AS vcol1#144558, null AS 
> vcol2#144561, COLUMN#144487]
>                :  +- LogicalRDD [COLUMN#144487], false
>                +- Aggregate [COLUMN#144533], [null AS vcol1#144559, true AS 
> vcol2#144560, COLUMN#144533]
>                   +- LogicalRDD [COLUMN#144533], false
> == Physical Plan ==
> *(7) Project [COLUMN#144487]
> +- Generate replicaterows(min_count#144566L, COLUMN#144487), [COLUMN#144487], 
> false, [COLUMN#144487]
>    +- *(6) Project [COLUMN#144487, if ((vcol1_count#144563L > 
> vcol2_count#144565L)) vcol2_count#144565L else vcol1_count#144563L AS 
> min_count#144566L]
>       +- *(6) Filter ((vcol1_count#144563L >= 1) AND (vcol2_count#144565L >= 
> 1))
>          +- *(6) HashAggregate(keys=[COLUMN#144487], 
> functions=[count(vcol1#144558), count(vcol2#144561)], 
> output=[vcol1_count#144563L, vcol2_count#144565L, COLUMN#144487])
>             +- Exchange hashpartitioning(COLUMN#144487, 200), true, 
> [id=#23310]
>                +- *(5) HashAggregate(keys=[COLUMN#144487], 
> functions=[partial_count(vcol1#144558), partial_count(vcol2#144561)], 
> output=[COLUMN#144487, count#144569L, count#144570L])
>                   +- Union
>                      :- *(2) HashAggregate(keys=[COLUMN#144487], 
> functions=[], output=[vcol1#144558, vcol2#144561, COLUMN#144487])
>                      :  +- Exchange hashpartitioning(COLUMN#144487, 200), 
> true, [id=#23267]
>                      :     +- *(1) HashAggregate(keys=[COLUMN#144487], 
> functions=[], output=[COLUMN#144487])
>                      :        +- *(1) Scan ExistingRDD[COLUMN#144487]
>                      +- *(4) HashAggregate(keys=[COLUMN#144533], 
> functions=[], output=[vcol1#144559, vcol2#144560, COLUMN#144533])
>                         +- ReusedExchange [COLUMN#144533], Exchange 
> hashpartitioning(COLUMN#144487, 200), true, [id=#23267] 
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to