[ 
https://issues.apache.org/jira/browse/KAFKA-14545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Solidum updated KAFKA-14545:
----------------------------------
    Description: 
MirrorTaskConnector looks like it's throwing a NullPointerException when a 
consumer group hasn't consumed from all topics from a partition. This blocks 
the syncing of consumer group offsets to the target cluster. The stacktrace and 
error message is as follows:
{code:java}
WARN Failure polling consumer state for checkpoints. 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask)
at java.base/java.lang.Thread.run(Thread.java:829)Dec 20
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346)
at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.poll(MirrorCheckpointTask.java:142)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.sourceRecordsForGroup(MirrorCheckpointTask.java:160)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpointsForGroup(MirrorCheckpointTask.java:177)
at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at 
java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764)
at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.lambda$checkpointsForGroup$2(MirrorCheckpointTask.java:174)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpoint(MirrorCheckpointTask.java:191)
java.lang.NullPointerException
 {code}
This seems to happen if the OffsetFetch call returns a 
OffsetFetchPartitionResponsePartition with a negative commitedOffset. 
Mirrormaker should handle this case more gracefully and still be sync over 
consumer offsets for non negative partitions.
{code:java}
TRACE [AdminClient clientId=adminclient-55] 
Call(callName=offsetFetch(api=OFFSET_FETCH), deadlineMs=1671657869539, tries=0, 
nextAllowedTryMs=0) got response OffsetFetchResponseData(throttleTimeMs=0, 
topics=[OffsetFetchResponseTopic(name='XXX', 
partitions=[OffsetFetchResponsePartition(partitionIndex=1, committedOffset=866, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=0, committedOffset=865, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=9, committedOffset=868, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=14, committedOffset=870, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=5, committedOffset=803, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=8, committedOffset=881, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=11, committedOffset=-1, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=4, committedOffset=872, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=7, committedOffset=863, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=10, committedOffset=835, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=13, committedOffset=860, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=12, committedOffset=885, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=3, committedOffset=771, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=6, committedOffset=859, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=2, committedOffset=820, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=15, committedOffset=826, 
committedLeaderEpoch=-1, metadata='', errorCode=0)])], errorCode=0, groups=[]) 
(org.apache.kafka.clients.admin.KafkaAdminClient) {code}

  was:
MirrorTaskConnector looks like it's throwing a NullPointerException when a 
consumer group hasn't consumed from all topics from a partition. This blocks 
the syncing of consumer group offsets to the target cluster. The stacktrace and 
error message is as follows:
{code:java}
WARN Failure polling consumer state for checkpoints. 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask)
at java.base/java.lang.Thread.run(Thread.java:829)Dec 20
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346)
at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.poll(MirrorCheckpointTask.java:142)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.sourceRecordsForGroup(MirrorCheckpointTask.java:160)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpointsForGroup(MirrorCheckpointTask.java:177)
at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at 
java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764)
at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.lambda$checkpointsForGroup$2(MirrorCheckpointTask.java:174)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpoint(MirrorCheckpointTask.java:191)
java.lang.NullPointerException
 {code}


> MirrorCheckpointTask throws NullPointerException when group hasn't consumed 
> from some partitions
> ------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14545
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14545
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.3.0
>            Reporter: Chris Solidum
>            Assignee: Chris Solidum
>            Priority: Major
>
> MirrorTaskConnector looks like it's throwing a NullPointerException when a 
> consumer group hasn't consumed from all topics from a partition. This blocks 
> the syncing of consumer group offsets to the target cluster. The stacktrace 
> and error message is as follows:
> {code:java}
> WARN Failure polling consumer state for checkpoints. 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask)
> at java.base/java.lang.Thread.run(Thread.java:829)Dec 20
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
> at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346)
> at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452)
> at 
> org.apache.kafka.connect.mirror.MirrorCheckpointTask.poll(MirrorCheckpointTask.java:142)
> at 
> org.apache.kafka.connect.mirror.MirrorCheckpointTask.sourceRecordsForGroup(MirrorCheckpointTask.java:160)
> at 
> org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpointsForGroup(MirrorCheckpointTask.java:177)
> at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at 
> java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764)
> at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
> at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at 
> org.apache.kafka.connect.mirror.MirrorCheckpointTask.lambda$checkpointsForGroup$2(MirrorCheckpointTask.java:174)
> at 
> org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpoint(MirrorCheckpointTask.java:191)
> java.lang.NullPointerException
>  {code}
> This seems to happen if the OffsetFetch call returns a 
> OffsetFetchPartitionResponsePartition with a negative commitedOffset. 
> Mirrormaker should handle this case more gracefully and still be sync over 
> consumer offsets for non negative partitions.
> {code:java}
> TRACE [AdminClient clientId=adminclient-55] 
> Call(callName=offsetFetch(api=OFFSET_FETCH), deadlineMs=1671657869539, 
> tries=0, nextAllowedTryMs=0) got response 
> OffsetFetchResponseData(throttleTimeMs=0, 
> topics=[OffsetFetchResponseTopic(name='XXX', 
> partitions=[OffsetFetchResponsePartition(partitionIndex=1, 
> committedOffset=866, committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=0, committedOffset=865, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=9, committedOffset=868, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=14, committedOffset=870, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=5, committedOffset=803, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=8, committedOffset=881, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=11, committedOffset=-1, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=4, committedOffset=872, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=7, committedOffset=863, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=10, committedOffset=835, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=13, committedOffset=860, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=12, committedOffset=885, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=3, committedOffset=771, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=6, committedOffset=859, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=2, committedOffset=820, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=15, committedOffset=826, 
> committedLeaderEpoch=-1, metadata='', errorCode=0)])], errorCode=0, 
> groups=[]) (org.apache.kafka.clients.admin.KafkaAdminClient) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to