I’ve found that Kafka Connect never respects the
“target.cluster.bootstrap.servers” configuration in the MirrorMaker2 task
config. It always uses the Kafka Connect broker information instead. Running
Kafka Connect on the source cluster causes an infinite loop of messages read
from the source cluster, then written back to the same topic on the source
cluster when using an IdentityReplicationPolicy. Running Kafka Connect on a
third cluster causes the messages to get written to the Kafka Connect cluster,
not the configured target cluster. Below are the scenarios I tested, and an
example of the Kafka Connect task settings used. The only scenario that
produced the correct result is running Kafka Connect on the target server.
Is this a hard requirement? Am I misunderstanding how the MM2 configs get used
in Kafka Connect? We generally recommend that for MirrorMaker2 applications,
users run Kafka Connect against the “target” Kafka cluster to help minimize
network latency for the producers. However, in some scenarios it makes sense to
run Kafka Connect against the “source” Kafka cluster, or even a third,
unrelated Kafka cluster. This is because we don’t always have control over
topic creation in the source/target clusters, and want MirrorMaker2 to only
replicate data/offsets to / from existing topics.
connect-distributed.properties:
bootstrap.servers=source.broker.address:9092
group.id=demo-loop
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets-demo-loop
offset.storage.replication.factor=3
config.storage.topic=connect-configs-demo-loop
config.storage.replication.factor=3
status.storage.topic=connect-status-demo-loop
status.storage.replication.factor=3
offset.flush.interval.ms=10000
connector.client.config.override.policy=All
Kafka Connect MM2 task config:
{
"name": "mm2-msc",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"replication.policy.class":"com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
"clusters": "msksource,mskdest",
"source.cluster.alias": "msksource",
"target.cluster.alias": "mskdest",
"target.cluster.bootstrap.servers": "target.broker.address:9092",
"source.cluster.bootstrap.servers": "source.broker.address:9092",
"topics": "example-topic",
"tasks.max": "1",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.factor": "3",
"offset-syncs.topic.replication.factor": "3",
"sync.topic.acls.interval.seconds": "600",
"sync.topic.configs.interval.seconds": "600",
"refresh.topics.interval.seconds": "300",
"refresh.groups.interval.seconds": "20",
"producer.enable.idempotence":"true",
"consumer.group.id": "mm2-msc",
"source.cluster.max.poll.records" : "50000",
"source.cluster.receive.buffer.bytes" : "33554432",
"source.cluster.send.buffer.bytes" : "33554432",
"source.cluster.max.partition.fetch.bytes" : "33554432",
"source.cluster.message.max.bytes" : "37755000",
"source.cluster.compression.type" : "gzip",
"source.cluster.max.request.size" : "26214400",
"source.cluster.buffer.memory" : "524288000",
"source.cluster.batch.size" : "524288",
"target.cluster.max.poll.records" : "20000",
"target.cluster.receive.buffer.bytes" : "33554432",
"target.cluster.send.buffer.bytes" : "33554432",
"target.cluster.max.partition.fetch.bytes" : "33554432",
"target.cluster.message.max.bytes" : "37755000",
"target.cluster.compression.type" : "gzip",
"target.cluster.max.request.size" : "26214400",
"target.cluster.buffer.memory" : "524288000",
"target.cluster.batch.size" : "52428"
}
Test
Kafka Connect Server
Kafka Connect/ MM2 Version
Offset Sync Location
Source Cluster Version
Target Cluster Version
Result
Control (BNSF config)
Source
3.8.1
Source
3.5.1
2.7.0
Infinite loop
1
Source
3.8.1
Target
3.5.1
2.7.0
Infinite loop
2
Source
3.8.1
Source
3.5.1
3.5.1
Infinite loop
3
Source
3.9.0
Source
3.5.1
3.5.1
Infinite loop
4
Source
3.7.1
Source
3.5.1
3.5.1
Infinite loop
5
Source
3.6.0
Source
3.5.1
3.5.1
Infinite loop
6
Source
2.7.1
Source
3.5.1
3.5.1
Infinite loop
7
Source
3.8.1
Source, and tested with various other config changes (e.g. varying how
target.cluster.bootstrap.servers setting is provided)
3.5.1
3.5.1
Infinite loop
8
A Third MSK Server
3.8.1
Source
3.5.1
3.5.1
Data replicated to the third server, not the target OR source
9
Target
3.8.1
Source
3.5.1
3.5.1
Correct behavior (source replicated to target)