[ 
https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated KAFKA-17025:
----------------------------------
    Description: 
When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
nothing:

 
{code:java}
ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
Direct buffer memory .....
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) 
at org.apache.kafka.clients.producer.internals.Sender.run 
at java.Lang.Thread.run
{code}
 

 

I try to find what happens:

1. It seems that OutOfMemoryError as a Error is not captured when 
org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
Exception: 
{code:java}
@Override
public void run() {
    log.debug("Starting Kafka producer I/O thread.");

    // main loop, runs until close is called
    while (running) {
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    log.debug("Beginning shutdown of Kafka producer I/O thread, sending 
remaining records.");

    // okay we stopped accepting requests but there may still be
    // requests in the transaction manager, accumulator or waiting for 
acknowledgment,
    // wait until these are completed.
    while (!forceClose && ((this.accumulator.hasUndrained() || 
this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    // Abort the transaction if any commit or abort didn't go through the 
transaction manager's queue
    while (!forceClose && transactionManager != null && 
transactionManager.hasOngoingTransaction()) {
        if (!transactionManager.isCompleting()) {
            log.info("Aborting incomplete transaction due to shutdown");
            transactionManager.beginAbort();
        }
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    if (forceClose) {
        // We need to fail all the incomplete transactional requests and 
batches and wake up the threads waiting on
        // the futures.
        if (transactionManager != null) {
            log.debug("Aborting incomplete transactional requests due to forced 
shutdown");
            transactionManager.close();
        }
        log.debug("Aborting incomplete batches due to forced shutdown");
        this.accumulator.abortIncompleteBatches();
    }
    try {
        this.client.close();
    } catch (Exception e) {
        log.error("Failed to close network client", e);
    }

    log.debug("Shutdown of Kafka producer I/O thread has completed.");
}


{code}
 

2. Then KafkaThread catch uncaught exception and just log it:
{code:java}
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
    super(runnable, name);
    configureThread(name, daemon);
}

private void configureThread(final String name, boolean daemon) {
    setDaemon(daemon);
    setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in 
thread '{}':", name, e));
}{code}
 

To be honest, I don't understand why KafkaThread doing nothing but log it when 
an uncaught exception occurs? Why not exposing method to set 
setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can 
determine what to do with uncaught exception, no matter thrown it or just 
ignore it?

  was:
When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
nothing:

 
{code:java}
ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
Direct buffer memory ..... at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) at 
org.apache.kafka.clients.producer.internals.Sender.run at java. Lang.Thread.run
{code}
 

 

I try to find what happens:

1. It seems that OutOfMemoryError as a Error is not captured when 
org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
Exception: 
{code:java}
@Override
public void run() {
    log.debug("Starting Kafka producer I/O thread.");

    // main loop, runs until close is called
    while (running) {
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    log.debug("Beginning shutdown of Kafka producer I/O thread, sending 
remaining records.");

    // okay we stopped accepting requests but there may still be
    // requests in the transaction manager, accumulator or waiting for 
acknowledgment,
    // wait until these are completed.
    while (!forceClose && ((this.accumulator.hasUndrained() || 
this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    // Abort the transaction if any commit or abort didn't go through the 
transaction manager's queue
    while (!forceClose && transactionManager != null && 
transactionManager.hasOngoingTransaction()) {
        if (!transactionManager.isCompleting()) {
            log.info("Aborting incomplete transaction due to shutdown");
            transactionManager.beginAbort();
        }
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    if (forceClose) {
        // We need to fail all the incomplete transactional requests and 
batches and wake up the threads waiting on
        // the futures.
        if (transactionManager != null) {
            log.debug("Aborting incomplete transactional requests due to forced 
shutdown");
            transactionManager.close();
        }
        log.debug("Aborting incomplete batches due to forced shutdown");
        this.accumulator.abortIncompleteBatches();
    }
    try {
        this.client.close();
    } catch (Exception e) {
        log.error("Failed to close network client", e);
    }

    log.debug("Shutdown of Kafka producer I/O thread has completed.");
}


{code}
 

2. Then KafkaThread catch uncaught exception and just log it:
{code:java}
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
    super(runnable, name);
    configureThread(name, daemon);
}

private void configureThread(final String name, boolean daemon) {
    setDaemon(daemon);
    setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in 
thread '{}':", name, e));
}{code}
 

To be honest, I don't understand why KafkaThread doing nothing but log it when 
an uncaught exception occurs? Why not exposing method to set 
setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can 
determine what to do with uncaught exception, no matter thrown it or just 
ignore it?


> KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-17025
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17025
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 3.6.2
>            Reporter: Hongshun Wang
>            Priority: Major
>             Fix For: 3.6.3
>
>
> When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
> nothing:
>  
> {code:java}
> ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
> 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
> Direct buffer memory .....
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) 
> at org.apache.kafka.clients.producer.internals.Sender.run 
> at java.Lang.Thread.run
> {code}
>  
>  
> I try to find what happens:
> 1. It seems that OutOfMemoryError as a Error is not captured when 
> org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
> Exception: 
> {code:java}
> @Override
> public void run() {
>     log.debug("Starting Kafka producer I/O thread.");
>     // main loop, runs until close is called
>     while (running) {
>         try {
>             runOnce();
>         } catch (Exception e) {
>             log.error("Uncaught error in kafka producer I/O thread: ", e);
>         }
>     }
>     log.debug("Beginning shutdown of Kafka producer I/O thread, sending 
> remaining records.");
>     // okay we stopped accepting requests but there may still be
>     // requests in the transaction manager, accumulator or waiting for 
> acknowledgment,
>     // wait until these are completed.
>     while (!forceClose && ((this.accumulator.hasUndrained() || 
> this.client.inFlightRequestCount() > 0) || 
> hasPendingTransactionalRequests())) {
>         try {
>             runOnce();
>         } catch (Exception e) {
>             log.error("Uncaught error in kafka producer I/O thread: ", e);
>         }
>     }
>     // Abort the transaction if any commit or abort didn't go through the 
> transaction manager's queue
>     while (!forceClose && transactionManager != null && 
> transactionManager.hasOngoingTransaction()) {
>         if (!transactionManager.isCompleting()) {
>             log.info("Aborting incomplete transaction due to shutdown");
>             transactionManager.beginAbort();
>         }
>         try {
>             runOnce();
>         } catch (Exception e) {
>             log.error("Uncaught error in kafka producer I/O thread: ", e);
>         }
>     }
>     if (forceClose) {
>         // We need to fail all the incomplete transactional requests and 
> batches and wake up the threads waiting on
>         // the futures.
>         if (transactionManager != null) {
>             log.debug("Aborting incomplete transactional requests due to 
> forced shutdown");
>             transactionManager.close();
>         }
>         log.debug("Aborting incomplete batches due to forced shutdown");
>         this.accumulator.abortIncompleteBatches();
>     }
>     try {
>         this.client.close();
>     } catch (Exception e) {
>         log.error("Failed to close network client", e);
>     }
>     log.debug("Shutdown of Kafka producer I/O thread has completed.");
> }
> {code}
>  
> 2. Then KafkaThread catch uncaught exception and just log it:
> {code:java}
> public KafkaThread(final String name, Runnable runnable, boolean daemon) {
>     super(runnable, name);
>     configureThread(name, daemon);
> }
> private void configureThread(final String name, boolean daemon) {
>     setDaemon(daemon);
>     setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in 
> thread '{}':", name, e));
> }{code}
>  
> To be honest, I don't understand why KafkaThread doing nothing but log it 
> when an uncaught exception occurs? Why not exposing method to set 
> setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can 
> determine what to do with uncaught exception, no matter thrown it or just 
> ignore it?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to