Dave Crighton created KAFKA-15247:
-------------------------------------

             Summary: OutOfMemoryError in SaslClientAuthenticator during server 
restart 
                 Key: KAFKA-15247
                 URL: https://issues.apache.org/jira/browse/KAFKA-15247
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 3.5.1
            Reporter: Dave Crighton
         Attachments: 
0001-defensive-code-for-rogue-packets-during-server-resta.patch

We embed the Kafka client  in IBM App Connect Enterprise in order to provide 
Kafka consume and  produce functionality. This product is a little  bit like an 
app server in that it may host multiple workloads including some which may not 
use the Kafka functionality.

 

When the Kafka server is installed in an open shift environment we are seeing 
cases where the clients receive OutOfMemory errors due to single large (1.2Gb) 
byte buffers being allocated by the client.

 

>From research this appears to be a known issue when a plaintext client is 
>configured to attempt connection to a TLS secured endpoint however in this 
>instance we see successful communication  via TLS and then when the Kafka 
>server is restarted (or connectivity is broken) both the consumers and 
>producers can throw OutOfMemoryError's with the following stacks:

 

Producer

------------

 

{{4XESTACKTRACE                at 
java/nio/HeapByteBuffer.<init>(HeapByteBuffer.java:57(Compiled Code))}}
{{4XESTACKTRACE                at 
java/nio/ByteBuffer.allocate(ByteBuffer.java:335(Compiled Code))}}
{{4XESTACKTRACE                at 
org/apache/kafka/common/memory/MemoryPool$1.tryAllocate(MemoryPool.java:30(Compiled
 Code))}}
{{4XESTACKTRACE                at 
org/apache/kafka/common/network/NetworkReceive.readFrom(NetworkReceive.java:102(Compiled
 Code))}}
{{4XESTACKTRACE                at 
org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:475(Compiled
 Code))}}
{{4XESTACKTRACE                at 
org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:572(Compiled
 Code))}}
{{4XESTACKTRACE                at 
org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:250(Compiled
 Code))}}
{{4XESTACKTRACE                at 
org/apache/kafka/common/network/KafkaChannel.prepare(KafkaChannel.java:181(Compiled
 Code))}}
{{4XESTACKTRACE                at 
org/apache/kafka/common/network/Selector.pollSelectionKeys(Selector.java:543(Compiled
 Code))}}
{{4XESTACKTRACE                at 
org/apache/kafka/common/network/Selector.poll(Selector.java:481(Compiled 
Code))}}
{{4XESTACKTRACE                at 
org/apache/kafka/clients/NetworkClient.poll(NetworkClient.java:571(Compiled 
Code))}}
{{4XESTACKTRACE                at 
org/apache/kafka/clients/producer/internals/Sender.runOnce(Sender.java:328(Compiled
 Code))}}
{{4XESTACKTRACE                at 
org/apache/kafka/clients/producer/internals/Sender.run(Sender.java:243(Compiled 
Code))}}
{{4XESTACKTRACE                at java/lang/Thread.run(Thread.java:830)}}

 

Consumer

-------------

{{{{3XMTHREADINFO3           Java callstack:
4XESTACKTRACE                at 
java/nio/HeapByteBuffer.<init>(HeapByteBuffer.java:57)
4XESTACKTRACE                at 
java/nio/ByteBuffer.allocate(ByteBuffer.java:335)
4XESTACKTRACE                at 
org/apache/kafka/common/memory/MemoryPool$1.tryAllocate(MemoryPool.java:30)
4XESTACKTRACE                at 
org/apache/kafka/common/network/NetworkReceive.readFrom(NetworkReceive.java:113)
4XESTACKTRACE                at 
org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:475)
4XESTACKTRACE                at 
org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:572)
4XESTACKTRACE                at 
org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:250)
4XESTACKTRACE                at 
org/apache/kafka/common/network/KafkaChannel.prepare(KafkaChannel.java:181)
4XESTACKTRACE                at 
org/apache/kafka/common/network/Selector.pollSelectionKeys(Selector.java:543)
4XESTACKTRACE                at 
org/apache/kafka/common/network/Selector.poll(Selector.java:481)
4XESTACKTRACE                at 
org/apache/kafka/clients/NetworkClient.poll(NetworkClient.java:551)
4XESTACKTRACE                at 
org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
4XESTACKTRACE                at 
org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
4XESTACKTRACE                at 
org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
4XESTACKTRACE                at 
org/apache/kafka/clients/consumer/internals/Fetcher.getTopicMetadata(Fetcher.java:374)
4XESTACKTRACE                at 
org/apache/kafka/clients/consumer/KafkaConsumer.partitionsFor(KafkaConsumer.java:1949)
4XESTACKTRACE                at 
org/apache/kafka/clients/consumer/KafkaConsumer.partitionsFor(KafkaConsumer.java:1917)
4XESTACKTRACE                at 
com/ibm/broker/connector/kafka/KafkaIIBConsumer.initialise(KafkaIIBConsumer.java:177)
4XESTACKTRACE                at 
com/ibm/broker/connector/kafka/KafkaIIBConsumer.start(KafkaIIBConsumer.java:512)
5XESTACKTRACE                   (entered lock: 
com/ibm/broker/connector/kafka/KafkaIIBConsumer@0x00000000C0A94038, entry 
count: 1)
4XESTACKTRACE                at 
com/ibm/broker/connector/kafka/KafkaInputConnector.start(KafkaInputConnector.java:250)}}}}

 

We believe that what is happening is that when the Kafka server goes down, in 
the RHOS environment the route is still available for some small period of time 
and the SASLClientAuthenticator is able to receive rogue packets which it 
interprets as a length to read off stream. 

 

For the consumer code since there is application code on the stack we were able 
to implement a workaround by catching the OOM but on the producer side the 
entire stack is Kafka client code.

 

I looked at the SaslClientAuthenticator code and I can see that it's use of the 
network buffer is unbounded so I applied 2 patches to this code. The first 
limits the buffer size for authentication to 10Mb, the 2nd catches the OOM and 
instead fails auth.

 


Using the patched client the customer has gone from being able to recreate this 
on at least 1 appserver for every Kafka server restart to not being able to 
reproduce the issue at all.

 


I am happy to submit a PR but I wanted to get feedback before I did so. For 
instance is 10Mb a suitable maximum buffer size for auth, should the maximum 
perhaps be configurable instead and if so what is best practice for providing 
this configuration>

 


Secondly catching the OOM doesn't feel like best practice to me however without 
doing this the entire application fails due to aggressive allocation of byte 
buffers in the SaslClientAuthenticator is there any alternative I should be 
considering.

 


I realise that this issue has been raised before and in the case of a 
mis-configuration it looks like this was not considered a bug however in this 
instance, at least in the customers environment, the configuration is actually 
ok and the error is causing clients to be unable to tolerate a server failure. 

 


I appreciate any guidance you might give me on how I can get a change committed 
to prevent the problem.

 

{{{{}}}}

 

{{{{}}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to