mxm commented on code in PR #13608:
URL: https://github.com/apache/iceberg/pull/13608#discussion_r2225655660
##########
docs/docs/flink-writes.md:
##########
@@ -396,3 +396,115 @@ To use SinkV2 based implementation, replace `FlinkSink`
with `IcebergSink` in th
- The `RANGE` distribution mode is not yet available for the `IcebergSink`
- When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix`
+
+
+# Dynamic Iceberg Flink Sink
Review Comment:
I think this needs to be:
```suggestion
## Dynamic Iceberg Flink Sink
```
##########
docs/docs/flink-writes.md:
##########
@@ -396,3 +396,115 @@ To use SinkV2 based implementation, replace `FlinkSink`
with `IcebergSink` in th
- The `RANGE` distribution mode is not yet available for the `IcebergSink`
- When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix`
+
+
+# Dynamic Iceberg Flink Sink
+
+Dynamic Flink Iceberg Sink allows:
+
+1. **Writing to any number of tables**
+ A single sink can dynamically route records to multiple Iceberg tables.
+
+2. **Dynamic table creation and updates**
+ Tables are created and updated based on user-defined routing logic.
+
+3. **Dynamic schema and partition evolution**
+ Table schemas and partition specs update during streaming execution.
+
+All configurations are controlled through the `DynamicRecord` class,
eliminating the need for Flink job restarts when requirements change.
+
+```java
+
+ DynamicIcebergSink.forInput(dataStream)
+ .generator(new Generator())
+ .catalogLoader(catalogLoader)
+ .writeParallelism(parallelism)
+ .immediateTableUpdate(immediateUpdate)
+ .append();
+
+```
+
+#### Configuration Example
Review Comment:
```suggestion
### Configuration Example
```
##########
docs/docs/flink-writes.md:
##########
@@ -396,3 +396,115 @@ To use SinkV2 based implementation, replace `FlinkSink`
with `IcebergSink` in th
- The `RANGE` distribution mode is not yet available for the `IcebergSink`
- When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix`
+
+
+# Dynamic Iceberg Flink Sink
+
+Dynamic Flink Iceberg Sink allows:
+
+1. **Writing to any number of tables**
+ A single sink can dynamically route records to multiple Iceberg tables.
+
+2. **Dynamic table creation and updates**
+ Tables are created and updated based on user-defined routing logic.
+
+3. **Dynamic schema and partition evolution**
+ Table schemas and partition specs update during streaming execution.
+
+All configurations are controlled through the `DynamicRecord` class,
eliminating the need for Flink job restarts when requirements change.
+
+```java
+
+ DynamicIcebergSink.forInput(dataStream)
+ .generator(new Generator())
+ .catalogLoader(catalogLoader)
+ .writeParallelism(parallelism)
+ .immediateTableUpdate(immediateUpdate)
+ .append();
+
+```
+
+#### Configuration Example
+
+```java
+DynamicIcebergSink.Builder<RowData> builder =
DynamicIcebergSink.forInput(inputStream);
+
+// Set common properties
+builder
+ .set("write-format", "parquet");
+
+// Set specific options
+builder
+ .writeParallelism(4)
+ .uidPrefix("dynamic-sink")
+ .cacheMaxSize(500)
+ .cacheRefreshMs(5000);
+
+// Add generator and append sink
+builder.generator(new CustomRecordGenerator());
+builder.append();
+```
+
+#### Dynamic Routing Configuration
Review Comment:
```suggestion
### Dynamic Routing Configuration
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]