[ 
https://issues.apache.org/jira/browse/KAFKA-14088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569832#comment-17569832
 ] 

Gao Fei commented on KAFKA-14088:
---------------------------------

Later, by looking at the kafka source code and combining the logs, it was found 
that kafka may have received a large number of abnormal packets. If it 
encounters some long connection data packets, these abnormal packets will be 
cached by kafka as normal data until the memory is not enough. However, the 
subsequent processing also found that the format of these data packets was 
incorrect and could not be processed.
Subsequent tests are performed through the nmap -p 9092 -T4 -A -v ip command 
tool, and the above-mentioned memory overflow problem will soon occur. The 
abnormal message that should be generated will cause kafka to quickly report 
the memory overflow. Later, the consulting customer found that the customer did 
use a vulnerability scanning tool to perform similar operations on site, and 
each operation caused kafka to crash. Can this be avoided by using SASL? When 
Kafka itself encounters such an abnormal message, can it detect an incorrect 
data format without having to cache a lot and close the connection directly?
The following is some log information of the error:
{code:java}
[2022-07-21 14:33:18,664] ERROR Exception while processing request from 
177.177.113.129:6667-172.36.28.103:65440-406 (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error parsing request 
header. Our best guess of the apiKey is: 27265
Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error 
reading field 'client_id': Error reading string of length 513, only 103 bytes 
available
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77)
    at 
org.apache.kafka.common.requests.RequestHeader.parse(RequestHeader.java:121)
    at 
kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:844)
    at 
kafka.network.Processor.$anonfun$processCompletedReceives$1$adapted(SocketServer.scala:840)
    at kafka.network.Processor$$Lambda$1000/0x0000000058005440.apply(Unknown 
Source)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at kafka.network.Processor.processCompletedReceives(SocketServer.scala:840)
    at kafka.network.Processor.run(SocketServer.scala:731)
    at java.lang.Thread.run(Thread.java:823)
[2022-07-21 14:33:18,727] ERROR Closing socket for 
177.177.113.129:6667-172.36.28.103:30646-406 because of error 
(kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Unknown API key -173
[2022-07-21 14:33:18,727] ERROR Exception while processing request from 
177.177.113.129:6667-172.36.28.103:30646-406 (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Unknown API key -173
[2022-07-21 14:39:56,995] ERROR Processor got uncaught exception. 
(kafka.network.Processor)
java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:703)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:128)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
    at 
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
    at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
    at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
    at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at kafka.network.Processor.poll(SocketServer.scala:830)
    at kafka.network.Processor.run(SocketServer.scala:730)
    at java.lang.Thread.run(Thread.java:823){code}

> KafkaChannel memory leak
> ------------------------
>
>                 Key: KAFKA-14088
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14088
>             Project: Kafka
>          Issue Type: Bug
>          Components: network
>    Affects Versions: 2.2.1
>         Environment: Current system environment:
> kafka version: 2.2.1
> openjdk(openj9): jdk1.8
> Heap memory: 6.4GB
> MaxDirectSize: 8GB
> Total number of topics: about 150+, each with about 3 partitions
>            Reporter: Gao Fei
>            Priority: Minor
>
> The kafka broker reports OutOfMemoryError: Java heap space and 
> OutOfMemoryError: Direct buffer memory at the same time. Through the memory 
> dump, it is found that the most occupied objects are 
> KafkaChannel->NetworkReceive->HeapByteBuffer, there are about 4 such 
> KafkaChannels, each about 1.5GB Around, and the total heap memory allocation 
> is only 6.4GB.
> It's strange why a KafkaChannel occupies so much heap memory. Isn't each 
> batch request slowly written to disk through the RequestHandler thread? 
> Normally, this memory in KafkaChannel should be released continuously, but it 
> is not released.
> I am curious why there is such a large HeapByteBuffer object in KafkaChannel? 
> What does this object store? Shouldn't the socket communication here use a 
> lot of direct memory? Instead, why a lot of heap memory is used, and why is 
> it not released?
> The business data is not very large, the business data of each customer is 
> different, and some customers have this OOM in the environment, and some 
> customers with large business data do not appear OOM.
> java.lang.OutOfMemoryError: Direct buffer memory
>     at java.nio.Bits.reserveMemory(Bits.java:693)
>     at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>     at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>     at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
>     at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>     at 
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
>     at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>     at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>     at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>     at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>     at kafka.network.Processor.poll(SocketServer.scala:863)
>     at kafka.network.Processor.run(SocketServer.scala:762)
>     at java.lang.Thread.run(Thread.java:745)
> java.lang.OutOfMemoryError: Java heap space
>     at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>     at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>     at org.apache.kafka.common.MemoryPool$1.tryAllocate(MemoryPool.java:30)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
>     at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>     at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>     at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>     at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>     at kafka.network.Processor.poll(SocketServer.scala:863)
>     at kafka.network.Processor.run(SocketServer.scala:762)
>     at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to