[ https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17509327#comment-17509327 ]
Chris Egerton edited comment on KAFKA-13601 at 3/19/22, 7:31 PM: ----------------------------------------------------------------- Sorry, I don't think this is correct: {quote}new proposed option provides exactly once guarantees in case of field partitioner and size or time rotation based flush strategy considering no duplicates in kafka topic. {quote} What would be necessary for exactly-once in this situation is either deterministic, idempotent writes to the external system (which, as we've noted, can already be achieved in some cases with the Confluent S3 sink connector), or a combination of resiliency against zombie tasks writing to the external system _and_ atomically committing offsets with the data for each write. Adding synchronous offset commits for sink tasks wouldn't accomplish either of these. There's no way to both write an S3 object and commit a Kafka offset atomically. Synchronous offset commits don't provide that guarantee, as there's still the possibility that the worker dies suddenly in between the task writing to S3 and the Kafka Connect framework committing offsets. Regarding this: {quote}Offset commit interval on cluster to 1 is too aggressive and have performance impacts {quote} I also don't think this is correct. For sink tasks, if the offsets that will be committed are exactly the same as the last ones that were committed, then no offset commit actually takes place. If your connector manually tracks offsets (like I believe the Confluent S3 sink connector does), then this will only cause an actual commit to Kafka to take place as soon as the connector informs the framework that there are new offsets to commit, and otherwise will effectively be a no-op. I guess there might be implications if the connector is configured to use an errant record reporter, though? Finally, I believe the Confluent S3 sink connector _can_ provide exactly-once delivery (via idempotent writes) with a time-based rotation policy, as long as it's configured to use record timestamps and not wallclock time, as that would allow rotations to be deterministic based on topic content. was (Author: chrisegerton): Sorry, I don't think this is correct: {quote}new proposed option provides exactly once guarantees in case of field partitioner and size or time rotation based flush strategy considering no duplicates in kafka topic. {quote} What would be necessary for exactly-once in this situation is either deterministic, idempotent writes to the external system (which, as we've noted, can already be achieved in some cases with the Confluent S3 sink connector), or a combination of resiliency against zombie tasks writing to the external system _and_ atomically committing offsets with the data for each write. Adding synchronous offset commits for sink tasks wouldn't accomplish either of these. There's no way to both write an S3 object and commit a Kafka offset atomically. Synchronous offset commits don't provide that guarantee, as there's still the possibility that the worker dies suddenly in between the task writing to S3 and the Kafka Connect framework committing offsets. Finally, I believe the Confluent S3 sink connector _can_ provide exactly-once delivery (via idempotent writes) with a time-based rotation policy, as long as it's configured to use record timestamps and not wallclock time, as that would allow rotations to be deterministic based on topic content. > Add option to support sync offset commit in Kafka Connect Sink > -------------------------------------------------------------- > > Key: KAFKA-13601 > URL: https://issues.apache.org/jira/browse/KAFKA-13601 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect > Reporter: Anil Dasari > Priority: Major > > Exactly once in s3 connector with scheduled rotation and field partitioner > can be achieved with consumer offset sync' commit after message batch flushed > to sink successfully > Currently, WorkerSinkTask committing the consumer offsets asynchronously and > at regular intervals of WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L203] > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196] > [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L354] > > Add config to allow user to select synchronous commit over > WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG -- This message was sent by Atlassian Jira (v8.20.1#820001)