[ https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Anil Dasari updated KAFKA-13601: -------------------------------- Description: Exactly once in s3 connector with scheduled rotation and field partitioner can be achieved with consumer offset sync' commit after message batch flushed to sink successfully Currently, WorkerSinkTask committing the consumer offsets asynchronously. [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L203] [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196] [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L354] Add config to allow user to select synchronous commit over WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG was: Exactly once in s3 connector with scheduled rotation and field partitioner can be achieved with consumer offset sync' commit. Currently, WorkerSinkTask committing the consumer offsets asynchronously. private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing, final int seqno) \{ log.info("{} Committing offsets", this); if (closing) \{ doCommitSync(offsets, seqno); } else \{ OffsetCommitCallback cb = new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) { lastCommittedOffsets = offsets; onCommitCompleted(error, seqno); } }; consumer.commitAsync(offsets, cb); } } Add config to sink to chose sync' offset commit > Add option to support sync offset commit in Kafka Connect Sink > -------------------------------------------------------------- > > Key: KAFKA-13601 > URL: https://issues.apache.org/jira/browse/KAFKA-13601 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect > Reporter: Anil Dasari > Priority: Major > > Exactly once in s3 connector with scheduled rotation and field partitioner > can be achieved with consumer offset sync' commit after message batch flushed > to sink successfully > Currently, WorkerSinkTask committing the consumer offsets asynchronously. > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L203] > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196] > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L354] > > Add config to allow user to select synchronous commit over > WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG -- This message was sent by Atlassian Jira (v8.20.1#820001)