kumarpritam863 opened a new pull request, #15880:
URL: https://github.com/apache/iceberg/pull/15880

   ## Summary                             
                                                                                
                                                                                
                               
     `SinkWriter.save()` tracked source offsets using `record.topic()` and 
`record.kafkaPartition()`, which reflect values **after** SMT transformations. 
When topic-rewriting SMTs like  `RegexRouter` are in the chain, these no longer 
match the original Kafka topic/partition that the framework's 
`context.assignment()` and consumer offset management use.                  
                                                                                
                                                                                
                               
     This caused:                                                               
                                                                                
                               
     - `Worker.receive()` to fail matching source offsets against 
`context.assignment()`, falling back to `NULL_OFFSET` for every partition
     - `Channel.send()` to commit offsets via `sendOffsetsToTransaction` for 
the **transformed** (non-existent) topic instead of the real source topic       
                                  
     - Source consumer offsets for the original topic were never advanced       
                                                                                
                               
     - On connector restart, all records were re-consumed, producing duplicate 
data in Iceberg tables         
   
   
   **Kafka Connect SinkRecord JavaDoc**
   ```
   Get the original topic for this sink record, before any transformations were 
applied. In order to be compatible with transformations that mutate topic 
names, this method should be used by sink tasks instead of topic() for any 
internal offset tracking purposes (for instance, reporting offsets to the 
Connect runtime via SinkTask. preCommit(Map)).
   ```                                                                          
       
                                                                                
                                                                                
                               
     ## Reproduction                                                            
                                                                                
                               
                                                                                
                                                                                
                               
     Use dynamic routing with a `RegexRouter` + `InsertField` SMT chain:        
                                                                                
                               
                                                               
     ```json                                                                    
                                                                                
                               
     {                                                         
       "iceberg.tables.dynamic-enabled": "true",                                
                                                                                
                               
       "iceberg.tables.route-field": "srcTopic",                                
                                                                                
                               
       "transforms": "addDbPrefix, insertTopic",                                
                                                                                
                               
       "transforms.addDbPrefix.type": 
"org.apache.kafka.connect.transforms.RegexRouter",                              
                                                                         
       "transforms.addDbPrefix.regex": ".*",                                    
                                                                                
                               
       "transforms.addDbPrefix.replacement": "tmp.dynamic_$0",                  
                                                                                
                               
       "transforms.insertTopic.type": 
"org.apache.kafka.connect.transforms.InsertField$Value",                        
                                                                         
       "transforms.insertTopic.topic.field": "srcTopic"                         
                                                                                
                               
     }                                                                          
                                                                                
                               
   ```
   
   RegexRouter changes record.topic() from orders → tmp.dynamic_orders. The 
sink then keys offsets under TopicPartition("tmp.dynamic_orders", 0), which 
never matches context.assignment()   
     containing TopicPartition("orders", 0).                   
                                                                                
                                                                                
                               
     Fix                                                       
   
     Changed SinkWriter.save() to use record.originalTopic(), 
record.originalKafkaPartition(), and record.originalKafkaOffset() — the pre-SMT 
values that stay consistent with the framework's 
     consumer offset tracking.
                                                                                
                                                                                
                               
     Test plan                                                 
   
     - Added testOffsetTrackedByOriginalTopicPartition — creates a SinkRecord, 
transforms it via newRecord() with a different topic (simulating RegexRouter), 
and verifies offsets are keyed by
      the original topic, not the transformed one
   
   - Added integration tests to verify the snapshots which fail when if 
record.topic() is used and pass if originalTopic() is used.
   
   -   All existing TestSinkWriter tests pass 
   
   **Addresses** -> https://github.com/apache/iceberg/issues/13457


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to