C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r728086626



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -378,7 +370,7 @@ private boolean sendRecords() {
                             log.trace("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
                             producerSendException.compareAndSet(null, e);

Review comment:
       > So any subsequent records that were sent to a different topic 
partition could still have the same source partition, and thus they should be 
enqueued into the same deque. Those offsets would not be committed, since their 
SubmittedRecord instances are after the SubmittedRecord for the record that 
failed to send, and the latter would never be acked (as its send failed).
   
   I think this is the "vital" section and it provides a good rationale for why 
we intentionally keep the failed record in the queue.
   
   > If the committed offsets were moved as suggested in a separate thread 
above, we'd actually get a chance to commit offsets for acked source records 
before failing the task. It's not super essential, but it'd be good to commit 
the offsets for as many of those submitted-and-acked records as possible.
   
   We call `commitOffsets` in a `finally` block for `execute` right now. I 
think we can address this case by adding another call to 
`updateCommittableOffsets` right before this end-of-life call to 
`commitOffsets`. I've done this; LMKWYT.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to