kumarpritam863 opened a new pull request, #15880: URL: https://github.com/apache/iceberg/pull/15880
## Summary
`SinkWriter.save()` tracked source offsets using `record.topic()` and
`record.kafkaPartition()`, which reflect values **after** SMT transformations.
When topic-rewriting SMTs like `RegexRouter` are in the chain, these no longer
match the original Kafka topic/partition that the framework's
`context.assignment()` and consumer offset management use.
This caused:
- `Worker.receive()` to fail matching source offsets against
`context.assignment()`, falling back to `NULL_OFFSET` for every partition
- `Channel.send()` to commit offsets via `sendOffsetsToTransaction` for
the **transformed** (non-existent) topic instead of the real source topic
- Source consumer offsets for the original topic were never advanced
- On connector restart, all records were re-consumed, producing duplicate
data in Iceberg tables
**Kafka Connect SinkRecord JavaDoc**
```
Get the original topic for this sink record, before any transformations were
applied. In order to be compatible with transformations that mutate topic
names, this method should be used by sink tasks instead of topic() for any
internal offset tracking purposes (for instance, reporting offsets to the
Connect runtime via SinkTask. preCommit(Map)).
```
## Reproduction
Use dynamic routing with a `RegexRouter` + `InsertField` SMT chain:
```json
{
"iceberg.tables.dynamic-enabled": "true",
"iceberg.tables.route-field": "srcTopic",
"transforms": "addDbPrefix, insertTopic",
"transforms.addDbPrefix.type":
"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addDbPrefix.regex": ".*",
"transforms.addDbPrefix.replacement": "tmp.dynamic_$0",
"transforms.insertTopic.type":
"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTopic.topic.field": "srcTopic"
}
```
RegexRouter changes record.topic() from orders → tmp.dynamic_orders. The
sink then keys offsets under TopicPartition("tmp.dynamic_orders", 0), which
never matches context.assignment()
containing TopicPartition("orders", 0).
Fix
Changed SinkWriter.save() to use record.originalTopic(),
record.originalKafkaPartition(), and record.originalKafkaOffset() — the pre-SMT
values that stay consistent with the framework's
consumer offset tracking.
Test plan
- Added testOffsetTrackedByOriginalTopicPartition — creates a SinkRecord,
transforms it via newRecord() with a different topic (simulating RegexRouter),
and verifies offsets are keyed by
the original topic, not the transformed one
- Added integration tests to verify the snapshots which fail when if
record.topic() is used and pass if originalTopic() is used.
- All existing TestSinkWriter tests pass
**Addresses** -> https://github.com/apache/iceberg/issues/13457
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
