kumarpritam863 opened a new pull request, #15959:
URL: https://github.com/apache/iceberg/pull/15959

   ## Summary                                                                   
                                                                                
                                                     
   
     Introduces a pluggable `RecordRouter` interface that abstracts all 
record-to-table routing logic in the Kafka Connect sink, replacing the 
hardcoded static/dynamic if-else in `SinkWriter`. This unlocks custom   
     routing strategies without modifying connector internals.
                                                                                
                                                                                
                                                       
     Ships three built-in implementations:                                      
                                                                                
                                                       
     - **`StaticRouter`** — wraps existing static routing behavior (broadcast + 
regex filtering)
     - **`DynamicRouter`** — wraps existing dynamic routing behavior (field 
value as table name)                                                            
                                                           
     - **`TopicNameRouter`** — new: maps Kafka topic names to Iceberg table 
identifiers with support for namespace prefixing, regex transforms, explicit 
topic-to-table overrides, and case control                    
                                                                                
                                                                                
                                                       
     ### Motivation                                                             
                                                                                
                                                       
                                                                                
                                                                                
                                                       
     The current routing is single-dimensional (one field determines the 
destination) and the two modes (static vs dynamic) are mutually exclusive. 
Users frequently need topic-based routing, which requires no       
     workaround today. The pluggable interface also enables future routers 
(header-based, composite field, expression-based) as independent contributions.
                                                                                
                                                                                
                                                       
     ### Key design decisions                                                   
                                                                                
                                                       
      
     - `RecordRouter.configure(Map<String, String>)` takes raw props instead of 
`IcebergSinkConfig` to keep the interface decoupled — external implementations 
only depend on the router API                           
     - `route()` returns `List<RouteTarget>` to support fan-out (one record → 
multiple tables), which static routing already does in broadcast mode
     - `RouteTarget` carries `ignoreMissingTable` to preserve the semantic 
difference between static routing (throw on missing) and dynamic/topic routing 
(NoOpWriter)                                                 
     - Built-in routers have a package-private `configure(IcebergSinkConfig)` 
overload to avoid the cost of re-creating config + JsonConverter from raw props 
                                                         
     - When `iceberg.tables.router-class` is unset, `RecordRouterFactory` falls 
back to `StaticRouter`/`DynamicRouter` — **zero behavioral change** for 
existing users                                                 
                                                                                
                                                                                
                                                       
     ### New configuration                                                      
                                                                                
                                                       
                                                                                
                                                                                
                                                       
     | Property | Type | Default | Description |                                
                                                                                
                                                       
     |----------|------|---------|-------------|               
     | `iceberg.tables.router-class` | String | null | Fully-qualified 
`RecordRouter` class name. Overrides `iceberg.tables` and 
`iceberg.tables.dynamic-enabled` when set. |
                                                                                
                                                                                
                                                       
     #### TopicNameRouter properties (prefix 
`iceberg.tables.route.topic-name.`)                                             
                                                                                
          
                                                                                
                                                                                
                                                       
     | Property | Default | Description |                                       
                                                                                
                                                       
     |----------|---------|-------------|                      
     | `table-namespace` | null | Namespace prepended to derived table name |   
                                                                                
                                                       
     | `table-map.<topic>` | — | Explicit topic→table overrides (bypass all 
transforms) |                                                                   
                                                           
     | `regex` | null | Regex applied to topic name |                           
                                                                                
                                                       
     | `regex-replacement` | `$1` | Replacement string for regex captures |     
                                                                                
                                                       
     | `lowercase` | true | Lowercase the derived table name |                  
                                                                                
                                                       
     | `ignore-missing-table` | true | Use NoOpWriter for missing tables 
instead of throwing |                                                           
                                                              
                                                                                
                                                                                
                                                       
     #### Example usage                                                         
                                                                                
                                                       
                                                                                
                                                                                
                                                       
     ```properties                                                              
                                                                                
                                                       
     # Simple: topic "orders" → table "analytics.orders"       
     
iceberg.tables.router-class=org.apache.iceberg.connect.data.TopicNameRouter     
                                                                                
                                                  
     iceberg.tables.route.topic-name.table-namespace=analytics                  
                                                                                
                                                       
                                                                                
                                                                                
                                                       
     # Regex: topic "prod.events.clicks" → table "warehouse.clicks"             
                                                                                
                                                       
     
iceberg.tables.router-class=org.apache.iceberg.connect.data.TopicNameRouter     
                                                                                
                                                  
     iceberg.tables.route.topic-name.regex=.*\\.(.+)                            
                                                                                
                                                       
     iceberg.tables.route.topic-name.regex-replacement=$1                       
                                                                                
                                                       
     iceberg.tables.route.topic-name.table-namespace=warehouse                  
                                                                                
                                                       
                                                                                
                                                                                
                                                       
     # Explicit overrides for specific topics                                   
                                                                                
                                                       
     
iceberg.tables.router-class=org.apache.iceberg.connect.data.TopicNameRouter     
                                                                                
                                                  
     iceberg.tables.route.topic-name.table-namespace=db                         
                                                                                
                                                       
     iceberg.tables.route.topic-name.table-map.orders-v2=analytics.orders
     iceberg.tables.route.topic-name.table-map.users-legacy=analytics.users     
                                                                                
                                                                                
                                                                                
                                                        
     ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to