Hi Team, I am using a mirror maker 2 to replicate a source kafka cluster to a target cluster. I started with kafka_2.13-2.6.0 on destination and their it was working properly as expected replicating everything but then on destination cluster we are trying kafka_2.13-3.0.0 and after this when i am trying to start mm2 with same properties file on destination cluster then it is giving me below error and i can see few topics are created on destination cluster(__consumer_offsets / heartbeats / mm2-configs.source.internal / source.checkpoints.internal etc..)
ERROR [MirrorSourceConnector|worker] Scheduler for MirrorSourceConnector caught exception in scheduled task: refreshing topics (org.apache.kafka.connect.mirror.Scheduler:102) java.lang.NullPointerException at org.apache.kafka.connect.mirror.MirrorSourceConnector.isCycle(MirrorSourceConnector.java:496) at org.apache.kafka.connect.mirror.MirrorSourceConnector.shouldReplicateTopic(MirrorSourceConnector.java:475) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174) at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.kafka.connect.mirror.MirrorSourceConnector.findSourceTopicPartitions(MirrorSourceConnector.java:196) at org.apache.kafka.connect.mirror.MirrorSourceConnector.refreshTopicPartitions(MirrorSourceConnector.java:218) at org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93) at org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112) at org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeatingDelayed$1(Scheduler.java:57) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ERROR [MirrorSourceConnector|worker] Scheduler for MirrorSourceConnector caught exception in scheduled task: refreshing topics (org.apache.kafka.connect.mirror.Scheduler:102) java.lang.NullPointerException at org.apache.kafka.connect.mirror.MirrorSourceConnector.isCycle(MirrorSourceConnector.java:496) at org.apache.kafka.connect.mirror.MirrorSourceConnector.shouldReplicateTopic(MirrorSourceConnector.java:475) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174) at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.kafka.connect.mirror.MirrorSourceConnector.findSourceTopicPartitions(MirrorSourceConnector.java:196) at org.apache.kafka.connect.mirror.MirrorSourceConnector.refreshTopicPartitions(MirrorSourceConnector.java:218) at org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93) at org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112) at org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeatingDelayed$1(Scheduler.java:57) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) -- =================================== *mm2.properties config :-* # Kafka datacenters. # https://dzone.com/articles/mirror-maker-v20 #Clusters have been given explicit names. source and target respectively. clusters = source, target source.bootstrap.servers = 10.*.*.1:9092, 10.*.*.2:9092, 10.*.*.1:9092 target.bootstrap.servers = 10.*.*.01:9092, 10.*.*.02:9092, 10.*.*.03:9092 # Custom Replication Policy for ignoring alias in topic name replication.policy.class=com.games24x7.mirrormaker2.PrefixlessReplicationPolicy source.config.storage.replication.factor = 1 target.config.storage.replication.factor = 1 source.offset.storage.replication.factor = 1 target.offset.storage.replication.factor = 1 source.status.storage.replication.factor = 1 target.status.storage.replication.factor = 1 source->target.enabled = true target->source.enabled = false offset-syncs.topic.replication.factor = 1 heartbeats.topic.replication.factor = 1 checkpoints.topic.replication.factor = 1 topics = .* #topics = test-mm2.* groups = .* tasks.max = 3 replication.factor = 3 refresh.topics.enabled = true sync.topic.configs.enabled = true refresh.topics.interval.seconds = 30 topics.blacklist = .*[\-\.]internal, .*\.replica groups.blacklist = console-consumer-.*, connect-.*, __.* # Enable heartbeats and checkpoints. source->target.emit.heartbeats.enabled = true source->target.emit.checkpoints.enabled = true -- =================================== Regards, Anup Tiwari