[ 
https://issues.apache.org/jira/browse/KAFKA-4767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15870055#comment-15870055
 ] 

huxi commented on KAFKA-4767:
-----------------------------

What do you mean by "leaking the IO thread"? Do you mean it could not be shut 
down successfully after interrupting the user thread in which 
KafkaProducer.close was invoked?  This should be not gonna happen since 
this.sender.initiateClose() would always be run even when you interrupt the 
user thread. 

In my opinion, interrupting the user thread is no different from invoking 
ioThread.join with a relatively small timeout because there is still a chance 
to force close the IO thread and wait it again. That's also why we swallow 
InterruptedException during the first join. 

Does it look good to you though? And for sake of the curiosity, did you 
encounter any cases where IO thread got failed to be shut down?

> KafkaProducer is not joining its IO thread properly
> ---------------------------------------------------
>
>                 Key: KAFKA-4767
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4767
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.11.0.0
>            Reporter: Buğra Gedik
>            Priority: Minor
>
> The {{KafkaProducer}} is not properly joining the thread it creates. The code 
> is like this:
> {code}
> try {
>     this.ioThread.join(timeUnit.toMillis(timeout));
> } catch (InterruptedException t) {
>     firstException.compareAndSet(null, t);
>     log.error("Interrupted while joining ioThread", t);
> }
> {code}
> If the code is interrupted while performing the join, it will end up leaving 
> the io thread running. The correct way of handling this is a follows:
> {code}
> try {
>     this.ioThread.join(timeUnit.toMillis(timeout));
> } catch (InterruptedException t) {
>     // propagate the interrupt
>     this.ioThread.interrupt();
>     try { 
>          this.ioThread.join();
>     } catch (InterruptedException t) {
>         firstException.compareAndSet(null, t);
>         log.error("Interrupted while joining ioThread", t);
>     } finally {
>         // make sure we maintain the interrupted status
>         Thread.currentThread.interrupt();
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to