[ https://issues.apache.org/jira/browse/KAFKA-13623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Emanuel Velzi updated KAFKA-13623: ---------------------------------- Description: Hi, I'm experiencing a kind of memory leak with this simple consumer. Some info before the code: - kafka-clients.version: I try with 2.4.1 and 2.8.1 I only set these properties: - bootstrap.servers: my-servers - group.id: my-group-id - auto.offset.reset: earliest - enable.auto.commit: false - heartbeat.interval.ms: 300 My topic has NUM_PARTITIONS=48 partitions: {code:java} public class Test { /* ... */ public void start() { for (int i = 0; i < NUM_PARTITIONS; i++) { startOne(); } } public void startOne() { LOGGER.info("startOne"); this.pool.submit(this::startConsumer; } public void startConsumer() { var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer) try { consumer.subscribe(Collections.singletonList(this.topic)); consumer.poll(Duration.ofSeconds(30)); throw new RuntimeException("Some kind of error"); } catch (Exception e) { LOGGER.error("Error!"); } finally { consumer.close(); } scheduleNewConsumer(); } private void scheduleNewConsumer() { scheduledExecutorService.schedule(() -> startOne(), Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS); } } {code} In summary, when I have some error processing a record, then I close de consumer and retry, starting a new one. In that moment the Direct Memory used by de java process starts to grow up indefinitely, until the process is killed. I test some other strategies. For example: - no close the consumer, and reuse it with a seek(..) - no close the consumer, and reuse it doing: consumer.unsubscribe(); and consumer.subscribe(..); In both cases the memory leak was slower, but it happened anyway. Also I tried this: {code:java} public void startConsumer(Consumer consumer) { /*always using the same consumer*/ try { consumer.subscribe(Collections.singletonList(this.topic)); // NO POLL HERE: consumer.poll(Duration.ofSeconds(30)); throw new RuntimeException("Some kind of error"); } catch (Exception e) { LOGGER.error("Error!"); } finally { consumer.unsubscribe(); consumer.subscribe(Collections.singletonList(this.topic)); } scheduleNewConsumer(); }{code} I mean, multiple times I'm subscribing and unsubscribing the consumer, without poll anything. In those cases I don't experience the memory leak. So, I imagine that the problem is the poll itself. Someone can help me with this please? was: Hi, I'm experiencing a kind of memory leak with this simple consumer. Some info before the code: - kafka-clients.version: I try with 2.4.1 and 2.8.1 I only set these properties: - bootstrap.servers: my-servers - group.id: my-group-id - auto.offset.reset: earliest - enable.auto.commit: false - heartbeat.interval.ms: 300 My topic has NUM_PARTITIONS=48 partitions: {code:java} public class Test { /* ... */ public void start() { for (int i = 0; i < NUM_PARTITIONS; i++) { startOne(); } } public void startOne() { LOGGER.info("startOne"); this.pool.submit(this::startConsumer; } public void startConsumer() { var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer) try { consumer.subscribe(Collections.singletonList(this.topic)); consumer.poll(Duration.ofSeconds(30)); throw new RuntimeException("Some kind of error"); } catch (Exception e) { LOGGER.error("Error!"); } finally { consumer.close(); } scheduleNewConsumer(); } private void scheduleNewConsumer() { scheduledExecutorService.schedule(() -> startOne(), Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS); } } {code} In summary, when I have some error processing a record, then I close de consumer and retry, starting a new one. In that moment the Direct Memory used by de java process starts to grow up indefinitely, until the process is killed. I test some other strategies. For example: - no close the consumer, and reuse it with a seek(..) - no close the consumer, and reuse it doing: consumer.unsubscribe(); and consumer.subscribe(..); In both cases the memory leak was slower, but it happened anyway. Also I tried this: {code:java} public void startConsumer(Consumer consumer) { /*always using the same consumer*/ try { consumer.subscribe(Collections.singletonList(this.topic)); // NO POLL HERE: consumer.poll(Duration.ofSeconds(30)); throw new RuntimeException("Some kind of error"); } catch (Exception e) { LOGGER.error("Error!"); } finally { consumer.unsubscribe(); consumer.subscribe(Collections.singletonList(this.topic)); } scheduleNewConsumer(); }{code} I mean, multiple times I'm subscribing and unsubscribing the consumer, without poll anything. In those cases I don't experience the memory leak. So, I imagine that the problem is the poll itself. Someone can help me with this please? > Memory leak when multiple poll > ------------------------------ > > Key: KAFKA-13623 > URL: https://issues.apache.org/jira/browse/KAFKA-13623 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 2.4.1, 2.8.1 > Reporter: Emanuel Velzi > Priority: Major > > Hi, I'm experiencing a kind of memory leak with this simple consumer. > Some info before the code: > - kafka-clients.version: I try with 2.4.1 and 2.8.1 > I only set these properties: > - bootstrap.servers: my-servers > - group.id: my-group-id > - auto.offset.reset: earliest > - enable.auto.commit: false > - heartbeat.interval.ms: 300 > My topic has NUM_PARTITIONS=48 partitions: > {code:java} > public class Test { > /* ... */ > public void start() { > for (int i = 0; i < NUM_PARTITIONS; i++) { > startOne(); > } > } > public void startOne() { > LOGGER.info("startOne"); > this.pool.submit(this::startConsumer; > } > public void startConsumer() { > var consumer = new KafkaConsumer<>(this.kafkaConfiguration, > this.stringDeserializer, this.stringDeserializer) > try { > consumer.subscribe(Collections.singletonList(this.topic)); > consumer.poll(Duration.ofSeconds(30)); > throw new RuntimeException("Some kind of error"); > } catch (Exception e) { > LOGGER.error("Error!"); > } finally { > consumer.close(); > } > scheduleNewConsumer(); > } > private void scheduleNewConsumer() { > scheduledExecutorService.schedule(() -> startOne(), > Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS); > } > } > {code} > > In summary, when I have some error processing a record, then I close de > consumer and retry, starting a new one. > In that moment the Direct Memory used by de java process starts to grow up > indefinitely, until the process is killed. > I test some other strategies. For example: > - no close the consumer, and reuse it with a seek(..) > - no close the consumer, and reuse it doing: consumer.unsubscribe(); and > consumer.subscribe(..); > In both cases the memory leak was slower, but it happened anyway. > Also I tried this: > {code:java} > public void startConsumer(Consumer consumer) { > /*always using the same consumer*/ > try { > consumer.subscribe(Collections.singletonList(this.topic)); > // NO POLL HERE: consumer.poll(Duration.ofSeconds(30)); > throw new RuntimeException("Some kind of error"); > } catch (Exception e) { > LOGGER.error("Error!"); > } finally { > consumer.unsubscribe(); > consumer.subscribe(Collections.singletonList(this.topic)); > } > scheduleNewConsumer(); > }{code} > > I mean, multiple times I'm subscribing and unsubscribing the consumer, > without poll anything. In those cases I don't experience the memory leak. So, > I imagine that the problem is the poll itself. > Someone can help me with this please? -- This message was sent by Atlassian Jira (v8.20.1#820001)