mxm commented on code in PR #13608:
URL: https://github.com/apache/iceberg/pull/13608#discussion_r2230924210


##########
docs/docs/flink-writes.md:
##########
@@ -396,3 +396,148 @@ To use SinkV2 based implementation, replace `FlinkSink` 
with `IcebergSink` in th
 
      - The `RANGE` distribution mode is not yet available for the `IcebergSink`
      - When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix`
+
+
+## Dynamic Iceberg Flink Sink
+
+Dynamic Flink Iceberg Sink allows:
+
+1. **Writing to any number of tables**  
+   A single sink can dynamically route records to multiple Iceberg tables.
+
+2. **Dynamic table creation and updates**  
+   Tables are created and updated based on user-defined routing logic.
+
+3. **Dynamic schema and partition evolution**  
+   Table schemas and partition specs update during streaming execution.
+
+All configurations are controlled through the `DynamicRecord` class, 
eliminating the need for Flink job restarts when requirements change.
+
+```java
+
+    DynamicIcebergSink.forInput(dataStream)
+        .generator((inputRecord, out) -> out.collect(
+                new DynamicRecord(
+                        TableIdentifier.of("db", "table"),
+                        "branch",
+                        SCHEMA,
+                        (RowData) inputRecord,
+                        PartitionSpec.unpartitioned(),
+                        DistributionMode.HASH,
+                        2)))
+        .catalogLoader(CatalogLoader.hive("hive", new Configuration(), 
Map.of()))
+        .writeParallelism(10)
+        .immediateTableUpdate(true)
+        .append();
+```
+
+### Configuration Example
+
+```java
+DynamicIcebergSink.Builder<RowData> builder = 
DynamicIcebergSink.forInput(inputStream);
+
+// Set common properties
+builder
+    .set("write.parquet.compression-codec", "gzip");
+
+// Set Dynamic Sink specific options
+builder
+    .writeParallelism(4)
+    .uidPrefix("dynamic-sink")
+    .cacheMaxSize(500)
+    .cacheRefreshMs(5000);
+
+// Add generator and append sink
+builder.generator(new CustomRecordGenerator());
+builder.append();
+```
+
+### Dynamic Routing Configuration
+
+Dynamic table routing can be customized by implementing the 
`DynamicRecordGenerator` interface:
+
+```java
+public class CustomRecordGenerator implements DynamicRecordGenerator<RowData> {
+    @Override
+    public DynamicRecord generate(RowData row) {
+        DynamicRecord record = new DynamicRecord();
+        // Set table name based on business logic
+        TableIdentifier tableIdentifier = TableIdentifier.of(database, 
tableName);
+        record.setTableIdentifier(tableIdentifier);
+        record.setData(row);
+        // Set the maximum number of parallel writers for a given 
table/branch/schema/spec
+        record.writeParallelism(2);
+        return record;
+    }
+}
+
+// Set custom record generator when building the sink
+DynamicIcebergSink.Builder<RowData> builder = 
DynamicIcebergSink.forInput(inputStream);
+builder.generator(new CustomRecordGenerator());
+// ... other config ...
+builder.append();
+```
+The user should provide a converter which converts the input record to a 
DynamicRecord.
+We need the following information (DynamicRecord) for every record:
+
+| Property           | Description                                             
                                  |
+|--------------------|-------------------------------------------------------------------------------------------|
+| `TableIdentifier`  | The target table to which the record will be written.   
                                  |
+| `Branch`           | The target branch for writing the record (optional).    
                                  |
+| `Schema`           | The schema of the record.                               
                                  |
+| `Spec`             | The expected partitioning specification for the record. 
                                  |
+| `RowData`          | The actual row data to be written.                      
                                  |
+| `DistributionMode` | The distribution mode for writing the record (currently 
supports NONE or HASH).           |
+| `Parallelism`      | The maximum number of parallel writers for a given 
table/branch/schema/spec (WriteTarget). |
+| `UpsertMode`       | Overrides this table's write.upsert.enabled (optional). 
                                  |
+| `EqualityFields`   | The equality fields for the table(optional).            
                                            |
+
+### Schema Update
+
+The dynamic sink maintains an LRU cache for both table metadata and incoming 
schemas, with eviction based on size and time constraints. When a DynamicRecord 
contains a schema that is incompatible with the current table schema, a schema 
update is triggered. This update can occur either immediately or via a 
centralized executor, depending on the immediateTableUpdate configuration. 
While centralized updates reduce load on the Catalog, they may introduce 
backpressure on the sink.
+
+Supported schema updates:
+
+- Adding new columns
+- Widening existing column types (e.g., Integer → Long, Float → Double)

Review Comment:
   ```suggestion
   - Widening existing column types (e.g., Integer → Long, Float → Double)
   - Making required columns optional
   ```



##########
docs/docs/flink-writes.md:
##########
@@ -396,3 +396,149 @@ To use SinkV2 based implementation, replace `FlinkSink` 
with `IcebergSink` in th
 
      - The `RANGE` distribution mode is not yet available for the `IcebergSink`
      - When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix`
