[ https://issues.apache.org/jira/browse/KAFKA-13985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jacopo updated KAFKA-13985: --------------------------- Description: Applying a SMT that filter message it can laverage to enter in this if: >From WorkerSourceTask.java {code:java} final SourceRecord record = transformationChain.apply(preTransformRecord); final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record); if (producerRecord == null || retryWithToleranceOperator.failed()) { counter.skipRecord(); commitTaskRecord(preTransformRecord, null); continue; } {code} That bring to: {color:#cf222e}private{color}{color:#24292f} {color}{color:#cf222e}void{color}{color:#24292f} {color}{color:#8250df}commitTaskRecord{color}{color:#24292f}(SourceRecord {color}{color:#953800}record{color}{color:#24292f}, RecordMetadata {color}{color:#953800}metadata{color}{color:#24292f}) {{color} {color:#24292f} {color}{color:#cf222e}try{color}{color:#24292f} {{color} {color:#24292f} task.{color}{color:#8250df}commitRecord{color}{color:#24292f}(record, metadata);{color} {color:#24292f} } {color}{color:#cf222e}catch{color}{color:#24292f} (Throwable {color}{color:#953800}t{color}{color:#24292f}) {{color} {color:#24292f} log.{color}{color:#8250df}error{color}{color:#24292f}({color}{color:#0a3069}"{} Exception thrown while calling task.commitRecord()"{color}{color:#24292f}, {color}{color:#0550ae}this{color}{color:#24292f}, t);{color} {color:#24292f} }{color} {color:#24292f}}{color} {color:#24292f}And then{color} >From MirrorSourceTask.java {code:java} @Override public void commitRecord(SourceRecord record, RecordMetadata metadata) { try { if (stopping) { return; } if (!metadata.hasOffset()) { log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic()); return; } ...{code} Causing a NPE because metadata is null, this the exception. {code:java} [2022-06-13 12:31:33,094] WARN Failure committing record. (org.apache.kafka.connect.mirror.MirrorSourceTask:190) java.lang.NullPointerException at org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177) at org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) {code} In my understanding this is well handled and it does not have negative impacts because it's handled by MirrorSourceTask.commitRecord, without leaving the exception be forwarded outside of it. But probably is preferred handle it checking if metadata != null. So skipping commit but safely and silently was: Applying a SMT that filter message it can laverage to enter in this if: >From WorkerSourceTask.java {code:java} final SourceRecord record = transformationChain.apply(preTransformRecord); final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record); if (producerRecord == null || retryWithToleranceOperator.failed()) { counter.skipRecord(); commitTaskRecord(preTransformRecord, null); continue; } {code} That bring to: >From MirrorSourceTask.java {code:java} @Override public void commitRecord(SourceRecord record, RecordMetadata metadata) { try { if (stopping) { return; } if (!metadata.hasOffset()) { log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic()); return; } ...{code} Causing a NPE because metadata is null, this the exception. {code:java} [2022-06-13 12:31:33,094] WARN Failure committing record. (org.apache.kafka.connect.mirror.MirrorSourceTask:190) java.lang.NullPointerException at org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177) at org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) {code} In my understanding this is well handled and it does not have negative impacts because it's handled by MirrorSourceTask.commitRecord, without leaving the exception be forwarded outside of it. But probably is preferred handle it checking if metadata != null. So skipping commit but safely and silently > MirrorSourceTask commitRecord throws NPE if SMT is filtering out source record > ------------------------------------------------------------------------------ > > Key: KAFKA-13985 > URL: https://issues.apache.org/jira/browse/KAFKA-13985 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 3.1.0, 3.2.0 > Reporter: Jacopo > Priority: Minor > > Applying a SMT that filter message it can laverage to enter in this if: > > From WorkerSourceTask.java > > {code:java} > final SourceRecord record = transformationChain.apply(preTransformRecord); > final ProducerRecord<byte[], byte[]> producerRecord = > convertTransformedRecord(record); > if (producerRecord == null || retryWithToleranceOperator.failed()) { > counter.skipRecord(); > commitTaskRecord(preTransformRecord, null); > continue; > } {code} > > That bring to: > {color:#cf222e}private{color}{color:#24292f} > {color}{color:#cf222e}void{color}{color:#24292f} > {color}{color:#8250df}commitTaskRecord{color}{color:#24292f}(SourceRecord > {color}{color:#953800}record{color}{color:#24292f}, RecordMetadata > {color}{color:#953800}metadata{color}{color:#24292f}) {{color} > {color:#24292f} {color}{color:#cf222e}try{color}{color:#24292f} > {{color} > {color:#24292f} > task.{color}{color:#8250df}commitRecord{color}{color:#24292f}(record, > metadata);{color} > {color:#24292f} } {color}{color:#cf222e}catch{color}{color:#24292f} > (Throwable {color}{color:#953800}t{color}{color:#24292f}) {{color} > {color:#24292f} > log.{color}{color:#8250df}error{color}{color:#24292f}({color}{color:#0a3069}"{} > Exception thrown while calling task.commitRecord()"{color}{color:#24292f}, > {color}{color:#0550ae}this{color}{color:#24292f}, t);{color} > {color:#24292f} }{color} > {color:#24292f}}{color} > {color:#24292f}And then{color} > From MirrorSourceTask.java > {code:java} > @Override > public void commitRecord(SourceRecord record, RecordMetadata metadata) { > try { > if (stopping) { > return; > } > if (!metadata.hasOffset()) { > log.error("RecordMetadata has no offset -- can't sync offsets > for {}.", record.topic()); > return; > } > ...{code} > > Causing a NPE because metadata is null, this the exception. > {code:java} > [2022-06-13 12:31:33,094] WARN Failure committing record. > (org.apache.kafka.connect.mirror.MirrorSourceTask:190) > java.lang.NullPointerException > at > org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) > at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown > Source) > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > at java.base/java.lang.Thread.run(Unknown Source) {code} > In my understanding this is well handled and it does not have negative > impacts because it's handled by MirrorSourceTask.commitRecord, without > leaving the exception be forwarded outside of it. > > But probably is preferred handle it checking if metadata != null. > So skipping commit but safely and silently > > -- This message was sent by Atlassian Jira (v8.20.7#820007)