Bert Baron created KAFKA-13632:
----------------------------------
Summary: MirrorMaker 2.0 NPE and Warning "Failure to commit
records" for filtered records
Key: KAFKA-13632
URL: https://issues.apache.org/jira/browse/KAFKA-13632
Project: Kafka
Issue Type: Bug
Components: mirrormaker
Affects Versions: 3.1.0
Reporter: Bert Baron
We have a setup where we filter records with MirrorMaker 2.0 (see below). This
results in the following warning messages as a result of NPE's in
MirrorSourceTask.commitRecord for each filtered record:
{code:java}
[2022-01-31 08:01:29,581] WARN [MirrorSourceConnector|task-0] Failure
committing record. (org.apache.kafka.connect.mirror.MirrorSourceTask:190)
java.lang.NullPointerException
at
org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
at
org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
at
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
at
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
The reason seems to be that for filtered records metadata is null. Note that in
the overridden SourceTask.commitRecord the javadoc clearly states that metadata
will be null if the record was filtered.
In our case we use a custom predicate, but the issue can be reproduced with the
following configuration:
{code:java}
clusters = source,target
tasks.max = 1
source.bootstrap.servers = <cluster1>
target.bootstrap.servers = <cluster2>
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
source->target.enabled = true
source->target.topics = topic1
source->target.transforms=Filter
source->target.transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
source->target.transforms.Filter.predicate=HeaderPredicate
source->target.predicates=HeaderPredicate
source->target.predicates.HeaderPredicate.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
source->target.predicates.HeaderPredicate.name=someheader
{code}
Each record with the header key 'someheader' will result in the NPE and warning
message.
On a side note, we couldn't find clear documentation on how to configure
filtering with MirrorMaker 2 or whether this is supported at all, but apart
from the NPE's en warning messages this seems to functionally work for us with
our custom filter.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)