rhauch commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-910540773


   My previous suggestion simply dequeues all completed records until an 
unacked record is found. This is really straightforward, but we could try to do 
better.
   
   We could dequeue all records except those that have a source partition that 
has not been acked. For example, let's say we have enqueue 4 records when 
`commitOffsets()` is called:
   ```
   1. SubmittedRecord{ SourceRecord{...partition=P1,offset=O1...}, acked=true}
   2. SubmittedRecord{ SourceRecord{...partition=P1,offset=O2...}, acked=true}
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true}
   5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true}
   8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true}
   9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   This might happen if records 4, 5, and 7-10 were written to different topic 
partitions than records 1, 2, 3, and 6, and the producer is stuck on the latter 
partitions.
   
   With the simplistic logic, we'd only dequeue record 1 and 2, we'd add the 
offset for these two records to the offset writer, and we'd flush offset 
`partition=P1,offset=O2`. But the connector has made significantly more 
progress on source partition `partition=P2`, and it would be good to *also* 
flush offset `partition=P2,offset=O7`.
   
   We could do that by dequeuing acked records only if their source partition 
map does not match a previously unacked message. This way, we'd end up with the 
following remaining in the queue (using the same record numbers as before):
   ```
   3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
   6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
   10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}
   ```
   This minor change will dramatically improve the ability to commit offsets 
closer to what has actually be acked.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to