[ https://issues.apache.org/jira/browse/KAFKA-14545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Solidum updated KAFKA-14545: ---------------------------------- Description: MirrorTaskConnector looks like it's throwing a NullPointerException when a consumer group hasn't consumed from all topics from a partition. This blocks the syncing of consumer group offsets to the target cluster. The stacktrace and error message is as follows: {code:java} WARN Failure polling consumer state for checkpoints. (org.apache.kafka.connect.mirror.MirrorCheckpointTask) at java.base/java.lang.Thread.run(Thread.java:829)Dec 20 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.poll(MirrorCheckpointTask.java:142) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.sourceRecordsForGroup(MirrorCheckpointTask.java:160) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpointsForGroup(MirrorCheckpointTask.java:177) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.lambda$checkpointsForGroup$2(MirrorCheckpointTask.java:174) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpoint(MirrorCheckpointTask.java:191) java.lang.NullPointerException {code} This seems to happen if the OffsetFetch call returns a OffsetFetchPartitionResponsePartition with a negative commitedOffset. Mirrormaker should handle this case more gracefully and still be sync over consumer offsets for non negative partitions. {code:java} TRACE [AdminClient clientId=adminclient-55] Call(callName=offsetFetch(api=OFFSET_FETCH), deadlineMs=1671657869539, tries=0, nextAllowedTryMs=0) got response OffsetFetchResponseData(throttleTimeMs=0, topics=[OffsetFetchResponseTopic(name='XXX', partitions=[OffsetFetchResponsePartition(partitionIndex=1, committedOffset=866, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=0, committedOffset=865, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=9, committedOffset=868, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=14, committedOffset=870, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=5, committedOffset=803, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=8, committedOffset=881, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=11, committedOffset=-1, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=4, committedOffset=872, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=7, committedOffset=863, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=10, committedOffset=835, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=13, committedOffset=860, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=12, committedOffset=885, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=3, committedOffset=771, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=6, committedOffset=859, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=2, committedOffset=820, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=15, committedOffset=826, committedLeaderEpoch=-1, metadata='', errorCode=0)])], errorCode=0, groups=[]) (org.apache.kafka.clients.admin.KafkaAdminClient) {code} was: MirrorTaskConnector looks like it's throwing a NullPointerException when a consumer group hasn't consumed from all topics from a partition. This blocks the syncing of consumer group offsets to the target cluster. The stacktrace and error message is as follows: {code:java} WARN Failure polling consumer state for checkpoints. (org.apache.kafka.connect.mirror.MirrorCheckpointTask) at java.base/java.lang.Thread.run(Thread.java:829)Dec 20 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.poll(MirrorCheckpointTask.java:142) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.sourceRecordsForGroup(MirrorCheckpointTask.java:160) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpointsForGroup(MirrorCheckpointTask.java:177) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.lambda$checkpointsForGroup$2(MirrorCheckpointTask.java:174) at org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpoint(MirrorCheckpointTask.java:191) java.lang.NullPointerException {code} > MirrorCheckpointTask throws NullPointerException when group hasn't consumed > from some partitions > ------------------------------------------------------------------------------------------------ > > Key: KAFKA-14545 > URL: https://issues.apache.org/jira/browse/KAFKA-14545 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 3.3.0 > Reporter: Chris Solidum > Assignee: Chris Solidum > Priority: Major > > MirrorTaskConnector looks like it's throwing a NullPointerException when a > consumer group hasn't consumed from all topics from a partition. This blocks > the syncing of consumer group offsets to the target cluster. The stacktrace > and error message is as follows: > {code:java} > WARN Failure polling consumer state for checkpoints. > (org.apache.kafka.connect.mirror.MirrorCheckpointTask) > at java.base/java.lang.Thread.run(Thread.java:829)Dec 20 > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346) > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452) > at > org.apache.kafka.connect.mirror.MirrorCheckpointTask.poll(MirrorCheckpointTask.java:142) > at > org.apache.kafka.connect.mirror.MirrorCheckpointTask.sourceRecordsForGroup(MirrorCheckpointTask.java:160) > at > org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpointsForGroup(MirrorCheckpointTask.java:177) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > org.apache.kafka.connect.mirror.MirrorCheckpointTask.lambda$checkpointsForGroup$2(MirrorCheckpointTask.java:174) > at > org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpoint(MirrorCheckpointTask.java:191) > java.lang.NullPointerException > {code} > This seems to happen if the OffsetFetch call returns a > OffsetFetchPartitionResponsePartition with a negative commitedOffset. > Mirrormaker should handle this case more gracefully and still be sync over > consumer offsets for non negative partitions. > {code:java} > TRACE [AdminClient clientId=adminclient-55] > Call(callName=offsetFetch(api=OFFSET_FETCH), deadlineMs=1671657869539, > tries=0, nextAllowedTryMs=0) got response > OffsetFetchResponseData(throttleTimeMs=0, > topics=[OffsetFetchResponseTopic(name='XXX', > partitions=[OffsetFetchResponsePartition(partitionIndex=1, > committedOffset=866, committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=0, committedOffset=865, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=9, committedOffset=868, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=14, committedOffset=870, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=5, committedOffset=803, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=8, committedOffset=881, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=11, committedOffset=-1, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=4, committedOffset=872, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=7, committedOffset=863, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=10, committedOffset=835, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=13, committedOffset=860, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=12, committedOffset=885, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=3, committedOffset=771, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=6, committedOffset=859, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=2, committedOffset=820, > committedLeaderEpoch=-1, metadata='', errorCode=0), > OffsetFetchResponsePartition(partitionIndex=15, committedOffset=826, > committedLeaderEpoch=-1, metadata='', errorCode=0)])], errorCode=0, > groups=[]) (org.apache.kafka.clients.admin.KafkaAdminClient) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)