[ https://issues.apache.org/jira/browse/SPARK-40181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-40181: --------------------------------- Component/s: SQL > DataFrame.intersect and .intersectAll are inconsistently dropping rows > ---------------------------------------------------------------------- > > Key: SPARK-40181 > URL: https://issues.apache.org/jira/browse/SPARK-40181 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL > Affects Versions: 3.0.1 > Reporter: Luke > Priority: Major > > I don't have a minimal reproducible example for this, but the place where it > shows up in our workflow is very simple. > The data in "COLUMN" are a few hundred million distinct strings (gets > deduplicated in the plan also) and it is being compared against itself using > intersect. > The code that is failing is essentially: > {quote}values = [...] # python list containing many unique strings, none of > which are None > df = spark.createDataFrame( > spark.sparkContext.parallelize( > [(value,) for value in values], numSlices=2 + len(values) // 10000 > ), > schema=StructType([StructField("COLUMN", StringType())]), > ) > df = df.distinct() > assert df.count() == df.intersect(df).count() > assert df.count() == df.intersectAll(df).count() > {quote} > The issue is that both of the above asserts sometimes pass, and sometimes > fail (technically we haven't seen intersectAll pass yet, but we have only > tried a few times). One thing which is striking is that if you call > df.intersect(df).count() multiple times, the returned count is not always the > same. Sometimes it is exactly df.count(), sometimes it is ~1% lower, but how > much lower exactly seems random. > In particular, we have called df.intersect(df).count() twice in a row, and > got two different counts, which is very surprising given that df should be > deterministic, and suggests maybe there is some kind of > concurrency/inconsistent hashing issue? > One other thing which is possibly noteworthy is that using df.join(df, > df.columns, how="inner") does seem to reliably have the desired behavior (not > dropping any rows). > Here is the resulting plan from df.intersect(df) > {quote}== Parsed Logical Plan == > 'Intersect false > :- Deduplicate [COLUMN#144487] > : +- LogicalRDD [COLUMN#144487], false > +- Deduplicate [COLUMN#144487] > +- LogicalRDD [COLUMN#144487], false > == Analyzed Logical Plan == > COLUMN: string > Intersect false > :- Deduplicate [COLUMN#144487] > : +- LogicalRDD [COLUMN#144487], false > +- Deduplicate [COLUMN#144523] > +- LogicalRDD [COLUMN#144523], false > == Optimized Logical Plan == > Aggregate [COLUMN#144487], [COLUMN#144487] > +- Join LeftSemi, (COLUMN#144487 <=> COLUMN#144523) > :- LogicalRDD [COLUMN#144487], false > +- Aggregate [COLUMN#144523], [COLUMN#144523] > +- LogicalRDD [COLUMN#144523], false > == Physical Plan == > *(7) HashAggregate(keys=[COLUMN#144487], functions=[], output=[COLUMN#144487]) > +- Exchange hashpartitioning(COLUMN#144487, 200), true, [id=#22790] > +- *(6) HashAggregate(keys=[COLUMN#144487], functions=[], > output=[COLUMN#144487]) > +- *(6) SortMergeJoin [coalesce(COLUMN#144487, ), > isnull(COLUMN#144487)], [coalesce(COLUMN#144523, ), isnull(COLUMN#144523)], > LeftSemi > :- *(2) Sort [coalesce(COLUMN#144487, ) ASC NULLS FIRST, > isnull(COLUMN#144487) ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(coalesce(COLUMN#144487, ), > isnull(COLUMN#144487), 200), true, [id=#22772] > : +- *(1) Scan ExistingRDD[COLUMN#144487] > +- *(5) Sort [coalesce(COLUMN#144523, ) ASC NULLS FIRST, > isnull(COLUMN#144523) ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(coalesce(COLUMN#144523, ), > isnull(COLUMN#144523), 200), true, [id=#22782] > +- *(4) HashAggregate(keys=[COLUMN#144523], functions=[], > output=[COLUMN#144523]) > +- Exchange hashpartitioning(COLUMN#144523, 200), true, > [id=#22778] > +- *(3) HashAggregate(keys=[COLUMN#144523], > functions=[], output=[COLUMN#144523]) > +- *(3) Scan ExistingRDD[COLUMN#144523] > {quote} > and for df.intersectAll(df) > {quote}== Parsed Logical Plan == > 'IntersectAll true > :- Deduplicate [COLUMN#144487] > : +- LogicalRDD [COLUMN#144487], false > +- Deduplicate [COLUMN#144487] > +- LogicalRDD [COLUMN#144487], false > == Analyzed Logical Plan == > COLUMN: string > IntersectAll true > :- Deduplicate [COLUMN#144487] > : +- LogicalRDD [COLUMN#144487], false > +- Deduplicate [COLUMN#144533] > +- LogicalRDD [COLUMN#144533], false > == Optimized Logical Plan == > Project [COLUMN#144487] > +- Generate replicaterows(min_count#144566L, COLUMN#144487), [1], false, > [COLUMN#144487] > +- Project [COLUMN#144487, if ((vcol1_count#144563L > > vcol2_count#144565L)) vcol2_count#144565L else vcol1_count#144563L AS > min_count#144566L] > +- Filter ((vcol1_count#144563L >= 1) AND (vcol2_count#144565L >= 1)) > +- Aggregate [COLUMN#144487], [count(vcol1#144558) AS > vcol1_count#144563L, count(vcol2#144561) AS vcol2_count#144565L, > COLUMN#144487] > +- Union > :- Aggregate [COLUMN#144487], [true AS vcol1#144558, null AS > vcol2#144561, COLUMN#144487] > : +- LogicalRDD [COLUMN#144487], false > +- Aggregate [COLUMN#144533], [null AS vcol1#144559, true AS > vcol2#144560, COLUMN#144533] > +- LogicalRDD [COLUMN#144533], false > == Physical Plan == > *(7) Project [COLUMN#144487] > +- Generate replicaterows(min_count#144566L, COLUMN#144487), [COLUMN#144487], > false, [COLUMN#144487] > +- *(6) Project [COLUMN#144487, if ((vcol1_count#144563L > > vcol2_count#144565L)) vcol2_count#144565L else vcol1_count#144563L AS > min_count#144566L] > +- *(6) Filter ((vcol1_count#144563L >= 1) AND (vcol2_count#144565L >= > 1)) > +- *(6) HashAggregate(keys=[COLUMN#144487], > functions=[count(vcol1#144558), count(vcol2#144561)], > output=[vcol1_count#144563L, vcol2_count#144565L, COLUMN#144487]) > +- Exchange hashpartitioning(COLUMN#144487, 200), true, > [id=#23310] > +- *(5) HashAggregate(keys=[COLUMN#144487], > functions=[partial_count(vcol1#144558), partial_count(vcol2#144561)], > output=[COLUMN#144487, count#144569L, count#144570L]) > +- Union > :- *(2) HashAggregate(keys=[COLUMN#144487], > functions=[], output=[vcol1#144558, vcol2#144561, COLUMN#144487]) > : +- Exchange hashpartitioning(COLUMN#144487, 200), > true, [id=#23267] > : +- *(1) HashAggregate(keys=[COLUMN#144487], > functions=[], output=[COLUMN#144487]) > : +- *(1) Scan ExistingRDD[COLUMN#144487] > +- *(4) HashAggregate(keys=[COLUMN#144533], > functions=[], output=[vcol1#144559, vcol2#144560, COLUMN#144533]) > +- ReusedExchange [COLUMN#144533], Exchange > hashpartitioning(COLUMN#144487, 200), true, [id=#23267] > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org