[ 
https://issues.apache.org/jira/browse/KAFKA-13985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacopo Riciputi updated KAFKA-13985:
------------------------------------
    Description: 
Applying a SMT that filters out messages it can brings to enter in this path:

>From WorkerSourceTask.java
{code:java}
final SourceRecord record = transformationChain.apply(preTransformRecord);
final ProducerRecord<byte[], byte[]> producerRecord = 
convertTransformedRecord(record);
if (producerRecord == null || retryWithToleranceOperator.failed()) {
    counter.skipRecord();
    commitTaskRecord(preTransformRecord, null);
    continue;
} {code}
 

Then to:
{code:java}
private void commitTaskRecord(SourceRecord record, RecordMetadata metadata) {
        try {
            task.commitRecord(record, metadata);
        } catch (Throwable t) {
            log.error("{} Exception thrown while calling task.commitRecord()", 
this, t);
        }
}{code}

Finally
>From MirrorSourceTask.java
{code:java}
    @Override
    public void commitRecord(SourceRecord record, RecordMetadata metadata) {
        try {
            if (stopping) {
                return;
            }
            if (!metadata.hasOffset()) {
                log.error("RecordMetadata has no offset -- can't sync offsets 
for {}.", record.topic());
                return;
            }

...{code}
 
Causing a NPE because metadata is null. 
This the exception.

{code:java}
[2022-06-13 12:31:33,094] WARN Failure committing record. 
(org.apache.kafka.connect.mirror.MirrorSourceTask:190)
java.lang.NullPointerException
    at 
org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
    at 
org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
    at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
    at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
    at java.base/java.lang.Thread.run(Unknown Source) {code}

In my understanding this is well handled and it does not have negative impacts 
because it's handled by MirrorSourceTask.commitRecord, without leaving the 
exception be forwarded outside of it. 

But probably is preferred to handle it checking if metadata != null.
So skipping commit but safely and silently

[EDIT]
Actually, going a bit in deep, there is a small side-effect.

If the latest message elaborated was filtered out (so not committed by 
MirrorSourceTask), if MM2 instance is rebooted, this message will be re-read by 
consumer, because offset was not committed (and probably filtered out if 
configurations wasn't change).

But probably this behavior is fine considering MM2's nature

 

  was:
Applying a SMT that filters out messages it can brings to enter in this path:

>From WorkerSourceTask.java
{code:java}
final SourceRecord record = transformationChain.apply(preTransformRecord);
final ProducerRecord<byte[], byte[]> producerRecord = 
convertTransformedRecord(record);
if (producerRecord == null || retryWithToleranceOperator.failed()) {
    counter.skipRecord();
    commitTaskRecord(preTransformRecord, null);
    continue;
} {code}
 

Then to:
{code:java}
private void commitTaskRecord(SourceRecord record, RecordMetadata metadata) {
        try {
            task.commitRecord(record, metadata);
        } catch (Throwable t) {
            log.error("{} Exception thrown while calling task.commitRecord()", 
this, t);
        }
}{code}

Finally
>From MirrorSourceTask.java
{code:java}
    @Override
    public void commitRecord(SourceRecord record, RecordMetadata metadata) {
        try {
            if (stopping) {
                return;
            }
            if (!metadata.hasOffset()) {
                log.error("RecordMetadata has no offset -- can't sync offsets 
for {}.", record.topic());
                return;
            }

...{code}
 
Causing a NPE because metadata is null. 
This the exception.

{code:java}
[2022-06-13 12:31:33,094] WARN Failure committing record. 
(org.apache.kafka.connect.mirror.MirrorSourceTask:190)
java.lang.NullPointerException
    at 
org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
    at 
org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
    at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
    at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
    at java.base/java.lang.Thread.run(Unknown Source) {code}

In my understanding this is well handled and it does not have negative impacts 
because it's handled by MirrorSourceTask.commitRecord, without leaving the 
exception be forwarded outside of it. 

But probably is preferred to handle it checking if metadata != null.
So skipping commit but safely and silently

[EDIT]
Actually, going a bit in deep, there is a small side-effect.

If the latest message elaborated was filtered out (so not committed by 
MirrorSourceTask), if MM2 instance is rebooted, this message will be re-read by 
consumer, because offset was not committed (and probably filtered out if 
configurations wasn't change).

But probably this behavior is fine considering MM2 nature

 


> MirrorSourceTask commitRecord throws NPE if SMT is filtering out source record
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-13985
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13985
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.1.0, 3.2.0
>            Reporter: Jacopo Riciputi
>            Priority: Minor
>
> Applying a SMT that filters out messages it can brings to enter in this path:
> From WorkerSourceTask.java
> {code:java}
> final SourceRecord record = transformationChain.apply(preTransformRecord);
> final ProducerRecord<byte[], byte[]> producerRecord = 
> convertTransformedRecord(record);
> if (producerRecord == null || retryWithToleranceOperator.failed()) {
>     counter.skipRecord();
>     commitTaskRecord(preTransformRecord, null);
>     continue;
> } {code}
>  
> Then to:
> {code:java}
> private void commitTaskRecord(SourceRecord record, RecordMetadata metadata) {
>         try {
>             task.commitRecord(record, metadata);
>         } catch (Throwable t) {
>             log.error("{} Exception thrown while calling 
> task.commitRecord()", this, t);
>         }
> }{code}
> Finally
> From MirrorSourceTask.java
> {code:java}
>     @Override
>     public void commitRecord(SourceRecord record, RecordMetadata metadata) {
>         try {
>             if (stopping) {
>                 return;
>             }
>             if (!metadata.hasOffset()) {
>                 log.error("RecordMetadata has no offset -- can't sync offsets 
> for {}.", record.topic());
>                 return;
>             }
> ...{code}
>  
> Causing a NPE because metadata is null. 
> This the exception.
> {code:java}
> [2022-06-13 12:31:33,094] WARN Failure committing record. 
> (org.apache.kafka.connect.mirror.MirrorSourceTask:190)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
>     at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
>     at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
>     at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
>     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
> Source)
>     at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>     at java.base/java.lang.Thread.run(Unknown Source) {code}
> In my understanding this is well handled and it does not have negative 
> impacts because it's handled by MirrorSourceTask.commitRecord, without 
> leaving the exception be forwarded outside of it. 
> But probably is preferred to handle it checking if metadata != null.
> So skipping commit but safely and silently
> [EDIT]
> Actually, going a bit in deep, there is a small side-effect.
> If the latest message elaborated was filtered out (so not committed by 
> MirrorSourceTask), if MM2 instance is rebooted, this message will be re-read 
> by consumer, because offset was not committed (and probably filtered out if 
> configurations wasn't change).
> But probably this behavior is fine considering MM2's nature
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to