Hi Team,

I am using a mirror maker 2 to replicate a source kafka cluster to a target
cluster. I started with kafka_2.13-2.6.0 on destination and their it was
working properly as expected replicating everything but then on destination
cluster we are trying kafka_2.13-3.0.0 and after this when i am trying to
start mm2 with same properties file on destination cluster then it is
giving me below error and i can see few topics are created on destination
cluster(__consumer_offsets / heartbeats / mm2-configs.source.internal /
source.checkpoints.internal etc..)


ERROR [MirrorSourceConnector|worker] Scheduler for MirrorSourceConnector
caught exception in scheduled task: refreshing topics
(org.apache.kafka.connect.mirror.Scheduler:102)
java.lang.NullPointerException
        at
org.apache.kafka.connect.mirror.MirrorSourceConnector.isCycle(MirrorSourceConnector.java:496)
        at
org.apache.kafka.connect.mirror.MirrorSourceConnector.shouldReplicateTopic(MirrorSourceConnector.java:475)
        at
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
        at
java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
        at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at
org.apache.kafka.connect.mirror.MirrorSourceConnector.findSourceTopicPartitions(MirrorSourceConnector.java:196)
        at
org.apache.kafka.connect.mirror.MirrorSourceConnector.refreshTopicPartitions(MirrorSourceConnector.java:218)
        at org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
        at
org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
        at
org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeatingDelayed$1(Scheduler.java:57)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
ERROR [MirrorSourceConnector|worker] Scheduler for MirrorSourceConnector
caught exception in scheduled task: refreshing topics
(org.apache.kafka.connect.mirror.Scheduler:102)
java.lang.NullPointerException
        at
org.apache.kafka.connect.mirror.MirrorSourceConnector.isCycle(MirrorSourceConnector.java:496)
        at
org.apache.kafka.connect.mirror.MirrorSourceConnector.shouldReplicateTopic(MirrorSourceConnector.java:475)
        at
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
        at
java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
        at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at
org.apache.kafka.connect.mirror.MirrorSourceConnector.findSourceTopicPartitions(MirrorSourceConnector.java:196)
        at
org.apache.kafka.connect.mirror.MirrorSourceConnector.refreshTopicPartitions(MirrorSourceConnector.java:218)
        at org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
        at
org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
        at
org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeatingDelayed$1(Scheduler.java:57)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)


-- ===================================

*mm2.properties config :-*


# Kafka datacenters.
# https://dzone.com/articles/mirror-maker-v20

#Clusters have been given explicit names. source and target respectively.
clusters = source, target

source.bootstrap.servers = 10.*.*.1:9092, 10.*.*.2:9092, 10.*.*.1:9092
target.bootstrap.servers = 10.*.*.01:9092, 10.*.*.02:9092, 10.*.*.03:9092

# Custom Replication Policy for ignoring alias in topic name
replication.policy.class=com.games24x7.mirrormaker2.PrefixlessReplicationPolicy

source.config.storage.replication.factor = 1
target.config.storage.replication.factor = 1
source.offset.storage.replication.factor = 1
target.offset.storage.replication.factor = 1
source.status.storage.replication.factor = 1
target.status.storage.replication.factor = 1

source->target.enabled = true
target->source.enabled = false

offset-syncs.topic.replication.factor = 1
heartbeats.topic.replication.factor = 1
checkpoints.topic.replication.factor = 1

topics = .*
#topics = test-mm2.*
groups = .*
tasks.max = 3

replication.factor = 3
refresh.topics.enabled = true
sync.topic.configs.enabled = true

refresh.topics.interval.seconds = 30
topics.blacklist = .*[\-\.]internal, .*\.replica
groups.blacklist = console-consumer-.*, connect-.*, __.*
# Enable heartbeats and checkpoints.
source->target.emit.heartbeats.enabled = true
source->target.emit.checkpoints.enabled = true

-- ===================================

Regards,
Anup Tiwari

Reply via email to