[ 
https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392914#comment-17392914
 ] 

Alexis Josephides commented on KAFKA-12468:
-------------------------------------------

Thanks for the suggestions and apologies for the delay in updating how we 
handled this issue in the end.
Should say from the outset that we did not completely remove this issue but we 
minimised the occurrences, fixed some and in the remainder - lived with it.
The first step was minimisation. We achieved this via the phasing of turning on 
our connectors. The first connector we applied was the `Source` connector. For 
our setup we had a number of source connectors - some set to replicate from 
`latest` and others from `earliest`. We let this connector run and replicate 
until we hit a steady state and all replication was confirmed to be at the head 
of their relevant topic. This soak could be a few days depending on your data 
volumes, throughputs (client limits) etc.....
Once the soak has completed we then turned on the Checkpoint connector.

If there are negative offsets after this first step we then took steps to 
manage them. There are 2 categories here. Partitions that have data on them and 
partitions that have no data on them.
In the first instance (data on partitions) the first thing we try is to 
`delete` the affected consumer group. This is absolutely fine to do as a) no 
consumers on the target cluster yet, b) the group is replicated again by MM2.
In 90% of instances the negative offset was corrected.

In the second instance (no data on partitions) the first thing we examined is 
whether we could publish data (on source cluster) onto the topic to put data 
onto the partition. This was then followed by a refresh (delete) of the 
affected consumer group. This was possible only if the downstream consumer 
handled either dummy garbage messages ok or was fine with a small number of 
duplicate messages.

What if following the above a negative offset remained?
In the instance where there was zero data on a partition and no new data could 
be published to it we let the consumer migrate onto the target cluster without 
much worry. The Kafka consumer behaviour at this point would look at a negative 
offset and throw a warning that it was out of range. It would then reset it's 
offset on the cluster to its default setting - either consumer from `latest` or 
`earliest`. Since there is 0 data on that partition this is one and the same 
thing.

For instances (rare but did occur) where there remained a negative offset and 
data on the partition we still migrated and relied on the consumer behaviour to 
reset its offset to either `earliest` or `latest`. Depending on the consumer 
and it's use case we picked whichever best suited the scenario.

Hope this is helpful in some way to others that might be experiencing these 
issues.

> Initial offsets are copied from source to target cluster
> --------------------------------------------------------
>
>                 Key: KAFKA-12468
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12468
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 2.7.0
>            Reporter: Bart De Neuter
>            Priority: Major
>
> We have an active-passive setup where  the 3 connectors from mirror maker 2 
> (heartbeat, checkpoint and source) are running on a dedicated Kafka connect 
> cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it 
> seems the offsets from the source cluster are initially copied to the target 
> cluster without translation. This causes a negative lag for all synced 
> consumer groups. Only when we reset the offsets for each topic/partition on 
> the target cluster and produce a record on the topic/partition in the source, 
> the sync starts working correctly. 
> I would expect that the consumer groups are synced but that the current 
> offsets of the source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>  
> {code:xml}
> {
>   "name": "mm2-mirror-heartbeat",
>   "config": {
>     "name": "mm2-mirror-heartbeat",
>     "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "1",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
> Checkpoint connector:
> {code:xml}
> {
>   "name": "mm2-mirror-checkpoint",
>   "config": {
>     "name": "mm2-mirror-checkpoint",
>     "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "40",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  Source connector:
> {code:xml}
> {
>   "name": "mm2-mirror-source",
>   "config": {
>     "name": "mm2-mirror-source",
>     "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "40",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to