Jacopo created KAFKA-13985:
------------------------------

             Summary: MirrorSourceTask commitRecord throws NPE if SMT is 
filtering out source record
                 Key: KAFKA-13985
                 URL: https://issues.apache.org/jira/browse/KAFKA-13985
             Project: Kafka
          Issue Type: Bug
          Components: mirrormaker
    Affects Versions: 3.2.0, 3.1.0
            Reporter: Jacopo


Applying a SMT that filter message it can laverage to enter in this if:

 

>From WorkerSourceTask.java

 
{code:java}
final SourceRecord record = transformationChain.apply(preTransformRecord);
final ProducerRecord<byte[], byte[]> producerRecord = 
convertTransformedRecord(record);
if (producerRecord == null || retryWithToleranceOperator.failed()) {
    counter.skipRecord();
    commitTaskRecord(preTransformRecord, null);
    continue;
} {code}
 

That bring to:

>From MirrorSourceTask.java

 
{code:java}
    @Override
    public void commitRecord(SourceRecord record, RecordMetadata metadata) {
        try {
            if (stopping) {
                return;
            }
            if (!metadata.hasOffset()) {
                log.error("RecordMetadata has no offset -- can't sync offsets 
for {}.", record.topic());
                return;
            }

...{code}
 

Causing a NPE because metadata is null, this the exception.
{code:java}
java.lang.NullPointerException
    at 
org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
    at 
org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
    at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
    at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
    at java.base/java.lang.Thread.run(Unknown Source) {code}
In my understanding this is well handled and it does not have negative impacts 
because it's handled by MirrorSourceTask.commitRecord, without leaving the 
exception be forwarded outside of it. 

 

But probably is preferred handle it checking if metadata != null.

So skipping commit but safely and silently

 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to