[jira] [Commented] (KAFKA-16291) Mirrormaker2 wrong checkpoints
[ https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825467#comment-17825467 ] Greg Harris commented on KAFKA-16291: - [~claudio.benfatto] That's a good idea, I agree that the default behavior isn't good enough in all scenarios and a configuration is needed. Since it includes a user configuration, and needs significant design work, this will need a KIP. I've opened KAFKA-16364 to track the work there, and you're welcome to assign yourself and draft a KIP. But just to temper your expectations here: > Offset translation guarantees zero-redelivery This is not possible given the asynchronous pattern used for offset translation. I think this can be true in an eventual-consistency sense: If the upstream consumer group is inactive for sufficiently long enough (and lag < N), then translation could be exact. We can also use this as an opportunity to design an alternative to offset.lag.max=0 and the mirror source sync send semaphore[!] because even with a 100% retention solution on the MirrorCheckpointTask side, the MirrorSourceTask still drops syncs occasionally. > Mirrormaker2 wrong checkpoints > -- > > Key: KAFKA-16291 > URL: https://issues.apache.org/jira/browse/KAFKA-16291 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.6.1 > Environment: Mirrormaker2 version 3.6.1 running on docker containers >Reporter: Claudio Benfatto >Priority: Major > > I am running Mirrormaker2 with the following configuration: > {noformat} > clusters = fallingwaterfall, weatheredbase > sync.group.offsets.interval.seconds=30 > emit.checkpoints.interval.seconds=30 > offset.lag.max=0 > fallingwaterfall->weatheredbase.enabled = true > weatheredbase->fallingwaterfall.enabled = false > sync.group.offsets.enabled=true > emit.heartbeats.enabled=true > emit.checkpoints.enabled=true > emit.checkpoints.interval.seconds=30 > refresh.groups.enabled=true > refresh.groups.interval.seconds=30 > refresh.topics.enabled=true > sync.topic.configs.enabled=true > refresh.topics.interval.seconds=30 > sync.topic.acls.enabled = false > fallingwaterfall->weatheredbase.topics = storage-demo-.* > fallingwaterfall->weatheredbase.groups = storage-demo-.* > group.id=mirror-maker-fallingwaterfall-weatheredbase > consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase > fallingwaterfall.consumer.isolation.level = read_committed > weatheredbase.producer.enable.idempotence = true > weatheredbase.producer.acks=all > weatheredbase.exactly.once.source.support = enabled > replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy > {noformat} > I am experiencing issues with the consumer group offset synchronisation. > I have a setup with a 12-partition topic, named *storage-demo-test,* a single > transactional producer to this topic and a consumer group, named > *storage-demo-test-cg,* consuming from it. > The consumer configuration is: > {code:java} > 'auto.offset.reset': 'earliest', > 'isolation.level': 'read_committed', > 'enable.auto.commit': False, {code} > and I'm committing the offsets explicitly and synchronously after each poll. > What I observed is that the synchronised offsets between the upstream and > downstream cluster for the *storage-demo-test-cg* are often wrong. > For example in the case of this checkpoint: > {code:java} > (1, 1708505669764) - 6252 - > CheckpointKey(consumer_group='storage-demo-test-cg', > topic='storage-demo-test', partition=5) - > CheckpointValue(upstream_offset=197532, downstream_offset=196300) {code} > We have a mismatch in the replicated messages: > {code:java} > [fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1 > Test message 1027-0 {code} > {code:java} > [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1 > Test message 1015-9 {code} > In the Mirrormaker2 logs I see many of these messages: > {code:java} > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 09:02:18,534] TRACE [MirrorCheckpointConnector|task-0] > latestDownstreamOffset 196300 is larger than or equal to > convertedUpstreamOffset 196300 for TopicPartition storage-demo-test-5 > (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] > translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): > Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, > upstreamOffset=196913, downstreamOffset=195683}) > (org.apache.kafka.connect.mirror.OffsetSyncStore:160) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 09:02:01,557] TRACE [MirrorCheckpointCo
[jira] [Commented] (KAFKA-16291) Mirrormaker2 wrong checkpoints
[ https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819522#comment-17819522 ] Claudio Benfatto commented on KAFKA-16291: -- In terms of Mirrormaker features, what would be great to have is a more customisable behaviour for the *OffsetSyncStore.* What I'm thinking about is the possibility to keep information, in particular when *offset.lag.max=0,* about the last *N* OffsetSync records (configurable) in a FIFO data structure. And to emit a metric with the last offset translated. This way we could always guarantee exact offset translation within certain boundaries. The contract with the consumers in this case would be: * Offset translation guarantees zero-redelivery when *offset.lag.max=0* and within a consumer lag of less than *N* * No offset translation happens when we cannot guarantee that it is exact (maps 1:1 with an upstream offset) * We can check the last offset for which we had an exact translation via the value of a metric * Memory is still bounded in size and its size depends on the value of *N* and the number of topics being replicated * Not sure about performance when looking up for the index tbh This way we could have more predictable behaviour when performing topic migration from an upstream to a downstream cluster, which I believe is a very common use case. > Mirrormaker2 wrong checkpoints > -- > > Key: KAFKA-16291 > URL: https://issues.apache.org/jira/browse/KAFKA-16291 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.6.1 > Environment: Mirrormaker2 version 3.6.1 running on docker containers >Reporter: Claudio Benfatto >Priority: Major > > I am running Mirrormaker2 with the following configuration: > {noformat} > clusters = fallingwaterfall, weatheredbase > sync.group.offsets.interval.seconds=30 > emit.checkpoints.interval.seconds=30 > offset.lag.max=0 > fallingwaterfall->weatheredbase.enabled = true > weatheredbase->fallingwaterfall.enabled = false > sync.group.offsets.enabled=true > emit.heartbeats.enabled=true > emit.checkpoints.enabled=true > emit.checkpoints.interval.seconds=30 > refresh.groups.enabled=true > refresh.groups.interval.seconds=30 > refresh.topics.enabled=true > sync.topic.configs.enabled=true > refresh.topics.interval.seconds=30 > sync.topic.acls.enabled = false > fallingwaterfall->weatheredbase.topics = storage-demo-.* > fallingwaterfall->weatheredbase.groups = storage-demo-.* > group.id=mirror-maker-fallingwaterfall-weatheredbase > consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase > fallingwaterfall.consumer.isolation.level = read_committed > weatheredbase.producer.enable.idempotence = true > weatheredbase.producer.acks=all > weatheredbase.exactly.once.source.support = enabled > replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy > {noformat} > I am experiencing issues with the consumer group offset synchronisation. > I have a setup with a 12-partition topic, named *storage-demo-test,* a single > transactional producer to this topic and a consumer group, named > *storage-demo-test-cg,* consuming from it. > The consumer configuration is: > {code:java} > 'auto.offset.reset': 'earliest', > 'isolation.level': 'read_committed', > 'enable.auto.commit': False, {code} > and I'm committing the offsets explicitly and synchronously after each poll. > What I observed is that the synchronised offsets between the upstream and > downstream cluster for the *storage-demo-test-cg* are often wrong. > For example in the case of this checkpoint: > {code:java} > (1, 1708505669764) - 6252 - > CheckpointKey(consumer_group='storage-demo-test-cg', > topic='storage-demo-test', partition=5) - > CheckpointValue(upstream_offset=197532, downstream_offset=196300) {code} > We have a mismatch in the replicated messages: > {code:java} > [fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1 > Test message 1027-0 {code} > {code:java} > [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1 > Test message 1015-9 {code} > In the Mirrormaker2 logs I see many of these messages: > {code:java} > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 09:02:18,534] TRACE [MirrorCheckpointConnector|task-0] > latestDownstreamOffset 196300 is larger than or equal to > convertedUpstreamOffset 196300 for TopicPartition storage-demo-test-5 > (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] > translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): > Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5,
[jira] [Commented] (KAFKA-16291) Mirrormaker2 wrong checkpoints
[ https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819397#comment-17819397 ] Claudio Benfatto commented on KAFKA-16291: -- Hi [~gharris1727] , thanks a lot for your clear explanation of Mirrormaker's behaviour. Now I understand much better how everything ties together. My use case is a cluster migration, planning to move the consumers ahead of the producers. And yes, you are right, my original idea was to have a zero-redelivery migration for the consumers without stopping the producers. Now I see better how this is hard to achieve, and especially when the consumers are lagging a bit behind. I appreciate how Mirrormaker, being a general purpose tool, need to accommodate different use cases and prioritise some guarantees over others (in an ideal world, perhaps, that behaviour could be tuned and customised and some of the guarantees swapped for others). This said, I think I will be testing a different strategy based on the *OffsetSync* records (as you observed, with {*}offset.lag.max=0{*}, they offer complete and accurate data for the offset translation). What I plan on doing is: # Stop the consumer in upstream # Find its current offsets for each partition # Consume the offset sync topic, from a recent enough timestamp, to seek for the records with the upstream offsets which are the closest to the upstream consumer current offsets (usually that should be 1 or 2 offsets behind, depending on the transaction markers) # Use the downstream offset (+1) for the record in 3. to alter the offsets for the downstream consumer groups My main concern is with the efficiency of consuming the offset sync topic. Even though I would process it only for, let's say, the last 10 minutes it could still be a lot of records to go through. Let me know if this plan sounds reasonable to you, I would really appreciate your advice ;) Many thanks! > Mirrormaker2 wrong checkpoints > -- > > Key: KAFKA-16291 > URL: https://issues.apache.org/jira/browse/KAFKA-16291 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.6.1 > Environment: Mirrormaker2 version 3.6.1 running on docker containers >Reporter: Claudio Benfatto >Priority: Major > > I am running Mirrormaker2 with the following configuration: > {noformat} > clusters = fallingwaterfall, weatheredbase > sync.group.offsets.interval.seconds=30 > emit.checkpoints.interval.seconds=30 > offset.lag.max=0 > fallingwaterfall->weatheredbase.enabled = true > weatheredbase->fallingwaterfall.enabled = false > sync.group.offsets.enabled=true > emit.heartbeats.enabled=true > emit.checkpoints.enabled=true > emit.checkpoints.interval.seconds=30 > refresh.groups.enabled=true > refresh.groups.interval.seconds=30 > refresh.topics.enabled=true > sync.topic.configs.enabled=true > refresh.topics.interval.seconds=30 > sync.topic.acls.enabled = false > fallingwaterfall->weatheredbase.topics = storage-demo-.* > fallingwaterfall->weatheredbase.groups = storage-demo-.* > group.id=mirror-maker-fallingwaterfall-weatheredbase > consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase > fallingwaterfall.consumer.isolation.level = read_committed > weatheredbase.producer.enable.idempotence = true > weatheredbase.producer.acks=all > weatheredbase.exactly.once.source.support = enabled > replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy > {noformat} > I am experiencing issues with the consumer group offset synchronisation. > I have a setup with a 12-partition topic, named *storage-demo-test,* a single > transactional producer to this topic and a consumer group, named > *storage-demo-test-cg,* consuming from it. > The consumer configuration is: > {code:java} > 'auto.offset.reset': 'earliest', > 'isolation.level': 'read_committed', > 'enable.auto.commit': False, {code} > and I'm committing the offsets explicitly and synchronously after each poll. > What I observed is that the synchronised offsets between the upstream and > downstream cluster for the *storage-demo-test-cg* are often wrong. > For example in the case of this checkpoint: > {code:java} > (1, 1708505669764) - 6252 - > CheckpointKey(consumer_group='storage-demo-test-cg', > topic='storage-demo-test', partition=5) - > CheckpointValue(upstream_offset=197532, downstream_offset=196300) {code} > We have a mismatch in the replicated messages: > {code:java} > [fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1 > Test message 1027-0 {code} > {code:java} > [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1 > Test message 1015-9 {code} > In the Mirrormaker2 logs I see many of these messages: > {code:java} > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - >
[jira] [Commented] (KAFKA-16291) Mirrormaker2 wrong checkpoints
[ https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819368#comment-17819368 ] Greg Harris commented on KAFKA-16291: - Hi [~claudio.benfatto] Thanks for the bug report and the very relevant logs! I'm sorry that offset translation isn't working how you expect. These two lines show what is happening: {noformat} mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 08:58:40,812] TRACE [MirrorCheckpointConnector|task-0] New sync OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=198765, downstreamOffset=197535} applied, new state is [198765:197535,198764:197534,198762:197532,198761:197531,198753:197523,198739:197509,198717:197487,198673:197443,198585:197355,198497:197267,198321:197091,197617:196387,196913:195683,194098:192868] (org.apache.kafka.connect.mirror.OffsetSyncStore:193) mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 08:54:05,030] TRACE [MirrorCheckpointConnector|task-0] New sync OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=197532, downstreamOffset=196302} applied, new state is [197532:196302,197530:196300,197529:196299,197521:196291,197507:196277,197485:196255,197441:196211,197353:196123,197265:196035,196913:195683,196209:194979,195505:194275,194098:192868] (org.apache.kafka.connect.mirror.OffsetSyncStore:193){noformat} These log lines are printing out the whole in-memory state of the offset translation cache. If an offset isn't in this list, then it isn't available for translation after that point in time. You can see the critical sync 197532:196302 get added to the front of the cache, and by the time the next line is printed 4 minutes later, that sync is no longer present. It has syncs on either side (196913:195683 and 197616:196387) and separated by some gap, which is expected. The earlier/lower of these two syncs is the one that is being used for translation later: mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): Translated 195684 (relative to OffsetSync\{topicPartition=storage-demo-test-5, upstreamOffset=196913, downstreamOffset=195683} The idea here is MirrorMaker2 keeps only a finite number of offset syncs in-memory, and expires syncs as they become older. After the sync is expired, it is going to be available in the offset syncs topic (as you've shown), but won't be in-memory, and instead the closest earlier sync will be used for translation. At the cost of increased re-delivery/re-processing on the downstream side and changing the offset commit boundaries, this prioritizes the following guarantees: # If the downstream commits offsets for some record, then the upstream must have committed offsets including that record (no data loss when resuming from downstream offsets) # If you watch the progress of the downstream offsets, they increase monotonically (no rewinding downstream offsets) # The in-memory cache is bounded in size and won't cause an OOM The translation cache is optimized for offsets near the end of the topic, and you can see that how the later offsets are closer together and the earlier offsets are farther apart (198765 and 198764 are 1 apart, 196913 and 194098 are 2815 apart). The closer your offsets are to the end of the topic, the less redelivery that consumer group will experience, and at the end of the topic, you can get a "perfect sync". So if you're looking for a zero-re-delivery failover, I would make sure that you stop the producers, let the consumers commit at the end of the topic, wait for MM2 to translate the offsets, and then perform the failover. Unplanned failovers will nearly always include data re-delivery because MM2 does asynchronous replication, so your application should be tolerant of that. This has been changing since KAFKA-12468, so if you're only seeing this after an upgrade, that would explain why. > Mirrormaker2 wrong checkpoints > -- > > Key: KAFKA-16291 > URL: https://issues.apache.org/jira/browse/KAFKA-16291 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.6.1 > Environment: Mirrormaker2 version 3.6.1 running on docker containers >Reporter: Claudio Benfatto >Priority: Major > > I am running Mirrormaker2 with the following configuration: > {noformat} > clusters = fallingwaterfall, weatheredbase > sync.group.offsets.interval.seconds=30 > emit.checkpoints.interval.seconds=30 > offset.lag.max=0 > fallingwaterfall->weatheredbase.enabled = true > weatheredbase->fallingwaterfall.enabled = false > sync.group.offsets.enabled=true > emit.heartbeats.enabled=true > emit.checkpoints.