[ https://issues.apache.org/jira/browse/SPARK-24588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiao Li updated SPARK-24588: ---------------------------- Description: In [https://github.com/apache/spark/pull/19080], we simplified the distribution/partitioning framework, and make all the join-like operators require HashClusteredDistribution from children. Unfortunately streaming join operator was missed. This can cause wrong result. Think about {code:java} val input1 = MemoryStream[Int] val input2 = MemoryStream[Int] val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b) val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b) val joined = df1.join(df2, Seq("a", "b")).select('a) {code} The physical plan is {code:java} *(3) Project [a#5, b#6, c#7, c#14] +- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ] :- Exchange hashpartitioning(a#5, b#6, 5) : +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS c#7] : +- StreamingRelation MemoryStream[value#1], [value#1] +- Exchange hashpartitioning(b#13, 5) +- *(2) Project [value#3 AS a#12, (value#3 * 3) AS b#13, (value#3 * 4) AS c#14] +- StreamingRelation MemoryStream[value#3], [value#3] {code} The left table is hash partitioned by a, b, while the right table is hash partitioned by b. This means, we may have a matching record that is in different partitions, which should be in the output but not. was: In [https://github.com/apache/spark/pull/19080], we simplified the distribution/partitioning framework, and make all the join-like operators require HashClusteredPartitioning from children. Unfortunately streaming join operator was missed. This can cause wrong result. Think about {code:java} val input1 = MemoryStream[Int] val input2 = MemoryStream[Int] val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b) val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b) val joined = df1.join(df2, Seq("a", "b")).select('a) {code} The physical plan is {code:java} *(3) Project [a#5, b#6, c#7, c#14] +- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ] :- Exchange hashpartitioning(a#5, b#6, 5) : +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS c#7] : +- StreamingRelation MemoryStream[value#1], [value#1] +- Exchange hashpartitioning(b#13, 5) +- *(2) Project [value#3 AS a#12, (value#3 * 3) AS b#13, (value#3 * 4) AS c#14] +- StreamingRelation MemoryStream[value#3], [value#3] {code} The left table is hash partitioned by a, b, while the right table is hash partitioned by b. This means, we may have a matching record that is in different partitions, which should be in the output but not. > StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from > children > ------------------------------------------------------------------------------------- > > Key: SPARK-24588 > URL: https://issues.apache.org/jira/browse/SPARK-24588 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.0, 2.3.1 > Reporter: Wenchen Fan > Assignee: Wenchen Fan > Priority: Blocker > Labels: correctness > Fix For: 2.3.2, 2.4.0 > > > In [https://github.com/apache/spark/pull/19080], we simplified the > distribution/partitioning framework, and make all the join-like operators > require HashClusteredDistribution from children. Unfortunately streaming join > operator was missed. > This can cause wrong result. Think about > {code:java} > val input1 = MemoryStream[Int] > val input2 = MemoryStream[Int] > val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b) > val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b) > val joined = df1.join(df2, Seq("a", "b")).select('a) > {code} > The physical plan is > {code:java} > *(3) Project [a#5, b#6, c#7, c#14] > +- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ > leftOnly = null, rightOnly = null, both = null, full = null ], state info [ > checkpoint = <unknown>, runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = > 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ] > :- Exchange hashpartitioning(a#5, b#6, 5) > : +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS > c#7] > : +- StreamingRelation MemoryStream[value#1], [value#1] > +- Exchange hashpartitioning(b#13, 5) > +- *(2) Project [value#3 AS a#12, (value#3 * 3) AS b#13, (value#3 * 4) > AS c#14] > +- StreamingRelation MemoryStream[value#3], [value#3] > {code} > The left table is hash partitioned by a, b, while the right table is hash > partitioned by b. This means, we may have a matching record that is in > different partitions, which should be in the output but not. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org