C0urante edited a comment on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-782174511


   Ah, gotcha! Yeah, `SourceTask::stop` isn't really a key part of the 
functional changes here or the tests added. The idea is just to prevent 
high-throughput source tasks whose producers are unable to keep up with them 
from entering a death spiral where they stop being able to commit offsets 
completely.
   
   This is because right now, when an offset commit attempt fails because the 
current batch of source records wasn't flushed to Kafka in time, all backlogged 
records that were read from `SourceTask::poll` during that time get added to 
the batch, which just compounds the problem.
   
   For example:
   1. The source task generates 10000 records and the worker starts sending 
those to Kafka.
   1. An offset commit is triggered on a separate thread and the worker waits 
for up to `offset.flush.timeout.ms` milliseconds for all of those records to be 
ack'd by the broker. In the meantime, on the main thread, the worker continues 
polling the source task for data.
   1. The worker times out while waiting the original batch of 10000 records to 
be flushed; let's say that 5000 managed to be written successfully but 5000 
remain unacknowledged. Additionally, during this time, the worker managed to 
poll an additional 1000 records from the task.
   
   At this point, the current behavior is:
   1. Abort the offset commit, and start a new batch consisting of the 5000 
unacknowledged records from the previous batch and the 1000 records polled from 
the task during the failed offset commit attempt.
   1. Continue adding records to that batch until the next offset commit 
attempt is triggered.
   
   If the task is generating a steady throughput of 10000 records per offset 
commit attempt, and the worker's producer is only able to write 5000 of those 
before the offset commit attempt times out, the worker will never be able to 
successfully commit offsets for the task, even though there are plenty of 
records that have been sent to and ack'd by the broker.
   
   The proposed behavior in the PR is:
   1. Abort the offset commit, and keep the old batch of the 5000 
unacknowledged records. Add the 1000 records polled during the failed offset 
commit attempt to a backlog.
   1. Continue adding newly-polled records from the task to that backlog.
   1. On the next offset commit attempt, only wait to flush out the records 
from the active batch (i.e., the 5000 unacknowledged records), and only write 
offsets for that batch.
   1. If successful, use the backlog of records as the new batch of records. 
Otherwise, keep the same batch of records and continue adding newly-polled 
records to the backlog.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to