Hongshun Wang created KAFKA-17025: ------------------------------------- Summary: KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler Key: KAFKA-17025 URL: https://issues.apache.org/jira/browse/KAFKA-17025 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 3.6.2 Reporter: Hongshun Wang Fix For: 3.6.3
When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do noning: ```java ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: Direct buffer memory ``` I try to find what happens: 1. It seems that OutOfMemoryError as a Error is not captured when org.apache.kafka.clients.producer.internals.Sender#run try to catch a Exception: ``` @Override public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the transaction manager, accumulator or waiting for acknowledgment, // wait until these are completed. while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } // Abort the transaction if any commit or abort didn't go through the transaction manager's queue while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) { if (!transactionManager.isCompleting()) { log.info("Aborting incomplete transaction due to shutdown"); transactionManager.beginAbort(); } try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } if (forceClose) { // We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on // the futures. if (transactionManager != null) { log.debug("Aborting incomplete transactional requests due to forced shutdown"); transactionManager.close(); } log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { this.client.close(); } catch (Exception e) { log.error("Failed to close network client", e); } log.debug("Shutdown of Kafka producer I/O thread has completed."); } ``` 2. Then KafkaThread catch uncaught exception and just log it: ```java public KafkaThread(final String name, Runnable runnable, boolean daemon) { super(runnable, name); configureThread(name, daemon); } private void configureThread(final String name, boolean daemon) { setDaemon(daemon); setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread '{}':", name, e)); } ``` To be honest, I don't understand why KafkaThread doing nothing but log it when an uncaught exception occurs? Why not exposing method to set setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can determine what to do with uncaught exception, no matter thrown it or just ignore it? -- This message was sent by Atlassian Jira (v8.20.10#820010)