[ https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17316380#comment-17316380 ]
Angelos Kaltsikis commented on KAFKA-12468: ------------------------------------------- Hello again, We have managed to set Mirrormaker2 properly considering this bug which requires the SourceConnector tasks.max to be a number that the division of total partition number / tasks.max < 10. Also, we have introduced some configurations which offer performance optimizations according to this [blog|https://wmclane.medium.com/how-to-optimize-mirrormaker2-for-high-performance-apache-kafka-replication-697bc5089c]. We were trying to prove that the MM2 successfully has copied all the data (every topic message & correct consumer group offsets) from the source cluster to the target, but without great success. We tried to compare between source & target clusters the following metrics: Log Size per partition on the 2 different clusters but on most of the partitions is different Consumer Lag for each Consumer group- topic - partition combination but still all of them have a negative consumer lag. Only when we reset the offsets (as mentioned in this ticket's description) the consumer group lag in the target cluster start being in a better state (not being negative) LogEndOffset - LogStartOffset . For topics with cleanup.policy delete the subtraction gives pretty much the same numbers on the different clusters. Is there *any better way* to verify that MM2 has caught up & continues the mirroring correctly? What worries us most is that for some topics we observed that in the target cluster for some partitions we have a much smaller log size. We compared the messages in some of those partitions and indeed it seems that they were fewer messages in the target cluster’s topic partition. Our main question is does MM2 offer at least once guarantee = no messages lost? > Initial offsets are copied from source to target cluster > -------------------------------------------------------- > > Key: KAFKA-12468 > URL: https://issues.apache.org/jira/browse/KAFKA-12468 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 2.7.0 > Reporter: Bart De Neuter > Priority: Major > > We have an active-passive setup where the 3 connectors from mirror maker 2 > (heartbeat, checkpoint and source) are running on a dedicated Kafka connect > cluster on the target cluster. > Offset syncing is enabled as specified by KIP-545. But when activated, it > seems the offsets from the source cluster are initially copied to the target > cluster without translation. This causes a negative lag for all synced > consumer groups. Only when we reset the offsets for each topic/partition on > the target cluster and produce a record on the topic/partition in the source, > the sync starts working correctly. > I would expect that the consumer groups are synced but that the current > offsets of the source cluster are not copied to the target cluster. > This is the configuration we are currently using: > Heartbeat connector > > {code:xml} > { > "name": "mm2-mirror-heartbeat", > "config": { > "name": "mm2-mirror-heartbeat", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>", > "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>", > "topics": ".*", > "groups": ".*", > "tasks.max": "1", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > Checkpoint connector: > {code:xml} > { > "name": "mm2-mirror-checkpoint", > "config": { > "name": "mm2-mirror-checkpoint", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>", > "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>", > "topics": ".*", > "groups": ".*", > "tasks.max": "40", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > Source connector: > {code:xml} > { > "name": "mm2-mirror-source", > "config": { > "name": "mm2-mirror-source", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorSourceConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>", > "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>", > "topics": ".*", > "groups": ".*", > "tasks.max": "40", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)