[ https://issues.apache.org/jira/browse/KAFKA-10602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Konstantine Karantasis resolved KAFKA-10602. -------------------------------------------- Resolution: Fixed > DLQ Reporter throws NPE when reporting from different thread > ------------------------------------------------------------ > > Key: KAFKA-10602 > URL: https://issues.apache.org/jira/browse/KAFKA-10602 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.6.0 > Reporter: Lev Zemlyanov > Assignee: Tom Bentley > Priority: Major > Fix For: 2.7.0, 2.6.1 > > > If a connector uses separate threads to report errant records using the > ErrantRecordReporter from KIP-610, it can sometimes hit this race condition > and throw an NPE when reporting both serialization errors from a converter > and errors from a connector on separate thread because both use the same > [RetryWithToleranceOperator|https://github.com/apache/kafka/blob/24290de82821d16f7d163d086f5cfa88cec2b976/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L92-L94] > in the > [reporters|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L635], > which results in the NPE because the error can get reset to `null`. > The `WorkerErrantRecordReporter::report` needs to be made threadsafe > otherwise it can conflict with the DLQ reporter. > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.stacktrace(DeadLetterQueueReporter.java:187) > at > org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:177) > at > org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:149) > at > org.apache.kafka.connect.runtime.errors.ProcessingContext.lambda$report$0(ProcessingContext.java:151) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1624) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) > at > org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:153) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:126) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:498) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) > at java.base/java.lang.Thread.run(Thread.java:832) {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)