[ 
https://issues.apache.org/jira/browse/KAFKA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511487#comment-17511487
 ] 

Chris Egerton commented on KAFKA-13404:
---------------------------------------

Thanks [~yujhe.li]. Agreed that an ideal fix for this would prevent control 
records from distorting committed offsets.

One complication is that the connector API allows sink connectors to explicitly 
specify which offsets get committed for each topic partition via the 
{{SinkTask}} class's 
[preCommit|https://kafka.apache.org/31/javadoc/org/apache/kafka/connect/sink/SinkTask.html#preCommit(java.util.Map)]
 method. These offsets are usually derived by querying the 
[kafkaOffset|https://kafka.apache.org/31/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#kafkaOffset()]
 method for each {{SinkRecord}} that the task receives in 
[put|https://kafka.apache.org/31/javadoc/org/apache/kafka/connect/sink/SinkTask.html#put(java.util.Collection)].
 This creates an issue because the contract for the {{kafkaOffset}} method is 
that it returns the offset for the consumer record that the {{SinkRecord}} was 
derived from–not the offset that should be committed to Kafka in order to 
signal that that record has been successfully processed by the connector and 
should not be redelivered to it in the future. Examples of this can be found in 
the Confluent HDFS connector 
[here|https://github.com/confluentinc/kafka-connect-hdfs/pull/425] and the 
WePay/Confluent BigQuery connector 
[here|https://github.com/confluentinc/kafka-connect-bigquery/blob/e7c19571ff94f3aa290a02d490b0049af2c51e0c/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java#L331-L334].

Unfortunately, we can't really modify the {{kafkaOffset}} method without 
running into some pretty ugly compatibility issues.

One thing that might be worth noting is KAFKA-13431, a work-in-progress effort 
to address a separate issue in sink task offset tracking logic.

Given that issue, here are two possible approaches we could take, which are not 
mutually exclusive:
 # Adjust the behavior of the Kafka Connect framework to fix this issue but 
only for sink connectors that do not manually manage their own offsets 
(probably using a similar strategy to what Kafka Streams does, but I haven't 
taken a long look at that yet)
 # Account for this bug while working on KAFKA-13431. One possible method could 
be to add an {{acknowledge}} or {{commit}} method to the {{SinkTask}} class, so 
that tasks can notify Kafka Connect that a record has been processed 
successfully without having to explicitly manage offsets. This would be a 
pretty major change to the connector API, but given that it would allow us to 
address this and another long-standing bug, and could also potentially be used 
to satisfy the use case for 
[KIP-767|https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics],
 there might be enough here to warrant it.

> Kafka sink connectors do not commit offset correctly if messages are produced 
> in transaction
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13404
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13404
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.6.1
>            Reporter: Yu-Jhe Li
>            Priority: Major
>         Attachments: Main.scala
>
>
> The Kafka sink connectors don't commit offset to the latest log-end offset if 
> the messages are produced in a transaction.
> From the code of 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/2.6.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L477]],
>  we found that the sink connector gets offset from messages and commits it to 
> Kafka after the messages are processed successfully. But for messages 
> produced in the transaction, there are additional record [control 
> batches|http://kafka.apache.org/documentation/#controlbatch] that are used to 
> indicate the transaction is successful or aborted.
>  
> You can reproduce it by running `connect-file-sink` with the following 
> properties:
> {noformat}
> /opt/kafka/bin/connect-standalone.sh /connect-standalone.properties 
> /connect-file-sink.properties{noformat}
> {code:java}
> # connect-standalone.properties
> bootstrap.servers=localhost:9092
> key.converter=org.apache.kafka.connect.storage.StringConverter
> value.converter=org.apache.kafka.connect.storage.StringConverter
> key.converter.schemas.enable=true
> value.converter.schemas.enable=true
> # for testing
> offset.flush.interval.ms=10000
> consumer.isolation.level=read_committed
> consumer.auto.offset.reset=none
> {code}
> {code:java}
> # connect-file-sink.properties
> name=local-file-sink
> connector.class=FileStreamSink
> tasks.max=1
> file=/tmp/test.sink.txt
> topics=test{code}
> And use the attached Java producer ([^Main.scala] to produce 10 messages to 
> the `test` topic in a transaction.
> You can see that the topic log-end offset is 11 now and the last record in 
> the segment file is control batches. But the consumer group offset is still 
> in 10. (If the record is deleted by topic retention, you will get 
> OffsetOutOfRange exception after restart the connector)
> {code:java}
> bash-5.1# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 
> kafka1:9092 --group connect-local-file-sink --describe 
> GROUP                   TOPIC           PARTITION  CURRENT-OFFSET  
> LOG-END-OFFSET  LAG             CONSUMER-ID                                   
>                             HOST            CLIENT-ID                         
>                
> connect-local-file-sink test            0          10              11         
>      1               
> connector-consumer-local-file-sink-0-10777adb-72c2-4fd3-8773-4f5a0498903d 
> /172.21.0.3     connector-consumer-local-file-sink-0
> bash-5.1# /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
> --files /kafka/test-0/00000000000000000000.log --print-data-log
> Dumping /kafka/test-0/00000000000000000000.log
> Starting offset: 0
> baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9 
> producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: 
> true isControl: false position: 0 CreateTime: 1634805907230 size: 208 magic: 
> 2 compresscodec: GZIP crc: 2170304005 isvalid: tru GZIP crc: 2170304005 
> isvalid: true
> | offset: 0 CreateTime: 1634805907210 keysize: -1 valuesize: 39 sequence: 0 
> headerKeys: [] payload: {"value": "banana", "time": 1634805907}
> | offset: 1 CreateTime: 1634805907230 keysize: -1 valuesize: 39 sequence: 1 
> headerKeys: [] payload: {"value": "banana", "time": 1634805907}
> | offset: 2 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 2 
> headerKeys: [] payload: {"value": "ice", "time": 1634805907}
> | offset: 3 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 3 
> headerKeys: [] payload: {"value": "apple", "time": 1634805907}
> | offset: 4 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 4 
> headerKeys: [] payload: {"value": "home", "time": 1634805907}
> | offset: 5 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 5 
> headerKeys: [] payload: {"value": "juice", "time": 1634805907}
> | offset: 6 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 6 
> headerKeys: [] payload: {"value": "cat", "time": 1634805907}
> | offset: 7 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 7 
> headerKeys: [] payload: {"value": "cat", "time": 1634805907}
> | offset: 8 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 8 
> headerKeys: [] payload: {"value": "girl", "time": 1634805907}
> | offset: 9 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 9 
> headerKeys: [] payload: {"value": "cat", "time": 1634805907}
> baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1 
> producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: 
> true isControl: true position: 208 CreateTime: 1634805908149 size: 78 magic: 
> 2 compresscodec: NONE crc: 1662003889 isvalid: ec: NONE crc: 1662003889 
> isvalid: true
> | offset: 10 CreateTime: 1634805908149 keysize: 4 valuesize: 6 sequence: -1 
> headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
> {code}
>  
> I think we should use 
> [KafkaConsumer.position()|https://github.com/apache/kafka/blob/2.6.1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1742]
>  to get the correct offset instead of offsets in messages. I will create a PR 
> for that later.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to