Anil Dasari created KAFKA-13601:
-----------------------------------

             Summary: Add option to support sync offset commit in Kafka Connect 
Sink
                 Key: KAFKA-13601
                 URL: https://issues.apache.org/jira/browse/KAFKA-13601
             Project: Kafka
          Issue Type: New Feature
          Components: KafkaConnect
            Reporter: Anil Dasari


Exactly once in s3 connector with scheduled rotation and field partitioner can 
be achieved with consumer offset sync' commit. 

Currently, WorkerSinkTask committing the consumer offsets asynchronously. 
private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean 
closing, final int seqno) \{
        log.info("{} Committing offsets", this);
        if (closing) \{
            doCommitSync(offsets, seqno);
        } else \{
            OffsetCommitCallback cb = new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception error) {
                    lastCommittedOffsets = offsets;
                    onCommitCompleted(error, seqno);
                }
            };
            consumer.commitAsync(offsets, cb);
        }
    }
 

Add config to sink to chose sync' offset commit



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to