[ 
https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17509327#comment-17509327
 ] 

Chris Egerton edited comment on KAFKA-13601 at 3/19/22, 7:31 PM:
-----------------------------------------------------------------

Sorry, I don't think this is correct:
{quote}new proposed option provides exactly once guarantees in case of field 
partitioner and size or time rotation based flush strategy considering no 
duplicates in kafka topic.
{quote}
What would be necessary for exactly-once in this situation is either 
deterministic, idempotent writes to the external system (which, as we've noted, 
can already be achieved in some cases with the Confluent S3 sink connector), or 
a combination of resiliency against zombie tasks writing to the external system 
_and_ atomically committing offsets with the data for each write.

Adding synchronous offset commits for sink tasks wouldn't accomplish either of 
these. There's no way to both write an S3 object and commit a Kafka offset 
atomically. Synchronous offset commits don't provide that guarantee, as there's 
still the possibility that the worker dies suddenly in between the task writing 
to S3 and the Kafka Connect framework committing offsets.

Regarding this:
{quote}Offset commit interval on cluster to 1 is too aggressive and have 
performance impacts
{quote}
I also don't think this is correct. For sink tasks, if the offsets that will be 
committed are exactly the same as the last ones that were committed, then no 
offset commit actually takes place. If your connector manually tracks offsets 
(like I believe the Confluent S3 sink connector does), then this will only 
cause an actual commit to Kafka to take place as soon as the connector informs 
the framework that there are new offsets to commit, and otherwise will 
effectively be a no-op. I guess there might be implications if the connector is 
configured to use an errant record reporter, though?

Finally, I believe the Confluent S3 sink connector _can_ provide exactly-once 
delivery (via idempotent writes) with a time-based rotation policy, as long as 
it's configured to use record timestamps and not wallclock time, as that would 
allow rotations to be deterministic based on topic content.


was (Author: chrisegerton):
Sorry, I don't think this is correct:
{quote}new proposed option provides exactly once guarantees in case of field 
partitioner and size or time rotation based flush strategy considering no 
duplicates in kafka topic.
{quote}
What would be necessary for exactly-once in this situation is either 
deterministic, idempotent writes to the external system (which, as we've noted, 
can already be achieved in some cases with the Confluent S3 sink connector), or 
a combination of resiliency against zombie tasks writing to the external system 
_and_ atomically committing offsets with the data for each write.

Adding synchronous offset commits for sink tasks wouldn't accomplish either of 
these. There's no way to both write an S3 object and commit a Kafka offset 
atomically. Synchronous offset commits don't provide that guarantee, as there's 
still the possibility that the worker dies suddenly in between the task writing 
to S3 and the Kafka Connect framework committing offsets.

Finally, I believe the Confluent S3 sink connector _can_ provide exactly-once 
delivery (via idempotent writes) with a time-based rotation policy, as long as 
it's configured to use record timestamps and not wallclock time, as that would 
allow rotations to be deterministic based on topic content.

> Add option to support sync offset commit in Kafka Connect Sink
> --------------------------------------------------------------
>
>                 Key: KAFKA-13601
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13601
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Anil Dasari
>            Priority: Major
>
> Exactly once in s3 connector with scheduled rotation and field partitioner 
> can be achieved with consumer offset sync' commit after message batch flushed 
> to sink successfully
> Currently, WorkerSinkTask committing the consumer offsets asynchronously and 
> at regular intervals of WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG 
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L203]
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196]
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L354]
>  
> Add config to allow user to select synchronous commit over 
> WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to