[ 
https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819522#comment-17819522
 ] 

Claudio Benfatto commented on KAFKA-16291:
------------------------------------------

In terms of Mirrormaker features, what would be great to have is a more 
customisable behaviour for the *OffsetSyncStore.*
What I'm thinking about is the possibility to keep information, in particular 
when *offset.lag.max=0,*  about the last *N* OffsetSync records (configurable) 
in a FIFO data structure. And to emit a metric with the last offset translated.

This way we could always guarantee exact offset translation within certain 
boundaries. The contract with the consumers in this case would be:
 * Offset translation guarantees zero-redelivery when *offset.lag.max=0* and 
within a consumer lag of less than *N*
 * No offset translation happens when we cannot guarantee that it is exact 
(maps 1:1 with an upstream offset)
 * We can check the last offset for which we had an exact translation via the 
value of a metric
 * Memory is still bounded in size and its size depends on the value of *N* and 
the number of topics being replicated
 * Not sure about performance when looking up for the index tbh

This way we could have more predictable behaviour when performing topic 
migration from an upstream to a downstream cluster, which I believe is a very 
common use case.

 

 

> Mirrormaker2 wrong checkpoints
> ------------------------------
>
>                 Key: KAFKA-16291
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16291
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.6.1
>         Environment: Mirrormaker2 version 3.6.1 running on docker containers
>            Reporter: Claudio Benfatto
>            Priority: Major
>
> I am running Mirrormaker2 with the following configuration:
> {noformat}
> clusters = fallingwaterfall, weatheredbase
> sync.group.offsets.interval.seconds=30
> emit.checkpoints.interval.seconds=30
> offset.lag.max=0
> fallingwaterfall->weatheredbase.enabled = true
> weatheredbase->fallingwaterfall.enabled = false
> sync.group.offsets.enabled=true
> emit.heartbeats.enabled=true
> emit.checkpoints.enabled=true
> emit.checkpoints.interval.seconds=30
> refresh.groups.enabled=true
> refresh.groups.interval.seconds=30
> refresh.topics.enabled=true
> sync.topic.configs.enabled=true
> refresh.topics.interval.seconds=30
> sync.topic.acls.enabled = false
> fallingwaterfall->weatheredbase.topics = storage-demo-.*
> fallingwaterfall->weatheredbase.groups = storage-demo-.*
> group.id=mirror-maker-fallingwaterfall-weatheredbase
> consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase
> fallingwaterfall.consumer.isolation.level = read_committed
> weatheredbase.producer.enable.idempotence = true
> weatheredbase.producer.acks=all
> weatheredbase.exactly.once.source.support = enabled
> replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
> {noformat}
> I am experiencing issues with the consumer group offset synchronisation.
> I have a setup with a 12-partition topic, named *storage-demo-test,* a single 
> transactional producer to this topic and a consumer group, named 
> *storage-demo-test-cg,* consuming from it.
> The consumer configuration is:
> {code:java}
> 'auto.offset.reset': 'earliest',
> 'isolation.level': 'read_committed',
> 'enable.auto.commit': False, {code}
> and I'm committing the offsets explicitly and synchronously after each poll.
> What I observed is that the synchronised offsets between the upstream and 
> downstream cluster for the *storage-demo-test-cg* are often wrong.
> For example in the case of this checkpoint:
> {code:java}
> (1, 1708505669764) - 6252 - 
> CheckpointKey(consumer_group='storage-demo-test-cg', 
> topic='storage-demo-test', partition=5) - 
> CheckpointValue(upstream_offset=197532, downstream_offset=196300) {code}
> We have a mismatch in the replicated messages:
> {code:java}
> [fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1
> Test message 1027-0 {code}
> {code:java}
> [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1
> Test message 1015-9 {code}
> In the Mirrormaker2 logs I see many of these messages:
> {code:java}
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:18,534] TRACE [MirrorCheckpointConnector|task-0] 
> latestDownstreamOffset 196300 is larger than or equal to 
> convertedUpstreamOffset 196300 for TopicPartition storage-demo-test-5 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] 
> translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): 
> Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, 
> upstreamOffset=196913, downstreamOffset=195683}) 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:01,557] TRACE [MirrorCheckpointConnector|task-0] Skipping 
> Checkpoint{consumerGroupId=storage-demo-test-cg, 
> topicPartition=storage-demo-test-5, upstreamOffset=197532, 
> downstreamOffset=195684, metadata=} (preventing downstream rewind) 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 08:59:00,859] TRACE [MirrorCheckpointConnector|task-0] Skipping 
> Checkpoint{consumerGroupId=storage-demo-test-cg, 
> topicPartition=storage-demo-test-5, upstreamOffset=197532, 
> downstreamOffset=195684, metadata=} (preventing downstream rewind) 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 08:59:00,859] DEBUG [MirrorCheckpointConnector|task-0] 
> translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): 
> Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, 
> upstreamOffset=196913, downstreamOffset=195683}) 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 08:58:40,812] TRACE [MirrorCheckpointConnector|task-0] New sync 
> OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=198765, 
> downstreamOffset=197535} applied, new state is 
> [198765:197535,198764:197534,198762:197532,198761:197531,198753:197523,198739:197509,198717:197487,198673:197443,198585:197355,198497:197267,198321:197091,197617:196387,196913:195683,194098:192868]
>  (org.apache.kafka.connect.mirror.OffsetSyncStore:193)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 08:54:05,030] TRACE [MirrorCheckpointConnector|task-0] New sync 
> OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=197532, 
> downstreamOffset=196302} applied, new state is 
> [197532:196302,197530:196300,197529:196299,197521:196291,197507:196277,197485:196255,197441:196211,197353:196123,197265:196035,196913:195683,196209:194979,195505:194275,194098:192868]
>  (org.apache.kafka.connect.mirror.OffsetSyncStore:193)
> mirrormaker2-fallingwaterfall-weatheredbase-0 - mirrormaker2-server - 
> [2024-02-21 08:54:05,030] TRACE [MirrorSourceConnector|task-0] Sync'd offsets 
> for storage-demo-test-5: 197532==196302 
> (org.apache.kafka.connect.mirror.MirrorSourceTask:251){code}
> And looking in the OffsetSync topic, I see the correct value for the offset 
> sync:
> {code:java}
> (1, 1708505645010) - 3945070 - OffsetSyncKey(topic='storage-demo-test', 
> partition=5) - OffsetSyncValue(upstream_offset=197532, 
> downstream_offset=196302)
>  {code}
> {code:java}
> [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196302 -c 1
> Test message 1027-0 {code}
> So it seems that the offset conversions and checkpoints produced in the 
> *MirrorCheckpointTask* are not matching the information committed to the 
> OffsetSync topic by the *MirrorSourceTask.*
> Please let me know if you need additional info about the setup I'm running or 
> collecting more logs.
> Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to