Srivignesh created KAFKA-16398: ---------------------------------- Summary: mirror-maker2 running into OOM while filtering high number of messages Key: KAFKA-16398 URL: https://issues.apache.org/jira/browse/KAFKA-16398 Project: Kafka Issue Type: Bug Components: connect, mirrormaker Affects Versions: 3.6.1 Reporter: Srivignesh
Based on custom predicate, our application is filtering messages during mirroring. When the HasHeader:test method of the predicate returns false (when it has to drop messages from mirroring), it encounters below exceptions. However when it returns true (the messages are forwarded for mirroring), it works fine without OOM. Note: This issue doesn't occur in version 2.8.0. Exception stacktraces: {code:java} line java.lang.OutOfMemoryError: Java heap space line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:289) line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:252) line at org.apache.kafka.common.utils.Utils.toNullableArray(Utils.java:270) line at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) line at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300) line at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263) line at org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340) line at org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306) line at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262) line at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186) line at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) line at org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:153) line at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469) line at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357) line at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) line at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) line at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77) line at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236) line at org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$841/0x00007f55cc4c3d78.run(Unknown Source) line at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) line at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) line at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) line at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) line at java.base/java.lang.Thread.run(Thread.java:840) {code} {code:java} line java.lang.OutOfMemoryError: Java heap space line at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)