[ https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Greg Harris updated KAFKA-17025: -------------------------------- Fix Version/s: (was: 3.6.3) > KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler > ------------------------------------------------------------------------------ > > Key: KAFKA-17025 > URL: https://issues.apache.org/jira/browse/KAFKA-17025 > Project: Kafka > Issue Type: Improvement > Components: clients > Affects Versions: 3.6.2 > Reporter: Hongshun Wang > Priority: Major > > When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do > nothing: > > {code:java} > ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread > 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: > Direct buffer memory ..... > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) > at org.apache.kafka.clients.producer.internals.Sender.run > at java.Lang.Thread.run > {code} > > > I try to find what happens: > 1. It seems that OutOfMemoryError as a Error is not captured when > org.apache.kafka.clients.producer.internals.Sender#run try to catch a > Exception: > {code:java} > @Override > public void run() { > log.debug("Starting Kafka producer I/O thread."); > // main loop, runs until close is called > while (running) { > try { > runOnce(); > } catch (Exception e) { > log.error("Uncaught error in kafka producer I/O thread: ", e); > } > } > log.debug("Beginning shutdown of Kafka producer I/O thread, sending > remaining records."); > // okay we stopped accepting requests but there may still be > // requests in the transaction manager, accumulator or waiting for > acknowledgment, > // wait until these are completed. > while (!forceClose && ((this.accumulator.hasUndrained() || > this.client.inFlightRequestCount() > 0) || > hasPendingTransactionalRequests())) { > try { > runOnce(); > } catch (Exception e) { > log.error("Uncaught error in kafka producer I/O thread: ", e); > } > } > // Abort the transaction if any commit or abort didn't go through the > transaction manager's queue > while (!forceClose && transactionManager != null && > transactionManager.hasOngoingTransaction()) { > if (!transactionManager.isCompleting()) { > log.info("Aborting incomplete transaction due to shutdown"); > transactionManager.beginAbort(); > } > try { > runOnce(); > } catch (Exception e) { > log.error("Uncaught error in kafka producer I/O thread: ", e); > } > } > if (forceClose) { > // We need to fail all the incomplete transactional requests and > batches and wake up the threads waiting on > // the futures. > if (transactionManager != null) { > log.debug("Aborting incomplete transactional requests due to > forced shutdown"); > transactionManager.close(); > } > log.debug("Aborting incomplete batches due to forced shutdown"); > this.accumulator.abortIncompleteBatches(); > } > try { > this.client.close(); > } catch (Exception e) { > log.error("Failed to close network client", e); > } > log.debug("Shutdown of Kafka producer I/O thread has completed."); > } > {code} > > 2. Then KafkaThread catch uncaught exception and just log it: > {code:java} > public KafkaThread(final String name, Runnable runnable, boolean daemon) { > super(runnable, name); > configureThread(name, daemon); > } > private void configureThread(final String name, boolean daemon) { > setDaemon(daemon); > setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in > thread '{}':", name, e)); > }{code} > > To be honest, I don't understand why KafkaThread doing nothing but log it > when an uncaught exception occurs? Why not exposing method to set > setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can > determine what to do with uncaught exception, no matter thrown it or just > ignore it? -- This message was sent by Atlassian Jira (v8.20.10#820010)