[ https://issues.apache.org/jira/browse/KAFKA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Anupam Aggarwal reassigned KAFKA-13404: --------------------------------------- Assignee: Anupam Aggarwal > Kafka sink connectors do not commit offset correctly if messages are produced > in transaction > -------------------------------------------------------------------------------------------- > > Key: KAFKA-13404 > URL: https://issues.apache.org/jira/browse/KAFKA-13404 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 2.6.1 > Reporter: Yu-Jhe Li > Assignee: Anupam Aggarwal > Priority: Major > Attachments: Main.scala > > > The Kafka sink connectors don't commit offset to the latest log-end offset if > the messages are produced in a transaction. > From the code of > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/2.6.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L477]], > we found that the sink connector gets offset from messages and commits it to > Kafka after the messages are processed successfully. But for messages > produced in the transaction, there are additional record [control > batches|http://kafka.apache.org/documentation/#controlbatch] that are used to > indicate the transaction is successful or aborted. > > You can reproduce it by running `connect-file-sink` with the following > properties: > {noformat} > /opt/kafka/bin/connect-standalone.sh /connect-standalone.properties > /connect-file-sink.properties{noformat} > {code:java} > # connect-standalone.properties > bootstrap.servers=localhost:9092 > key.converter=org.apache.kafka.connect.storage.StringConverter > value.converter=org.apache.kafka.connect.storage.StringConverter > key.converter.schemas.enable=true > value.converter.schemas.enable=true > # for testing > offset.flush.interval.ms=10000 > consumer.isolation.level=read_committed > consumer.auto.offset.reset=none > {code} > {code:java} > # connect-file-sink.properties > name=local-file-sink > connector.class=FileStreamSink > tasks.max=1 > file=/tmp/test.sink.txt > topics=test{code} > And use the attached Java producer ([^Main.scala] to produce 10 messages to > the `test` topic in a transaction. > You can see that the topic log-end offset is 11 now and the last record in > the segment file is control batches. But the consumer group offset is still > in 10. (If the record is deleted by topic retention, you will get > OffsetOutOfRange exception after restart the connector) > {code:java} > bash-5.1# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server > kafka1:9092 --group connect-local-file-sink --describe > GROUP TOPIC PARTITION CURRENT-OFFSET > LOG-END-OFFSET LAG CONSUMER-ID > HOST CLIENT-ID > > connect-local-file-sink test 0 10 11 > 1 > connector-consumer-local-file-sink-0-10777adb-72c2-4fd3-8773-4f5a0498903d > /172.21.0.3 connector-consumer-local-file-sink-0 > bash-5.1# /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments > --files /kafka/test-0/00000000000000000000.log --print-data-log > Dumping /kafka/test-0/00000000000000000000.log > Starting offset: 0 > baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9 > producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: > true isControl: false position: 0 CreateTime: 1634805907230 size: 208 magic: > 2 compresscodec: GZIP crc: 2170304005 isvalid: tru GZIP crc: 2170304005 > isvalid: true > | offset: 0 CreateTime: 1634805907210 keysize: -1 valuesize: 39 sequence: 0 > headerKeys: [] payload: {"value": "banana", "time": 1634805907} > | offset: 1 CreateTime: 1634805907230 keysize: -1 valuesize: 39 sequence: 1 > headerKeys: [] payload: {"value": "banana", "time": 1634805907} > | offset: 2 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 2 > headerKeys: [] payload: {"value": "ice", "time": 1634805907} > | offset: 3 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 3 > headerKeys: [] payload: {"value": "apple", "time": 1634805907} > | offset: 4 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 4 > headerKeys: [] payload: {"value": "home", "time": 1634805907} > | offset: 5 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 5 > headerKeys: [] payload: {"value": "juice", "time": 1634805907} > | offset: 6 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 6 > headerKeys: [] payload: {"value": "cat", "time": 1634805907} > | offset: 7 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 7 > headerKeys: [] payload: {"value": "cat", "time": 1634805907} > | offset: 8 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 8 > headerKeys: [] payload: {"value": "girl", "time": 1634805907} > | offset: 9 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 9 > headerKeys: [] payload: {"value": "cat", "time": 1634805907} > baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1 > producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: > true isControl: true position: 208 CreateTime: 1634805908149 size: 78 magic: > 2 compresscodec: NONE crc: 1662003889 isvalid: ec: NONE crc: 1662003889 > isvalid: true > | offset: 10 CreateTime: 1634805908149 keysize: 4 valuesize: 6 sequence: -1 > headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0 > {code} > > I think we should use > [KafkaConsumer.position()|https://github.com/apache/kafka/blob/2.6.1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1742] > to get the correct offset instead of offsets in messages. I will create a PR > for that later. > -- This message was sent by Atlassian Jira (v8.20.10#820010)