Hello folks,

We have a use case where we have a few stream-stream joins, requiring us to
join a very large table with a much smaller table. Given the nature of the
dataset, if we use a typical join that uses Hash distribution to co-locate
the records for each join key, we end up with a very skewed join (a few
task slots getting all of the work, as against a good distribution).

We’ve internally implemented a Salting based solution where we salt the
smaller table and join it with the larger table. While this works in the
POC stage, we’d like to leverage flink as much as possible to do such a
join.

By the nature of the problem, a broadcast join seems theoretically helpful.
We’ve done an exploration on query hints supported in Flink, starting with this
FLIP
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job>
and this FLIP
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join>
.

Currently, the Optimizer doesn't consider the Broadcast hint in the
`Exchange` step of the join, when creating the physical plan. Notice that
the Query AST (Abstract Syntax Tree) has the broadcast hint parsed from the
query:

```sql

...

...

joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[gpla]]]])

...

```

However, the Flink optimizer ignores the hint and still represents the join
as a regular `hash` join in the `Exchange` step:

```sql

...

...

:- Exchange(distribution=[hash[shop_id, publication_id, price_list_id]])

...

```

In Flink `StreamExecExchange`, the translation happens only via the `HASH`
distribution type
<https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java#L106-L127>.
unlike in the Flink `BatchExecExchange`, the translation can happen via a
multitude of options
<https://github.com/apache/flink/blob/release-1.18.0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java#L145-L194>
(`HASH/BROADCAST`).


Quoting this Flink mailing list discussion
<https://lists.apache.org/thread/ovyltrhztw7locn301f0wqfvlykw6l9z> for the
FLIP that implemented the Broadcast join hint for batch sql:

> But currently, only in batch the optimizer has different Join strategies
for Join and there is no choice of join strategies in the stream. The join
hints listed in the current flip should be ignored (maybe can be warned) in
streaming mode. When in the future the stream mode has the choice of join
strategies, I think that's a good time to discuss that the join hint can
affect the streaming SQL.

What do you folks think about the possibility of a Broadcast hint for
Streaming Sql, that lets the user choose the kind of distribution they’d
want with the dataset ?

Happy to learn more about this and hopefully implement it, if it doesn’t
sound like a terrible idea.

Thanks,
Prabhjot

Reply via email to