[jira] [Comment Edited] (KAFKA-13623) Memory leak when multiple poll

2022-02-15 Thread Emanuel Velzi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17492748#comment-17492748
 ] 

Emanuel Velzi edited comment on KAFKA-13623 at 2/15/22, 5:30 PM:
-

Hi, thanks for your reply.
 * _Since you're not calling poll in the second example, it's expected that 
direct memory wouldn't be allocated._

Yes, I was just trying to prove that closing and creating a new consumer every 
time were not the problem here, and it's not :)
 * _It may be the case that a full GC is needed for the direct memory to get 
cleaned up._

Can the GC clean the Direct Memory? I thought that GC has nothing to do on 
non-heap memory.

 

*Some extra details*

I'm using +Ubuntu 18.04.5 LTS+

This is de output of "free -m" before I run my app:

!image-2022-01-31-09-06-27-762.png|width=504,height=49!

And this is after my up is running:

!image-2022-02-15-14-29-59-371.png|width=499,height=48!

These're my {+}jvm args{+}: 
{code:java}
   -javaagent:newrelic/newrelic-agent-7.0.1.jar
   -Xmx2000m
   -Xms2000m
   -XX:+UseG1GC
   -XX:+AlwaysPreTouch{code}
 

This is the config for the consumer:
{code:java}
[pool-2-thread-45] INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig 
values:
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [my-bootstrap-servers]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-test-random-700675988
    client.rack =
    connections.max.idle.ms = 54
    default.api.timeout.ms = 6
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = consumer-test-random-700675988
    group.instance.id = null
    heartbeat.interval.ms = 300
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 30
    max.poll.records = 500
    metadata.max.age.ms = 30
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 3
    partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 3
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 6
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 1
    socket.connection.setup.timeout.max.ms = 3
    socket.connection.setup.timeout.ms = 1
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer

{code}
 

This is the New Relic graph about non-heap memory:

!image-2022-02-15-14-21-38-467.png|width=728,height=252!

And this is the last log before the app is killed is something like:
{code:java}
Uncaught exception in kafka-coordinator-heartbeat-thread | topic1: 
java.lang.OutOfMemoryError: Direct buffer memory at 
java.nio.Bits.reserveMemory(Bits.java:693) at 
java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at 
java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at 
sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at 
sun.nio.ch.IOUtil.read(IOUtil.java:195) at 
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at 
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.

[jira] [Comment Edited] (KAFKA-13623) Memory leak when multiple poll

2022-02-15 Thread Emanuel Velzi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17492748#comment-17492748
 ] 

Emanuel Velzi edited comment on KAFKA-13623 at 2/15/22, 5:30 PM:
-

Hi, thanks for your reply.
 * _Since you're not calling poll in the second example, it's expected that 
direct memory wouldn't be allocated._

Yes, I was just trying to prove that closing and creating a new consumer every 
time were not the problem here, and it's not :)
 * _It may be the case that a full GC is needed for the direct memory to get 
cleaned up._

Can the GC clean the Direct Memory? I thought that GC has nothing to do on 
non-heap memory.

 

*Some extra details*

I'm using +Ubuntu 18.04.5 LTS+

This is de output of "free -m" before I run my app:

!image-2022-01-31-09-06-27-762.png|width=504,height=49!

And this is after my app is running:

!image-2022-02-15-14-29-59-371.png|width=499,height=48!

These're my {+}jvm args{+}: 
{code:java}
   -javaagent:newrelic/newrelic-agent-7.0.1.jar
   -Xmx2000m
   -Xms2000m
   -XX:+UseG1GC
   -XX:+AlwaysPreTouch{code}
 

This is the config for the consumer:
{code:java}
[pool-2-thread-45] INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig 
values:
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [my-bootstrap-servers]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-test-random-700675988
    client.rack =
    connections.max.idle.ms = 54
    default.api.timeout.ms = 6
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = consumer-test-random-700675988
    group.instance.id = null
    heartbeat.interval.ms = 300
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 30
    max.poll.records = 500
    metadata.max.age.ms = 30
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 3
    partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 3
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 6
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 1
    socket.connection.setup.timeout.max.ms = 3
    socket.connection.setup.timeout.ms = 1
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer

{code}
 

This is the New Relic graph about non-heap memory:

!image-2022-02-15-14-21-38-467.png|width=728,height=252!

And this is the last log before the app is killed is something like:
{code:java}
Uncaught exception in kafka-coordinator-heartbeat-thread | topic1: 
java.lang.OutOfMemoryError: Direct buffer memory at 
java.nio.Bits.reserveMemory(Bits.java:693) at 
java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at 
java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at 
sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at 
sun.nio.ch.IOUtil.read(IOUtil.java:195) at 
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at 
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer

[jira] [Comment Edited] (KAFKA-13623) Memory leak when multiple poll

2022-01-30 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484366#comment-17484366
 ] 

David Mao edited comment on KAFKA-13623 at 1/30/22, 2:40 PM:
-

direct memory is allocated when java applications perform IO with heap byte 
buffers. Since you're not calling poll in the second example, it's expected 
that direct memory wouldn't be allocated.

It's a little weird that the direct memory allocation keeps growing, since the 
direct memory is tied to the lifetime of a thread. It may be the case that a 
full GC is needed for the direct memory to get cleaned up.

What is killing the process here, can you describe the application environment?

Likewise, what JVM args are you running?


was (Author: david.mao):
Can you add

*-XX:+HeapDumpOnOutOfMemoryError*

to the consumer app java args and upload the heap dump?

> Memory leak when multiple poll
> --
>
> Key: KAFKA-13623
> URL: https://issues.apache.org/jira/browse/KAFKA-13623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.1, 2.8.1
>Reporter: Emanuel Velzi
>Priority: Major
>
> Hi, I'm experiencing a kind of memory leak with this simple consumer.
> Some info before the code:
>     - kafka-clients.version: I try with 2.4.1 and 2.8.1
> I only set these properties:
>     - bootstrap.servers: my-servers
>     - group.id: my-group-id
>     - auto.offset.reset: earliest
>     - enable.auto.commit: false
>     - heartbeat.interval.ms: 300
> My topic has NUM_PARTITIONS=48 partitions:
> {code:java}
> public class Test {
>     /* ... */
>     public void start() {
>         for (int i = 0; i < NUM_PARTITIONS; i++) {
>             startOne();
>         }
>     }
>     public void startOne() {
>         LOGGER.info("startOne");
>         this.pool.submit(this::startConsumer;
>     }
>     public void startConsumer() {
>         var consumer = new KafkaConsumer<>(this.kafkaConfiguration, 
> this.stringDeserializer, this.stringDeserializer)
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error!");
>         } finally {
>             consumer.close();
>         }
>         scheduleNewConsumer();
>     }
>     private void scheduleNewConsumer() {
>         scheduledExecutorService.schedule(() -> startOne(), 
> Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS);
>     }
> }
> {code}
>  
> In summary, when I have some error processing a record, then I close de 
> consumer and retry, starting a new one. 
> In that moment the Direct Memory used by de java process starts to grow up 
> indefinitely, until the process is killed.
> I test some other strategies. For example:
>  - no close the consumer, and reuse it with a seek(..)
>  - no close the consumer, and reuse it doing:  consumer.unsubscribe(); and 
> consumer.subscribe(..);
> In both cases the memory leak was slower, but it happened anyway.
> Also I tried this:
> {code:java}
> public void startConsumer(Consumer consumer) {
>  /*always using the same consumer*/
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             // NO POLL HERE: consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error!");
>         } finally {
>             consumer.unsubscribe();
>             consumer.subscribe(Collections.singletonList(this.topic));
>         }
>         scheduleNewConsumer();
>     }{code}
>  
> I mean, multiple times I'm subscribing and unsubscribing the consumer, 
> without poll anything. In those cases I don't experience the memory leak. So, 
> I imagine that the problem is the poll itself.
> Someone can help me with this please?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)