+
+
+## Dynamic Iceberg Flink Sink
+
+Dynamic Flink Iceberg Sink allows:
+
+1. **Writing to any number of tables**  
+   A single sink can dynamically route records to multiple Iceberg tables.
+
+2. **Dynamic table creation and updates**  
+   Tables are created and updated based on user-defined routing logic.
+
+3. **Dynamic schema and partition evolution**  
+   Table schemas and partition specs update during streaming execution.
+
+All configurations are controlled through the `DynamicRecord` class, 
eliminating the need for Flink job restarts when requirements change.
+
+```java
+
+    DynamicIcebergSink.forInput(dataStream)
+        .generator((inputRecord, out) -> out.collect(
+                new DynamicRecord(
+                        TableIdentifier.of("db", "table"),
+                        "branch",
+                        SCHEMA,
+                        (RowData) inputRecord,
+                        PartitionSpec.unpartitioned(),
+                        DistributionMode.HASH,
+                        2)))
+        .catalogLoader(CatalogLoader.hive("hive", new Configuration(), 
Map.of()))
+        .writeParallelism(10)
+        .immediateTableUpdate(true)
+        .append();
+```
+
+### Configuration Example
+
+```java
+DynamicIcebergSink.Builder<RowData> builder = 
DynamicIcebergSink.forInput(inputStream);
+
+// Set common properties
+builder
+    .set("write.parquet.compression-codec", "gzip");
+
+// Set Dynamic Sink specific options
+builder
+    .writeParallelism(4)
+    .uidPrefix("dynamic-sink")
+    .cacheMaxSize(500)
+    .cacheRefreshMs(5000);
+
+// Add generator and append sink
+builder.generator(new CustomRecordGenerator());
+builder.append();
+```
+
+### Dynamic Routing Configuration
+
+Dynamic table routing can be customized by implementing the 
`DynamicRecordGenerator` interface:
+
+```java
+public class CustomRecordGenerator implements DynamicRecordGenerator<RowData> {
+    @Override
+    public DynamicRecord generate(RowData row) {
+        DynamicRecord record = new DynamicRecord();
+        // Set table name based on business logic
+        TableIdentifier tableIdentifier = TableIdentifier.of(database, 
tableName);
+        record.setTableIdentifier(tableIdentifier);
+        record.setData(row);
+        // Set the maximum number of parallel writers for a given 
table/branch/schema/spec
+        record.writeParallelism(2);
+        return record;
+    }
+}
+
+// Set custom record generator when building the sink
+DynamicIcebergSink.Builder<RowData> builder = 
DynamicIcebergSink.forInput(inputStream);
+builder.generator(new CustomRecordGenerator());
+// ... other config ...
+builder.append();
+```
+The user should provide a converter which converts the input record to a 
DynamicRecord.
+We need the following information (DynamicRecord) for every record:
+
+| Property            | Description                                            
                                   |
+|---------------------|-------------------------------------------------------------------------------------------|
+| `TableIdentifier`   | The target table to which the record will be written.  
                                   |
+| `Branch`            | The target branch for writing the record (optional).   
                                   |
+| `Schema`            | The schema of the record.                              
                                   |
+| `Spec`              | The expected partitioning specification for the 
record.                                   |
+| `RowData`           | The actual row data to be written.                     
                                   |
+| `DistributionMode`  | The distribution mode for writing the record 
(currently supports NONE or HASH).           |
+| `Parallelism`       | The maximum number of parallel writers for a given 
table/branch/schema/spec (WriteTarget). |
+| `upsertMode`        | Overrides this table's write.upsert.enabled 
(optional).                                   |
+| `equalityFields`    | The equality fields for the table(optional).           
                                             |
+
+### Schema Update

Review Comment:
   ```suggestion
   ### Schema Evolution
   ```



