C0urante commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1602941862

   Thanks for the detailed analysis, everyone!
   
   ## Root cause
   
   I believe @sudeshwasnik's latest theory is correct: the Connect runtime 
invokes `SourceTask::commitRecord` even for records in aborted transactions, 
which causes `ConnectorHandle::awaitCommits` to return before the expected 
number of (non-aborted) records has been produced to Kafka.
   
   ## Consume-all implementation
   
   I haven't been able to find any issues with 
`EmbeddedKafkaCluster::consumeAll`. The use of the `read_uncommitted` isolation 
level for fetching end offsets and the `read_committed` isolation level for 
consuming is intentional, and mirrors logic we use elsewhere in the Connect 
runtime (see 
[TopicAdmin::endOffsets](https://github.com/apache/kafka/blob/9c8aaa2c35aabb09bd2d5c3d28d1b4587818b419/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L711),
 which is used by the `KafkaBasedLog` class when reading to the end of a topic, 
even if the log's consumer is configured with the `read_committed` isolation 
level). This ensures that, when reading to the end of a topic, we reach the end 
of any in-progress transactions on the topic.
   
   ## Consume vs. consume all
   
   With regards to the change in the PR--the concern with consuming a fixed 
number of records from the topic is that we can potentially see a gap in 
sequence numbers if the topic has multiple partitions, since we wouldn't be 
guaranteed to consume records in the same order they were produced (which is 
why I implemented and used `EmbeddedKafkaCluster::consumeAll` when writing 
these tests initially; you can find the discussion 
[here](https://github.com/apache/kafka/pull/11782#discussion_r912235126)).
   
   Could we stick with using `consumeAll` and instead bump the number of 
expected records/commits? I drafted this change locally and it seemed to work 
well:
   
   ```java
           // the connector aborts some transactions, which causes records that 
it has emitted (and for which
           // SourceTask::commitRecord has been invoked) to be invisible to 
consumers; we expect the task to
           // emit at most 233 records in total before 100 records have been 
emitted as part of one or more
           // committed transactions
           connectorHandle.expectedRecords(233);
           connectorHandle.expectedCommits(233);
   ```
   
   (This would replace the existing code 
[here](https://github.com/apache/kafka/blob/9c8aaa2c35aabb09bd2d5c3d28d1b4587818b419/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L397-L399).)
   
   ## Transaction size logging
   
   Also, as an aside--it was really helpful to know how many records were in 
each aborted/committed transaction while investigating this test. I tweaked 
`MonitorableSourceConnector.MonitorableSourceTask::maybeDefineTransactionBoundary`
 to provide this info; if you agree that it'd be useful, feel free to add it to 
this PR (regardless of which fix we use):
   
   ```java
           private void maybeDefineTransactionBoundary(SourceRecord record) {
               if (context.transactionContext() == null || seqno != 
nextTransactionBoundary) {
                   return;
               }
               long transactionSize = nextTransactionBoundary - 
priorTransactionBoundary;
               // If the transaction boundary ends on an even-numbered offset, 
abort it
               // Otherwise, commit
               boolean abort = nextTransactionBoundary % 2 == 0;
               calculateNextBoundary();
               if (abort) {
                   log.info("Aborting transaction of {} records", 
transactionSize);
                   context.transactionContext().abortTransaction(record);
               } else {
                   log.info("Committing transaction of {} records", 
transactionSize);
                   context.transactionContext().commitTransaction(record);
               }
           }
       }
   ```
   
   And in fact, if we believe this would be useful for all connectors, we could 
even add this kind of logging to the `ExactlyOnceWorkerSourceTask` class. But 
that should be done in a separate PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to