Hi Mehrtens,

I think you are experiencing this problem:
https://issues.apache.org/jira/browse/KAFKA-15905 which just received a
patch which is due to release in 3.8.0/3.7.1. Read there for more context,
and you can consider building and testing with that patch to see if it
resolves your issue.

Hope this helps,
Greg Harris

On Tue, May 28, 2024 at 12:35 PM Mehrtens, M <mmehr...@amazon.com.invalid>
wrote:

> Hi there,
> I’m running into an issue with MirrorMaker2 (MM2), specifically the
> MirrorCheckpointConnector (CPC). In short, CPC doesn’t replicate the final
> batch of offsets to the target Kafka cluster during a migration. I believe
> I can point to the sections of the MM2 code where this is happening, but I
> can’t figure out why it works that way. Can someone help me understand if
> this is a bug or a feature?
>
> Issue Summary:
>
> During a migration scenario with MirrorMaker2, it is desirable to shut
> down a consumer on the source cluster and start it again in the target
> cluster. However, it seems that during this process the final consumer
> group offsets for the source cluster do not get replicated to the target
> cluster.
>
>   *   Consumer is running against source cluster, committing offsets over
> time
>   *   Right before the cutover, consumer commits offset 10
>   *
>  emit.checkpoints.interval.seconds/sync.group.offsets.interval.seconds
> later, CPC replicates offset 10
>   *   Consumer commits offset 20, then gets shut down for migration
>   *   CPC never replicates offset 20, causing a gap in committed offsets
> MM2 Version:
> Kafka 3.7.0
> CPC Config:
>
> {
>
>     "name": "mm2-cpc",
>
>     "connector.class":
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
>
>     "clusters": "msksource,mskdest",
>
>     "source.cluster.alias": "msksource",
>
>     "target.cluster.alias": "mskdest",
>
>     "target.cluster.bootstrap.servers": "b-1...",
>
>     "source.cluster.bootstrap.servers": "b-1...",
>
>     "tasks.max": "1",
>
>     "key.converter": "
> org.apache.kafka.connect.converters.ByteArrayConverter",
>
>     "value.converter":
> "org.apache.kafka.connect.converters.ByteArrayConverter",
>
>     "replication.policy.class":
> "com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
>
>     "replication.factor": "3",
>
>     "checkpoints.topic.replication.factor": "3",
>
>     "refresh.groups.interval.seconds": "60",
>
>     "emit.checkpoints.interval.seconds": "20",
>
>     "sync.group.offsets.interval.seconds": "20",
>
>     "sync.group.offsets.enabled": "true",
>
> }
>
> Possible Explanation
>
> From my read of the code, this appears to be an issue in the
> OffsetSyncStore.
>
> Example during migration when lag is small
> Time 1:
>
> Target cluster consumer group is at offset 381561, source is at offset
> 381561:
>
> Target:
>
> GROUP            TOPIC                   PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12         381561
> 381561          0               -               -               -
>
> Source:
>
> GROUP            TOPIC                   PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12         381561
> 381561          0
>  clickstream-consumer-1278c34c-6ced-49cb-9804-29fc2e8d6df2 /10.0.4.143
>  clickstream-consumer
>
> 60 seconds later:
>
> Target cluster consumer group has advanced to offset 614064, source is at
> offset 651218. The lag is higher in the target than the source because of
> the emit.checkpoints.interval.seconds/sync.group.offsets.interval.seconds
> being set to 20 seconds. If these were lower (e.g. 1 second) we'd still see
> some lag (albeit lower).
>
> Target:
>
> GROUP            TOPIC                   PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12         614064
> 646293          32229           -               -               -
>
>
>
> Source:
>
> GROUP            TOPIC                   PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12         651218
> 651334          116
>  clickstream-consumer-1278c34c-6ced-49cb-9804-29fc2e8d6df2 /10.0.4.143
>  clickstream-consumer
>
> Once consumer is shut down:
>
> Ideally, we would see the offsets converge between source and target once
> the consumer shuts down and the final intervals occur.
>
> However, we see the target cluster consumer group has advanced to offset
> 933426, while the source is at offset 936898. The lag continues to grow
> because the producer continues to send messages.
>
> Target:
>
> GROUP            TOPIC                   PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12         933426
> 1008788         75362           -               -               -
>
>
>
> Source:
>
> GROUP            TOPIC                   PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12         936898
> 1013744         76846           -               -               -
>
> Logs once consumer is shut down to start cutover:
>
> In this case, it appears to be an issue in offset translation as the CPC
> is recording offsets in the log for downstream (795763) that don't match
> the actual downstream offset (933426 current / 1008788 log end).
>
> [2024-05-24 19:30:41,931] TRACE [mm2-cpc|task-0] skip syncing the offset
> for consumer group: mm2TestConsumer1
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:352)
>
> [2024-05-24 19:30:41,931] INFO [mm2-cpc|task-0] sync idle consumer group
> offset from source to target took 1 ms
> (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [2024-05-24 19:30:41,930] INFO [mm2-cpc|task-0] refreshing idle consumers
> group offsets at target cluster took 3 ms
> (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [2024-05-24 19:30:41,930] TRACE [mm2-cpc|task-0] latestDownstreamOffset
> 931828 is larger than or equal to convertedUpstreamOffset 931828 for
> TopicPartition ExampleTopicClickStream-21
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)
>
> [2024-05-24 19:30:41,930] TRACE [mm2-cpc|task-0] latestDownstreamOffset
> 936948 is larger than or equal to convertedUpstreamOffset 936948 for
> TopicPartition ExampleTopicClickStream-23
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)
>
> [2024-05-24 19:30:33,839] TRACE [mm2-cpc|task-0] Skipping
> Checkpoint{consumerGroupId=mm2TestConsumer1,
> topicPartition=ExampleTopicClickStream-16, upstreamOffset=929667,
> downstreamOffset=791619, metadata=No Metadata} (preventing downstream
> rewind) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
>
> [2024-05-24 19:30:33,840] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-18,921852):
> Translated 791455 (relative to
> OffsetSync{topicPartition=ExampleTopicClickStream-18,
> upstreamOffset=791454, downstreamOffset=791454})
> (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
>
> [2024-05-24 19:30:33,840] TRACE [mm2-cpc|task-0] Skipping
> Checkpoint{consumerGroupId=mm2TestConsumer1,
> topicPartition=ExampleTopicClickStream-18, upstreamOffset=921852,
> downstreamOffset=791455, metadata=No Metadata} (preventing downstream
> rewind) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
>
> [2024-05-24 19:30:33,839] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-12,936898):
> Translated 795763 (relative to
> OffsetSync{topicPartition=ExampleTopicClickStream-12,
> upstreamOffset=795762, downstreamOffset=795762})
> (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
>
> [2024-05-24 19:30:33,839] TRACE [mm2-cpc|task-0] Skipping
> Checkpoint{consumerGroupId=mm2TestConsumer1,
> topicPartition=ExampleTopicClickStream-12, upstreamOffset=936898,
> downstreamOffset=795763, metadata=No Metadata} (preventing downstream
> rewind) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
>
> ...
>
> [2024-05-24 19:30:32,776] INFO [mm2-cpc|worker] refreshing consumer groups
> took 15 ms (org.apache.kafka.connect.mirror.Scheduler:95)
>
>
> Example during catch-up where lag is large
> Offsets are out of sync between source and target cluster
> Time 1:
>
> Target cluster consumer group is at offset 2162916, source is at offset
> 8106079:
>
> Target:
>
> GROUP            TOPIC                   PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12         2162916
>  100833554       98670638        -               -               -
>
> Source:
>
> GROUP            TOPIC                   PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12         8106079
>  100833520       92727441
> clickstream-consumer-47a4b99a-1196-4d3b-831c-eee58828d5dc /10.0.3.252
>  clickstream-consumer
>
> 60 seconds later:
>
> Target cluster consumer group is still at offset 2162916, source is at
> offset 8736048
>
> Target:
>
> GROUP            TOPIC                   PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12         2162916
>  100833554       98670638
>
> Source: - - -
>
> GROUP            TOPIC                   PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12         8736048
>  100833520       92097472
> clickstream-consumer-47a4b99a-1196-4d3b-831c-eee58828d5dc /10.0.3.252
>  clickstream-consumer
>
> Logs during catch-up:
>
> Logs show that OffsetSyncStore is skipping the offsets during
> translateDownstream<
> https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L250>
> for checkpointing, which in turn causes checkpointStore<
> https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L324-L370>
> to not provide any offsets to replicate for the consumer group:
>
> [2024-05-24 18:32:49,938] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-16,9871846):
> Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-16,
> upstreamOffset=100256587, downstreamOffset=100256746} is ahead of upstream
> consumer group 9871846)
> (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
>
> ...
>
> [2024-05-24 18:32:49,933] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-29,9831304):
> Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-29,
> upstreamOffset=100165303, downstreamOffset=100165463} is ahead of upstream
> consumer group 9831304)
> (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
>
> [2024-05-24 18:32:48,297] INFO [mm2-cpc|task-0] refreshing idle consumers
> group offsets at target cluster took 99 ms
> (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [2024-05-24 18:32:48,297] TRACE [mm2-cpc|task-0] skip syncing the offset
> for consumer group: mm2TestConsumer1
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:352)
>
> [2024-05-24 18:32:48,297] INFO [mm2-cpc|task-0] sync idle consumer group
> offset from source to target took 0 ms
> (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [2024-05-24 18:32:29,906] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-20,9702235):
> Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-20,
> upstreamOffset=100253696, downstreamOffset=100253854} is ahead of upstream
> consumer group 9702235)
> (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
>
> ...
>
> [2024-05-24 18:32:29,903] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-21,9723670):
> Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-21,
> upstreamOffset=100118386, downstreamOffset=100118547} is ahead of upstream
> consumer group 9723670)
> (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
>
> [2024-05-24 18:32:28,297] INFO [mm2-cpc|task-0] refreshing idle consumers
> group offsets at target cluster took 99 ms
> (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [2024-05-24 18:32:28,298] TRACE [mm2-cpc|task-0] skip syncing the offset
> for consumer group: mm2TestConsumer1
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:352)
>
> [2024-05-24 18:32:28,298] INFO [mm2-cpc|task-0] sync idle consumer group
> offset from source to target took 0 ms
> (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [2024-05-24 18:32:27,873] DEBUG [mm2-cpc|worker] Ignoring the following
> groups which do not have any offsets for topics that are accepted by the
> topic filter: [amazon.msk.canary.group.broker-1,
> amazon.msk.canary.group.broker-3, amazon.msk.canary.group.broker-2]
> (org.apache.kafka.connect.mirror.MirrorCheckpointConnector:212)
>
> [2024-05-24 18:32:27,873] INFO [mm2-cpc|worker] refreshing consumer groups
> took 15 ms (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [2024-05-24 18:32:09,893] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-20,9599651):
> Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-20,
> upstreamOffset=100253696, downstreamOffset=100253854} is ahead of upstream
> consumer group 9599651)
> (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
>
> ...
>
> [2024-05-24 18:32:09,891] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-21,9620968):
> Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-21,
> upstreamOffset=100118386, downstreamOffset=100118547} is ahead of upstream
> consumer group 9620968)
> (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
>
>

Reply via email to