[
https://issues.apache.org/jira/browse/KAFKA-10602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lev Zemlyanov updated KAFKA-10602:
----------------------------------
Description:
The Elasticsearch connector uses separate threads to report errant records
using the ErrantRecordReporter from KIP-610. It sometimes hit this race
condition and throws an NPE when reporting both serialization errors from a
converter and errors from a connector on separate thread because both use the
same
[RetryWithToleranceOperator|https://github.com/apache/kafka/blob/24290de82821d16f7d163d086f5cfa88cec2b976/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L92-L94]
in the
[reporters|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L635],
which results in the NPE because the error can get reset to `null`.
The `WorkerErrantRecordReporter::report` needs to be made threadsafe otherwise
it can conflict with the DLQ reporter.
{code:java}
java.lang.NullPointerException
at
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.stacktrace(DeadLetterQueueReporter.java:187)
at
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:177)
at
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:149)
at
org.apache.kafka.connect.runtime.errors.ProcessingContext.lambda$report$0(ProcessingContext.java:151)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1624)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at
org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:153)
at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:126)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:498)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832) {code}
was:
The Elasticsearch connector uses separate threads to report errant records
using the ErrantRecordReporter from KIP-610. It sometimes hit this race
condition and throws an NPE when reporting both serialization errors from a
converter and errors from a connector on separate thread because both use the
same
[RetryWithToleranceOperator|https://github.com/apache/kafka/blob/24290de82821d16f7d163d086f5cfa88cec2b976/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L92-L94]
in the
[reporters|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L635],
which results in the NPE because the error can get reset to `null`.
The `WorkerErrantRecordReporter::report` needs to be made threadsafe otherwise
it can conflict with the DLQ reporter.
java.lang.NullPointerException
at
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.stacktrace(DeadLetterQueueReporter.java:187)
at
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:177)
at
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:149)
at
org.apache.kafka.connect.runtime.errors.ProcessingContext.lambda$report$0(ProcessingContext.java:151)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1624)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at
org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:153)
at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:126)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:498)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
> DLQ Reporter throws NPE when reporting from different thread
> ------------------------------------------------------------
>
> Key: KAFKA-10602
> URL: https://issues.apache.org/jira/browse/KAFKA-10602
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 2.6.0
> Reporter: Lev Zemlyanov
> Priority: Major
>
> The Elasticsearch connector uses separate threads to report errant records
> using the ErrantRecordReporter from KIP-610. It sometimes hit this race
> condition and throws an NPE when reporting both serialization errors from a
> converter and errors from a connector on separate thread because both use the
> same
> [RetryWithToleranceOperator|https://github.com/apache/kafka/blob/24290de82821d16f7d163d086f5cfa88cec2b976/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L92-L94]
> in the
> [reporters|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L635],
> which results in the NPE because the error can get reset to `null`.
> The `WorkerErrantRecordReporter::report` needs to be made threadsafe
> otherwise it can conflict with the DLQ reporter.
> {code:java}
> java.lang.NullPointerException
> at
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.stacktrace(DeadLetterQueueReporter.java:187)
> at
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:177)
> at
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:149)
> at
> org.apache.kafka.connect.runtime.errors.ProcessingContext.lambda$report$0(ProcessingContext.java:151)
> at
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1624)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> at
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> at
> org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:153)
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:126)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:498)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
> at java.base/java.lang.Thread.run(Thread.java:832) {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)