gharris1727 opened a new pull request, #13178:
URL: https://github.com/apache/kafka/pull/13178

   This PR addresses three distinct but closely related issues:
   
   1. [KAFKA-12468](https://issues.apache.org/jira/browse/KAFKA-12468) "Initial 
offsets are copied from source to target cluster" "Mirror Maker 2 Negative 
Offsets"
   2. [KAFKA-14663](https://issues.apache.org/jira/browse/KAFKA-14663) "High 
throughput topics can starve low-throughput MM2 offset syncs"
   3. [KAFKA-12566](https://issues.apache.org/jira/browse/KAFKA-12566) "Flaky 
Test MirrorConnectorsIntegrationSSLTest#testReplication"
   
   The primary issue being addressed here is the incorrect translation of 
offsets, the title issue.
   
   The [MM2 
KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0)
 does not discuss the offset translation mechanism in detail, so I'll summarize 
the mechanism as it currently exists on trunk:
   1. Records are mirrored from source topic-partition to target 
topic-partition by the MirrorSourceTask
   2. MirrorSourceTask will (occasionally) emit OffsetSync messages to an 
Offset Syncs topic. Offset syncs contain the upstream and downstream offset of 
an emitted data record.
   3. The MirrorCheckpointTask will consume from the offset syncs topic, and 
maintain an in-memory copy of the latest offset sync seen for each 
topic-partition (in OffsetSyncStore)
   4. Periodically the MirrorCheckpointTask will poll consumer group offsets 
for the source topic, and use it's in-memory copy of the latest offset sync to 
translate upstream offsets to downstream offsets.
   5. This is done by measuring the 'distance' between the MM2 offset sync and 
the upstream consumer group, and then assuming that the same distance applies 
in the downstream topic.
   
   Step (5) is correct when assuming that every *offset* from the source topic 
has already been reproduced in the downstream topic. However, this assumption 
is violated when offsets are not present, which can happen for a variety of 
reasons, including:
   1. Transaction markers take an offset but will never be emitted as records 
from the consumer
   2. Records are dropped by SMTs and will never be emitted to the target topic
   3. The source topic has been compacted and some offsets will never be 
emitted by the consumer
   4. MM2 replication is lagging behind an upstream consumer group and some 
records have not been replicated yet
   
   In any of these conditions, an upstream offset may be translated to a 
downstream offset which is beyond the corresponding record in the downstream 
topic. Consider the following concrete example of situation (4) **resulting in 
negative lag**:
   1. Source topic `A` has 1000 records, all with contiguous offsets
   2. An upstream consumer group `cg` is at the end of the log, offset 1000.
   3. MM2 begins replicating the topic, and writes 500 upstream records to the 
target topic `target.A`, and writes offset-syncs correlating (`A`, 500) with 
(`target.A`, 500).
   4. MM2 checkpoint reads `cg` offset 1000, translates the offset to 500 + 
(1000-500) = 1000, and writes to `target.cg`
   5. Someone checks the `target.cg` offset for `target.A` and observes that 
the group offset is 1000, the topic end offset is 500, and the lag is -500.
   
   And the following concrete example of situation (1) **resulting in 
undelivered data**.
   1. Source topic `A` has 1000 records, all emitted with a transactional 
producer.
   2. The 1000 records are interleaved with 1000 commit markers at every other 
offset.
   3. An upstream consumer group `cg` is in the middle of the topic, at offset 
1000.
   4. MM2 begins replicating the topic, and writes 1000 records to the target 
topic `target.A`, and writes offset-syncs correlating (`A`, 500) with 
(`target.A`, 250), in addition to other offset-syncs.
   5. MM2 checkpoint reads the `cg` offset 1000, translates the offset to 250 + 
(1000 - 500) = 750, and writes to `target.cg`
   6. A system fails-over from `cg` to `target.cg` and someone notices that the 
`cg` application read records 0-500, `target.cg` application read 750-1000, but 
no consumer ever received offsets 500-750.
   
   This PR adds a test that replicates transactional data, as in situation (1). 
It asserts that whenever an offset is translated, it does not pass the end of 
the downstream topic, and cannot cause negative lag. In addition the tests are 
strengthened to require the offset syncs to be emitted up to the end of the 
topic, requiring a fix for the offset-syncs topic starvation issue. This also 
exposed a number of mistakes and flakiness in the existing tests, so this PR 
also stabilizes the tests to make them useful for validating the negative 
offsets fix.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to