C0urante commented on PR #13646: URL: https://github.com/apache/kafka/pull/13646#issuecomment-1602941862
Thanks for the detailed analysis, everyone! ## Root cause I believe @sudeshwasnik's latest theory is correct: the Connect runtime invokes `SourceTask::commitRecord` even for records in aborted transactions, which causes `ConnectorHandle::awaitCommits` to return before the expected number of (non-aborted) records has been produced to Kafka. ## Consume-all implementation I haven't been able to find any issues with `EmbeddedKafkaCluster::consumeAll`. The use of the `read_uncommitted` isolation level for fetching end offsets and the `read_committed` isolation level for consuming is intentional, and mirrors logic we use elsewhere in the Connect runtime (see [TopicAdmin::endOffsets](https://github.com/apache/kafka/blob/9c8aaa2c35aabb09bd2d5c3d28d1b4587818b419/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L711), which is used by the `KafkaBasedLog` class when reading to the end of a topic, even if the log's consumer is configured with the `read_committed` isolation level). This ensures that, when reading to the end of a topic, we reach the end of any in-progress transactions on the topic. ## Consume vs. consume all With regards to the change in the PR--the concern with consuming a fixed number of records from the topic is that we can potentially see a gap in sequence numbers if the topic has multiple partitions, since we wouldn't be guaranteed to consume records in the same order they were produced (which is why I implemented and used `EmbeddedKafkaCluster::consumeAll` when writing these tests initially; you can find the discussion [here](https://github.com/apache/kafka/pull/11782#discussion_r912235126)). Could we stick with using `consumeAll` and instead bump the number of expected records/commits? I drafted this change locally and it seemed to work well: ```java // the connector aborts some transactions, which causes records that it has emitted (and for which // SourceTask::commitRecord has been invoked) to be invisible to consumers; we expect the task to // emit at most 233 records in total before 100 records have been emitted as part of one or more // committed transactions connectorHandle.expectedRecords(233); connectorHandle.expectedCommits(233); ``` (This would replace the existing code [here](https://github.com/apache/kafka/blob/9c8aaa2c35aabb09bd2d5c3d28d1b4587818b419/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L397-L399).) ## Transaction size logging Also, as an aside--it was really helpful to know how many records were in each aborted/committed transaction while investigating this test. I tweaked `MonitorableSourceConnector.MonitorableSourceTask::maybeDefineTransactionBoundary` to provide this info; if you agree that it'd be useful, feel free to add it to this PR (regardless of which fix we use): ```java private void maybeDefineTransactionBoundary(SourceRecord record) { if (context.transactionContext() == null || seqno != nextTransactionBoundary) { return; } long transactionSize = nextTransactionBoundary - priorTransactionBoundary; // If the transaction boundary ends on an even-numbered offset, abort it // Otherwise, commit boolean abort = nextTransactionBoundary % 2 == 0; calculateNextBoundary(); if (abort) { log.info("Aborting transaction of {} records", transactionSize); context.transactionContext().abortTransaction(record); } else { log.info("Committing transaction of {} records", transactionSize); context.transactionContext().commitTransaction(record); } } } ``` And in fact, if we believe this would be useful for all connectors, we could even add this kind of logging to the `ExactlyOnceWorkerSourceTask` class. But that should be done in a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org