[ https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825467#comment-17825467 ]
Greg Harris commented on KAFKA-16291: ------------------------------------- [~claudio.benfatto] That's a good idea, I agree that the default behavior isn't good enough in all scenarios and a configuration is needed. Since it includes a user configuration, and needs significant design work, this will need a KIP. I've opened KAFKA-16364 to track the work there, and you're welcome to assign yourself and draft a KIP. But just to temper your expectations here: > Offset translation guarantees zero-redelivery This is not possible given the asynchronous pattern used for offset translation. I think this can be true in an eventual-consistency sense: If the upstream consumer group is inactive for sufficiently long enough (and lag < N), then translation could be exact. We can also use this as an opportunity to design an alternative to offset.lag.max=0 and the mirror source sync send semaphore[!] because even with a 100% retention solution on the MirrorCheckpointTask side, the MirrorSourceTask still drops syncs occasionally. > Mirrormaker2 wrong checkpoints > ------------------------------ > > Key: KAFKA-16291 > URL: https://issues.apache.org/jira/browse/KAFKA-16291 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 3.6.1 > Environment: Mirrormaker2 version 3.6.1 running on docker containers > Reporter: Claudio Benfatto > Priority: Major > > I am running Mirrormaker2 with the following configuration: > {noformat} > clusters = fallingwaterfall, weatheredbase > sync.group.offsets.interval.seconds=30 > emit.checkpoints.interval.seconds=30 > offset.lag.max=0 > fallingwaterfall->weatheredbase.enabled = true > weatheredbase->fallingwaterfall.enabled = false > sync.group.offsets.enabled=true > emit.heartbeats.enabled=true > emit.checkpoints.enabled=true > emit.checkpoints.interval.seconds=30 > refresh.groups.enabled=true > refresh.groups.interval.seconds=30 > refresh.topics.enabled=true > sync.topic.configs.enabled=true > refresh.topics.interval.seconds=30 > sync.topic.acls.enabled = false > fallingwaterfall->weatheredbase.topics = storage-demo-.* > fallingwaterfall->weatheredbase.groups = storage-demo-.* > group.id=mirror-maker-fallingwaterfall-weatheredbase > consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase > fallingwaterfall.consumer.isolation.level = read_committed > weatheredbase.producer.enable.idempotence = true > weatheredbase.producer.acks=all > weatheredbase.exactly.once.source.support = enabled > replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy > {noformat} > I am experiencing issues with the consumer group offset synchronisation. > I have a setup with a 12-partition topic, named *storage-demo-test,* a single > transactional producer to this topic and a consumer group, named > *storage-demo-test-cg,* consuming from it. > The consumer configuration is: > {code:java} > 'auto.offset.reset': 'earliest', > 'isolation.level': 'read_committed', > 'enable.auto.commit': False, {code} > and I'm committing the offsets explicitly and synchronously after each poll. > What I observed is that the synchronised offsets between the upstream and > downstream cluster for the *storage-demo-test-cg* are often wrong. > For example in the case of this checkpoint: > {code:java} > (1, 1708505669764) - 6252 - > CheckpointKey(consumer_group='storage-demo-test-cg', > topic='storage-demo-test', partition=5) - > CheckpointValue(upstream_offset=197532, downstream_offset=196300) {code} > We have a mismatch in the replicated messages: > {code:java} > [fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1 > Test message 1027-0 {code} > {code:java} > [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1 > Test message 1015-9 {code} > In the Mirrormaker2 logs I see many of these messages: > {code:java} > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 09:02:18,534] TRACE [MirrorCheckpointConnector|task-0] > latestDownstreamOffset 196300 is larger than or equal to > convertedUpstreamOffset 196300 for TopicPartition storage-demo-test-5 > (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] > translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): > Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, > upstreamOffset=196913, downstreamOffset=195683}) > (org.apache.kafka.connect.mirror.OffsetSyncStore:160) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 09:02:01,557] TRACE [MirrorCheckpointConnector|task-0] Skipping > Checkpoint{consumerGroupId=storage-demo-test-cg, > topicPartition=storage-demo-test-5, upstreamOffset=197532, > downstreamOffset=195684, metadata=} (preventing downstream rewind) > (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 08:59:00,859] TRACE [MirrorCheckpointConnector|task-0] Skipping > Checkpoint{consumerGroupId=storage-demo-test-cg, > topicPartition=storage-demo-test-5, upstreamOffset=197532, > downstreamOffset=195684, metadata=} (preventing downstream rewind) > (org.apache.kafka.connect.mirror.MirrorCheckpointTask:218) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 08:59:00,859] DEBUG [MirrorCheckpointConnector|task-0] > translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): > Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, > upstreamOffset=196913, downstreamOffset=195683}) > (org.apache.kafka.connect.mirror.OffsetSyncStore:160) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 08:58:40,812] TRACE [MirrorCheckpointConnector|task-0] New sync > OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=198765, > downstreamOffset=197535} applied, new state is > [198765:197535,198764:197534,198762:197532,198761:197531,198753:197523,198739:197509,198717:197487,198673:197443,198585:197355,198497:197267,198321:197091,197617:196387,196913:195683,194098:192868] > (org.apache.kafka.connect.mirror.OffsetSyncStore:193) > mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - > [2024-02-21 08:54:05,030] TRACE [MirrorCheckpointConnector|task-0] New sync > OffsetSync{topicPartition=storage-demo-test-5, upstreamOffset=197532, > downstreamOffset=196302} applied, new state is > [197532:196302,197530:196300,197529:196299,197521:196291,197507:196277,197485:196255,197441:196211,197353:196123,197265:196035,196913:195683,196209:194979,195505:194275,194098:192868] > (org.apache.kafka.connect.mirror.OffsetSyncStore:193) > mirrormaker2-fallingwaterfall-weatheredbase-0 - mirrormaker2-server - > [2024-02-21 08:54:05,030] TRACE [MirrorSourceConnector|task-0] Sync'd offsets > for storage-demo-test-5: 197532==196302 > (org.apache.kafka.connect.mirror.MirrorSourceTask:251){code} > And looking in the OffsetSync topic, I see the correct value for the offset > sync: > {code:java} > (1, 1708505645010) - 3945070 - OffsetSyncKey(topic='storage-demo-test', > partition=5) - OffsetSyncValue(upstream_offset=197532, > downstream_offset=196302) > {code} > {code:java} > [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196302 -c 1 > Test message 1027-0 {code} > So it seems that the offset conversions and checkpoints produced in the > *MirrorCheckpointTask* are not matching the information committed to the > OffsetSync topic by the *MirrorSourceTask.* > Please let me know if you need additional info about the setup I'm running or > collecting more logs. > Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)