rhauch commented on pull request #10112: URL: https://github.com/apache/kafka/pull/10112#issuecomment-910540773
My previous suggestion simply dequeues all completed records until an unacked record is found. This is really straightforward, but we could try to do better. We could dequeue all records except those that have a source partition that has not been acked. For example, let's say we have enqueue 4 records when `commitOffsets()` is called: ``` 1. SubmittedRecord{ SourceRecord{...partition=P1,offset=O1...}, acked=true} 2. SubmittedRecord{ SourceRecord{...partition=P1,offset=O2...}, acked=true} 3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false} 4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true} 5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true} 6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false} 7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true} 8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true} 9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true} 10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false} ``` This might happen if records 4, 5, and 7-10 were written to different topic partitions than records 1, 2, 3, and 6, and the producer is stuck on the latter partitions. With the simplistic logic, we'd only dequeue record 1 and 2, we'd add the offset for these two records to the offset writer, and we'd flush offset `partition=P1,offset=O2`. But the connector has made significantly more progress on source partition `partition=P2`, and it would be good to *also* flush offset `partition=P2,offset=O7`. We could do that by dequeuing acked records only if their source partition map does not match a previously unacked message. This way, we'd end up with the following remaining in the queue (using the same record numbers as before): ``` 3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false} 6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false} 10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false} ``` This minor change will dramatically improve the ability to commit offsets closer to what has actually be acked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org