[ 
https://issues.apache.org/jira/browse/KAFKA-4767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15894818#comment-15894818
 ] 

Colin P. McCabe commented on KAFKA-4767:
----------------------------------------

I agree that the interrupt state of the thread should be preserved.  In other 
words, we should invoke {{Thread.currentThread().interrupt()}} in the catch 
block for {{InterruptedException}}.  However, the current behavior of aborting 
the thread join when an {{InterruptedException}} is received seems correct to 
me.

bq. I understand that it will eventually go away. But that does not cut it for 
us. We would like the IO thread to be shutdown after close returns. And that 
does not happen if we get an interrupt during close().

Fundamentally, if you interrupt an operation, you are not waiting for it to 
complete.  That's what interruption means.

> KafkaProducer is not joining its IO thread properly
> ---------------------------------------------------
>
>                 Key: KAFKA-4767
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4767
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.10.2.0
>            Reporter: Buğra Gedik
>            Assignee: huxi
>            Priority: Minor
>
> The {{KafkaProducer}} is not properly joining the thread it creates. The code 
> is like this:
> {code}
> try {
>     this.ioThread.join(timeUnit.toMillis(timeout));
> } catch (InterruptedException t) {
>     firstException.compareAndSet(null, t);
>     log.error("Interrupted while joining ioThread", t);
> }
> {code}
> If the code is interrupted while performing the join, it will end up leaving 
> the io thread running. The correct way of handling this is a follows:
> {code}
> try {
>     this.ioThread.join(timeUnit.toMillis(timeout));
> } catch (InterruptedException t) {
>     // propagate the interrupt
>     this.ioThread.interrupt();
>     try { 
>          this.ioThread.join();
>     } catch (InterruptedException t) {
>         firstException.compareAndSet(null, t);
>         log.error("Interrupted while joining ioThread", t);
>     } finally {
>         // make sure we maintain the interrupted status
>         Thread.currentThread.interrupt();
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to