sudeshwasnik commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1590627003

   hi @sambhav-jain-16 @C0urante , here's what i think may be happening, 
   the test is written in a way that , 
   1. Assumes/expects that when `recordsToCommitLatch` is decremented X times 
(in awaitCommits), there should be X records in topics. 
   2. Source connector is never stopped. It is continously running in the 
background, producing more records into the topic. 
   
   The test doesn't do `awaitCommit` and asserts consumed-records at the same 
time. If awaitCommit was checked at timestamp t1, and records were consumed at 
t2 ->  between (t1, t2) timegap there might have been more records produced 
into the topic. Thus the assertion is not strong enough. (we should try to stop 
source-connector at-or-before t1). IMO (t1,t2) is reason for flakiness, because 
assumption `1` is wrong itself. But may have passed `some` time. 
   
   Reason why assumption `1` is wrong, and we can't expect X records to be 
present in topic when `recordsToCommitLatch` is decremented X times. 
   
   Say r1, ... rY were sent by producer sucessfully in a transaction `txn1` 
(not completed yet). Then all r1..rY are stored in 
`[commitableRecords](https://github.com/apache/kafka/blob/9e74f91e56dbc06f17c95fe80dd3923f7b713457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L194)`
 from this 
[line](https://github.com/apache/kafka/blob/3ddb62316f287d74b9649e1821c62fe8f68bc6e6/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L422).
 
   Now, if `rY` says `txn1` [has to be 
aborted](https://github.com/apache/kafka/blob/9e74f91e56dbc06f17c95fe80dd3923f7b713457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L461),
 `txn1` is aborted, and all records `r1, ..rY` are dropped. (essentially 
unreadble). 
   But we still return `true` 
([link](https://github.com/apache/kafka/blob/9e74f91e56dbc06f17c95fe80dd3923f7b713457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L467))
 for `shouldCommitTransactionForRecord`. Which means we still try to do 
commitTransaction. In commitTransaction -> we end up calling `commitTaskRecord` 
for each record in `commitableRecords` (r1, .. rY).  For each such 
`commitRecord` - eventually recordsToCommitLatch is 
[decremented](https://github.com/apache/kafka/blob/3ae1afa43838066e44ea78918050c6780c208042/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java#L166-L170).
   So for the above case -> [r1, .. rY] were never produced (since no consumers 
can read them, they are from aborted transaction), ... but this [assertion will 
still be 
true](https://github.com/apache/kafka/blob/9e74f91e56dbc06f17c95fe80dd3923f7b713457/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L405).
 
   
   Since assumption `1` is incorrect, we should change the test where it 
doesn't expect every record that decrements `recordsRemainingLatch` also must 
have been produced. 
   
   imo flakiness is just due to source connector running in background 
continuously producing records.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to