Hi there,
I’m running into an issue with MirrorMaker2 (MM2), specifically the 
MirrorCheckpointConnector (CPC). In short, CPC doesn’t replicate the final 
batch of offsets to the target Kafka cluster during a migration. I believe I 
can point to the sections of the MM2 code where this is happening, but I can’t 
figure out why it works that way. Can someone help me understand if this is a 
bug or a feature?

Issue Summary:

During a migration scenario with MirrorMaker2, it is desirable to shut down a 
consumer on the source cluster and start it again in the target cluster. 
However, it seems that during this process the final consumer group offsets for 
the source cluster do not get replicated to the target cluster.

  *   Consumer is running against source cluster, committing offsets over time
  *   Right before the cutover, consumer commits offset 10
  *   emit.checkpoints.interval.seconds/sync.group.offsets.interval.seconds 
later, CPC replicates offset 10
  *   Consumer commits offset 20, then gets shut down for migration
  *   CPC never replicates offset 20, causing a gap in committed offsets
MM2 Version:
Kafka 3.7.0
CPC Config:

{

    "name": "mm2-cpc",

    "connector.class": 
"org.apache.kafka.connect.mirror.MirrorCheckpointConnector",

    "clusters": "msksource,mskdest",

    "source.cluster.alias": "msksource",

    "target.cluster.alias": "mskdest",

    "target.cluster.bootstrap.servers": "b-1...",

    "source.cluster.bootstrap.servers": "b-1...",

    "tasks.max": "1",

    "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",

    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",

    "replication.policy.class": 
"com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",

    "replication.factor": "3",

    "checkpoints.topic.replication.factor": "3",

    "refresh.groups.interval.seconds": "60",

    "emit.checkpoints.interval.seconds": "20",

    "sync.group.offsets.interval.seconds": "20",

    "sync.group.offsets.enabled": "true",

}

Possible Explanation

From my read of the code, this appears to be an issue in the OffsetSyncStore.

Example during migration when lag is small
Time 1:

Target cluster consumer group is at offset 381561, source is at offset 381561:

Target:

GROUP            TOPIC                   PARTITION  CURRENT-OFFSET  
LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID

mm2TestConsumer1 ExampleTopicClickStream 12         381561          381561      
    0               -               -               -

Source:

GROUP            TOPIC                   PARTITION  CURRENT-OFFSET  
LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID

mm2TestConsumer1 ExampleTopicClickStream 12         381561          381561      
    0               clickstream-consumer-1278c34c-6ced-49cb-9804-29fc2e8d6df2 
/10.0.4.143     clickstream-consumer

60 seconds later:

Target cluster consumer group has advanced to offset 614064, source is at 
offset 651218. The lag is higher in the target than the source because of the 
emit.checkpoints.interval.seconds/sync.group.offsets.interval.seconds being set 
to 20 seconds. If these were lower (e.g. 1 second) we'd still see some lag 
(albeit lower).

Target:

GROUP            TOPIC                   PARTITION  CURRENT-OFFSET  
LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID

mm2TestConsumer1 ExampleTopicClickStream 12         614064          646293      
    32229           -               -               -



Source:

GROUP            TOPIC                   PARTITION  CURRENT-OFFSET  
LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID

mm2TestConsumer1 ExampleTopicClickStream 12         651218          651334      
    116             clickstream-consumer-1278c34c-6ced-49cb-9804-29fc2e8d6df2 
/10.0.4.143     clickstream-consumer

Once consumer is shut down:

Ideally, we would see the offsets converge between source and target once the 
consumer shuts down and the final intervals occur.

However, we see the target cluster consumer group has advanced to offset 
933426, while the source is at offset 936898. The lag continues to grow because 
the producer continues to send messages.

Target:

GROUP            TOPIC                   PARTITION  CURRENT-OFFSET  
LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID

mm2TestConsumer1 ExampleTopicClickStream 12         933426          1008788     
    75362           -               -               -



Source:

GROUP            TOPIC                   PARTITION  CURRENT-OFFSET  
LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID

mm2TestConsumer1 ExampleTopicClickStream 12         936898          1013744     
    76846           -               -               -

Logs once consumer is shut down to start cutover:

