HI - Apologies for the delay, it took me a while to find time to learn how to 
build from source and incorporate that into my test environment. I've built off 
of the 3.7.1-rc2 tag which includes the KAFKA-15905 fix. Unfortunately, this 
did not fix the issue.

With 3.7.1-rc2, offset replication still paused once the source cluster 
consumer group was turned off. 

There is a gap in offsets between source & target cluster, where the source 
cluster has a last committed offset for the consumer group that is greater than 
the last committed offset replicated in the target cluster.




On 5/28/24, 2:43 PM, "Greg Harris" <greg.har...@aiven.io.inva 
<mailto:greg.har...@aiven.io.inva>LID> wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.






Hi Mehrtens,


I think you are experiencing this problem:
https://issues.apache.org/jira/browse/KAFKA-15905 
<https://issues.apache.org/jira/browse/KAFKA-15905> which just received a
patch which is due to release in 3.8.0/3.7.1. Read there for more context,
and you can consider building and testing with that patch to see if it
resolves your issue.


Hope this helps,
Greg Harris


On Tue, May 28, 2024 at 12:35 PM Mehrtens, M <mmehr...@amazon.com.inva 
<mailto:mmehr...@amazon.com.inva>lid>
wrote:


> Hi there,
> I’m running into an issue with MirrorMaker2 (MM2), specifically the
> MirrorCheckpointConnector (CPC). In short, CPC doesn’t replicate the final
> batch of offsets to the target Kafka cluster during a migration. I believe
> I can point to the sections of the MM2 code where this is happening, but I
> can’t figure out why it works that way. Can someone help me understand if
> this is a bug or a feature?
>
> Issue Summary:
>
> During a migration scenario with MirrorMaker2, it is desirable to shut
> down a consumer on the source cluster and start it again in the target
> cluster. However, it seems that during this process the final consumer
> group offsets for the source cluster do not get replicated to the target
> cluster.
>
> * Consumer is running against source cluster, committing offsets over
> time
> * Right before the cutover, consumer commits offset 10
> *
> emit.checkpoints.interval.seconds/sync.group.offsets.interval.seconds
> later, CPC replicates offset 10
> * Consumer commits offset 20, then gets shut down for migration
> * CPC never replicates offset 20, causing a gap in committed offsets
> MM2 Version:
> Kafka 3.7.0
> CPC Config:
>
> {
>
> "name": "mm2-cpc",
>
> "connector.class":
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
>
> "clusters": "msksource,mskdest",
>
> "source.cluster.alias": "msksource",
>
> "target.cluster.alias": "mskdest",
>
> "target.cluster.bootstrap.servers": "b-1...",
>
> "source.cluster.bootstrap.servers": "b-1...",
>
> "tasks.max": "1",
>
> "key.converter": "
> org.apache.kafka.connect.converters.ByteArrayConverter",
>
> "value.converter":
> "org.apache.kafka.connect.converters.ByteArrayConverter",
>
> "replication.policy.class":
> "com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
>
> "replication.factor": "3",
>
> "checkpoints.topic.replication.factor": "3",
>
> "refresh.groups.interval.seconds": "60",
>
> "emit.checkpoints.interval.seconds": "20",
>
> "sync.group.offsets.interval.seconds": "20",
>
> "sync.group.offsets.enabled": "true",
>
> }
>
> Possible Explanation
>
> From my read of the code, this appears to be an issue in the
> OffsetSyncStore.
>
> Example during migration when lag is small
> Time 1:
>
> Target cluster consumer group is at offset 381561, source is at offset
> 381561:
>
> Target:
>
> GROUP TOPIC PARTITION CURRENT-OFFSET
> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12 381561
> 381561 0 - - -
>
> Source:
>
> GROUP TOPIC PARTITION CURRENT-OFFSET
> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12 381561
> 381561 0
> clickstream-consumer-1278c34c-6ced-49cb-9804-29fc2e8d6df2 /10.0.4.143
> clickstream-consumer
>
> 60 seconds later:
>
> Target cluster consumer group has advanced to offset 614064, source is at
> offset 651218. The lag is higher in the target than the source because of
> the emit.checkpoints.interval.seconds/sync.group.offsets.interval.seconds
> being set to 20 seconds. If these were lower (e.g. 1 second) we'd still see
> some lag (albeit lower).
>
> Target:
>
> GROUP TOPIC PARTITION CURRENT-OFFSET
> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12 614064
> 646293 32229 - - -
>
>
>
> Source:
>
> GROUP TOPIC PARTITION CURRENT-OFFSET
> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12 651218
> 651334 116
> clickstream-consumer-1278c34c-6ced-49cb-9804-29fc2e8d6df2 /10.0.4.143
> clickstream-consumer
>
> Once consumer is shut down:
>
> Ideally, we would see the offsets converge between source and target once
> the consumer shuts down and the final intervals occur.
>
> However, we see the target cluster consumer group has advanced to offset
> 933426, while the source is at offset 936898. The lag continues to grow
> because the producer continues to send messages.
>
> Target:
>
> GROUP TOPIC PARTITION CURRENT-OFFSET
> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12 933426
> 1008788 75362 - - -
>
>
>
> Source:
>
> GROUP TOPIC PARTITION CURRENT-OFFSET
> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12 936898
> 1013744 76846 - - -
>
> Logs once consumer is shut down to start cutover:
>
> In this case, it appears to be an issue in offset translation as the CPC
> is recording offsets in the log for downstream (795763) that don't match
> the actual downstream offset (933426 current / 1008788 log end).
>
> [2024-05-24 19:30:41,931] TRACE [mm2-cpc|task-0] skip syncing the offset
> for consumer group: mm2TestConsumer1
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:352)
>
> [2024-05-24 19:30:41,931] INFO [mm2-cpc|task-0] sync idle consumer group
> offset from source to target took 1 ms
> (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [2024-05-24 19:30:41,930] INFO [mm2-cpc|task-0] refreshing idle consumers
> group offsets at target cluster took 3 ms
> (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [2024-05-24 19:30:41,930] TRACE [mm2-cpc|task-0] latestDownstreamOffset
> 931828 is larger than or equal to convertedUpstreamOffset 931828 for
> TopicPartition ExampleTopicClickStream-21
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)
>
> [2024-05-24 19:30:41,930] TRACE [mm2-cpc|task-0] latestDownstreamOffset
> 936948 is larger than or equal to convertedUpstreamOffset 936948 for
> TopicPartition ExampleTopicClickStream-23
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)
>
> [2024-05-24 19:30:33,839] TRACE [mm2-cpc|task-0] Skipping
> Checkpoint{consumerGroupId=mm2TestConsumer1,
> topicPartition=ExampleTopicClickStream-16, upstreamOffset=929667,
> downstreamOffset=791619, metadata=No Metadata} (preventing downstream
> rewind) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
>
> [2024-05-24 19:30:33,840] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-18,921852):
> Translated 791455 (relative to
> OffsetSync{topicPartition=ExampleTopicClickStream-18,
> upstreamOffset=791454, downstreamOffset=791454})
> (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
>
> [2024-05-24 19:30:33,840] TRACE [mm2-cpc|task-0] Skipping
> Checkpoint{consumerGroupId=mm2TestConsumer1,
> topicPartition=ExampleTopicClickStream-18, upstreamOffset=921852,
> downstreamOffset=791455, metadata=No Metadata} (preventing downstream
> rewind) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
>
> [2024-05-24 19:30:33,839] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-12,936898):
> Translated 795763 (relative to
> OffsetSync{topicPartition=ExampleTopicClickStream-12,
> upstreamOffset=795762, downstreamOffset=795762})
> (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
>
> [2024-05-24 19:30:33,839] TRACE [mm2-cpc|task-0] Skipping
> Checkpoint{consumerGroupId=mm2TestConsumer1,
> topicPartition=ExampleTopicClickStream-12, upstreamOffset=936898,
> downstreamOffset=795763, metadata=No Metadata} (preventing downstream
> rewind) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
>
> ...
>
> [2024-05-24 19:30:32,776] INFO [mm2-cpc|worker] refreshing consumer groups
> took 15 ms (org.apache.kafka.connect.mirror.Scheduler:95)
>
>
> Example during catch-up where lag is large
> Offsets are out of sync between source and target cluster
> Time 1:
>
> Target cluster consumer group is at offset 2162916, source is at offset
> 8106079:
>
> Target:
>
> GROUP TOPIC PARTITION CURRENT-OFFSET
> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12 2162916
> 100833554 98670638 - - -
>
> Source:
>
> GROUP TOPIC PARTITION CURRENT-OFFSET
> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12 8106079
> 100833520 92727441
> clickstream-consumer-47a4b99a-1196-4d3b-831c-eee58828d5dc /10.0.3.252
> clickstream-consumer
>
> 60 seconds later:
>
> Target cluster consumer group is still at offset 2162916, source is at
> offset 8736048
>
> Target:
>
> GROUP TOPIC PARTITION CURRENT-OFFSET
> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12 2162916
> 100833554 98670638
>
> Source: - - -
>
> GROUP TOPIC PARTITION CURRENT-OFFSET
> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
>
> mm2TestConsumer1 ExampleTopicClickStream 12 8736048
> 100833520 92097472
> clickstream-consumer-47a4b99a-1196-4d3b-831c-eee58828d5dc /10.0.3.252
> clickstream-consumer
>
> Logs during catch-up:
>
> Logs show that OffsetSyncStore is skipping the offsets during
> translateDownstream<
> https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L250>
>  
> <https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L250&gt;>
> for checkpointing, which in turn causes checkpointStore<
> https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L324-L370>
>  
> <https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L324-L370&gt;>
> to not provide any offsets to replicate for the consumer group:
>
> [2024-05-24 18:32:49,938] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-16,9871846):
> Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-16,
> upstreamOffset=100256587, downstreamOffset=100256746} is ahead of upstream
> consumer group 9871846)
> (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
>
> ...
>
> [2024-05-24 18:32:49,933] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-29,9831304):
> Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-29,
> upstreamOffset=100165303, downstreamOffset=100165463} is ahead of upstream
> consumer group 9831304)
> (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
>
> [2024-05-24 18:32:48,297] INFO [mm2-cpc|task-0] refreshing idle consumers
> group offsets at target cluster took 99 ms
> (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [2024-05-24 18:32:48,297] TRACE [mm2-cpc|task-0] skip syncing the offset
> for consumer group: mm2TestConsumer1
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:352)
>
> [2024-05-24 18:32:48,297] INFO [mm2-cpc|task-0] sync idle consumer group
> offset from source to target took 0 ms
> (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [2024-05-24 18:32:29,906] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-20,9702235):
> Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-20,
> upstreamOffset=100253696, downstreamOffset=100253854} is ahead of upstream
> consumer group 9702235)
> (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
>
> ...
>
> [2024-05-24 18:32:29,903] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-21,9723670):
> Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-21,
> upstreamOffset=100118386, downstreamOffset=100118547} is ahead of upstream
> consumer group 9723670)
> (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
>
> [2024-05-24 18:32:28,297] INFO [mm2-cpc|task-0] refreshing idle consumers
> group offsets at target cluster took 99 ms
> (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [2024-05-24 18:32:28,298] TRACE [mm2-cpc|task-0] skip syncing the offset
> for consumer group: mm2TestConsumer1
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:352)
>
> [2024-05-24 18:32:28,298] INFO [mm2-cpc|task-0] sync idle consumer group
> offset from source to target took 0 ms
> (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [2024-05-24 18:32:27,873] DEBUG [mm2-cpc|worker] Ignoring the following
> groups which do not have any offsets for topics that are accepted by the
> topic filter: [amazon.msk.canary.group.broker-1,
> amazon.msk.canary.group.broker-3, amazon.msk.canary.group.broker-2]
> (org.apache.kafka.connect.mirror.MirrorCheckpointConnector:212)
>
> [2024-05-24 18:32:27,873] INFO [mm2-cpc|worker] refreshing consumer groups
> took 15 ms (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [2024-05-24 18:32:09,893] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-20,9599651):
> Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-20,
> upstreamOffset=100253696, downstreamOffset=100253854} is ahead of upstream
> consumer group 9599651)
> (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
>
> ...
>
> [2024-05-24 18:32:09,891] DEBUG [mm2-cpc|task-0]
> translateDownstream(mm2TestConsumer1,ExampleTopicClickStream-21,9620968):
> Skipped (OffsetSync{topicPartition=ExampleTopicClickStream-21,
> upstreamOffset=100118386, downstreamOffset=100118547} is ahead of upstream
> consumer group 9620968)
> (org.apache.kafka.connect.mirror.OffsetSyncStore:142)
>
>



Reply via email to