Srivignesh created KAFKA-16398:
----------------------------------

             Summary: mirror-maker2 running into OOM while filtering high 
number of messages
                 Key: KAFKA-16398
                 URL: https://issues.apache.org/jira/browse/KAFKA-16398
             Project: Kafka
          Issue Type: Bug
          Components: connect, mirrormaker
    Affects Versions: 3.6.1
            Reporter: Srivignesh


Based on custom predicate, our application is filtering messages during 
mirroring. 

When the HasHeader:test method of the predicate returns false (when it has to 
drop messages from mirroring), it encounters below exceptions. 

However when it returns true (the messages are forwarded for mirroring), it 
works fine without OOM. 

Note: This issue doesn't occur in version 2.8.0.

Exception stacktraces:
{code:java}
line java.lang.OutOfMemoryError: Java heap space
line     at org.apache.kafka.common.utils.Utils.toArray(Utils.java:289)
line     at org.apache.kafka.common.utils.Utils.toArray(Utils.java:252)
line     at org.apache.kafka.common.utils.Utils.toNullableArray(Utils.java:270)
line     at 
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
line     at 
org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300)
line     at 
org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263)
line     at 
org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340)
line     at 
org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306)
line     at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262)
line     at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
line     at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
line     at 
org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:153)
line     at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
line     at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
line     at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
line     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
line     at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
line     at 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
line     at 
org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$841/0x00007f55cc4c3d78.run(Unknown
 Source)
line     at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
line     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
line     at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
line     at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
line     at java.base/java.lang.Thread.run(Thread.java:840) {code}
{code:java}
line java.lang.OutOfMemoryError: Java heap space line     at 
java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to