[
https://issues.apache.org/jira/browse/FLINK-16482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
likang updated FLINK-16482:
---------------------------
Description:
*Background:*
Today I tried to detect my Flink job with a timing thread, and if
the job did not read the data for a long time, it automatically exited. But
when I detect the read timeout and call the cancel function of
FlinkKafkaConsumer, I find that a CloseException is thrown, and then Flink's
recovery mechanism considers that it exited abnormally and re-puller the task.
I checked the Cancel code of the FlinkKafkaConsumer code, and found
that in fact, the Cancel of KafkaFetcher was first called, then the Close () of
Handover was called, and then the shutdown () of KafkaConsumerThread was
called. Finally, the KafkaConsumerThread thread exited the while loop and
called once after detecting the running identifier. Handover's Close ().
There will be several problems here: 1. CloseException will be thrown
when Handover is called in Cancel of KafkaFetcher, here need to remove the call
of handover.close () 2. The thread in KafkaConsumerThread exits because of
running = false After the loop, you need to determine whether to exit normally.
You should not call handover.close () for normal exit, otherwise you will also
throw a CloseException.
was:Flink-kafka-Consumer
在用户程序调用cancel函数主动结束任务时,KafkaConsumerThread会抛出CloseException的异常,导致任务重启而不是正常退出
> Flink Job throw CloseException when call the FlinkKafkaConsumer cancel
> function
> -------------------------------------------------------------------------------
>
> Key: FLINK-16482
> URL: https://issues.apache.org/jira/browse/FLINK-16482
> Project: Flink
> Issue Type: Bug
> Reporter: likang
> Priority: Critical
>
> *Background:*
> Today I tried to detect my Flink job with a timing thread, and if
> the job did not read the data for a long time, it automatically exited. But
> when I detect the read timeout and call the cancel function of
> FlinkKafkaConsumer, I find that a CloseException is thrown, and then Flink's
> recovery mechanism considers that it exited abnormally and re-puller the task.
>
> I checked the Cancel code of the FlinkKafkaConsumer code, and
> found that in fact, the Cancel of KafkaFetcher was first called, then the
> Close () of Handover was called, and then the shutdown () of
> KafkaConsumerThread was called. Finally, the KafkaConsumerThread thread
> exited the while loop and called once after detecting the running identifier.
> Handover's Close ().
> There will be several problems here: 1. CloseException will be thrown
> when Handover is called in Cancel of KafkaFetcher, here need to remove the
> call of handover.close () 2. The thread in KafkaConsumerThread exits because
> of running = false After the loop, you need to determine whether to exit
> normally. You should not call handover.close () for normal exit, otherwise
> you will also throw a CloseException.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)