[
https://issues.apache.org/jira/browse/KAFKA-7434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ewen Cheslack-Postava resolved KAFKA-7434.
------------------------------------------
Resolution: Fixed
Fix Version/s: 2.1.0
2.0.1
Issue resolved by pull request 5700
[https://github.com/apache/kafka/pull/5700]
> DeadLetterQueueReporter throws NPE if transform throws NPE
> ----------------------------------------------------------
>
> Key: KAFKA-7434
> URL: https://issues.apache.org/jira/browse/KAFKA-7434
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 2.0.0
> Environment: jdk 8
> Reporter: Michal Borowiecki
> Assignee: Michal Borowiecki
> Priority: Major
> Fix For: 2.0.1, 2.1.0
>
>
> A NPE thrown from a transform in a connector configured with
> errors.deadletterqueue.context.headers.enable=true
> causes DeadLetterQueueReporter to break with a NPE.
> {code}
> Executing stage 'TRANSFORMATION' with class
> 'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is
> {topic='****', partition=1, offset=0, timestamp=1537370573366,
> timestampType=CreateTime}.
> (org.apache.kafka.connect.runtime.errors.LogReporter)
> java.lang.NullPointerException
> Task threw an uncaught and unrecoverable exception
> (org.apache.kafka.connect.runtime.WorkerTask)
> java.lang.NullPointerException
> at
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)
> at
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)
> at
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)
> at
> org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
> at
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
> at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>
> This is caused by populateContextHeaders only checking if the Throwable is
> not null, but not checking that the message in the Throwable is not null
> before trying to serialize the message:
> [https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177]
> {code:java}
> if (context.error() != null) {
> headers.add(ERROR_HEADER_EXCEPTION,
> toBytes(context.error().getClass().getName()));
> headers.add(ERROR_HEADER_EXCEPTION_MESSAGE,
> toBytes(context.error().getMessage()));
> {code}
> toBytes throws an NPE if passed null as the parameter.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)