##########
docs/docs/flink-writes.md:
##########
@@ -396,3 +396,148 @@ To use SinkV2 based implementation, replace `FlinkSink` 
with `IcebergSink` in th
 
      - The `RANGE` distribution mode is not yet available for the `IcebergSink`
      - When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix`
+
+
+## Dynamic Iceberg Flink Sink
+
+Dynamic Flink Iceberg Sink allows:
+
+1. **Writing to any number of tables**  
+   A single sink can dynamically route records to multiple Iceberg tables.
+
+2. **Dynamic table creation and updates**  
+   Tables are created and updated based on user-defined routing logic.
+
+3. **Dynamic schema and partition evolution**  
+   Table schemas and partition specs update during streaming execution.
+
+All configurations are controlled through the `DynamicRecord` class, 
eliminating the need for Flink job restarts when requirements change.
+
+```java
+
+    DynamicIcebergSink.forInput(dataStream)
+        .generator((inputRecord, out) -> out.collect(
+                new DynamicRecord(
+                        TableIdentifier.of("db", "table"),
+                        "branch",
+                        SCHEMA,
+                        (RowData) inputRecord,
+                        PartitionSpec.unpartitioned(),
+                        DistributionMode.HASH,
+                        2)))
+        .catalogLoader(CatalogLoader.hive("hive", new Configuration(), 
Map.of()))
+        .writeParallelism(10)
+        .immediateTableUpdate(true)
+        .append();
+```
+
+### Configuration Example
+
+```java
+DynamicIcebergSink.Builder<RowData> builder = 
DynamicIcebergSink.forInput(inputStream);
+
+// Set common properties
+builder
+    .set("write.parquet.compression-codec", "gzip");
+
+// Set Dynamic Sink specific options
+builder
+    .writeParallelism(4)
+    .uidPrefix("dynamic-sink")
+    .cacheMaxSize(500)
+    .cacheRefreshMs(5000);
+
+// Add generator and append sink
+builder.generator(new CustomRecordGenerator());
+builder.append();
+```
+
+### Dynamic Routing Configuration
+
+Dynamic table routing can be customized by implementing the 
`DynamicRecordGenerator` interface:
+
+```java
+public class CustomRecordGenerator implements DynamicRecordGenerator<RowData> {
+    @Override
+    public DynamicRecord generate(RowData row) {
+        DynamicRecord record = new DynamicRecord();
+        // Set table name based on business logic
+        TableIdentifier tableIdentifier = TableIdentifier.of(database, 
tableName);
+        record.setTableIdentifier(tableIdentifier);
+        record.setData(row);
+        // Set the maximum number of parallel writers for a given 
table/branch/schema/spec
+        record.writeParallelism(2);
+        return record;
+    }
+}
+
+// Set custom record generator when building the sink
+DynamicIcebergSink.Builder<RowData> builder = 
DynamicIcebergSink.forInput(inputStream);
+builder.generator(new CustomRecordGenerator());
+// ... other config ...
+builder.append();
+```
+The user should provide a converter which converts the input record to a 
DynamicRecord.
+We need the following information (DynamicRecord) for every record:
+
+| Property           | Description                                             
                                  |
+|--------------------|-------------------------------------------------------------------------------------------|
+| `TableIdentifier`  | The target table to which the record will be written.   
                                  |
+| `Branch`           | The target branch for writing the record (optional).    
                                  |
+| `Schema`           | The schema of the record.                               
                                  |
+| `Spec`             | The expected partitioning specification for the record. 
                                  |
+| `RowData`          | The actual row data to be written.                      
                                  |
+| `DistributionMode` | The distribution mode for writing the record (currently 
supports NONE or HASH).           |
+| `Parallelism`      | The maximum number of parallel writers for a given 
table/branch/schema/spec (WriteTarget). |
+| `UpsertMode`       | Overrides this table's write.upsert.enabled (optional). 
                                  |
+| `EqualityFields`   | The equality fields for the table(optional).            
                                            |
+
+### Schema Update
+

Review Comment:
   ```suggestion
   
   The DynamicSink tries to match the schema provided in `DynamicRecord` with 
the existing table schemas. 
   
   - If there is a direct match with one of the existing table schemas, that 
table schema will be used for writing to the table.
   - If there is no direct match, DynamicSink tries to adapt the provided 
schema such that it matches one of table schemas. For example, if there is an 
additional optional column in the table schema, a null value will be added to 
the RowData provided via DynamicRecord.
   - Otherwise, we evolve the table schema to match the input schema, within 
the constraints described below.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to