[ https://issues.apache.org/jira/browse/KAFKA-9941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
jiangchuan updated KAFKA-9941: ------------------------------ Summary: WorkerSinkTask:When a record triggers a RetriableException and the retry is processed successfully, its offset does not commit. (was: When a record triggers a RetriableException and the retry is processed successfully, its offset does not commit.) > WorkerSinkTask:When a record triggers a RetriableException and the retry is > processed successfully, its offset does not commit. > ------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-9941 > URL: https://issues.apache.org/jira/browse/KAFKA-9941 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 2.2.0 > Reporter: jiangchuan > Priority: Major > Fix For: 2.2.0 > > > When a record triggers a RetriableException and the retry is processed > successfully, its offset does not commit. > The processing process of connect is as follows: > 1: commitOffsets > 2: poll (long timeoutMs) > 3: convertMessages > 4: deliverMessages > Offset storage: > 1: lastCommittedOffsets > 2: currentOffsets > 3: origOffsets > Cause of the problem: I need to retry the record > 1.RetriableException from SinkTask: > 2.Pausing partitions > 3.Not returning fetched records for assigned partition since it is no longer > fetchable > 3.1.ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs); > 3.2.convertMessages(msgs);// msgs is empty > 3.3.origOffsets.clear(); // Record of the retry operation, it's offset has > been cleared > 4.The retry operation has completed > 4.1.now commit the offset of the record of the retry operation, but > origOffsets has been cleared. > 4.2.Skipping offset commit, no change since last commit > 4.3.Finished offset commit successfully in 0 ms for sequence number 384: > null > -- This message was sent by Atlassian Jira (v8.3.4#803005)