In this case, it appears to be an issue in offset translation as the CPC is 
recording offsets in the log for downstream (795763) that don't match the 
actual downstream offset (933426 current / 1008788 log end).

[2024-05-24 19:30:41,931] TRACE [mm2-cpc|task-0] skip syncing the offset for 
consumer group: mm2TestConsumer1 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:352)

[2024-05-24 19:30:41,931] INFO [mm2-cpc|task-0] sync idle consumer group offset 
from source to target took 1 ms (org.apache.kafka.connect.mirror.Scheduler:95)

[2024-05-24 19:30:41,930] INFO [mm2-cpc|task-0] refreshing idle consumers group 
offsets at target cluster took 3 ms 
(org.apache.kafka.connect.mirror.Scheduler:95)

[2024-05-24 19:30:41,930] TRACE [mm2-cpc|task-0] latestDownstreamOffset 931828 
is larger than or equal to convertedUpstreamOffset 931828 for TopicPartition 
ExampleTopicClickStream-21 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)

[2024-05-24 19:30:41,930] TRACE [mm2-cpc|task-0] latestDownstreamOffset 936948 
is larger than or equal to convertedUpstreamOffset 936948 for TopicPartition 
ExampleTopicClickStream-23 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)

[2024-05-24 19:30:33,839] TRACE [mm2-cpc|task-0] Skipping 
Checkpoint{consumerGroupId=mm2TestConsumer1, 
topicPartition=ExampleTopicClickStream-16, upstreamOffset=929667, 
downstreamOffset=791619, metadata=No Metadata} (preventing downstream rewind) 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)

[2024-05-24 19:30:33,840] DEBUG [mm2-cpc|task-0] 
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-18,921852): 
Translated 791455 (relative to 
OffsetSync{topicPartition=ExampleTopicClickStream-18, upstreamOffset=791454, 
downstreamOffset=791454}) (org.apache.kafka.connect.mirror.OffsetSyncStore:160)

[2024-05-24 19:30:33,840] TRACE [mm2-cpc|task-0] Skipping 
Checkpoint{consumerGroupId=mm2TestConsumer1, 
topicPartition=ExampleTopicClickStream-18, upstreamOffset=921852, 
downstreamOffset=791455, metadata=No Metadata} (preventing downstream rewind) 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)

[2024-05-24 19:30:33,839] DEBUG [mm2-cpc|task-0] 
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-12,936898): 
Translated 795763 (relative to 
OffsetSync{topicPartition=ExampleTopicClickStream-12, upstreamOffset=795762, 
downstreamOffset=795762}) (org.apache.kafka.connect.mirror.OffsetSyncStore:160)

[2024-05-24 19:30:33,839] TRACE [mm2-cpc|task-0] Skipping 
Checkpoint{consumerGroupId=mm2TestConsumer1, 
topicPartition=ExampleTopicClickStream-12, upstreamOffset=936898, 
downstreamOffset=795763, metadata=No Metadata} (preventing downstream rewind) 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)

...

[2024-05-24 19:30:32,776] INFO [mm2-cpc|worker] refreshing consumer groups took 
15 ms (org.apache.kafka.connect.mirror.Scheduler:95)


Example during catch-up where lag is large
Offsets are out of sync between source and target cluster
Time 1:

Target cluster consumer group is at offset 2162916, source is at offset 8106079:

Target:

GROUP            TOPIC                   PARTITION  CURRENT-OFFSET  
LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID

mm2TestConsumer1 ExampleTopicClickStream 12         2162916         100833554   
    98670638        -               -               -

Source:

GROUP            TOPIC                   PARTITION  CURRENT-OFFSET  
LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID

mm2TestConsumer1 ExampleTopicClickStream 12         8106079         100833520   
    92727441        clickstream-consumer-47a4b99a-1196-4d3b-831c-eee58828d5dc 
/10.0.3.252     clickstream-consumer

60 seconds later:

Target cluster consumer group is still at offset 2162916, source is at offset 
8736048

Target:

GROUP            TOPIC                   PARTITION  CURRENT-OFFSET  
LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID

mm2TestConsumer1 ExampleTopicClickStream 12         2162916         100833554   
    98670638

Source: - - -

