[ 
https://issues.apache.org/jira/browse/KAFKA-4090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez reassigned KAFKA-4090:
----------------------------------------

    Assignee: Alexandre Dupriez

> JVM runs into OOM if (Java) client uses a SSL port without setting the 
> security protocol
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4090
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4090
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.1, 0.10.0.1, 2.1.0
>            Reporter: jaikiran pai
>            Assignee: Alexandre Dupriez
>            Priority: Major
>
> Quoting from the mail thread that was sent to Kafka mailing list:
> {quote}
> We have been using Kafka 0.9.0.1 (server and Java client libraries). So far 
> we had been using it with plaintext transport but recently have been 
> considering upgrading to using SSL. It mostly works except that a 
> mis-configured producer (and even consumer) causes a hard to relate 
> OutOfMemory exception and thus causing the JVM in which the client is 
> running, to go into a bad state. We can consistently reproduce that OOM very 
> easily. We decided to check if this is something that is fixed in 0.10.0.1 so 
> upgraded one of our test systems to that version (both server and client 
> libraries) but still see the same issue. Here's how it can be easily 
> reproduced
> 1. Enable SSL listener on the broker via server.properties, as per the Kafka 
> documentation
> {code}
> listeners=PLAINTEXT://:9092,SSL://:9093
> ssl.keystore.location=<location-of-keystore>
> ssl.keystore.password=pass
> ssl.key.password=pass
> ssl.truststore.location=<location-of-truststore>
> ssl.truststore.password=pass
> {code}
> 2. Start zookeeper and kafka server
> 3. Create a "oom-test" topic (which will be used for these tests):
> {code}
> kafka-topics.sh --zookeeper localhost:2181 --create --topic oom-test  
> --partitions 1 --replication-factor 1
> {code}
> 4. Create a simple producer which sends a single message to the topic via 
> Java (new producer) APIs:
> {code}
> public class OOMTest {
>     public static void main(final String[] args) throws Exception {
>         final Properties kafkaProducerConfigs = new Properties();
>         // NOTE: Intentionally use a SSL port without specifying 
> security.protocol as SSL
>         
> kafkaProducerConfigs.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9093");
>         
> kafkaProducerConfigs.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
>         
> kafkaProducerConfigs.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>  StringSerializer.class.getName());
>         try (KafkaProducer<String, String> producer = new 
> KafkaProducer<>(kafkaProducerConfigs)) {
>             System.out.println("Created Kafka producer");
>             final String topicName = "oom-test";
>             final String message = "Hello OOM!";
>             // send a message to the topic
>             final Future<RecordMetadata> recordMetadataFuture = 
> producer.send(new ProducerRecord<>(topicName, message));
>             final RecordMetadata sentRecordMetadata = 
> recordMetadataFuture.get();
>             System.out.println("Sent message '" + message + "' to topic '" + 
> topicName + "'");
>         }
>         System.out.println("Tests complete");
>     }
> }
> {code}
> Notice that the server URL is using a SSL endpoint localhost:9093 but isn't 
> specifying any of the other necessary SSL configs like security.protocol.
> 5. For the sake of easily reproducing this issue run this class with a max 
> heap size of 256MB (-Xmx256M). Running this code throws up the following 
> OutOfMemoryError in one of the Sender threads:
> {code}
> 18:33:25,770 ERROR [KafkaThread] - Uncaught exception in 
> kafka-producer-network-thread | producer-1:
> java.lang.OutOfMemoryError: Java heap space
>     at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>     at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>     at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
>     at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>     at java.lang.Thread.run(Thread.java:745)
> {code}
> Note that I set it to 256MB as heap size to easily reproduce it but this 
> isn't specific to that size. We have been able to reproduce it at even 516MB 
> and higher too.
> This even happens with the consumer and in fact can be reproduced out of the 
> box with the kafka-consumer-group.sh script. All you have to do is run that 
> tool as follows:
> {code}
> ./kafka-consumer-groups.sh --list --bootstrap-server localhost:9093 
> --new-consumer
> {code}
> {code}
> Error while executing consumer group command Java heap space
> java.lang.OutOfMemoryError: Java heap space
>     at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>     at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>     at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
>     at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
>     at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>     at 
> kafka.admin.AdminClient.kafka$admin$AdminClient$$send(AdminClient.scala:49)
>     at 
> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:61)
>     at 
> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:58)
>     at scala.collection.immutable.List.foreach(List.scala:381)
>     at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:58)
>     at kafka.admin.AdminClient.findAllBrokers(AdminClient.scala:87)
>     at kafka.admin.AdminClient.listAllGroups(AdminClient.scala:96)
>     at kafka.admin.AdminClient.listAllGroupsFlattened(AdminClient.scala:117)
>     at 
> kafka.admin.AdminClient.listAllConsumerGroupsFlattened(AdminClient.scala:121)
>     at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.list(ConsumerGroupCommand.scala:311)
>     at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
>     at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> {code}
> Notice that here again I'm using the new consumer and the SSL port without 
> any additional SSL configs.
> Once this OOM occurs, the producer is useless since it's (background) sender 
> thread is dead. Not just that, since we run these producers/consumers from 
> within our application, this OOM trips the JVM and our whole JVM goes into an 
> unstable state.
> Debugging shows that the NetworkReceive class in its readFromReadableChannel 
> method receives a value of something like 352518912 and then goes ahead to 
> allocate a ByteBuffer of that size. This 352518912 is approximately 300 odd 
> MB and obviously causes allocation issues. I suspect the value being passed 
> over the channel is incorrect.
> Of course this exception is triggered by a user config error but given that 
> ends up in a (almost unclear) OOM and causing the JVM to go in a bad state, 
> is there a way the Kafka Java library can handle this better?
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to