vamossagar12 commented on PR #14158:
URL: https://github.com/apache/kafka/pull/14158#issuecomment-1669417547

   Thanks Chris for your comments and Yash for reviewing. 
   
   > I'm just not certain that the decision I made to commit offsets for 
dropped records when working on exactly-once source connectors was the correct 
one
   
   It's interesting you say this Chris. As you pointed out, during KIP-910 you 
had brought this point up which gave me insights into working on this fix. 
Anything that you have found since then that you feel this shouldn't have been 
allowed? Would be interested to know.
   
   Actually staying on this topic, I am slightly confused when we say we commit 
offsets when EOS is supported. I say this because, when a record is dropped, 
within the `recordDropped` override of it in ExactlyOnceWorkerSourceTask, I see 
offset being added only to `committableOffsets` 
   
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L153-L155
   
   but when I look at the `commitTransaction` method, the offsets it uses for 
flushing are the ones that are picked from 
[OffsetWriter#data](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java#L143-L145)
 
   
   over here: 
   
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L269-L302
   
   Only when the transaction succeeds, do we invoke `commitTaskRecord` and 
eventually SourceRecord#commit 
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L336-L339
   
   I couldn't find any mechanism when `committableOffsets` are flushed to 
OffsetStorageWriter#data. In my opinion, even for EOS-enabled source connectors 
offsets aren't being committed when records are dropped. Have I understood this 
correctly or am I missing something totally in this case? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to