Roman created KAFKA-9438:
----------------------------

             Summary: Issue with mm2 active/active replication
                 Key: KAFKA-9438
                 URL: https://issues.apache.org/jira/browse/KAFKA-9438
             Project: Kafka
          Issue Type: Bug
          Components: mirrormaker
    Affects Versions: 2.4.0
            Reporter: Roman


Hi,

 

i am trying to configure the the active/active with new kafka 2.4.0 and MM2.

I have 2 kafka clusters of 3 kafkas, one in lets say BO and on in IN.

In each cluster there are 3 kafkas.

Topics are replicated properly so in BO i see
{quote}topics

in.topics
{quote}
 

in IN i see
{quote}topics

bo.topic
{quote}
 

That should be according to documentation.

 

But when I stop the replication process on one data center and start it up, the 
replication replicate the topics with the same prefix twice bo.bo.topics or 
in.in.topics depending on what connector i restart.

I have also blacklisted the topics but they are still replicating.

 

bo.properties file
{quote}name = in-bo
#topics = .*
topics.blacklist = "bo.*"
#groups = .*
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 10

source.cluster.alias = in
target.cluster.alias = bo
source.cluster.bootstrap.servers = IN-kafka1:9092,IN-kafka2:9092,IN-kafka3:9092
target.cluster.bootstrap.servers = BO-kafka1:9092,BO-kafka2:9092,bo-kafka39092


# use ByteArrayConverter to ensure that records are not re-encoded
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

 
{quote}
in.properties
{quote}name = bo-in
#topics = .*
topics.blacklist = "in.*"
#groups = .*
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 10

source.cluster.alias = bo
target.cluster.alias = in
target.cluster.bootstrap.servers = IN-kafka1:9092,IN-kafka2:9092,IN-kafka3:9092
source.cluster.bootstrap.servers = BO-kafka1:9092,BO-kafka2:9092,BO-kafka3:9092


# use ByteArrayConverter to ensure that records are not re-encoded
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter
{quote}
 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to