[ 
https://issues.apache.org/jira/browse/KAFKA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anupam Aggarwal reassigned KAFKA-13404:
---------------------------------------

    Assignee: Anupam Aggarwal

> Kafka sink connectors do not commit offset correctly if messages are produced 
> in transaction
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13404
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13404
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.6.1
>            Reporter: Yu-Jhe Li
>            Assignee: Anupam Aggarwal
>            Priority: Major
>         Attachments: Main.scala
>
>
> The Kafka sink connectors don't commit offset to the latest log-end offset if 
> the messages are produced in a transaction.
> From the code of 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/2.6.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L477]],
>  we found that the sink connector gets offset from messages and commits it to 
> Kafka after the messages are processed successfully. But for messages 
> produced in the transaction, there are additional record [control 
> batches|http://kafka.apache.org/documentation/#controlbatch] that are used to 
> indicate the transaction is successful or aborted.
>  
> You can reproduce it by running `connect-file-sink` with the following 
> properties:
> {noformat}
> /opt/kafka/bin/connect-standalone.sh /connect-standalone.properties 
> /connect-file-sink.properties{noformat}
> {code:java}
> # connect-standalone.properties
> bootstrap.servers=localhost:9092
> key.converter=org.apache.kafka.connect.storage.StringConverter
> value.converter=org.apache.kafka.connect.storage.StringConverter
> key.converter.schemas.enable=true
> value.converter.schemas.enable=true
> # for testing
> offset.flush.interval.ms=10000
> consumer.isolation.level=read_committed
> consumer.auto.offset.reset=none
> {code}
> {code:java}
> # connect-file-sink.properties
> name=local-file-sink
> connector.class=FileStreamSink
> tasks.max=1
> file=/tmp/test.sink.txt
> topics=test{code}
> And use the attached Java producer ([^Main.scala] to produce 10 messages to 
> the `test` topic in a transaction.
> You can see that the topic log-end offset is 11 now and the last record in 
> the segment file is control batches. But the consumer group offset is still 
> in 10. (If the record is deleted by topic retention, you will get 
> OffsetOutOfRange exception after restart the connector)
> {code:java}
> bash-5.1# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 
> kafka1:9092 --group connect-local-file-sink --describe 
> GROUP                   TOPIC           PARTITION  CURRENT-OFFSET  
> LOG-END-OFFSET  LAG             CONSUMER-ID                                   
>                             HOST            CLIENT-ID                         
>                
> connect-local-file-sink test            0          10              11         
>      1               
> connector-consumer-local-file-sink-0-10777adb-72c2-4fd3-8773-4f5a0498903d 
> /172.21.0.3     connector-consumer-local-file-sink-0
> bash-5.1# /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
> --files /kafka/test-0/00000000000000000000.log --print-data-log
> Dumping /kafka/test-0/00000000000000000000.log
> Starting offset: 0
> baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9 
> producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: 
> true isControl: false position: 0 CreateTime: 1634805907230 size: 208 magic: 
> 2 compresscodec: GZIP crc: 2170304005 isvalid: tru GZIP crc: 2170304005 
> isvalid: true
> | offset: 0 CreateTime: 1634805907210 keysize: -1 valuesize: 39 sequence: 0 
> headerKeys: [] payload: {"value": "banana", "time": 1634805907}
> | offset: 1 CreateTime: 1634805907230 keysize: -1 valuesize: 39 sequence: 1 
> headerKeys: [] payload: {"value": "banana", "time": 1634805907}
> | offset: 2 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 2 
> headerKeys: [] payload: {"value": "ice", "time": 1634805907}
> | offset: 3 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 3 
> headerKeys: [] payload: {"value": "apple", "time": 1634805907}
> | offset: 4 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 4 
> headerKeys: [] payload: {"value": "home", "time": 1634805907}
> | offset: 5 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 5 
> headerKeys: [] payload: {"value": "juice", "time": 1634805907}
> | offset: 6 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 6 
> headerKeys: [] payload: {"value": "cat", "time": 1634805907}
> | offset: 7 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 7 
> headerKeys: [] payload: {"value": "cat", "time": 1634805907}
> | offset: 8 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 8 
> headerKeys: [] payload: {"value": "girl", "time": 1634805907}
> | offset: 9 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 9 
> headerKeys: [] payload: {"value": "cat", "time": 1634805907}
> baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1 
> producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: 
> true isControl: true position: 208 CreateTime: 1634805908149 size: 78 magic: 
> 2 compresscodec: NONE crc: 1662003889 isvalid: ec: NONE crc: 1662003889 
> isvalid: true
> | offset: 10 CreateTime: 1634805908149 keysize: 4 valuesize: 6 sequence: -1 
> headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
> {code}
>  
> I think we should use 
> [KafkaConsumer.position()|https://github.com/apache/kafka/blob/2.6.1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1742]
>  to get the correct offset instead of offsets in messages. I will create a PR 
> for that later.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to