[ 
https://issues.apache.org/jira/browse/SPARK-22053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-22053.
-----------------------------------
       Resolution: Fixed
    Fix Version/s: 3.0.0

Issue resolved by pull request 19271
[https://github.com/apache/spark/pull/19271]

> Implement stream-stream inner join in Append mode
> -------------------------------------------------
>
>                 Key: SPARK-22053
>                 URL: https://issues.apache.org/jira/browse/SPARK-22053
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>             Fix For: 3.0.0
>
>
> Stream-stream inner join is traditionally implemented using a two-way 
> symmetric hash join. At a high level, we want to do the following.
> 1. For each stream, we maintain the past rows as state in State Store. 
>     - For each joining key, there can be multiple rows that have been 
> received. 
>     - So, we have to effectively maintain a key-to-list-of-values multimap as 
> state for each stream.
> 2. In each batch, for each input row in each stream
>     - Look up the other streams state to see if there are matching rows, and 
> output them if they satisfy the joining condition
>     - Add the input row to corresponding stream’s state.
>     - If the data has a timestamp/window column with watermark, then we will 
> use that to calculate the threshold for keys that are required to buffered 
> for future matches and drop the rest from the state.
> Cleaning up old unnecessary state rows depends completely on whether 
> watermark has been defined and what are join conditions. We definitely want 
> to support state clean up two types of queries that are likely to be common. 
> - Queries to time range conditions - E.g. {{SELECT * FROM leftTable, 
> rightTable ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 
> MINUTES AND leftTime < rightTime + INTERVAL 1 HOUR}}
> - Queries with windows as the matching key - E.g. {{SELECT * FROM leftTable, 
> rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") = 
> window(rightTime, "1 hour")}} (pseudo-SQL)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to