This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 0431be8f5 [docs] Add Insert If Not Exists section to Lookup Joins 
documentation (#2991)
0431be8f5 is described below

commit 0431be8f56c919439fca575bb181a5c68e121e37
Author: Yang Wang <[email protected]>
AuthorDate: Wed Apr 8 16:07:39 2026 +0800

    [docs] Add Insert If Not Exists section to Lookup Joins documentation 
(#2991)
---
 website/docs/engine-flink/lookups.md              | 79 +++++++++++++++++++++++
 website/docs/table-design/table-types/pk-table.md |  4 ++
 2 files changed, 83 insertions(+)

diff --git a/website/docs/engine-flink/lookups.md 
b/website/docs/engine-flink/lookups.md
index 835ee9492..dd7532808 100644
--- a/website/docs/engine-flink/lookups.md
+++ b/website/docs/engine-flink/lookups.md
@@ -268,6 +268,85 @@ ON `o`.`o_custkey` = `c`.`c_custkey` AND  `o`.`o_dt` = 
`c`.`dt`;
 
 For more details about Fluss partitioned table, see [Partitioned 
Tables](table-design/data-distribution/partitioning.md).
 
+## Insert If Not Exists
+
+### Overview
+
+When performing a lookup join, if the lookup key does not match any existing 
row in the dimension table, the default behavior is to skip the join (for `LEFT 
JOIN`, the dimension side returns `NULL`). By enabling the 
`lookup.insert-if-not-exists` option, Fluss will automatically insert a new row 
with the lookup key values when no match is found, and return the newly 
inserted row as the join result.
+
+This feature is particularly useful when combined with [Auto-Increment 
Columns](table-design/table-types/pk-table.md#auto-increment-column) to build 
dictionary tables on the fly during stream processing. A typical use case is 
mapping high-cardinality string identifiers (e.g., user IDs, device IDs) to 
compact integer IDs for efficient downstream aggregation, such as 
[RoaringBitmap-based count-distinct](table-design/merge-engines/aggregation.md).
+
+### Instructions
+
+- Only supported for primary key lookup. Prefix lookup with 
`insert-if-not-exists` is not supported.
+- The dimension table must not contain non-nullable columns other than the 
primary key columns and auto-increment columns. This is because Fluss cannot 
fill values for those columns when auto-inserting.
+- Enable via SQL Hint: `/*+ OPTIONS('lookup.insert-if-not-exists' = 'true') 
*/`.
+
+### Example
+
+The following example demonstrates how to automatically build a UID dictionary 
table during a lookup join.
+
+1. Create a dictionary table with an auto-increment column.
+
+```sql title="Flink SQL"
+CREATE TABLE uid_mapping (
+  uid VARCHAR NOT NULL,
+  uid_int32 INT,
+  PRIMARY KEY (uid) NOT ENFORCED
+) WITH (
+  'auto-increment.fields' = 'uid_int32',
+  'bucket.num' = '1'
+);
+```
+
+2. Perform a lookup join with `insert-if-not-exists` enabled. When a `uid` is 
encountered for the first time, Fluss automatically inserts it into 
`uid_mapping` and assigns an auto-incremented `uid_int32` value.
+
+```sql title="Flink SQL"
+-- UIDs from the streaming table ods_events are automatically registered
+-- into the dictionary table uid_mapping, and the corresponding integer
+-- ID uid_int32 is returned for each lookup
+SELECT
+  ods.country,
+  ods.prov,
+  ods.city,
+  ods.ymd,
+  ods.uid,
+  dim.uid_int32
+FROM ods_events AS ods
+JOIN uid_mapping /*+ OPTIONS('lookup.insert-if-not-exists' = 'true') */
+  FOR SYSTEM_TIME AS OF ods.proctime AS dim
+  ON dim.uid = ods.uid;
+```
+
+Suppose `ods_events` contains the following data:
+
+| country | prov       | city    | ymd        | uid    |
+|---------|------------|---------|------------|--------|
+| CN      | Beijing    | Haidian | 2025-01-01 | user_a |
+| CN      | Shanghai   | Pudong  | 2025-01-02 | user_b |
+| US      | California | LA      | 2025-01-03 | user_a |
+| JP      | Tokyo      | Shibuya | 2025-01-04 | user_c |
+
+The join result will be:
+
+| country | prov       | city    | ymd        | uid    | uid_int32 |
+|---------|------------|---------|------------|--------|-----------|
+| CN      | Beijing    | Haidian | 2025-01-01 | user_a | 1         |
+| CN      | Shanghai   | Pudong  | 2025-01-02 | user_b | 2         |
+| US      | California | LA      | 2025-01-03 | user_a | 1         |
+| JP      | Tokyo      | Shibuya | 2025-01-04 | user_c | 3         |
+
+- `user_a` first appears and gets `uid_int32 = 1`; the second occurrence 
reuses the same value.
+- `user_b` and `user_c` each get a new auto-incremented ID.
+
+After the job runs, the `uid_mapping` dictionary table contains:
+
+| uid    | uid_int32 |
+|--------|-----------|
+| user_a | 1         |
+| user_b | 2         |
+| user_c | 3         |
+
 ## Lookup Options
 
 Fluss lookup join supports various configuration options. For more details, 
please refer to the [Connector Options](engine-flink/options.md#lookup-options) 
page.
diff --git a/website/docs/table-design/table-types/pk-table.md 
b/website/docs/table-design/table-types/pk-table.md
index e71a443a0..ff9224845 100644
--- a/website/docs/table-design/table-types/pk-table.md
+++ b/website/docs/table-design/table-types/pk-table.md
@@ -214,6 +214,10 @@ The result may look like this:
 - The auto-increment column must be of type `INT` or `BIGINT`.
 - Fluss does not support specifying the starting value and step size for the 
auto-increment column.
 
+### Building Dictionary Tables with Lookup Join
+
+By combining auto-increment columns with the `lookup.insert-if-not-exists` 
option in Flink Lookup Join, you can automatically build dictionary tables 
during stream processing — when a lookup key is not found, Fluss inserts a new 
row and assigns an auto-incremented ID automatically. This is especially useful 
for mapping high-cardinality string identifiers to compact integer IDs for 
efficient aggregation. For details and examples, see [Lookup Join - Insert If 
Not Exists](engine-flink/looku [...]
+
 
 ## Data Queries
 

Reply via email to