[ 
https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17316380#comment-17316380
 ] 

Angelos Kaltsikis edited comment on KAFKA-12468 at 4/7/21, 2:26 PM:
--------------------------------------------------------------------

Hello again,

We have managed to set Mirrormaker2 properly considering [KAFKA-12558 
bug|https://issues.apache.org/jira/browse/KAFKA-12558] which requires the 
SourceConnector tasks.max to be a number that the division of total partition 
number / tasks.max < 10.
 Also, we have introduced some configurations which offer performance 
optimizations according to this 
[blog|https://wmclane.medium.com/how-to-optimize-mirrormaker2-for-high-performance-apache-kafka-replication-697bc5089c].
 We were trying to prove that the MM2 successfully has copied all the data 
(every topic message & correct consumer group offsets) from the source cluster 
to the target, but without great success.
 We tried to compare between source & target clusters the following metrics:
 Log Size per partition on the 2 different clusters but on most of the 
partitions is different
 Consumer Lag for each Consumer group- topic - partition combination but still 
all of them have a negative consumer lag. Only when we reset the offsets (as 
mentioned in this ticket's description) the consumer group lag in the target 
cluster start being in a better state (not being negative)
 LogEndOffset - LogStartOffset . For topics with cleanup.policy delete the 
subtraction gives pretty much the same numbers on the different clusters.

Is there *any better way* to verify that MM2 has caught up & continues the 
mirroring correctly?

What worries us most is that for some topics we observed that in the target 
cluster for some partitions we have a much smaller log size. We compared the 
messages in some of those partitions and indeed it seems that they were fewer 
messages in the target cluster’s topic partition. Our main question is does MM2 
offer at least once guarantee = no messages lost?


was (Author: akaltsikis):
Hello again,

We have managed to set Mirrormaker2 properly considering this bug which 
requires the SourceConnector tasks.max to be a number that the division of 
total partition number / tasks.max < 10.
Also, we have introduced some configurations which offer performance 
optimizations according to this 
[blog|https://wmclane.medium.com/how-to-optimize-mirrormaker2-for-high-performance-apache-kafka-replication-697bc5089c].
We were trying to prove that the MM2 successfully has copied all the data 
(every topic message & correct consumer group offsets) from the source cluster 
to the target, but without great success.
We tried to compare between source & target clusters the following metrics:
Log Size per partition on the 2 different clusters but on most of the 
partitions is different
Consumer Lag for each Consumer group- topic - partition combination but still 
all of them have a negative consumer lag. Only when we reset the offsets (as 
mentioned in this ticket's description) the consumer group lag in the target 
cluster start being in a better state (not being negative)
LogEndOffset - LogStartOffset . For topics with cleanup.policy delete the 
subtraction gives pretty much the same numbers on the different clusters.

Is there *any better way* to verify that MM2 has caught up & continues the 
mirroring correctly?

What worries us most is that for some topics we observed that in the target 
cluster for some partitions we have a much smaller log size. We compared the 
messages in some of those partitions and indeed it seems that they were fewer 
messages in the target cluster’s topic partition. Our main question is does MM2 
offer at least once guarantee = no messages lost?

> Initial offsets are copied from source to target cluster
> --------------------------------------------------------
>
>                 Key: KAFKA-12468
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12468
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 2.7.0
>            Reporter: Bart De Neuter
>            Priority: Major
>
> We have an active-passive setup where  the 3 connectors from mirror maker 2 
> (heartbeat, checkpoint and source) are running on a dedicated Kafka connect 
> cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it 
> seems the offsets from the source cluster are initially copied to the target 
> cluster without translation. This causes a negative lag for all synced 
> consumer groups. Only when we reset the offsets for each topic/partition on 
> the target cluster and produce a record on the topic/partition in the source, 
> the sync starts working correctly. 
> I would expect that the consumer groups are synced but that the current 
> offsets of the source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>  
> {code:xml}
> {
>   "name": "mm2-mirror-heartbeat",
>   "config": {
>     "name": "mm2-mirror-heartbeat",
>     "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "1",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
> Checkpoint connector:
> {code:xml}
> {
>   "name": "mm2-mirror-checkpoint",
>   "config": {
>     "name": "mm2-mirror-checkpoint",
>     "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "40",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  Source connector:
> {code:xml}
> {
>   "name": "mm2-mirror-source",
>   "config": {
>     "name": "mm2-mirror-source",
>     "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "40",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to