[jira] [Commented] (KAFKA-13623) Memory leak when multiple poll

2022-02-15 Thread Emanuel Velzi (Jira)


Emanuel Velzi commented on KAFKA-13623:

Hi, thanks for your reply.
 * _Since you're not calling poll in the second example, it's expected that 
direct memory wouldn't be allocated._

Yes, I was just trying to prove that closing and creating a new consumer every 
time were not the problem here, and it's not :)
 * _It may be the case that a full GC is needed for the direct memory to get 
cleaned up._

Can the GC clean the Direct Memory? I thought that GC has nothing to do on 
non-heap memory.


*Some extra details*

I'm using +Ubuntu 18.04.5 LTS+

This is de output of "free -m" before I run my app:


These're my {+}jvm args{+}: 

This is the config for the consumer:
[pool-2-thread-45] INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [my-bootstrap-servers]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-test-random-700675988
    client.rack =
    connections.max.idle.ms = 54
    default.api.timeout.ms = 6
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = consumer-test-random-700675988
    group.instance.id = null
    heartbeat.interval.ms = 300
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class 
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 30
    max.poll.records = 500
    metadata.max.age.ms = 30
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 3
    partition.assignment.strategy = [class 
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 3
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 6
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 1
    socket.connection.setup.timeout.max.ms = 3
    socket.connection.setup.timeout.ms = 1
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class 


This is the New Relic graph about non-heap memory:


And this is the last log before the app is killed is something like:
Uncaught exception in kafka-coordinator-heartbeat-thread | topic1: 
java.lang.OutOfMemoryError: Direct buffer memory at 
java.nio.Bits.reserveMemory(Bits.java:693) at 
java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at 
java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at 
sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at 
sun.nio.ch.IOUtil.read(IOUtil.java:195) at 
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at 

[jira] [Commented] (KAFKA-13623) Memory leak when multiple poll

2022-01-30 Thread David Mao (Jira)


David Mao commented on KAFKA-13623:

Can you add


to the consumer app java args and upload the heap dump?

> Memory leak when multiple poll
> --
> Key: KAFKA-13623
> URL: https://issues.apache.org/jira/browse/KAFKA-13623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.1, 2.8.1
>Reporter: Emanuel Velzi
>Priority: Major
> Hi, I'm experiencing a kind of memory leak with this simple consumer.
> Some info before the code:
>     - kafka-clients.version: I try with 2.4.1 and 2.8.1
> I only set these properties:
>     - bootstrap.servers: my-servers
>     - group.id: my-group-id
>     - auto.offset.reset: earliest
>     - enable.auto.commit: false
>     - heartbeat.interval.ms: 300
> My topic has NUM_PARTITIONS=48 partitions:
> {code:java}
> public class Test {
>     /* ... */
>     public void start() {
>         for (int i = 0; i < NUM_PARTITIONS; i++) {
>             startOne();
>         }
>     }
>     public void startOne() {
>         LOGGER.info("startOne");
>         this.pool.submit(this::startConsumer;
>     }
>     public void startConsumer() {
>         var consumer = new KafkaConsumer<>(this.kafkaConfiguration, 
> this.stringDeserializer, this.stringDeserializer)
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error!");
>         } finally {
>             consumer.close();
>         }
>         scheduleNewConsumer();
>     }
>     private void scheduleNewConsumer() {
>         scheduledExecutorService.schedule(() -> startOne(), 
> Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS);
>     }
> }
> {code}
> In summary, when I have some error processing a record, then I close de 
> consumer and retry, starting a new one. 
> In that moment the Direct Memory used by de java process starts to grow up 
> indefinitely, until the process is killed.
> I test some other strategies. For example:
>  - no close the consumer, and reuse it with a seek(..)
>  - no close the consumer, and reuse it doing:  consumer.unsubscribe(); and 
> consumer.subscribe(..);
> In both cases the memory leak was slower, but it happened anyway.
> Also I tried this:
> {code:java}
> public void startConsumer(Consumer consumer) {
>  /*always using the same consumer*/
>         try {
>             consumer.subscribe(Collections.singletonList(this.topic));
>             // NO POLL HERE: consumer.poll(Duration.ofSeconds(30));
>             throw new RuntimeException("Some kind of error");
>         } catch (Exception e) {
>             LOGGER.error("Error!");
>         } finally {
>             consumer.unsubscribe();
>             consumer.subscribe(Collections.singletonList(this.topic));
>         }
>         scheduleNewConsumer();
>     }{code}
> I mean, multiple times I'm subscribing and unsubscribing the consumer, 
> without poll anything. In those cases I don't experience the memory leak. So, 
> I imagine that the problem is the poll itself.
> Someone can help me with this please?

This message was sent by Atlassian Jira