[ 
https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308308#comment-17308308
 ] 

Alan commented on KAFKA-12468:
------------------------------

I am seeing similar issue as well. I am also using a custom replication policy 
to preserve topic name. I would mirror hundreds of topics and CGs. Most of them 
will mirror correctly except for a few CG:partition.

In this example, group topic 1 has a lag of -16201970 and the offset is 
determined to be 16764337. This is wrong. The offset 16764337 is copied the 
original cluster and not translated. 

 
{noformat}
GROUP TOPIC PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             
CONSUMER-ID     HOST            CLIENT-ID
group topic 20         390239          390239          0               -        
       -               -
group topic 22         494366          494366          0               -        
       -               -
group topic 16         147241          147241          0               -        
       -               -
group topic 18         469795          469795          0               -        
       -               -
group topic 24         835689          835689          0               -        
       -               -
group topic 3          391505          391505          0               -        
       -               -
group topic 5          194327          194327          0               -        
       -               -
group topic 1          16764337        562367          -16201970       -        
       -               -
group topic 11         913398          913398          0               -        
       -               -
group topic 13         1245835         1245835         0               -        
       -               -
group topic 7          52007           52007           0               -        
       -               -
group topic 9          1001964         1001964         0               -        
       -               -
group topic 19         1035791         1035791         0               -        
       -               -
group topic 21         456149          456149          0               -        
       -               -
group topic 15         696             696             0               -        
       -               -
group topic 17         225085          225085          0               -        
       -               -
group topic 23         622744          622744          0               -        
       -               -
group topic 4          777787          777787          0               -        
       -               -
group topic 6          286576          286576          0               -        
       -               -
group topic 0          1233042         1233042         0               -        
       -               -
group topic 2          1118624         1118624         0               -        
       -               -
group topic 12         693             693             0               -        
       -               -
group topic 14         283294          283294          0               -        
       -               -
group topic 8          924899          924899          0               -        
       -               -
group topic 10         494636          494636          0               -        
       -               -
{noformat}
 

> Initial offsets are copied from source to target cluster
> --------------------------------------------------------
>
>                 Key: KAFKA-12468
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12468
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 2.7.0
>            Reporter: Bart De Neuter
>            Priority: Major
>
> We have an active-passive setup where  the 3 connectors from mirror maker 2 
> (heartbeat, checkpoint and source) are running on a dedicated Kafka connect 
> cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it 
> seems the offsets from the source cluster are initially copied to the target 
> cluster without translation. This causes a negative lag for all synced 
> consumer groups. Only when we reset the offsets for each topic/partition on 
> the target cluster and produce a record on the topic/partition in the source, 
> the sync starts working correctly. 
> I would expect that the consumer groups are synced but that the current 
> offsets of the source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>  
> {code:xml}
> {
>   "name": "mm2-mirror-heartbeat",
>   "config": {
>     "name": "mm2-mirror-heartbeat",
>     "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "1",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
> Checkpoint connector:
> {code:xml}
> {
>   "name": "mm2-mirror-checkpoint",
>   "config": {
>     "name": "mm2-mirror-checkpoint",
>     "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "40",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  Source connector:
> {code:xml}
> {
>   "name": "mm2-mirror-source",
>   "config": {
>     "name": "mm2-mirror-source",
>     "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "40",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to