Broadcast streaming join is a very interesting addition to streaming
SQL, I'm glad to see it's been brought up.

One of the major difference between streaming and batch is state.
Regular join uses "Keyed State" (the key is deduced from join
condition), so for a regular broadcast streaming join, we should
consider how to represent the state of the large table. On the first
thought, we could implement streaming broadcast join just like the
batch (do no require hash shuffle for the probe side), but it does not
work in streaming mode, since we need a hash shuffle to utilize "keyed
state". I'd like to see how the state is designed in the FLIP.

Shengkai Fang <fskm...@gmail.com> 于2024年2月4日周日 11:24写道:
>
> +1 a FLIP to clarify the idea.
>
> Please be careful to choose which type of state you use here. The doc[1] says 
> the broadcast state doesn't support RocksDB backend here.
>
> Best,
> Shengkai
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#important-considerations
>
> David Anderson <dander...@apache.org> 于2024年2月2日周五 23:57写道:
>>
>> I've seen enough demand for a streaming broadcast join in the community to 
>> justify a FLIP -- I think it's a good idea, and look forward to the 
>> discussion.
>>
>> David
>>
>> On Fri, Feb 2, 2024 at 6:55 AM Feng Jin <jinfeng1...@gmail.com> wrote:
>>>
>>> +1 a FLIP for this topic.
>>>
>>>
>>> Best,
>>> Feng
>>>
>>> On Fri, Feb 2, 2024 at 10:26 PM Martijn Visser <martijnvis...@apache.org> 
>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I would definitely expect a FLIP on this topic before moving to 
>>>> implementation.
>>>>
>>>> Best regards,
>>>>
>>>> Martijn
>>>>
>>>> On Fri, Feb 2, 2024 at 12:47 PM Xuyang <xyzhong...@163.com> wrote:
>>>>>
>>>>> Hi, Prabhjot.
>>>>>
>>>>> IIUC, the main reasons why the community has not previously considered 
>>>>> supporting join hints only in batch mode are as follows:
>>>>> 1. In batch mode, multiple join type algorithms were implemented quite 
>>>>> early on, and
>>>>> 2. Stream processing represents a long-running scenario, and it is quite 
>>>>> difficult to determine whether a small table will become a large table 
>>>>> after a long period of operation.
>>>>>
>>>>> However, as you mentioned, join hints do indeed have their significance 
>>>>> in streaming. If you want to support the implementation of "join hints + 
>>>>> broadcast join" in streaming, the changes I can currently think of 
>>>>> include:
>>>>> 1. At optimizer, changing the exchange on the small table side to 
>>>>> broadcast instead of hash (InputProperty#BROADCAST).
>>>>> 2. Unknown changes required at the table runtime level.
>>>>>
>>>>> You can also discuss it within the community through JIRA, FLIP, or the 
>>>>> dev mailing list.
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>>     Best!
>>>>>     Xuyang
>>>>>
>>>>>
>>>>> At 2024-02-02 00:46:01, "Prabhjot Bharaj via user" 
>>>>> <user@flink.apache.org> wrote:
>>>>>
>>>>> Hi Feng,
>>>>>
>>>>> Thanks for your prompt response.
>>>>> If we were to solve this in Flink, my higher level viewpoint is:
>>>>>
>>>>> 1. First to implement Broadcast join in Flink Streaming SQL, that works 
>>>>> across Table api (e.g. via a `left.join(right, <predicate>, 
>>>>> join_type="broadcast")
>>>>> 2. Then, support a Broadcast hint that would utilize this new join based 
>>>>> on the hint type
>>>>>
>>>>> What do you think about this ?
>>>>> Would you have some pointers on how/where to start on the first part ? 
>>>>> (I'm thinking we'd have to extend the Broadcast state pattern for this 
>>>>> purpose)
>>>>>
>>>>> Thanks,
>>>>> Prabhjot
>>>>>
>>>>> On Thu, Feb 1, 2024 at 11:40 AM Feng Jin <jinfeng1...@gmail.com> wrote:
>>>>>>
>>>>>> Hi Prabhjot
>>>>>>
>>>>>> I think this is a reasonable scenario. If there is a large table and a 
>>>>>> very small table for regular join, without broadcasting the regular 
>>>>>> join, it can easily cause data skew.
>>>>>> We have also encountered similar problems too. Currently, we can only 
>>>>>> copy multiple copies of the small table using the union all and append 
>>>>>> random values to alleviate data skewness.
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Feng
>>>>>>
>>>>>> On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user 
>>>>>> <user@flink.apache.org> wrote:
>>>>>>>
>>>>>>> Hello folks,
>>>>>>>
>>>>>>>
>>>>>>> We have a use case where we have a few stream-stream joins, requiring 
>>>>>>> us to join a very large table with a much smaller table, essentially 
>>>>>>> enriching the large table with a permutation on the smaller table 
>>>>>>> (Consider deriving all orders/sessions for a new location). Given the 
>>>>>>> nature of the dataset, if we use a typical join that uses Hash 
>>>>>>> distribution to co-locate the records for each join key, we end up with 
>>>>>>> a very skewed join (a few task slots getting all of the work, as 
>>>>>>> against a good distribution).
>>>>>>>
>>>>>>>
>>>>>>> We’ve internally implemented a Salting based solution where we salt the 
>>>>>>> smaller table and join it with the larger table. While this works in 
>>>>>>> the POC stage, we’d like to leverage flink as much as possible to do 
>>>>>>> such a join.
>>>>>>>
>>>>>>>
>>>>>>> By the nature of the problem, a broadcast join seems theoretically 
>>>>>>> helpful. We’ve done an exploration on query hints supported in Flink, 
>>>>>>> starting with this FLIP and this FLIP.
>>>>>>>
>>>>>>>
>>>>>>> Currently, the Optimizer doesn't consider the Broadcast hint in the 
>>>>>>> `Exchange` step of the join, when creating the physical plan (Possibly 
>>>>>>> because the hint would require the stream-stream join to also support 
>>>>>>> Broadcast join with SQL)
>>>>>>>
>>>>>>>
>>>>>>> Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint 
>>>>>>> parsed from the query:
>>>>>>>
>>>>>>>
>>>>>>> ```sql
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] 
>>>>>>> options:[gpla]]]])
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>>
>>>>>>> However, the Flink optimizer ignores the hint and still represents the 
>>>>>>> join as a regular `hash` join in the `Exchange` step:
>>>>>>>
>>>>>>>
>>>>>>> ```sql
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> :- Exchange(distribution=[hash[shop_id, join_key]])
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>>
>>>>>>> In Flink `StreamExecExchange`, the translation happens only via the 
>>>>>>> `HASH` distribution type. unlike in the Flink `BatchExecExchange`, the 
>>>>>>> translation can happen via a multitude of options (`HASH/BROADCAST`).
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Quoting this Flink mailing list discussion for the FLIP that 
>>>>>>> implemented the Broadcast join hint for batch sql:
>>>>>>>
>>>>>>>
>>>>>>> > But currently, only in batch the optimizer has different Join 
>>>>>>> > strategies for Join and
>>>>>>>
>>>>>>> > there is no choice of join strategies in the stream. The join hints 
>>>>>>> > listed in the current
>>>>>>>
>>>>>>> > flip should be ignored (maybe can be warned) in streaming mode. When 
>>>>>>> > in the
>>>>>>>
>>>>>>> > future the stream mode has the choice of join strategies, I think 
>>>>>>> > that's a good time > to discuss that the join hint can affect the 
>>>>>>> > streaming SQL.
>>>>>>>
>>>>>>>
>>>>>>> What do you folks think about the possibility of a Broadcast join for 
>>>>>>> Streaming Sql along with its corresponding Broadcast hint, that lets 
>>>>>>> the user choose the kind of distribution they’d want with the dataset ?
>>>>>>>
>>>>>>> Happy to learn more about this and hopefully implement it, if it 
>>>>>>> doesn’t sound like a terrible idea.
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Prabhjot
>>>>>>>
>>>>>>>
>>>>>>>


-- 

Best,
Benchao Li

Reply via email to