C0urante edited a comment on pull request #10112: URL: https://github.com/apache/kafka/pull/10112#issuecomment-782174511
Ah, gotcha! Yeah, `SourceTask::stop` isn't really a key part of the functional changes here or the tests added. The idea is just to prevent high-throughput source tasks whose producers are unable to keep up with them from entering a death spiral where they stop being able to commit offsets completely. This is because right now, when an offset commit attempt fails because the current batch of source records wasn't flushed to Kafka in time, all backlogged records that were read from `SourceTask::poll` during that time get added to the batch, which just compounds the problem. For example: 1. The source task generates 10000 records and the worker starts sending those to Kafka. 1. An offset commit is triggered on a separate thread and the worker waits for up to `offset.flush.timeout.ms` milliseconds for all of those records to be ack'd by the broker. In the meantime, on the main thread, the worker continues polling the source task for data. 1. The worker times out while waiting the original batch of 10000 records to be flushed; let's say that 5000 managed to be written successfully but 5000 remain unacknowledged. Additionally, during this time, the worker managed to poll an additional 1000 records from the task. At this point, the current behavior is: 1. Abort the offset commit, and start a new batch consisting of the 5000 unacknowledged records from the previous batch and the 1000 records polled from the task during the failed offset commit attempt. 1. Continue adding records to that batch until the next offset commit attempt is triggered. If the task is generating a steady throughput of 10000 records per offset commit attempt, and the worker's producer is only able to write 5000 of those before the offset commit attempt times out, the worker will never be able to successfully commit offsets for the task, even though there are plenty of records that have been sent to and ack'd by the broker. The proposed behavior in the PR is: 1. Abort the offset commit, and keep the old batch of the 5000 unacknowledged records. Add the 1000 records polled during the failed offset commit attempt to a backlog. 1. Continue adding newly-polled records from the task to that backlog. 1. On the next offset commit attempt, only wait to flush out the records from the active batch (i.e., the 5000 unacknowledged records), and only write offsets for that batch. 1. If successful, use the backlog of records as the new batch of records. Otherwise, keep the same batch of records and continue adding newly-polled records to the backlog. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org