[ 
https://issues.apache.org/jira/browse/KAFKA-16398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17830576#comment-17830576
 ] 

Greg Harris commented on KAFKA-16398:
-------------------------------------

Hi [~srivignesh] Thank you for providing your configuration.

Please note that this part of the configuration:
{noformat}
  "transforms": "Filter",
  "transforms.Filter.type": 
"org.apache.kafka.connect.transforms.Filter",{noformat}
will drop every record unconditionally, so you won't get any of the load/memory 
pressure that the producers provide. This would explain why adding the 
predicate (which then allows some records through) would cause OOMs.

On that note, you have a lot of tuning parameters in your connect-distributed 
config, some of which don't appear to have any effect. One that does appear to 
have an effect is `producer.buffer.memory=8388608`.
For 1500 tasks with that config, I would expect just the producer buffers to 
consume 12GiB, without taking into account any other buffers/memory overhead. 
If all of the tasks were started on a single node, this could very easily cause 
OOMs.
What size cluster are you using, and does the cluster ever shrink to a single 
node, such as with a full reboot/cold start? If so, you may want to reduce the 
number of tasks, or size of each task's producer buffer.

> mirror-maker2 running into OOM while filtering (dropping) high number of 
> messages
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-16398
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16398
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect, mirrormaker
>    Affects Versions: 3.6.1
>            Reporter: Srivignesh
>            Priority: Critical
>         Attachments: connect-distributed.properties.template, 
> mm2.config.template
>
>
> Based on custom predicate, our application is filtering messages during 
> mirroring.
> When the HasHeader:test method of the predicate returns true (when it has to 
> drop messages from mirroring), it encounters below exceptions. 
> However when it returns false (the messages are forwarded for mirroring), it 
> works fine without OOM. 
> Note: This issue doesn't occur with the same load in version 2.8.0.
> JVM heap size increased till 15G, but still OOM hits.
> Exception stacktraces:
> {code:java}
> line java.lang.OutOfMemoryError: Java heap space
> line     at org.apache.kafka.common.utils.Utils.toArray(Utils.java:289)
> line     at org.apache.kafka.common.utils.Utils.toArray(Utils.java:252)
> line     at 
> org.apache.kafka.common.utils.Utils.toNullableArray(Utils.java:270)
> line     at 
> org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
> line     at 
> org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300)
> line     at 
> org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263)
> line     at 
> org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340)
> line     at 
> org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306)
> line     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262)
> line     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
> line     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
> line     at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:153)
> line     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
> line     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
> line     at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
> line     at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
> line     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
> line     at 
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
> line     at 
> org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$841/0x00007f55cc4c3d78.run(Unknown
>  Source)
> line     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
> line     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> line     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> line     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> line     at java.base/java.lang.Thread.run(Thread.java:840) {code}
> {code:java}
> line java.lang.OutOfMemoryError: Java heap space line     at 
> java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to