[ https://issues.apache.org/jira/browse/KAFKA-7157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589428#comment-16589428 ]
Ian Axelrod commented on KAFKA-7157: ------------------------------------ FWIW, I have encountered this as well and it is blocking production work. The fix for this is simple, but I wonder if there is an opportunity here to create the idea of conditional transformations. I can think of myriad cases where you would want to execute a transformer only when a record / record field has a certain value (or matches regex, ...). One could fix the NPE issue by wrapping TimestampConverter in the conditional transformer executor class and have the condition be that the field value is not null. > Connect TimestampConverter SMT doesn't handle null values > --------------------------------------------------------- > > Key: KAFKA-7157 > URL: https://issues.apache.org/jira/browse/KAFKA-7157 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.10.2.0 > Reporter: Randall Hauch > Priority: Major > > TimestampConverter SMT is not able to handle null values (in any versions), > so it's always trying to apply the transformation to the value. Instead, it > needs to check for null and use the default value for the new schema's field. > {noformat} > [2018-07-03 02:31:52,490] ERROR Task MySourceConnector-2 threw an uncaught > and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) > java.lang.NullPointerException > at > org.apache.kafka.connect.transforms.TimestampConverter$2.toRaw(TimestampConverter.java:137) > > at > org.apache.kafka.connect.transforms.TimestampConverter.convertTimestamp(TimestampConverter.java:440) > > at > org.apache.kafka.connect.transforms.TimestampConverter.applyValueWithSchema(TimestampConverter.java:368) > > at > org.apache.kafka.connect.transforms.TimestampConverter.applyWithSchema(TimestampConverter.java:358) > > at > org.apache.kafka.connect.transforms.TimestampConverter.apply(TimestampConverter.java:275) > > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38) > > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:435) > > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:264) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182) > > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150) > > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > [2018-07-03 02:31:52,491] ERROR Task is being killed and will not recover > until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)