gharris1727 opened a new pull request, #13178: URL: https://github.com/apache/kafka/pull/13178
This PR addresses three distinct but closely related issues: 1. [KAFKA-12468](https://issues.apache.org/jira/browse/KAFKA-12468) "Initial offsets are copied from source to target cluster" "Mirror Maker 2 Negative Offsets" 2. [KAFKA-14663](https://issues.apache.org/jira/browse/KAFKA-14663) "High throughput topics can starve low-throughput MM2 offset syncs" 3. [KAFKA-12566](https://issues.apache.org/jira/browse/KAFKA-12566) "Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication" The primary issue being addressed here is the incorrect translation of offsets, the title issue. The [MM2 KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0) does not discuss the offset translation mechanism in detail, so I'll summarize the mechanism as it currently exists on trunk: 1. Records are mirrored from source topic-partition to target topic-partition by the MirrorSourceTask 2. MirrorSourceTask will (occasionally) emit OffsetSync messages to an Offset Syncs topic. Offset syncs contain the upstream and downstream offset of an emitted data record. 3. The MirrorCheckpointTask will consume from the offset syncs topic, and maintain an in-memory copy of the latest offset sync seen for each topic-partition (in OffsetSyncStore) 4. Periodically the MirrorCheckpointTask will poll consumer group offsets for the source topic, and use it's in-memory copy of the latest offset sync to translate upstream offsets to downstream offsets. 5. This is done by measuring the 'distance' between the MM2 offset sync and the upstream consumer group, and then assuming that the same distance applies in the downstream topic. Step (5) is correct when assuming that every *offset* from the source topic has already been reproduced in the downstream topic. However, this assumption is violated when offsets are not present, which can happen for a variety of reasons, including: 1. Transaction markers take an offset but will never be emitted as records from the consumer 2. Records are dropped by SMTs and will never be emitted to the target topic 3. The source topic has been compacted and some offsets will never be emitted by the consumer 4. MM2 replication is lagging behind an upstream consumer group and some records have not been replicated yet In any of these conditions, an upstream offset may be translated to a downstream offset which is beyond the corresponding record in the downstream topic. Consider the following concrete example of situation (4) **resulting in negative lag**: 1. Source topic `A` has 1000 records, all with contiguous offsets 2. An upstream consumer group `cg` is at the end of the log, offset 1000. 3. MM2 begins replicating the topic, and writes 500 upstream records to the target topic `target.A`, and writes offset-syncs correlating (`A`, 500) with (`target.A`, 500). 4. MM2 checkpoint reads `cg` offset 1000, translates the offset to 500 + (1000-500) = 1000, and writes to `target.cg` 5. Someone checks the `target.cg` offset for `target.A` and observes that the group offset is 1000, the topic end offset is 500, and the lag is -500. And the following concrete example of situation (1) **resulting in undelivered data**. 1. Source topic `A` has 1000 records, all emitted with a transactional producer. 2. The 1000 records are interleaved with 1000 commit markers at every other offset. 3. An upstream consumer group `cg` is in the middle of the topic, at offset 1000. 4. MM2 begins replicating the topic, and writes 1000 records to the target topic `target.A`, and writes offset-syncs correlating (`A`, 500) with (`target.A`, 250), in addition to other offset-syncs. 5. MM2 checkpoint reads the `cg` offset 1000, translates the offset to 250 + (1000 - 500) = 750, and writes to `target.cg` 6. A system fails-over from `cg` to `target.cg` and someone notices that the `cg` application read records 0-500, `target.cg` application read 750-1000, but no consumer ever received offsets 500-750. This PR adds a test that replicates transactional data, as in situation (1). It asserts that whenever an offset is translated, it does not pass the end of the downstream topic, and cannot cause negative lag. In addition the tests are strengthened to require the offset syncs to be emitted up to the end of the topic, requiring a fix for the offset-syncs topic starvation issue. This also exposed a number of mistakes and flakiness in the existing tests, so this PR also stabilizes the tests to make them useful for validating the negative offsets fix. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org