Hi, Prabhjot.
IIUC, the main reasons why the community has not previously considered
supporting join hints only in batch mode are as follows:
1. In batch mode, multiple join type algorithms were implemented quite early
on, and
2. Stream processing represents a long-running scenario, and it is quite
difficult to determine whether a small table will become a large table after a
long period of operation.
However, as you mentioned, join hints do indeed have their significance in
streaming. If you want to support the implementation of "join hints + broadcast
join" in streaming, the changes I can currently think of include:
1. At optimizer, changing the exchange on the small table side to broadcast
instead of hash (InputProperty#BROADCAST).
2. Unknown changes required at the table runtime level.
You can also discuss it within the community through JIRA, FLIP, or the dev
mailing list.
--
Best!
Xuyang
At 2024-02-02 00:46:01, "Prabhjot Bharaj via user" <[email protected]>
wrote:
Hi Feng,
Thanks for your prompt response.
If we were to solve this in Flink, my higher level viewpoint is:
1. First to implement Broadcast join in Flink Streaming SQL, that works across
Table api (e.g. via a `left.join(right, <predicate>, join_type="broadcast")
2. Then, support a Broadcast hint that would utilize this new join based on the
hint type
What do you think about this ?
Would you have some pointers on how/where to start on the first part ? (I'm
thinking we'd have to extend the Broadcast state pattern for this purpose)
Thanks,
Prabhjot
On Thu, Feb 1, 2024 at 11:40 AM Feng Jin <[email protected]> wrote:
Hi Prabhjot
I think this is a reasonable scenario. If there is a large table and a very
small table for regular join, without broadcasting the regular join, it can
easily cause data skew.
We have also encountered similar problems too. Currently, we can only copy
multiple copies of the small table using the union all and append random values
to alleviate data skewness.
Best,
Feng
On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user
<[email protected]> wrote:
Hello folks,
We have a use case where we have a few stream-stream joins, requiring us to
join a very large table with a much smaller table, essentially enriching the
large table with a permutation on the smaller table (Consider deriving all
orders/sessions for a new location). Given the nature of the dataset, if we use
a typical join that uses Hash distribution to co-locate the records for each
join key, we end up with a very skewed join (a few task slots getting all of
the work, as against a good distribution).
We’ve internally implemented a Salting based solution where we salt the smaller
table and join it with the larger table. While this works in the POC stage,
we’d like to leverage flink as much as possible to do such a join.
By the nature of the problem, a broadcast join seems theoretically helpful.
We’ve done an exploration on query hints supported in Flink, starting withthis
FLIP andthis FLIP.
Currently, the Optimizer doesn't consider the Broadcast hint in the `Exchange`
step of the join, when creating the physical plan (Possibly because the hint
would require the stream-stream join to also support Broadcast join with SQL)
Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint parsed
from the query:
```sql
...
...
joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[gpla]]]])
...
```
However, the Flink optimizer ignores the hint and still represents the join as
a regular `hash` join in the `Exchange` step:
```sql
...
...
:- Exchange(distribution=[hash[shop_id, join_key]])
...
```
In Flink `StreamExecExchange`, the translation happens only viathe `HASH`
distribution type. unlike in the Flink `BatchExecExchange`, the translation can
happen viaa multitude of options (`HASH/BROADCAST`).
Quotingthis Flink mailing list discussion for the FLIP that implemented the
Broadcast join hint for batch sql:
> But currently, only in batch the optimizer has different Join strategies for
> Join and
> there is no choice of join strategies in the stream. The join hints listed in
> the current
> flip should be ignored (maybe can be warned) in streaming mode. When in the
> future the stream mode has the choice of join strategies, I think that's a
> good time > to discuss that the join hint can affect the streaming SQL.
What do you folks think about the possibility of a Broadcast join for Streaming
Sql along with its corresponding Broadcast hint, that lets the user choose the
kind of distribution they’d want with the dataset ?
Happy to learn more about this and hopefully implement it, if it doesn’t sound
like a terrible idea.
Thanks,
Prabhjot