[
https://issues.apache.org/jira/browse/KAFKA-15247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893807#comment-17893807
]
Dave Crighton commented on KAFKA-15247:
---------------------------------------
Updating to major as the cost of maintaining a fork is starting to become
burdensome for us.
> OutOfMemoryError in SaslClientAuthenticator during server restart
> ------------------------------------------------------------------
>
> Key: KAFKA-15247
> URL: https://issues.apache.org/jira/browse/KAFKA-15247
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 3.5.1
> Reporter: Dave Crighton
> Priority: Major
> Labels: patch
> Attachments:
> 0001-defensive-code-for-rogue-packets-during-server-resta.patch
>
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> We embed the Kafka client in IBM App Connect Enterprise in order to provide
> Kafka consume and produce functionality. This product is a little bit like
> an app server in that it may host multiple workloads including some which may
> not use the Kafka functionality.
>
> When the Kafka server is installed in an open shift environment we are seeing
> cases where the clients receive OutOfMemory errors due to single large
> (1.2Gb) byte buffers being allocated by the client.
>
> From research this appears to be a known issue when a plaintext client is
> configured to attempt connection to a TLS secured endpoint however in this
> instance we see successful communication via TLS and then when the Kafka
> server is restarted (or connectivity is broken) both the consumers and
> producers can throw OutOfMemoryError's with the following stacks:
>
> Producer
> ------------
>
> {{4XESTACKTRACE at
> java/nio/HeapByteBuffer.<init>(HeapByteBuffer.java:57(Compiled Code))}}
> {{4XESTACKTRACE at
> java/nio/ByteBuffer.allocate(ByteBuffer.java:335(Compiled Code))}}
> {{4XESTACKTRACE at
> org/apache/kafka/common/memory/MemoryPool$1.tryAllocate(MemoryPool.java:30(Compiled
> Code))}}
> {{4XESTACKTRACE at
> org/apache/kafka/common/network/NetworkReceive.readFrom(NetworkReceive.java:102(Compiled
> Code))}}
> {{4XESTACKTRACE at
> org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:475(Compiled
> Code))}}
> {{4XESTACKTRACE at
> org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:572(Compiled
> Code))}}
> {{4XESTACKTRACE at
> org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:250(Compiled
> Code))}}
> {{4XESTACKTRACE at
> org/apache/kafka/common/network/KafkaChannel.prepare(KafkaChannel.java:181(Compiled
> Code))}}
> {{4XESTACKTRACE at
> org/apache/kafka/common/network/Selector.pollSelectionKeys(Selector.java:543(Compiled
> Code))}}
> {{4XESTACKTRACE at
> org/apache/kafka/common/network/Selector.poll(Selector.java:481(Compiled
> Code))}}
> {{4XESTACKTRACE at
> org/apache/kafka/clients/NetworkClient.poll(NetworkClient.java:571(Compiled
> Code))}}
> {{4XESTACKTRACE at
> org/apache/kafka/clients/producer/internals/Sender.runOnce(Sender.java:328(Compiled
> Code))}}
> {{4XESTACKTRACE at
> org/apache/kafka/clients/producer/internals/Sender.run(Sender.java:243(Compiled
> Code))}}
> {{4XESTACKTRACE at java/lang/Thread.run(Thread.java:830)}}
>
> Consumer
> -------------
> {{{{3XMTHREADINFO3 Java callstack:
> 4XESTACKTRACE at
> java/nio/HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> 4XESTACKTRACE at
> java/nio/ByteBuffer.allocate(ByteBuffer.java:335)
> 4XESTACKTRACE at
> org/apache/kafka/common/memory/MemoryPool$1.tryAllocate(MemoryPool.java:30)
> 4XESTACKTRACE at
> org/apache/kafka/common/network/NetworkReceive.readFrom(NetworkReceive.java:113)
> 4XESTACKTRACE at
> org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:475)
> 4XESTACKTRACE at
> org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:572)
> 4XESTACKTRACE at
> org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:250)
> 4XESTACKTRACE at
> org/apache/kafka/common/network/KafkaChannel.prepare(KafkaChannel.java:181)
> 4XESTACKTRACE at
> org/apache/kafka/common/network/Selector.pollSelectionKeys(Selector.java:543)
> 4XESTACKTRACE at
> org/apache/kafka/common/network/Selector.poll(Selector.java:481)
> 4XESTACKTRACE at
> org/apache/kafka/clients/NetworkClient.poll(NetworkClient.java:551)
> 4XESTACKTRACE at
> org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
> 4XESTACKTRACE at
> org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> 4XESTACKTRACE at
> org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
> 4XESTACKTRACE at
> org/apache/kafka/clients/consumer/internals/Fetcher.getTopicMetadata(Fetcher.java:374)
> 4XESTACKTRACE at
> org/apache/kafka/clients/consumer/KafkaConsumer.partitionsFor(KafkaConsumer.java:1949)
> 4XESTACKTRACE at
> org/apache/kafka/clients/consumer/KafkaConsumer.partitionsFor(KafkaConsumer.java:1917)
> 4XESTACKTRACE at
> com/ibm/broker/connector/kafka/KafkaIIBConsumer.initialise(KafkaIIBConsumer.java:177)
> 4XESTACKTRACE at
> com/ibm/broker/connector/kafka/KafkaIIBConsumer.start(KafkaIIBConsumer.java:512)
> 5XESTACKTRACE (entered lock:
> com/ibm/broker/connector/kafka/KafkaIIBConsumer@0x00000000C0A94038, entry
> count: 1)
> 4XESTACKTRACE at
> com/ibm/broker/connector/kafka/KafkaInputConnector.start(KafkaInputConnector.java:250)}}}}
>
> We believe that what is happening is that when the Kafka server goes down, in
> the RHOS environment the route is still available for some small period of
> time and the SASLClientAuthenticator is able to receive rogue packets which
> it interprets as a length to read off stream.
>
> For the consumer code since there is application code on the stack we were
> able to implement a workaround by catching the OOM but on the producer side
> the entire stack is Kafka client code.
>
> I looked at the SaslClientAuthenticator code and I can see that it's use of
> the network buffer is unbounded so I applied 2 patches to this code. The
> first limits the buffer size for authentication to 10Mb, the 2nd catches the
> OOM and instead fails auth.
>
> Using the patched client the customer has gone from being able to recreate
> this on at least 1 appserver for every Kafka server restart to not being able
> to reproduce the issue at all.
>
> I am happy to submit a PR but I wanted to get feedback before I did so. For
> instance is 10Mb a suitable maximum buffer size for auth, should the maximum
> perhaps be configurable instead and if so what is best practice for providing
> this configuration>
>
> Secondly catching the OOM doesn't feel like best practice to me however
> without doing this the entire application fails due to aggressive allocation
> of byte buffers in the SaslClientAuthenticator is there any alternative I
> should be considering.
>
> I realise that this issue has been raised before and in the case of a
> mis-configuration it looks like this was not considered a bug however in this
> instance, at least in the customers environment, the configuration is
> actually ok and the error is causing clients to be unable to tolerate a
> server failure.
>
> I appreciate any guidance you might give me on how I can get a change
> committed to prevent the problem.
>
> {{{{}}}}
>
> {{{{}}}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)