[ 
https://issues.apache.org/jira/browse/KAFKA-9998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17111249#comment-17111249
 ] 

Chia-Ping Tsai commented on KAFKA-9998:
---------------------------------------

It seems the blocking is caused by your custom callback? If we don't join the 
thread before completing close(), it causes resource leak.

> KafkaProducer.close(timeout) still may block indefinitely
> ---------------------------------------------------------
>
>                 Key: KAFKA-9998
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9998
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.4.1
>            Reporter: radai rosenblatt
>            Priority: Major
>
> looking at KafkaProducer.close(timeout), we have this:
> {code:java}
> private void close(Duration timeout, boolean swallowException) {
>     long timeoutMs = timeout.toMillis();
>     if (timeoutMs < 0)
>         throw new IllegalArgumentException("The timeout cannot be negative.");
>     log.info("Closing the Kafka producer with timeoutMillis = {} ms.", 
> timeoutMs);
>     // this will keep track of the first encountered exception
>     AtomicReference<Throwable> firstException = new AtomicReference<>();
>     boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
>     if (timeoutMs > 0) {
>         if (invokedFromCallback) {
>             log.warn("Overriding close timeout {} ms to 0 ms in order to 
> prevent useless blocking due to self-join. " +
>                     "This means you have incorrectly invoked close with a 
> non-zero timeout from the producer call-back.",
>                     timeoutMs);
>         } else {
>             // Try to close gracefully.
>             if (this.sender != null)
>                 this.sender.initiateClose();
>             if (this.ioThread != null) {
>                 try {
>                     this.ioThread.join(timeoutMs);    <---- GRACEFUL JOIN
>                 } catch (InterruptedException t) {
>                     firstException.compareAndSet(null, new 
> InterruptException(t));
>                     log.error("Interrupted while joining ioThread", t);
>                 }
>             }
>         }
>     }
>     if (this.sender != null && this.ioThread != null && 
> this.ioThread.isAlive()) {
>         log.info("Proceeding to force close the producer since pending 
> requests could not be completed " +
>                 "within timeout {} ms.", timeoutMs);
>         this.sender.forceClose();
>         // Only join the sender thread when not calling from callback.
>         if (!invokedFromCallback) {
>             try {
>                 this.ioThread.join();   <----- UNBOUNDED JOIN
>             } catch (InterruptedException e) {
>                 firstException.compareAndSet(null, new InterruptException(e));
>             }
>         }
>     }
> ...
> }
> {code}
> specifically in our case the ioThread was running a (very) long running 
> user-provided callback which was preventing the producer from closing within 
> the given timeout.
>  
> I think the 2nd join() call should either be _VERY_ short (since we're 
> already past the timeout at that stage) ir should not happen at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to