vamossagar12 commented on PR #14158: URL: https://github.com/apache/kafka/pull/14158#issuecomment-1669417547
Thanks Chris for your comments and Yash for reviewing. > I'm just not certain that the decision I made to commit offsets for dropped records when working on exactly-once source connectors was the correct one It's interesting you say this Chris. As you pointed out, during KIP-910 you had brought this point up which gave me insights into working on this fix. Anything that you have found since then that you feel this shouldn't have been allowed? Would be interested to know. Actually staying on this topic, I am slightly confused when we say we commit offsets when EOS is supported. I say this because, when a record is dropped, within the `recordDropped` override of it in ExactlyOnceWorkerSourceTask, I see offset being added only to `committableOffsets` https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L153-L155 but when I look at the `commitTransaction` method, the offsets it uses for flushing are the ones that are picked from [OffsetWriter#data](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java#L143-L145) over here: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L269-L302 Only when the transaction succeeds, do we invoke `commitTaskRecord` and eventually SourceRecord#commit https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L336-L339 I couldn't find any mechanism when `committableOffsets` are flushed to OffsetStorageWriter#data. In my opinion, even for EOS-enabled source connectors offsets aren't being committed when records are dropped. Have I understood this correctly or am I missing something totally in this case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org