GROUP            TOPIC                   PARTITION  CURRENT-OFFSET  
LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID

mm2TestConsumer1 ExampleTopicClickStream 12         8736048         100833520   
    92097472        clickstream-consumer-47a4b99a-1196-4d3b-831c-eee58828d5dc 
/10.0.3.252     clickstream-consumer

Logs during catch-up:

Logs show that OffsetSyncStore is skipping the offsets during 
translateDownstream<https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L250>
 for checkpointing, which in turn causes 
checkpointStore<https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L324-L370>
 to not provide any offsets to replicate for the consumer group:

[2024-05-24 18:32:49,938] DEBUG [mm2-cpc|task-0] 
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-16,9871846): 
Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-16, 
upstreamOffset=100256587, downstreamOffset=100256746} is ahead of upstream 
consumer group 9871846) (org.apache.kafka.connect.mirror.OffsetSyncStore:142)

...

[2024-05-24 18:32:49,933] DEBUG [mm2-cpc|task-0] 
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-29,9831304): 
Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-29, 
upstreamOffset=100165303, downstreamOffset=100165463} is ahead of upstream 
consumer group 9831304) (org.apache.kafka.connect.mirror.OffsetSyncStore:142)

[2024-05-24 18:32:48,297] INFO [mm2-cpc|task-0] refreshing idle consumers group 
offsets at target cluster took 99 ms 
(org.apache.kafka.connect.mirror.Scheduler:95)

[2024-05-24 18:32:48,297] TRACE [mm2-cpc|task-0] skip syncing the offset for 
consumer group: mm2TestConsumer1 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:352)

[2024-05-24 18:32:48,297] INFO [mm2-cpc|task-0] sync idle consumer group offset 
from source to target took 0 ms (org.apache.kafka.connect.mirror.Scheduler:95)

[2024-05-24 18:32:29,906] DEBUG [mm2-cpc|task-0] 
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-20,9702235): 
Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-20, 
upstreamOffset=100253696, downstreamOffset=100253854} is ahead of upstream 
consumer group 9702235) (org.apache.kafka.connect.mirror.OffsetSyncStore:142)

...

[2024-05-24 18:32:29,903] DEBUG [mm2-cpc|task-0] 
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-21,9723670): 
Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-21, 
upstreamOffset=100118386, downstreamOffset=100118547} is ahead of upstream 
consumer group 9723670) (org.apache.kafka.connect.mirror.OffsetSyncStore:142)

[2024-05-24 18:32:28,297] INFO [mm2-cpc|task-0] refreshing idle consumers group 
offsets at target cluster took 99 ms 
(org.apache.kafka.connect.mirror.Scheduler:95)

[2024-05-24 18:32:28,298] TRACE [mm2-cpc|task-0] skip syncing the offset for 
consumer group: mm2TestConsumer1 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:352)

[2024-05-24 18:32:28,298] INFO [mm2-cpc|task-0] sync idle consumer group offset 
from source to target took 0 ms (org.apache.kafka.connect.mirror.Scheduler:95)

[2024-05-24 18:32:27,873] DEBUG [mm2-cpc|worker] Ignoring the following groups 
which do not have any offsets for topics that are accepted by the topic filter: 
[amazon.msk.canary.group.broker-1, amazon.msk.canary.group.broker-3, 
amazon.msk.canary.group.broker-2] 
(org.apache.kafka.connect.mirror.MirrorCheckpointConnector:212)

[2024-05-24 18:32:27,873] INFO [mm2-cpc|worker] refreshing consumer groups took 
15 ms (org.apache.kafka.connect.mirror.Scheduler:95)

[2024-05-24 18:32:09,893] DEBUG [mm2-cpc|task-0] 
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-20,9599651): 
Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-20, 
upstreamOffset=100253696, downstreamOffset=100253854} is ahead of upstream 
consumer group 9599651) (org.apache.kafka.connect.mirror.OffsetSyncStore:142)

...

[2024-05-24 18:32:09,891] DEBUG [mm2-cpc|task-0] 
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-21,9620968): 
Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-21, 
upstreamOffset=100118386, downstreamOffset=100118547} is ahead of upstream 
consumer group 9620968) (org.apache.kafka.connect.mirror.OffsetSyncStore:142)

Reply via email to