Hi there, I’m running into an issue with MirrorMaker2 (MM2), specifically the MirrorCheckpointConnector (CPC). In short, CPC doesn’t replicate the final batch of offsets to the target Kafka cluster during a migration. I believe I can point to the sections of the MM2 code where this is happening, but I can’t figure out why it works that way. Can someone help me understand if this is a bug or a feature?
Issue Summary: During a migration scenario with MirrorMaker2, it is desirable to shut down a consumer on the source cluster and start it again in the target cluster. However, it seems that during this process the final consumer group offsets for the source cluster do not get replicated to the target cluster. * Consumer is running against source cluster, committing offsets over time * Right before the cutover, consumer commits offset 10 * emit.checkpoints.interval.seconds/sync.group.offsets.interval.seconds later, CPC replicates offset 10 * Consumer commits offset 20, then gets shut down for migration * CPC never replicates offset 20, causing a gap in committed offsets MM2 Version: Kafka 3.7.0 CPC Config: { "name": "mm2-cpc", "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", "clusters": "msksource,mskdest", "source.cluster.alias": "msksource", "target.cluster.alias": "mskdest", "target.cluster.bootstrap.servers": "b-1...", "source.cluster.bootstrap.servers": "b-1...", "tasks.max": "1", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "replication.policy.class": "com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy", "replication.factor": "3", "checkpoints.topic.replication.factor": "3", "refresh.groups.interval.seconds": "60", "emit.checkpoints.interval.seconds": "20", "sync.group.offsets.interval.seconds": "20", "sync.group.offsets.enabled": "true", } Possible Explanation From my read of the code, this appears to be an issue in the OffsetSyncStore. Example during migration when lag is small Time 1: Target cluster consumer group is at offset 381561, source is at offset 381561: Target: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID mm2TestConsumer1 ExampleTopicClickStream 12 381561 381561 0 - - - Source: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID mm2TestConsumer1 ExampleTopicClickStream 12 381561 381561 0 clickstream-consumer-1278c34c-6ced-49cb-9804-29fc2e8d6df2 /10.0.4.143 clickstream-consumer 60 seconds later: Target cluster consumer group has advanced to offset 614064, source is at offset 651218. The lag is higher in the target than the source because of the emit.checkpoints.interval.seconds/sync.group.offsets.interval.seconds being set to 20 seconds. If these were lower (e.g. 1 second) we'd still see some lag (albeit lower). Target: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID mm2TestConsumer1 ExampleTopicClickStream 12 614064 646293 32229 - - - Source: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID mm2TestConsumer1 ExampleTopicClickStream 12 651218 651334 116 clickstream-consumer-1278c34c-6ced-49cb-9804-29fc2e8d6df2 /10.0.4.143 clickstream-consumer Once consumer is shut down: Ideally, we would see the offsets converge between source and target once the consumer shuts down and the final intervals occur. However, we see the target cluster consumer group has advanced to offset 933426, while the source is at offset 936898. The lag continues to grow because the producer continues to send messages. Target: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID mm2TestConsumer1 ExampleTopicClickStream 12 933426 1008788 75362 - - - Source: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID mm2TestConsumer1 ExampleTopicClickStream 12 936898 1013744 76846 - - - Logs once consumer is shut down to start cutover: In this case, it appears to be an issue in offset translation as the CPC is recording offsets in the log for downstream (795763) that don't match the actual downstream offset (933426 current / 1008788 log end). [2024-05-24 19:30:41,931] TRACE [mm2-cpc|task-0] skip syncing the offset for consumer group: mm2TestConsumer1 (org.apache.kafka.connect.mirror.MirrorCheckpointTask:352) [2024-05-24 19:30:41,931] INFO [mm2-cpc|task-0] sync idle consumer group offset from source to target took 1 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-05-24 19:30:41,930] INFO [mm2-cpc|task-0] refreshing idle consumers group offsets at target cluster took 3 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-05-24 19:30:41,930] TRACE [mm2-cpc|task-0] latestDownstreamOffset 931828 is larger than or equal to convertedUpstreamOffset 931828 for TopicPartition ExampleTopicClickStream-21 (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337) [2024-05-24 19:30:41,930] TRACE [mm2-cpc|task-0] latestDownstreamOffset 936948 is larger than or equal to convertedUpstreamOffset 936948 for TopicPartition ExampleTopicClickStream-23 (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337) [2024-05-24 19:30:33,839] TRACE [mm2-cpc|task-0] Skipping Checkpoint{consumerGroupId=mm2TestConsumer1, topicPartition=ExampleTopicClickStream-16, upstreamOffset=929667, downstreamOffset=791619, metadata=No Metadata} (preventing downstream rewind) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218) [2024-05-24 19:30:33,840] DEBUG [mm2-cpc|task-0] translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-18,921852): Translated 791455 (relative to OffsetSync{topicPartition=ExampleTopicClickStream-18, upstreamOffset=791454, downstreamOffset=791454}) (org.apache.kafka.connect.mirror.OffsetSyncStore:160) [2024-05-24 19:30:33,840] TRACE [mm2-cpc|task-0] Skipping Checkpoint{consumerGroupId=mm2TestConsumer1, topicPartition=ExampleTopicClickStream-18, upstreamOffset=921852, downstreamOffset=791455, metadata=No Metadata} (preventing downstream rewind) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218) [2024-05-24 19:30:33,839] DEBUG [mm2-cpc|task-0] translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-12,936898): Translated 795763 (relative to OffsetSync{topicPartition=ExampleTopicClickStream-12, upstreamOffset=795762, downstreamOffset=795762}) (org.apache.kafka.connect.mirror.OffsetSyncStore:160) [2024-05-24 19:30:33,839] TRACE [mm2-cpc|task-0] Skipping Checkpoint{consumerGroupId=mm2TestConsumer1, topicPartition=ExampleTopicClickStream-12, upstreamOffset=936898, downstreamOffset=795763, metadata=No Metadata} (preventing downstream rewind) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218) ... [2024-05-24 19:30:32,776] INFO [mm2-cpc|worker] refreshing consumer groups took 15 ms (org.apache.kafka.connect.mirror.Scheduler:95) Example during catch-up where lag is large Offsets are out of sync between source and target cluster Time 1: Target cluster consumer group is at offset 2162916, source is at offset 8106079: Target: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID mm2TestConsumer1 ExampleTopicClickStream 12 2162916 100833554 98670638 - - - Source: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID mm2TestConsumer1 ExampleTopicClickStream 12 8106079 100833520 92727441 clickstream-consumer-47a4b99a-1196-4d3b-831c-eee58828d5dc /10.0.3.252 clickstream-consumer 60 seconds later: Target cluster consumer group is still at offset 2162916, source is at offset 8736048 Target: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID mm2TestConsumer1 ExampleTopicClickStream 12 2162916 100833554 98670638 Source: - - - GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID mm2TestConsumer1 ExampleTopicClickStream 12 8736048 100833520 92097472 clickstream-consumer-47a4b99a-1196-4d3b-831c-eee58828d5dc /10.0.3.252 clickstream-consumer Logs during catch-up: Logs show that OffsetSyncStore is skipping the offsets during translateDownstream<https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L250> for checkpointing, which in turn causes checkpointStore<https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L324-L370> to not provide any offsets to replicate for the consumer group: [2024-05-24 18:32:49,938] DEBUG [mm2-cpc|task-0] translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-16,9871846): Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-16, upstreamOffset=100256587, downstreamOffset=100256746} is ahead of upstream consumer group 9871846) (org.apache.kafka.connect.mirror.OffsetSyncStore:142) ... [2024-05-24 18:32:49,933] DEBUG [mm2-cpc|task-0] translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-29,9831304): Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-29, upstreamOffset=100165303, downstreamOffset=100165463} is ahead of upstream consumer group 9831304) (org.apache.kafka.connect.mirror.OffsetSyncStore:142) [2024-05-24 18:32:48,297] INFO [mm2-cpc|task-0] refreshing idle consumers group offsets at target cluster took 99 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-05-24 18:32:48,297] TRACE [mm2-cpc|task-0] skip syncing the offset for consumer group: mm2TestConsumer1 (org.apache.kafka.connect.mirror.MirrorCheckpointTask:352) [2024-05-24 18:32:48,297] INFO [mm2-cpc|task-0] sync idle consumer group offset from source to target took 0 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-05-24 18:32:29,906] DEBUG [mm2-cpc|task-0] translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-20,9702235): Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-20, upstreamOffset=100253696, downstreamOffset=100253854} is ahead of upstream consumer group 9702235) (org.apache.kafka.connect.mirror.OffsetSyncStore:142) ... [2024-05-24 18:32:29,903] DEBUG [mm2-cpc|task-0] translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-21,9723670): Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-21, upstreamOffset=100118386, downstreamOffset=100118547} is ahead of upstream consumer group 9723670) (org.apache.kafka.connect.mirror.OffsetSyncStore:142) [2024-05-24 18:32:28,297] INFO [mm2-cpc|task-0] refreshing idle consumers group offsets at target cluster took 99 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-05-24 18:32:28,298] TRACE [mm2-cpc|task-0] skip syncing the offset for consumer group: mm2TestConsumer1 (org.apache.kafka.connect.mirror.MirrorCheckpointTask:352) [2024-05-24 18:32:28,298] INFO [mm2-cpc|task-0] sync idle consumer group offset from source to target took 0 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-05-24 18:32:27,873] DEBUG [mm2-cpc|worker] Ignoring the following groups which do not have any offsets for topics that are accepted by the topic filter: [amazon.msk.canary.group.broker-1, amazon.msk.canary.group.broker-3, amazon.msk.canary.group.broker-2] (org.apache.kafka.connect.mirror.MirrorCheckpointConnector:212) [2024-05-24 18:32:27,873] INFO [mm2-cpc|worker] refreshing consumer groups took 15 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-05-24 18:32:09,893] DEBUG [mm2-cpc|task-0] translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-20,9599651): Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-20, upstreamOffset=100253696, downstreamOffset=100253854} is ahead of upstream consumer group 9599651) (org.apache.kafka.connect.mirror.OffsetSyncStore:142) ... [2024-05-24 18:32:09,891] DEBUG [mm2-cpc|task-0] translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-21,9620968): Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-21, upstreamOffset=100118386, downstreamOffset=100118547} is ahead of upstream consumer group 9620968) (org.apache.kafka.connect.mirror.OffsetSyncStore:142)