Hi there,
I’m running into an issue with MirrorMaker2 (MM2), specifically the
MirrorCheckpointConnector (CPC). In short, CPC doesn’t replicate the final
batch of offsets to the target Kafka cluster during a migration. I believe I
can point to the sections of the MM2 code where this is happening, but I can’t
figure out why it works that way. Can someone help me understand if this is a
bug or a feature?
Issue Summary:
During a migration scenario with MirrorMaker2, it is desirable to shut down a
consumer on the source cluster and start it again in the target cluster.
However, it seems that during this process the final consumer group offsets for
the source cluster do not get replicated to the target cluster.
* Consumer is running against source cluster, committing offsets over time
* Right before the cutover, consumer commits offset 10
* emit.checkpoints.interval.seconds/sync.group.offsets.interval.seconds
later, CPC replicates offset 10
* Consumer commits offset 20, then gets shut down for migration
* CPC never replicates offset 20, causing a gap in committed offsets
MM2 Version:
Kafka 3.7.0
CPC Config:
{
"name": "mm2-cpc",
"connector.class":
"org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"clusters": "msksource,mskdest",
"source.cluster.alias": "msksource",
"target.cluster.alias": "mskdest",
"target.cluster.bootstrap.servers": "b-1...",
"source.cluster.bootstrap.servers": "b-1...",
"tasks.max": "1",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.policy.class":
"com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
"replication.factor": "3",
"checkpoints.topic.replication.factor": "3",
"refresh.groups.interval.seconds": "60",
"emit.checkpoints.interval.seconds": "20",
"sync.group.offsets.interval.seconds": "20",
"sync.group.offsets.enabled": "true",
}
Possible Explanation
From my read of the code, this appears to be an issue in the OffsetSyncStore.
Example during migration when lag is small
Time 1:
Target cluster consumer group is at offset 381561, source is at offset 381561:
Target:
GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mm2TestConsumer1 ExampleTopicClickStream 12 381561 381561
0 - - -
Source:
GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mm2TestConsumer1 ExampleTopicClickStream 12 381561 381561
0 clickstream-consumer-1278c34c-6ced-49cb-9804-29fc2e8d6df2
/10.0.4.143 clickstream-consumer
60 seconds later:
Target cluster consumer group has advanced to offset 614064, source is at
offset 651218. The lag is higher in the target than the source because of the
emit.checkpoints.interval.seconds/sync.group.offsets.interval.seconds being set
to 20 seconds. If these were lower (e.g. 1 second) we'd still see some lag
(albeit lower).
Target:
GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mm2TestConsumer1 ExampleTopicClickStream 12 614064 646293
32229 - - -
Source:
GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mm2TestConsumer1 ExampleTopicClickStream 12 651218 651334
116 clickstream-consumer-1278c34c-6ced-49cb-9804-29fc2e8d6df2
/10.0.4.143 clickstream-consumer
Once consumer is shut down:
Ideally, we would see the offsets converge between source and target once the
consumer shuts down and the final intervals occur.
However, we see the target cluster consumer group has advanced to offset
933426, while the source is at offset 936898. The lag continues to grow because
the producer continues to send messages.
Target:
GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mm2TestConsumer1 ExampleTopicClickStream 12 933426 1008788
75362 - - -
Source:
GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mm2TestConsumer1 ExampleTopicClickStream 12 936898 1013744
76846 - - -
Logs once consumer is shut down to start cutover:
In this case, it appears to be an issue in offset translation as the CPC is
recording offsets in the log for downstream (795763) that don't match the
actual downstream offset (933426 current / 1008788 log end).
[2024-05-24 19:30:41,931] TRACE [mm2-cpc|task-0] skip syncing the offset for
consumer group: mm2TestConsumer1
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:352)
[2024-05-24 19:30:41,931] INFO [mm2-cpc|task-0] sync idle consumer group offset
from source to target took 1 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2024-05-24 19:30:41,930] INFO [mm2-cpc|task-0] refreshing idle consumers group
offsets at target cluster took 3 ms
(org.apache.kafka.connect.mirror.Scheduler:95)
[2024-05-24 19:30:41,930] TRACE [mm2-cpc|task-0] latestDownstreamOffset 931828
is larger than or equal to convertedUpstreamOffset 931828 for TopicPartition
ExampleTopicClickStream-21
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)
[2024-05-24 19:30:41,930] TRACE [mm2-cpc|task-0] latestDownstreamOffset 936948
is larger than or equal to convertedUpstreamOffset 936948 for TopicPartition
ExampleTopicClickStream-23
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)
[2024-05-24 19:30:33,839] TRACE [mm2-cpc|task-0] Skipping
Checkpoint{consumerGroupId=mm2TestConsumer1,
topicPartition=ExampleTopicClickStream-16, upstreamOffset=929667,
downstreamOffset=791619, metadata=No Metadata} (preventing downstream rewind)
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
[2024-05-24 19:30:33,840] DEBUG [mm2-cpc|task-0]
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-18,921852):
Translated 791455 (relative to
OffsetSync{topicPartition=ExampleTopicClickStream-18, upstreamOffset=791454,
downstreamOffset=791454}) (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
[2024-05-24 19:30:33,840] TRACE [mm2-cpc|task-0] Skipping
Checkpoint{consumerGroupId=mm2TestConsumer1,
topicPartition=ExampleTopicClickStream-18, upstreamOffset=921852,
downstreamOffset=791455, metadata=No Metadata} (preventing downstream rewind)
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
[2024-05-24 19:30:33,839] DEBUG [mm2-cpc|task-0]
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-12,936898):
Translated 795763 (relative to
OffsetSync{topicPartition=ExampleTopicClickStream-12, upstreamOffset=795762,
downstreamOffset=795762}) (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
[2024-05-24 19:30:33,839] TRACE [mm2-cpc|task-0] Skipping
Checkpoint{consumerGroupId=mm2TestConsumer1,
topicPartition=ExampleTopicClickStream-12, upstreamOffset=936898,
downstreamOffset=795763, metadata=No Metadata} (preventing downstream rewind)
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
...
[2024-05-24 19:30:32,776] INFO [mm2-cpc|worker] refreshing consumer groups took
15 ms (org.apache.kafka.connect.mirror.Scheduler:95)
Example during catch-up where lag is large
Offsets are out of sync between source and target cluster
Time 1:
Target cluster consumer group is at offset 2162916, source is at offset 8106079:
Target:
GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mm2TestConsumer1 ExampleTopicClickStream 12 2162916 100833554
98670638 - - -
Source:
GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mm2TestConsumer1 ExampleTopicClickStream 12 8106079 100833520
92727441 clickstream-consumer-47a4b99a-1196-4d3b-831c-eee58828d5dc
/10.0.3.252 clickstream-consumer
60 seconds later:
Target cluster consumer group is still at offset 2162916, source is at offset
8736048
Target:
GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mm2TestConsumer1 ExampleTopicClickStream 12 2162916 100833554
98670638
Source: - - -
GROUP TOPIC PARTITION CURRENT-OFFSET
LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mm2TestConsumer1 ExampleTopicClickStream 12 8736048 100833520
92097472 clickstream-consumer-47a4b99a-1196-4d3b-831c-eee58828d5dc
/10.0.3.252 clickstream-consumer
Logs during catch-up:
Logs show that OffsetSyncStore is skipping the offsets during
translateDownstream<https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L250>
for checkpointing, which in turn causes
checkpointStore<https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L324-L370>
to not provide any offsets to replicate for the consumer group:
[2024-05-24 18:32:49,938] DEBUG [mm2-cpc|task-0]
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-16,9871846):
Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-16,
upstreamOffset=100256587, downstreamOffset=100256746} is ahead of upstream
consumer group 9871846) (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
...
[2024-05-24 18:32:49,933] DEBUG [mm2-cpc|task-0]
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-29,9831304):
Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-29,
upstreamOffset=100165303, downstreamOffset=100165463} is ahead of upstream
consumer group 9831304) (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
[2024-05-24 18:32:48,297] INFO [mm2-cpc|task-0] refreshing idle consumers group
offsets at target cluster took 99 ms
(org.apache.kafka.connect.mirror.Scheduler:95)
[2024-05-24 18:32:48,297] TRACE [mm2-cpc|task-0] skip syncing the offset for
consumer group: mm2TestConsumer1
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:352)
[2024-05-24 18:32:48,297] INFO [mm2-cpc|task-0] sync idle consumer group offset
from source to target took 0 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2024-05-24 18:32:29,906] DEBUG [mm2-cpc|task-0]
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-20,9702235):
Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-20,
upstreamOffset=100253696, downstreamOffset=100253854} is ahead of upstream
consumer group 9702235) (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
...
[2024-05-24 18:32:29,903] DEBUG [mm2-cpc|task-0]
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-21,9723670):
Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-21,
upstreamOffset=100118386, downstreamOffset=100118547} is ahead of upstream
consumer group 9723670) (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
[2024-05-24 18:32:28,297] INFO [mm2-cpc|task-0] refreshing idle consumers group
offsets at target cluster took 99 ms
(org.apache.kafka.connect.mirror.Scheduler:95)
[2024-05-24 18:32:28,298] TRACE [mm2-cpc|task-0] skip syncing the offset for
consumer group: mm2TestConsumer1
(org.apache.kafka.connect.mirror.MirrorCheckpointTask:352)
[2024-05-24 18:32:28,298] INFO [mm2-cpc|task-0] sync idle consumer group offset
from source to target took 0 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2024-05-24 18:32:27,873] DEBUG [mm2-cpc|worker] Ignoring the following groups
which do not have any offsets for topics that are accepted by the topic filter:
[amazon.msk.canary.group.broker-1, amazon.msk.canary.group.broker-3,
amazon.msk.canary.group.broker-2]
(org.apache.kafka.connect.mirror.MirrorCheckpointConnector:212)
[2024-05-24 18:32:27,873] INFO [mm2-cpc|worker] refreshing consumer groups took
15 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2024-05-24 18:32:09,893] DEBUG [mm2-cpc|task-0]
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-20,9599651):
Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-20,
upstreamOffset=100253696, downstreamOffset=100253854} is ahead of upstream
consumer group 9599651) (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
...
[2024-05-24 18:32:09,891] DEBUG [mm2-cpc|task-0]
translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-21,9620968):
Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-21,
upstreamOffset=100118386, downstreamOffset=100118547} is ahead of upstream
consumer group 9620968) (org.apache.kafka.connect.mirror.OffsetSyncStore:142)