Hongshun Wang created KAFKA-17025:
-------------------------------------

             Summary: KafkaThread and KafkaProducer expose method to set 
setUncaughtExceptionHandler
                 Key: KAFKA-17025
                 URL: https://issues.apache.org/jira/browse/KAFKA-17025
             Project: Kafka
          Issue Type: Improvement
          Components: clients
    Affects Versions: 3.6.2
            Reporter: Hongshun Wang
             Fix For: 3.6.3


When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
noning:

```java

ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
Direct buffer memory

``` 

 

I try to find what happens:

1. It seems that OutOfMemoryError as a Error is not captured when 
org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
Exception: 

```
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");

// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining 
records.");

// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for 
acknowledgment,
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() ||    
this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

// Abort the transaction if any commit or abort didn't go through the 
transaction manager's queue
while (!forceClose && transactionManager != null && 
transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

if (forceClose) {
// We need to fail all the incomplete transactional requests and batches and 
wake up the threads waiting on
// the futures.
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}

log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
```

2. Then KafkaThread catch uncaught exception and just log it:

```java
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
super(runnable, name);
configureThread(name, daemon);
}

private void configureThread(final String name, boolean daemon) {
setDaemon(daemon);
setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread 
'{}':", name, e));
}
```

 

To be honest, I don't understand why KafkaThread doing nothing but log it when 
an uncaught exception occurs? Why not exposing method to set 
setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can 
determine what to do with uncaught exception, no matter thrown it or just 
ignore it?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to