[
https://issues.apache.org/jira/browse/ATLAS-665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15240494#comment-15240494
]
jlaky commented on ATLAS-665:
-----------------------------
We are running Kafka 0.9.0.1 broker and using the new Consumer API mentioned in
http://kafka.apache.org/documentation.html#newconsumerapi , normally it works
well. But we hit one exception that when we inserted 30 million messages by
producer. And then we stopped the producer and start the consumer to consume
the messages, and we got the error when the poll started.
Exception in thread "Thread-9" java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
We debugged into the consumer source code and found the receiveSize returns a
number of 1213486160, and then ByteBuffer try to allocate the heap which is
around 1.1G that hit the out of memory issue, so can anyone help to explain how
is the receiveSize computed? And is it possible to limit the receiveSize by
configuration?
public long readFromReadableChannel(ReadableByteChannel channel) throws
IOException {
int read = 0;
if (size.hasRemaining()) {
int bytesRead = channel.read(size);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
if (!size.hasRemaining()) {
size.rewind();
int receiveSize = size.getInt();
if (receiveSize < 0)
throw new InvalidReceiveException("Invalid receive (size = " +
receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " +
receiveSize + " larger than " + maxSize + ")");
this.buffer = ByteBuffer.allocate(receiveSize);
}
}
if (buffer != null) {
int bytesRead = channel.read(buffer);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
}
return read;
}
> Kafka Consumer API 0.9.0.1 Out Of Memory
> ----------------------------------------
>
> Key: ATLAS-665
> URL: https://issues.apache.org/jira/browse/ATLAS-665
> Project: Atlas
> Issue Type: Bug
> Reporter: jlaky
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)