[jira] [Commented] (KAFKA-13623) Memory leak when multiple poll
[ https://issues.apache.org/jira/browse/KAFKA-13623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17492748#comment-17492748 ] Emanuel Velzi commented on KAFKA-13623: --- Hi, thanks for your reply. * _Since you're not calling poll in the second example, it's expected that direct memory wouldn't be allocated._ Yes, I was just trying to prove that closing and creating a new consumer every time were not the problem here, and it's not :) * _It may be the case that a full GC is needed for the direct memory to get cleaned up._ Can the GC clean the Direct Memory? I thought that GC has nothing to do on non-heap memory. *Some extra details* I'm using +Ubuntu 18.04.5 LTS+ This is de output of "free -m" before I run my app: !image-2022-01-31-09-06-27-762.png|width=504,height=49! These're my {+}jvm args{+}: {code:java} -javaagent:newrelic/newrelic-agent-7.0.1.jar -Xmx2000m -Xms2000m -XX:+UseG1GC -XX:+AlwaysPreTouch{code} This is the config for the consumer: {code:java} [pool-2-thread-45] INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [my-bootstrap-servers] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-test-random-700675988 client.rack = connections.max.idle.ms = 54 default.api.timeout.ms = 6 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = consumer-test-random-700675988 group.instance.id = null heartbeat.interval.ms = 300 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 30 max.poll.records = 500 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 3 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 1 socket.connection.setup.timeout.max.ms = 3 socket.connection.setup.timeout.ms = 1 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer {code} This is the New Relic graph about non-heap memory: !image-2022-02-15-14-21-38-467.png|width=728,height=252! And this is the last log before the app is killed is something like: {code:java} Uncaught exception in kafka-coordinator-heartbeat-thread | topic1: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:693) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) at
[jira] [Commented] (KAFKA-13623) Memory leak when multiple poll
[ https://issues.apache.org/jira/browse/KAFKA-13623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17484366#comment-17484366 ] David Mao commented on KAFKA-13623: --- Can you add *-XX:+HeapDumpOnOutOfMemoryError* to the consumer app java args and upload the heap dump? > Memory leak when multiple poll > -- > > Key: KAFKA-13623 > URL: https://issues.apache.org/jira/browse/KAFKA-13623 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.1, 2.8.1 >Reporter: Emanuel Velzi >Priority: Major > > Hi, I'm experiencing a kind of memory leak with this simple consumer. > Some info before the code: > - kafka-clients.version: I try with 2.4.1 and 2.8.1 > I only set these properties: > - bootstrap.servers: my-servers > - group.id: my-group-id > - auto.offset.reset: earliest > - enable.auto.commit: false > - heartbeat.interval.ms: 300 > My topic has NUM_PARTITIONS=48 partitions: > {code:java} > public class Test { > /* ... */ > public void start() { > for (int i = 0; i < NUM_PARTITIONS; i++) { > startOne(); > } > } > public void startOne() { > LOGGER.info("startOne"); > this.pool.submit(this::startConsumer; > } > public void startConsumer() { > var consumer = new KafkaConsumer<>(this.kafkaConfiguration, > this.stringDeserializer, this.stringDeserializer) > try { > consumer.subscribe(Collections.singletonList(this.topic)); > consumer.poll(Duration.ofSeconds(30)); > throw new RuntimeException("Some kind of error"); > } catch (Exception e) { > LOGGER.error("Error!"); > } finally { > consumer.close(); > } > scheduleNewConsumer(); > } > private void scheduleNewConsumer() { > scheduledExecutorService.schedule(() -> startOne(), > Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS); > } > } > {code} > > In summary, when I have some error processing a record, then I close de > consumer and retry, starting a new one. > In that moment the Direct Memory used by de java process starts to grow up > indefinitely, until the process is killed. > I test some other strategies. For example: > - no close the consumer, and reuse it with a seek(..) > - no close the consumer, and reuse it doing: consumer.unsubscribe(); and > consumer.subscribe(..); > In both cases the memory leak was slower, but it happened anyway. > Also I tried this: > {code:java} > public void startConsumer(Consumer consumer) { > /*always using the same consumer*/ > try { > consumer.subscribe(Collections.singletonList(this.topic)); > // NO POLL HERE: consumer.poll(Duration.ofSeconds(30)); > throw new RuntimeException("Some kind of error"); > } catch (Exception e) { > LOGGER.error("Error!"); > } finally { > consumer.unsubscribe(); > consumer.subscribe(Collections.singletonList(this.topic)); > } > scheduleNewConsumer(); > }{code} > > I mean, multiple times I'm subscribing and unsubscribing the consumer, > without poll anything. In those cases I don't experience the memory leak. So, > I imagine that the problem is the poll itself. > Someone can help me with this please? -- This message was sent by Atlassian Jira (v8.20.1#820001)