davidradl commented on code in PR #26775:
URL: https://github.com/apache/flink/pull/26775#discussion_r2200106756
##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -302,3 +302,67 @@ The execution of mini-batch join operator are as shown in
the figure below.
MiniBatch optimization is disabled by default for regular join. In order to
enable this optimization, you should set options
`table.exec.mini-batch.enabled`, `table.exec.mini-batch.allow-latency` and
`table.exec.mini-batch.size`. Please see [configuration]({{< ref
"docs/dev/table/config" >}}#execution-options) page for more details.
{{< top >}}
+
+## Multiple Regular Joins
+
+{{< label Streaming >}}
+
+Streaming Flink jobs with multiple non-temporal regular joins often experience
operational instability and performance degradation due to large state sizes.
This is often because the intermediate state created by a chain of joins is
much larger than the input state itself. In Flink 2.1, we introduce a new
multi-join operator, an optimization designed to significantly reduce state
size and improve performance for join pipelines that involve record
amplification and large intermediate state. This new operator eliminates the
need to store intermediate state for joins across multiple tables by processing
joins across various input streams simultaneously. This "zero intermediate
state" approach primarily targets state reduction, offering substantial
benefits in resource consumption and operational stability.
+
+In most joins, a significant portion of processing time is spent fetching
records from the state. The efficiency of the MultiJoin operator largely
depends on the size of this intermediate state. In a common scenario where a
pipeline experiences record amplification—meaning each join produces more data
and records than the previous one, the MultiJoin operator is more efficient.
This is because it keeps the state on which the operator interacts much
smaller, leading to a more stable operator. If a chain of joins actually
produces less state than the original records, the MultiJoin operator will
still use less state overall. However, in this specific case, binary joins
might perform better because the state that the final joins need to operate on
is smaller.
+
+### The MultiJoin Operator
+The main benefits of the MultiJoin operator are:
+
+1) Considerably smaller state size due to zero intermediate state.
+3) Improved performance for chained joins with record amplification.
+4) Improved stability: linear state growth with amount of records processed,
instead of polynomial growth with binary joins.
+
+### When to enable the MultiJoin?
+
+If your job has multiple joins that share at least one common join key, and
you observe that the intermediate state in the intermediate joins is larger
than the inputs sources, consider enabling the MultiJoin operator.
+
+### How to enable the MultiJoin?
+
+To enable this optimization, set the following configuration
+
+```sql
+SET 'table.optimizer.multi-join.enabled' = 'true';
+```
+
+Important: This is currently in an experimental state - there are open
optimizations and breaking changes might be implemented in this version. We
currently support only streaming INNER/LEFT joins. Support for RIGHT joins will
be added soon. Due to records partitioning, you need at least one key that is
shared between the join conditions, see:
+
+- Supported: A JOIN B ON A.key = B.key JOIN C ON A.key = C.key (Partition by
key)
+- Supported: A JOIN B ON A.key = B.key JOIN C ON B.key = C.key (Partition by
key via transitivity)
+- Not supported: A JOIN B ON A.key1 = B.key1 JOIN C ON B.key2 = C.key2 (No
single key allows partitioning A, B, and C together in a single operator. This
will be split into multiple MultiJoin operators)
+
+### MultiJoin Operator Example - Benchmark
+
+Here's a 10-way benchmark between the default binary joins and the MultiJoin
operator. You can observe the amount of intermediate state in the first
section, the amount of records processed when the operators reach 100% busyness
in the second section, and the checkpoints in the third.
+
+{{< img src="/fig/table-streaming/multijoin_operator.png" height="100%" >}}
+
+For this 10-way join above, involving record amplification, we've observed
significant improvements. Here are some rough numbers:
+
+- Performance: 2x to over 100x+ increase in processed records when both at
100% busyness.
+- State Size: 3x to over 1000x+ smaller as intermediate state grows.
+
+The total state is always smaller with the MultiJoin operator. In this case,
the performance is initially the same, but as the intermediate state grows, the
performance of binary joins degrade and the multi join remains stable and
outperforms.
Review Comment:
I wonder whether another consideration/benefit might be around recovery,
previously there was intermediate state which could be recovered , but now
there is none, so there is much less state to recover. Would it be true to say
we could have faster recovery for the cases where MultiJoin is most useful?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]