This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 0431be8f5 [docs] Add Insert If Not Exists section to Lookup Joins
documentation (#2991)
0431be8f5 is described below
commit 0431be8f56c919439fca575bb181a5c68e121e37
Author: Yang Wang <[email protected]>
AuthorDate: Wed Apr 8 16:07:39 2026 +0800
[docs] Add Insert If Not Exists section to Lookup Joins documentation
(#2991)
---
website/docs/engine-flink/lookups.md | 79 +++++++++++++++++++++++
website/docs/table-design/table-types/pk-table.md | 4 ++
2 files changed, 83 insertions(+)
diff --git a/website/docs/engine-flink/lookups.md
b/website/docs/engine-flink/lookups.md
index 835ee9492..dd7532808 100644
--- a/website/docs/engine-flink/lookups.md
+++ b/website/docs/engine-flink/lookups.md
@@ -268,6 +268,85 @@ ON `o`.`o_custkey` = `c`.`c_custkey` AND `o`.`o_dt` =
`c`.`dt`;
For more details about Fluss partitioned table, see [Partitioned
Tables](table-design/data-distribution/partitioning.md).
+## Insert If Not Exists
+
+### Overview
+
+When performing a lookup join, if the lookup key does not match any existing
row in the dimension table, the default behavior is to skip the join (for `LEFT
JOIN`, the dimension side returns `NULL`). By enabling the
`lookup.insert-if-not-exists` option, Fluss will automatically insert a new row
with the lookup key values when no match is found, and return the newly
inserted row as the join result.
+
+This feature is particularly useful when combined with [Auto-Increment
Columns](table-design/table-types/pk-table.md#auto-increment-column) to build
dictionary tables on the fly during stream processing. A typical use case is
mapping high-cardinality string identifiers (e.g., user IDs, device IDs) to
compact integer IDs for efficient downstream aggregation, such as
[RoaringBitmap-based count-distinct](table-design/merge-engines/aggregation.md).
+
+### Instructions
+
+- Only supported for primary key lookup. Prefix lookup with
`insert-if-not-exists` is not supported.
+- The dimension table must not contain non-nullable columns other than the
primary key columns and auto-increment columns. This is because Fluss cannot
fill values for those columns when auto-inserting.
+- Enable via SQL Hint: `/*+ OPTIONS('lookup.insert-if-not-exists' = 'true')
*/`.
+
+### Example
+
+The following example demonstrates how to automatically build a UID dictionary
table during a lookup join.
+
+1. Create a dictionary table with an auto-increment column.
+
+```sql title="Flink SQL"
+CREATE TABLE uid_mapping (
+ uid VARCHAR NOT NULL,
+ uid_int32 INT,
+ PRIMARY KEY (uid) NOT ENFORCED
+) WITH (
+ 'auto-increment.fields' = 'uid_int32',
+ 'bucket.num' = '1'
+);
+```
+
+2. Perform a lookup join with `insert-if-not-exists` enabled. When a `uid` is
encountered for the first time, Fluss automatically inserts it into
`uid_mapping` and assigns an auto-incremented `uid_int32` value.
+
+```sql title="Flink SQL"
+-- UIDs from the streaming table ods_events are automatically registered
+-- into the dictionary table uid_mapping, and the corresponding integer
+-- ID uid_int32 is returned for each lookup
+SELECT
+ ods.country,
+ ods.prov,
+ ods.city,
+ ods.ymd,
+ ods.uid,
+ dim.uid_int32
+FROM ods_events AS ods
+JOIN uid_mapping /*+ OPTIONS('lookup.insert-if-not-exists' = 'true') */
+ FOR SYSTEM_TIME AS OF ods.proctime AS dim
+ ON dim.uid = ods.uid;
+```
+
+Suppose `ods_events` contains the following data:
+
+| country | prov | city | ymd | uid |
+|---------|------------|---------|------------|--------|
+| CN | Beijing | Haidian | 2025-01-01 | user_a |
+| CN | Shanghai | Pudong | 2025-01-02 | user_b |
+| US | California | LA | 2025-01-03 | user_a |
+| JP | Tokyo | Shibuya | 2025-01-04 | user_c |
+
+The join result will be:
+
+| country | prov | city | ymd | uid | uid_int32 |
+|---------|------------|---------|------------|--------|-----------|
+| CN | Beijing | Haidian | 2025-01-01 | user_a | 1 |
+| CN | Shanghai | Pudong | 2025-01-02 | user_b | 2 |
+| US | California | LA | 2025-01-03 | user_a | 1 |
+| JP | Tokyo | Shibuya | 2025-01-04 | user_c | 3 |
+
+- `user_a` first appears and gets `uid_int32 = 1`; the second occurrence
reuses the same value.
+- `user_b` and `user_c` each get a new auto-incremented ID.
+
+After the job runs, the `uid_mapping` dictionary table contains:
+
+| uid | uid_int32 |
+|--------|-----------|
+| user_a | 1 |
+| user_b | 2 |
+| user_c | 3 |
+
## Lookup Options
Fluss lookup join supports various configuration options. For more details,
please refer to the [Connector Options](engine-flink/options.md#lookup-options)
page.
diff --git a/website/docs/table-design/table-types/pk-table.md
b/website/docs/table-design/table-types/pk-table.md
index e71a443a0..ff9224845 100644
--- a/website/docs/table-design/table-types/pk-table.md
+++ b/website/docs/table-design/table-types/pk-table.md
@@ -214,6 +214,10 @@ The result may look like this:
- The auto-increment column must be of type `INT` or `BIGINT`.
- Fluss does not support specifying the starting value and step size for the
auto-increment column.
+### Building Dictionary Tables with Lookup Join
+
+By combining auto-increment columns with the `lookup.insert-if-not-exists`
option in Flink Lookup Join, you can automatically build dictionary tables
during stream processing — when a lookup key is not found, Fluss inserts a new
row and assigns an auto-incremented ID automatically. This is especially useful
for mapping high-cardinality string identifiers to compact integer IDs for
efficient aggregation. For details and examples, see [Lookup Join - Insert If
Not Exists](engine-flink/looku [...]
+
## Data Queries