sudeshwasnik commented on PR #13646: URL: https://github.com/apache/kafka/pull/13646#issuecomment-1590627003
hi @sambhav-jain-16 @C0urante , here's what i think may be happening, the test is written in a way that , 1. Assumes/expects that when `recordsToCommitLatch` is decremented X times (in awaitCommits), there should be X records in topics. 2. Source connector is never stopped. It is continously running in the background, producing more records into the topic. The test doesn't do `awaitCommit` and asserts consumed-records at the same time. If awaitCommit was checked at timestamp t1, and records were consumed at t2 -> between (t1, t2) timegap there might have been more records produced into the topic. Thus the assertion is not strong enough. (we should try to stop source-connector at-or-before t1). IMO (t1,t2) is reason for flakiness, because assumption `1` is wrong itself. But may have passed `some` time. Reason why assumption `1` is wrong, and we can't expect X records to be present in topic when `recordsToCommitLatch` is decremented X times. Say r1, ... rY were sent by producer sucessfully in a transaction `txn1` (not completed yet). Then all r1..rY are stored in `[commitableRecords](https://github.com/apache/kafka/blob/9e74f91e56dbc06f17c95fe80dd3923f7b713457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L194)` from this [line](https://github.com/apache/kafka/blob/3ddb62316f287d74b9649e1821c62fe8f68bc6e6/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L422). Now, if `rY` says `txn1` [has to be aborted](https://github.com/apache/kafka/blob/9e74f91e56dbc06f17c95fe80dd3923f7b713457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L461), `txn1` is aborted, and all records `r1, ..rY` are dropped. (essentially unreadble). But we still return `true` ([link](https://github.com/apache/kafka/blob/9e74f91e56dbc06f17c95fe80dd3923f7b713457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L467)) for `shouldCommitTransactionForRecord`. Which means we still try to do commitTransaction. In commitTransaction -> we end up calling `commitTaskRecord` for each record in `commitableRecords` (r1, .. rY). For each such `commitRecord` - eventually recordsToCommitLatch is [decremented](https://github.com/apache/kafka/blob/3ae1afa43838066e44ea78918050c6780c208042/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java#L166-L170). So for the above case -> [r1, .. rY] were never produced (since no consumers can read them, they are from aborted transaction), ... but this [assertion will still be true](https://github.com/apache/kafka/blob/9e74f91e56dbc06f17c95fe80dd3923f7b713457/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L405). Since assumption `1` is incorrect, we should change the test where it doesn't expect every record that decrements `recordsRemainingLatch` also must have been produced. imo flakiness is just due to source connector running in background continuously producing records. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org