[ https://issues.apache.org/jira/browse/KAFKA-16398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17830576#comment-17830576 ]
Greg Harris commented on KAFKA-16398: ------------------------------------- Hi [~srivignesh] Thank you for providing your configuration. Please note that this part of the configuration: {noformat} "transforms": "Filter", "transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",{noformat} will drop every record unconditionally, so you won't get any of the load/memory pressure that the producers provide. This would explain why adding the predicate (which then allows some records through) would cause OOMs. On that note, you have a lot of tuning parameters in your connect-distributed config, some of which don't appear to have any effect. One that does appear to have an effect is `producer.buffer.memory=8388608`. For 1500 tasks with that config, I would expect just the producer buffers to consume 12GiB, without taking into account any other buffers/memory overhead. If all of the tasks were started on a single node, this could very easily cause OOMs. What size cluster are you using, and does the cluster ever shrink to a single node, such as with a full reboot/cold start? If so, you may want to reduce the number of tasks, or size of each task's producer buffer. > mirror-maker2 running into OOM while filtering (dropping) high number of > messages > --------------------------------------------------------------------------------- > > Key: KAFKA-16398 > URL: https://issues.apache.org/jira/browse/KAFKA-16398 > Project: Kafka > Issue Type: Bug > Components: connect, mirrormaker > Affects Versions: 3.6.1 > Reporter: Srivignesh > Priority: Critical > Attachments: connect-distributed.properties.template, > mm2.config.template > > > Based on custom predicate, our application is filtering messages during > mirroring. > When the HasHeader:test method of the predicate returns true (when it has to > drop messages from mirroring), it encounters below exceptions. > However when it returns false (the messages are forwarded for mirroring), it > works fine without OOM. > Note: This issue doesn't occur with the same load in version 2.8.0. > JVM heap size increased till 15G, but still OOM hits. > Exception stacktraces: > {code:java} > line java.lang.OutOfMemoryError: Java heap space > line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:289) > line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:252) > line at > org.apache.kafka.common.utils.Utils.toNullableArray(Utils.java:270) > line at > org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) > line at > org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300) > line at > org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263) > line at > org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340) > line at > org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306) > line at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262) > line at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186) > line at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) > line at > org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:153) > line at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469) > line at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357) > line at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) > line at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) > line at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77) > line at > org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236) > line at > org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$841/0x00007f55cc4c3d78.run(Unknown > Source) > line at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) > line at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > line at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > line at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > line at java.base/java.lang.Thread.run(Thread.java:840) {code} > {code:java} > line java.lang.OutOfMemoryError: Java heap space line at > java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)