Guosmilesmile commented on code in PR #13608:
URL: https://github.com/apache/iceberg/pull/13608#discussion_r2218736282
##########
docs/docs/flink-writes.md:
##########
@@ -396,3 +396,115 @@ To use SinkV2 based implementation, replace `FlinkSink`
with `IcebergSink` in th
- The `RANGE` distribution mode is not yet available for the `IcebergSink`
- When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix`
+
+
+# Dynamic Iceberg Flink Sink
+
+Dynamic Flink Iceberg Sink allows:
+
+1. **Writing to any number of tables**
+ A single sink can dynamically route records to multiple Iceberg tables.
+
+2. **Dynamic table creation and updates**
+ Tables are created and updated based on user-defined routing logic.
+
+3. **Dynamic schema and partition evolution**
+ Table schemas and partition specs update during streaming execution.
+
+All configurations are controlled through the `DynamicRecord` class,
eliminating the need for Flink job restarts when requirements change.
+
+```java
+
+ DynamicIcebergSink.forInput(dataStream)
+ .generator(new Generator())
+ .catalogLoader(catalogLoader)
+ .writeParallelism(parallelism)
+ .immediateTableUpdate(immediateUpdate)
+ .append();
+
+```
+
+### Dynamic Sink Configuration
+
+The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here
are the key configuration methods:
+
+| Method | Description
|
+|------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `set(String property, String value)` | Set any Iceberg write
property (e.g., `"write.format"`, `"write.upsert.enabled"`).Check out all the
options here: [write-options](flink-configuration.md#write-options) |
+| `setAll(Map<String, String> properties)` | Set multiple
properties at once
|
+| `overwrite(boolean enabled)` | Enable overwrite mode
|
+| `writeParallelism(int parallelism)` | Set writer
parallelism
|
+| `uidPrefix(String prefix)` | Set operator UID
prefix
|
+| `snapshotProperties(Map<String, String> properties)` | Set snapshot metadata
properties
|
+| `toBranch(String branch)` | Write to a specific
branch
|
+| `cacheMaxSize(int maxSize)` | Set cache size for
table metadata
|
+| `cacheRefreshMs(long refreshMs)` | Set cache refresh
interval
|
+| `inputSchemasPerTableCacheMaxSize(int size)` | Set max input schemas
to cache per table
|
+| `immediateTableUpdate(boolean enabled)` | Controls whether
table metadata (schema/partition spec) updates immediately (default: false)
|
+
+
+### Notes
+
+- **Range distribution mode**: Currently, the dynamic sink does not support
the `RANGE` distribution mode.
+- **Property Precedence Note**: When conflicts occur between table properties
and sink properties, the table properties will override the sink properties
configuration.
Review Comment:
I think we should adjust the configuration precedence
herer(https://github.com/apache/iceberg/pull/13609).
The documentation is currently written based on table properties taking
precedence; I’ll update it once the corresponding PR changes are merged.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]