[ 
https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated KAFKA-17025:
----------------------------------
    Summary: KAFKA-17025: Producer throws uncaught exception in the io thread.  
(was: KafkaThread and KafkaProducer expose method to set 
setUncaughtExceptionHandler)

> KAFKA-17025: Producer throws uncaught exception in the io thread.
> -----------------------------------------------------------------
>
>                 Key: KAFKA-17025
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17025
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 3.6.2
>            Reporter: Hongshun Wang
>            Assignee: Hongshun Wang
>            Priority: Major
>
> When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
> nothing:
>  
> {code:java}
> ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
> 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
> Direct buffer memory .....
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) 
> at org.apache.kafka.clients.producer.internals.Sender.run 
> at java.Lang.Thread.run
> {code}
>  
>  
> I try to find what happens:
> 1. It seems that OutOfMemoryError as a Error is not captured when 
> org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
> Exception: 
> {code:java}
> @Override
> public void run() {
>     log.debug("Starting Kafka producer I/O thread.");
>     // main loop, runs until close is called
>     while (running) {
>         try {
>             runOnce();
>         } catch (Exception e) {
>             log.error("Uncaught error in kafka producer I/O thread: ", e);
>         }
>     }
>     log.debug("Beginning shutdown of Kafka producer I/O thread, sending 
> remaining records.");
>     // okay we stopped accepting requests but there may still be
>     // requests in the transaction manager, accumulator or waiting for 
> acknowledgment,
>     // wait until these are completed.
>     while (!forceClose && ((this.accumulator.hasUndrained() || 
> this.client.inFlightRequestCount() > 0) || 
> hasPendingTransactionalRequests())) {
>         try {
>             runOnce();
>         } catch (Exception e) {
>             log.error("Uncaught error in kafka producer I/O thread: ", e);
>         }
>     }
>     // Abort the transaction if any commit or abort didn't go through the 
> transaction manager's queue
>     while (!forceClose && transactionManager != null && 
> transactionManager.hasOngoingTransaction()) {
>         if (!transactionManager.isCompleting()) {
>             log.info("Aborting incomplete transaction due to shutdown");
>             transactionManager.beginAbort();
>         }
>         try {
>             runOnce();
>         } catch (Exception e) {
>             log.error("Uncaught error in kafka producer I/O thread: ", e);
>         }
>     }
>     if (forceClose) {
>         // We need to fail all the incomplete transactional requests and 
> batches and wake up the threads waiting on
>         // the futures.
>         if (transactionManager != null) {
>             log.debug("Aborting incomplete transactional requests due to 
> forced shutdown");
>             transactionManager.close();
>         }
>         log.debug("Aborting incomplete batches due to forced shutdown");
>         this.accumulator.abortIncompleteBatches();
>     }
>     try {
>         this.client.close();
>     } catch (Exception e) {
>         log.error("Failed to close network client", e);
>     }
>     log.debug("Shutdown of Kafka producer I/O thread has completed.");
> }
> {code}
>  
> 2. Then KafkaThread catch uncaught exception and just log it:
> {code:java}
> public KafkaThread(final String name, Runnable runnable, boolean daemon) {
>     super(runnable, name);
>     configureThread(name, daemon);
> }
> private void configureThread(final String name, boolean daemon) {
>     setDaemon(daemon);
>     setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in 
> thread '{}':", name, e));
> }{code}
>  
> To be honest, I don't understand why KafkaThread doing nothing but log it 
> when an uncaught exception occurs? Why not exposing method to set 
> setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can 
> determine what to do with uncaught exception, no matter thrown it or just 
> ignore it?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to