[ 
https://issues.apache.org/jira/browse/ATLAS-665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15240494#comment-15240494
 ] 

jlaky commented on ATLAS-665:
-----------------------------

We are running Kafka 0.9.0.1 broker and using the new Consumer API mentioned in 
http://kafka.apache.org/documentation.html#newconsumerapi , normally it works 
well. But we hit one exception that when we inserted 30 million messages by 
producer. And then we stopped the producer and start the consumer to consume 
the messages, and we got the error when the poll started.

Exception in thread "Thread-9" java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
    at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
    at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)

We debugged into the consumer source code and found the receiveSize returns a 
number of 1213486160, and then ByteBuffer try to allocate the heap which is 
around 1.1G that hit the out of memory issue, so can anyone help to explain how 
is the receiveSize computed? And is it possible to limit the receiveSize by 
configuration?

    public long readFromReadableChannel(ReadableByteChannel channel) throws 
IOException {
    int read = 0;
    if (size.hasRemaining()) {
        int bytesRead = channel.read(size);
        if (bytesRead < 0)
            throw new EOFException();
        read += bytesRead;
        if (!size.hasRemaining()) {
            size.rewind();
            int receiveSize = size.getInt();
            if (receiveSize < 0)
                throw new InvalidReceiveException("Invalid receive (size = " + 
receiveSize + ")");
            if (maxSize != UNLIMITED && receiveSize > maxSize)
                throw new InvalidReceiveException("Invalid receive (size = " + 
receiveSize + " larger than " + maxSize + ")");

            this.buffer = ByteBuffer.allocate(receiveSize);
        }
    }
    if (buffer != null) {
        int bytesRead = channel.read(buffer);
        if (bytesRead < 0)
            throw new EOFException();
        read += bytesRead;
    }
    return read;
}


> Kafka Consumer API 0.9.0.1 Out Of Memory
> ----------------------------------------
>
>                 Key: ATLAS-665
>                 URL: https://issues.apache.org/jira/browse/ATLAS-665
>             Project: Atlas
>          Issue Type: Bug
>            Reporter: jlaky
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to