Bart De Neuter created KAFKA-12468:
--------------------------------------
Summary: Initial offsets are copied from source to target cluster
Key: KAFKA-12468
URL: https://issues.apache.org/jira/browse/KAFKA-12468
Project: Kafka
Issue Type: Bug
Components: mirrormaker
Affects Versions: 2.7.0
Reporter: Bart De Neuter
We have an active-passive setup where the 3 connectors from mirror maker 2
(heartbeat, checkpoint and source) are running on a dedicated Kafka connect
cluster on the target cluster.
Offset syncing is enabled as specified by KIP-545. But when activated, it seems
the offsets from the source cluster are initially copied to the target cluster
without translation. This causes a negative lag for all synced consumer groups.
Only when we reset the offsets for each topic/partition on the target cluster
and produce a record on the topic/partition in the source, the sync starts
working correctly.
I would expect that the consumer groups are synced but that the current offsets
of the source cluster are not copied to the target cluster.
This is the configuration we are currently using:
Heartbeat connector
{code:xml}
{
"name": "mm2-mirror-heartbeat",
"config": {
"name": "mm2-mirror-heartbeat",
"connector.class":
"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"source.cluster.alias": "eventador",
"target.cluster.alias": "msk",
"source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
"target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
"topics": ".*",
"groups": ".*",
"tasks.max": "1",
"replication.policy.class": "CustomReplicationPolicy",
"sync.group.offsets.enabled": "true",
"sync.group.offsets.interval.seconds": "5",
"emit.checkpoints.enabled": "true",
"emit.checkpoints.interval.seconds": "30",
"emit.heartbeats.interval.seconds": "30",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
{code}
Checkpoint connector:
{code:xml}
{
"name": "mm2-mirror-checkpoint",
"config": {
"name": "mm2-mirror-checkpoint",
"connector.class":
"org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"source.cluster.alias": "eventador",
"target.cluster.alias": "msk",
"source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
"target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
"topics": ".*",
"groups": ".*",
"tasks.max": "40",
"replication.policy.class": "CustomReplicationPolicy",
"sync.group.offsets.enabled": "true",
"sync.group.offsets.interval.seconds": "5",
"emit.checkpoints.enabled": "true",
"emit.checkpoints.interval.seconds": "30",
"emit.heartbeats.interval.seconds": "30",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
{code}
Source connector:
{code:xml}
{
"name": "mm2-mirror-source",
"config": {
"name": "mm2-mirror-source",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "eventador",
"target.cluster.alias": "msk",
"source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
"target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
"topics": ".*",
"groups": ".*",
"tasks.max": "40",
"replication.policy.class": "CustomReplicationPolicy",
"sync.group.offsets.enabled": "true",
"sync.group.offsets.interval.seconds": "5",
"emit.checkpoints.enabled": "true",
"emit.checkpoints.interval.seconds": "30",
"emit.heartbeats.interval.seconds": "30",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)