[
https://issues.apache.org/jira/browse/KAFKA-4767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15870055#comment-15870055
]
huxi commented on KAFKA-4767:
-----------------------------
What do you mean by "leaking the IO thread"? Do you mean it could not be shut
down successfully after interrupting the user thread in which
KafkaProducer.close was invoked? This should be not gonna happen since
this.sender.initiateClose() would always be run even when you interrupt the
user thread.
In my opinion, interrupting the user thread is no different from invoking
ioThread.join with a relatively small timeout because there is still a chance
to force close the IO thread and wait it again. That's also why we swallow
InterruptedException during the first join.
Does it look good to you though? And for sake of the curiosity, did you
encounter any cases where IO thread got failed to be shut down?
> KafkaProducer is not joining its IO thread properly
> ---------------------------------------------------
>
> Key: KAFKA-4767
> URL: https://issues.apache.org/jira/browse/KAFKA-4767
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 0.11.0.0
> Reporter: Buğra Gedik
> Priority: Minor
>
> The {{KafkaProducer}} is not properly joining the thread it creates. The code
> is like this:
> {code}
> try {
> this.ioThread.join(timeUnit.toMillis(timeout));
> } catch (InterruptedException t) {
> firstException.compareAndSet(null, t);
> log.error("Interrupted while joining ioThread", t);
> }
> {code}
> If the code is interrupted while performing the join, it will end up leaving
> the io thread running. The correct way of handling this is a follows:
> {code}
> try {
> this.ioThread.join(timeUnit.toMillis(timeout));
> } catch (InterruptedException t) {
> // propagate the interrupt
> this.ioThread.interrupt();
> try {
> this.ioThread.join();
> } catch (InterruptedException t) {
> firstException.compareAndSet(null, t);
> log.error("Interrupted while joining ioThread", t);
> } finally {
> // make sure we maintain the interrupted status
> Thread.currentThread.interrupt();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)