[ 
https://issues.apache.org/jira/browse/KAFKA-4090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117609#comment-17117609
 ] 

Marouane RAJI edited comment on KAFKA-4090 at 5/27/20, 10:11 AM:
-----------------------------------------------------------------

Hey, 
 We ran into this issue with Samza jobs using a Kafka client with TLS enabled. 
When producing data, Samza uses two clients, a producer and Admin (setup as a 
consumer to fetch meta-data etc..). We haven't setup the consumer side properly 
and it was silently failing, and causing the job to use a lot of memory (and in 
some cases die because of OOM). 
 We tested PR [https://github.com/apache/kafka/pull/8066] and it seems to work 
properly. We successfully tested the TLS usecase and we had no OOM when we 
don't setup the consumer client properly (we had exceptions reporting the 
mis-config as per PR#8066).

Please let us know if we can help in anyway moving this forward ? 


was (Author: rmarou):
Hey, 
 We ran into this issue with Samza jobs using a Kafka client with TLS enabled. 
When producing data, Samza uses two clients, a producer and Admin (setup as a 
consumer to fetch meta-data etc..). We haven't setup the consumer side properly 
and it was silently failing, and causing the job to use a lot of memory (and in 
some cases die because of OOM). 
 We tested PR [https://github.com/apache/kafka/pull/8066] and it seems to work 
properly. We successfully tested the TLS usecase and we had no OOM when we 
don't setup the consumer client properly (we had exceptions reporting the 
mis-config as per PR#8066).

Please let us know if we can help in anyway ? 

> JVM runs into OOM if (Java) client uses a SSL port without setting the 
> security protocol
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4090
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4090
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.1, 0.10.0.1, 2.1.0
>            Reporter: Jaikiran Pai
>            Assignee: Alexandre Dupriez
>            Priority: Major
>
> Quoting from the mail thread that was sent to Kafka mailing list:
> {quote}
> We have been using Kafka 0.9.0.1 (server and Java client libraries). So far 
> we had been using it with plaintext transport but recently have been 
> considering upgrading to using SSL. It mostly works except that a 
> mis-configured producer (and even consumer) causes a hard to relate 
> OutOfMemory exception and thus causing the JVM in which the client is 
> running, to go into a bad state. We can consistently reproduce that OOM very 
> easily. We decided to check if this is something that is fixed in 0.10.0.1 so 
> upgraded one of our test systems to that version (both server and client 
> libraries) but still see the same issue. Here's how it can be easily 
> reproduced
> 1. Enable SSL listener on the broker via server.properties, as per the Kafka 
> documentation
> {code}
> listeners=PLAINTEXT://:9092,SSL://:9093
> ssl.keystore.location=<location-of-keystore>
> ssl.keystore.password=pass
> ssl.key.password=pass
> ssl.truststore.location=<location-of-truststore>
> ssl.truststore.password=pass
> {code}
> 2. Start zookeeper and kafka server
> 3. Create a "oom-test" topic (which will be used for these tests):
> {code}
> kafka-topics.sh --zookeeper localhost:2181 --create --topic oom-test  
> --partitions 1 --replication-factor 1
> {code}
> 4. Create a simple producer which sends a single message to the topic via 
> Java (new producer) APIs:
> {code}
> public class OOMTest {
>     public static void main(final String[] args) throws Exception {
>         final Properties kafkaProducerConfigs = new Properties();
>         // NOTE: Intentionally use a SSL port without specifying 
> security.protocol as SSL
>         
> kafkaProducerConfigs.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9093");
>         
> kafkaProducerConfigs.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
>         
> kafkaProducerConfigs.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>  StringSerializer.class.getName());
>         try (KafkaProducer<String, String> producer = new 
> KafkaProducer<>(kafkaProducerConfigs)) {
>             System.out.println("Created Kafka producer");
>             final String topicName = "oom-test";
>             final String message = "Hello OOM!";
>             // send a message to the topic
>             final Future<RecordMetadata> recordMetadataFuture = 
> producer.send(new ProducerRecord<>(topicName, message));
>             final RecordMetadata sentRecordMetadata = 
> recordMetadataFuture.get();
>             System.out.println("Sent message '" + message + "' to topic '" + 
> topicName + "'");
>         }
>         System.out.println("Tests complete");
>     }
> }
> {code}
> Notice that the server URL is using a SSL endpoint localhost:9093 but isn't 
> specifying any of the other necessary SSL configs like security.protocol.
> 5. For the sake of easily reproducing this issue run this class with a max 
> heap size of 256MB (-Xmx256M). Running this code throws up the following 
> OutOfMemoryError in one of the Sender threads:
> {code}
> 18:33:25,770 ERROR [KafkaThread] - Uncaught exception in 
> kafka-producer-network-thread | producer-1:
> java.lang.OutOfMemoryError: Java heap space
>     at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>     at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>     at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
>     at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>     at java.lang.Thread.run(Thread.java:745)
> {code}
> Note that I set it to 256MB as heap size to easily reproduce it but this 
> isn't specific to that size. We have been able to reproduce it at even 516MB 
> and higher too.
> This even happens with the consumer and in fact can be reproduced out of the 
> box with the kafka-consumer-group.sh script. All you have to do is run that 
> tool as follows:
> {code}
> ./kafka-consumer-groups.sh --list --bootstrap-server localhost:9093 
> --new-consumer
> {code}
> {code}
> Error while executing consumer group command Java heap space
> java.lang.OutOfMemoryError: Java heap space
>     at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>     at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>     at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
>     at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
>     at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>     at 
> kafka.admin.AdminClient.kafka$admin$AdminClient$$send(AdminClient.scala:49)
>     at 
> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:61)
>     at 
> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:58)
>     at scala.collection.immutable.List.foreach(List.scala:381)
>     at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:58)
>     at kafka.admin.AdminClient.findAllBrokers(AdminClient.scala:87)
>     at kafka.admin.AdminClient.listAllGroups(AdminClient.scala:96)
>     at kafka.admin.AdminClient.listAllGroupsFlattened(AdminClient.scala:117)
>     at 
> kafka.admin.AdminClient.listAllConsumerGroupsFlattened(AdminClient.scala:121)
>     at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.list(ConsumerGroupCommand.scala:311)
>     at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
>     at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> {code}
> Notice that here again I'm using the new consumer and the SSL port without 
> any additional SSL configs.
> Once this OOM occurs, the producer is useless since it's (background) sender 
> thread is dead. Not just that, since we run these producers/consumers from 
> within our application, this OOM trips the JVM and our whole JVM goes into an 
> unstable state.
> Debugging shows that the NetworkReceive class in its readFromReadableChannel 
> method receives a value of something like 352518912 and then goes ahead to 
> allocate a ByteBuffer of that size. This 352518912 is approximately 300 odd 
> MB and obviously causes allocation issues. I suspect the value being passed 
> over the channel is incorrect.
> Of course this exception is triggered by a user config error but given that 
> ends up in a (almost unclear) OOM and causing the JVM to go in a bad state, 
> is there a way the Kafka Java library can handle this better?
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to