[ https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308308#comment-17308308 ]
Alan commented on KAFKA-12468: ------------------------------ I am seeing similar issue as well. I am also using a custom replication policy to preserve topic name. I would mirror hundreds of topics and CGs. Most of them will mirror correctly except for a few CG:partition. In this example, group topic 1 has a lag of -16201970 and the offset is determined to be 16764337. This is wrong. The offset 16764337 is copied the original cluster and not translated. {noformat} GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID group topic 20 390239 390239 0 - - - group topic 22 494366 494366 0 - - - group topic 16 147241 147241 0 - - - group topic 18 469795 469795 0 - - - group topic 24 835689 835689 0 - - - group topic 3 391505 391505 0 - - - group topic 5 194327 194327 0 - - - group topic 1 16764337 562367 -16201970 - - - group topic 11 913398 913398 0 - - - group topic 13 1245835 1245835 0 - - - group topic 7 52007 52007 0 - - - group topic 9 1001964 1001964 0 - - - group topic 19 1035791 1035791 0 - - - group topic 21 456149 456149 0 - - - group topic 15 696 696 0 - - - group topic 17 225085 225085 0 - - - group topic 23 622744 622744 0 - - - group topic 4 777787 777787 0 - - - group topic 6 286576 286576 0 - - - group topic 0 1233042 1233042 0 - - - group topic 2 1118624 1118624 0 - - - group topic 12 693 693 0 - - - group topic 14 283294 283294 0 - - - group topic 8 924899 924899 0 - - - group topic 10 494636 494636 0 - - - {noformat} > Initial offsets are copied from source to target cluster > -------------------------------------------------------- > > Key: KAFKA-12468 > URL: https://issues.apache.org/jira/browse/KAFKA-12468 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 2.7.0 > Reporter: Bart De Neuter > Priority: Major > > We have an active-passive setup where the 3 connectors from mirror maker 2 > (heartbeat, checkpoint and source) are running on a dedicated Kafka connect > cluster on the target cluster. > Offset syncing is enabled as specified by KIP-545. But when activated, it > seems the offsets from the source cluster are initially copied to the target > cluster without translation. This causes a negative lag for all synced > consumer groups. Only when we reset the offsets for each topic/partition on > the target cluster and produce a record on the topic/partition in the source, > the sync starts working correctly. > I would expect that the consumer groups are synced but that the current > offsets of the source cluster are not copied to the target cluster. > This is the configuration we are currently using: > Heartbeat connector > > {code:xml} > { > "name": "mm2-mirror-heartbeat", > "config": { > "name": "mm2-mirror-heartbeat", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>", > "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>", > "topics": ".*", > "groups": ".*", > "tasks.max": "1", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > Checkpoint connector: > {code:xml} > { > "name": "mm2-mirror-checkpoint", > "config": { > "name": "mm2-mirror-checkpoint", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>", > "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>", > "topics": ".*", > "groups": ".*", > "tasks.max": "40", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > Source connector: > {code:xml} > { > "name": "mm2-mirror-source", > "config": { > "name": "mm2-mirror-source", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorSourceConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>", > "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>", > "topics": ".*", > "groups": ".*", > "tasks.max": "40", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)