[ 
https://issues.apache.org/jira/browse/SPARK-24588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li updated SPARK-24588:
----------------------------
    Description: 
In [https://github.com/apache/spark/pull/19080], we simplified the 
distribution/partitioning framework, and make all the join-like operators 
require HashClusteredPartitioning from children. Unfortunately streaming join 
operator was missed.

This can cause wrong result. Think about
{code:java}
val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]

val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
val joined = df1.join(df2, Seq("a", "b")).select('a)
{code}
The physical plan is
{code:java}
*(3) Project [a#5, b#6, c#7, c#14]
+- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ 
leftOnly = null, rightOnly = null, both = null, full = null ], state info [ 
checkpoint = <unknown>, runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = 0, 
ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
   :- Exchange hashpartitioning(a#5, b#6, 5)
   :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS 
c#7]
   :     +- StreamingRelation MemoryStream[value#1], [value#1]
   +- Exchange hashpartitioning(b#13, 5)
      +- *(2) Project [value#3 AS a#12, (value#3 * 3) AS b#13, (value#3 * 4) AS 
c#14]
         +- StreamingRelation MemoryStream[value#3], [value#3]
{code}
The left table is hash partitioned by a, b, while the right table is hash 
partitioned by b. This means, we may have a matching record that is in 
different partitions, which should be in the output but not.

  was:
In https://github.com/apache/spark/pull/19080, we simplified the 
distribution/partitioning framework, and make all the join-like operators 
require HashClusteredPartitioning from children. Unfortunately streaming join 
operator was missed.

This can cause wrong result. Think about

{code}
val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]

val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
val joined = df1.join(df2, Seq("a", "b")).select('a)
{code}

The physical plan is

{code}
*(3) Project [a#5, b#6, c#7, c#14]
+- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ 
leftOnly = null, rightOnly = null, both = null, full = null ], state info [ 
checkpoint = <unknown>, runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = 0, 
ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
   :- Exchange hashpartitioning(a#5, b#6, 5)
   :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS 
c#7]
   :     +- StreamingRelation MemoryStream[value#1], [value#1]
   +- Exchange hashpartitioning(a#12, b#13, 5)
      +- Exchange hashpartitioning(b#13, 5)
         +- *(2) Project [value#3 AS a#12, (value#3 * 3) AS b#13, (value#3 * 4) 
AS c#14]
            +- StreamingRelation MemoryStream[value#3], [value#3]
{code}

The left table is hash partitioned by a, b, while the right table is hash 
partitioned by b. This means, we may have a matching record that is in 
different partitions, which should be in the output but not.


> StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from 
> children
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-24588
>                 URL: https://issues.apache.org/jira/browse/SPARK-24588
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0, 2.3.1
>            Reporter: Wenchen Fan
>            Assignee: Wenchen Fan
>            Priority: Blocker
>              Labels: correctness
>
> In [https://github.com/apache/spark/pull/19080], we simplified the 
> distribution/partitioning framework, and make all the join-like operators 
> require HashClusteredPartitioning from children. Unfortunately streaming join 
> operator was missed.
> This can cause wrong result. Think about
> {code:java}
> val input1 = MemoryStream[Int]
> val input2 = MemoryStream[Int]
> val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
> val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
> val joined = df1.join(df2, Seq("a", "b")).select('a)
> {code}
> The physical plan is
> {code:java}
> *(3) Project [a#5, b#6, c#7, c#14]
> +- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ 
> leftOnly = null, rightOnly = null, both = null, full = null ], state info [ 
> checkpoint = <unknown>, runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = 
> 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
>    :- Exchange hashpartitioning(a#5, b#6, 5)
>    :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS 
> c#7]
>    :     +- StreamingRelation MemoryStream[value#1], [value#1]
>    +- Exchange hashpartitioning(b#13, 5)
>       +- *(2) Project [value#3 AS a#12, (value#3 * 3) AS b#13, (value#3 * 4) 
> AS c#14]
>          +- StreamingRelation MemoryStream[value#3], [value#3]
> {code}
> The left table is hash partitioned by a, b, while the right table is hash 
> partitioned by b. This means, we may have a matching record that is in 
> different partitions, which should be in the output but not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to