[ https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819368#comment-17819368 ]
Greg Harris commented on KAFKA-16291: ------------------------------------- Hi [~claudio.benfatto] Thanks for the bug report and the very relevant logs! I'm sorry that offset translation isn't working how you expect. These two lines show what is happening: {noformat} mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 08:58:40,812] TRACE [MirrorCheckpointConnector|task-0] New sync OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=198765, downstreamOffset=197535} applied, new state is [198765:197535,198764:197534,198762:197532,198761:197531,198753:197523,198739:197509,198717:197487,198673:197443,198585:197355,198497:197267,198321:197091,197617:196387,196913:195683,194098:192868] (org.apache.kafka.connect.mirror.OffsetSyncStore:193) mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 08:54:05,030] TRACE [MirrorCheckpointConnector|task-0] New sync OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=197532, downstreamOffset=196302} applied, new state is [197532:196302,197530:196300,197529:196299,197521:196291,197507:196277,197485:196255,197441:196211,197353:196123,197265:196035,196913:195683,196209:194979,195505:194275,194098:192868] (org.apache.kafka.connect.mirror.OffsetSyncStore:193){noformat} These log lines are printing out the whole in-memory state of the offset translation cache. If an offset isn't in this list, then it isn't available for translation after that point in time. You can see the critical sync 197532:196302 get added to the front of the cache, and by the time the next line is printed 4 minutes later, that sync is no longer present. It has syncs on either side (196913:195683 and 197616:196387) and separated by some gap, which is expected. The earlier/lower of these two syncs is the one that is being used for translation later: mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): Translated 195684 (relative to OffsetSync\{topicPartition=storage-demo-test-5, upstreamOffset=196913, downstreamOffset=195683} The idea here is MirrorMaker2 keeps only a finite number of offset syncs in-memory, and expires syncs as they become older. After the sync is expired, it is going to be available in the offset syncs topic (as you've shown), but won't be in-memory, and instead the closest earlier sync will be used for translation. At the cost of increased re-delivery/re-processing on the downstream side and changing the offset commit boundaries, this prioritizes the following guarantees: # If the downstream commits offsets for some record, then the upstream must have committed offsets including that record (no data loss when resuming from downstream offsets) # If you watch the progress of the downstream offsets, they increase monotonically (no rewinding downstream offsets) # The in-memory cache is bounded in size and won't cause an OOM The translation cache is optimized for offsets near the end of the topic, and you can see that how the later offsets are closer together and the earlier offsets are farther apart (198765 and 198764 are 1 apart, 196913 and 194098 are 2815 apart). The closer your offsets are to the end of the topic, the less redelivery that consumer group will experience, and at the end of the topic, you can get a "perfect sync". So if you're looking for a zero-re-delivery failover, I would make sure that you stop the producers, let the consumers commit at the end of the topic, wait for MM2 to translate the offsets, and then perform the failover. Unplanned failovers will nearly always include data re-delivery because MM2 does asynchronous replication, so your application should be tolerant of that. This has been changing since KAFKA-12468, so if you're only seeing this after an upgrade, that would explain why. > Mirrormaker2 wrong checkpoints > ------------------------------ > > Key: KAFKA-16291 > URL: https://issues.apache.org/jira/browse/KAFKA-16291 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 3.6.1 > Environment: Mirrormaker2 version 3.6.1 running on docker containers > Reporter: Claudio Benfatto > Priority: Major > > I am running Mirrormaker2 with the following configuration: > {noformat} > clusters = fallingwaterfall, weatheredbase > sync.group.offsets.interval.seconds=30 > emit.checkpoints.interval.seconds=30 > offset.lag.max=0 > fallingwaterfall->weatheredbase.enabled = true > weatheredbase->fallingwaterfall.enabled = false > sync.group.offsets.enabled=true > emit.heartbeats.enabled=true > emit.checkpoints.enabled=true > emit.checkpoints.interval.seconds=30 > refresh.groups.enabled=true > refresh.groups.interval.seconds=30 > refresh.topics.enabled=true > sync.topic.configs.enabled=true > refresh.topics.interval.seconds=30 > sync.topic.acls.enabled = false > fallingwaterfall->weatheredbase.topics = storage-demo-.* > fallingwaterfall->weatheredbase.groups = storage-demo-.* > group.id=mirror-maker-fallingwaterfall-weatheredbase > consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase > fallingwaterfall.consumer.isolation.level = read_committed > weatheredbase.producer.enable.idempotence = true > weatheredbase.producer.acks=all > weatheredbase.exactly.once.source.support = enabled > replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy > {noformat} > I am experiencing issues with the consumer group offset synchronisation. > I have a setup with a 12-partition topic, named *storage-demo-test,* a single > transactional producer to this topic and a consumer group, named > *storage-demo-test-cg,* consuming from it. > The consumer configuration is: > {code:java} > 'auto.offset.reset': 'earliest', > 'isolation.level': 'read_committed', > 'enable.auto.commit': False, {code} > and I'm committing the offsets explicitly and synchronously after each poll. > What I observed is that the synchronised offsets between the upstream and > downstream cluster for the *storage-demo-test-cg* are often wrong. > For example in the case of this checkpoint: > {code:java} > (1, 1708505669764) - 6252 - > CheckpointKey(consumer_group='storage-demo-test-cg', > topic='storage-demo-test', partition=5) - > CheckpointValue(upstream_offset=197532, downstream_offset=196300) {code} > We have a mismatch in the replicated messages: > {code:java} > [fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1 > Test message 1027-0 {code} > {code:java} > [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1 > Test message 1015-9 {code} > In the Mirrormaker2 logs I see many of these messages: > {code:java} > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 09:02:18,534] TRACE [MirrorCheckpointConnector|task-0] > latestDownstreamOffset 196300 is larger than or equal to > convertedUpstreamOffset 196300 for TopicPartition storage-demo-test-5 > (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] > translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): > Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, > upstreamOffset=196913, downstreamOffset=195683}) > (org.apache.kafka.connect.mirror.OffsetSyncStore:160) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 09:02:01,557] TRACE [MirrorCheckpointConnector|task-0] Skipping > Checkpoint{consumerGroupId=storage-demo-test-cg, > topicPartition=storage-demo-test-5, upstreamOffset=197532, > downstreamOffset=195684, metadata=} (preventing downstream rewind) > (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 08:59:00,859] TRACE [MirrorCheckpointConnector|task-0] Skipping > Checkpoint{consumerGroupId=storage-demo-test-cg, > topicPartition=storage-demo-test-5, upstreamOffset=197532, > downstreamOffset=195684, metadata=} (preventing downstream rewind) > (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 08:59:00,859] DEBUG [MirrorCheckpointConnector|task-0] > translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): > Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, > upstreamOffset=196913, downstreamOffset=195683}) > (org.apache.kafka.connect.mirror.OffsetSyncStore:160) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 08:58:40,812] TRACE [MirrorCheckpointConnector|task-0] New sync > OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=198765, > downstreamOffset=197535} applied, new state is > [198765:197535,198764:197534,198762:197532,198761:197531,198753:197523,198739:197509,198717:197487,198673:197443,198585:197355,198497:197267,198321:197091,197617:196387,196913:195683,194098:192868] > (org.apache.kafka.connect.mirror.OffsetSyncStore:193) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 08:54:05,030] TRACE [MirrorCheckpointConnector|task-0] New sync > OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=197532, > downstreamOffset=196302} applied, new state is > [197532:196302,197530:196300,197529:196299,197521:196291,197507:196277,197485:196255,197441:196211,197353:196123,197265:196035,196913:195683,196209:194979,195505:194275,194098:192868] > (org.apache.kafka.connect.mirror.OffsetSyncStore:193) > mirrormaker2-fallingwaterfall-weatheredbase-0 - mirrormaker2-server - > [2024-02-21 08:54:05,030] TRACE [MirrorSourceConnector|task-0] Sync'd offsets > for storage-demo-test-5: 197532==196302 > (org.apache.kafka.connect.mirror.MirrorSourceTask:251){code} > And looking in the OffsetSync topic, I see the correct value for the offset > sync: > {code:java} > (1, 1708505645010) - 3945070 - OffsetSyncKey(topic='storage-demo-test', > partition=5) - OffsetSyncValue(upstream_offset=197532, > downstream_offset=196302) > {code} > {code:java} > [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196302 -c 1 > Test message 1027-0 {code} > So it seems that the offset conversions and checkpoints produced in the > *MirrorCheckpointTask* are not matching the information committed to the > OffsetSync topic by the *MirrorSourceTask.* > Please let me know if you need additional info about the setup I'm running or > collecting more logs. > Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)