C0urante opened a new pull request #8910:
URL: https://github.com/apache/kafka/pull/8910


   [Jira](https://issues.apache.org/jira/browse/KAFKA-10188)
   
   The general lifecycle for a sink task is:
   
   1. Instantiate the `SinkTask` object
   2. Invoke `SinkTask::initialize`
   3. Invoke `SinkTask::start`
   4. While the task is still running:
   - Poll Kafka for records
   - Give those records to the task via `SinkTask::put`
   - Periodically commit offsets, which involves calling `SinkTask::preCommit` 
and committing the resulting map of `TopicPartition` to offset to Kafka
   5. Commit offsets a penultimate time (including the call to 
`SinkTask::preCommit)
   6. Invoke `SinkTask::stop`
   7. Close the consumer for the task
   8. Commit offsets a final time (also including the call to 
`SinkTask::preCommit`)
   
   This final offset commit happens indirectly: closing the consumer for a sink 
task causes the rebalance listener for that consumer to be triggered, and the 
rebalance listener the framework uses for its consumers performs an offset 
commit for the task when partitions are revoked.
   
   This is a bit of a problem because the framework calls `SinkTask::stop` 
before closing the consumer for the task. It's possible and even likely that 
tasks will have de-allocated resources necessary for their `preCommit` method 
and will fail unexpectedly at this point.
   
   Since the framework already [ensures that offsets are 
committed](https://github.com/apache/kafka/blob/199f375b546c201289d2b15084e0a95598093fe0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L194-L196)
 after the last call to `SinkTask::put`, it should be fine to remove this extra 
offset commit. There is still a chance that some data may be dropped in the 
case that a task performs completely asynchronous writes to Kafka and has 
written data between the pre-stop call to `SinkTask::preCommit` and the 
post-stop one, but there will be no loss of delivery guarantees provided by the 
framework, and this change will adhere to the publicly-stated API for sink 
tasks.
   
   A unit test is added that covers the internal `WorkerSinkTask::close` method 
and ensures that `SinkTask::preCommit` is not called during that method.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to