[
https://issues.apache.org/jira/browse/FLINK-16482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
likang updated FLINK-16482:
---------------------------
Description:
*Background:*
Today I tried to detect my Flink job with a timing thread, and if
the job did not read the data for a long time, it automatically exited. But
when I detect the read timeout and call the cancel function of
FlinkKafkaConsumer, I find that a CloseException is thrown, and then Flink's
recovery mechanism considers that it exited abnormally and re-puller the task.
*Bug*:
I checked the Cancel code of the FlinkKafkaConsumer code, and found
that in fact, the Cancel of KafkaFetcher was first called, then the Close () of
Handover was called, and then the shutdown () of KafkaConsumerThread was
called. Finally, the KafkaConsumerThread thread exited the while loop and
called once after detecting the running identifier. Handover's Close ().
There will be several problems here: 1. CloseException will be thrown when
Handover is called in Cancel of KafkaFetcher, here need to remove the call of
handover.close () 2. The thread in KafkaConsumerThread exits because of running
= false After the loop, you need to determine whether to exit normally. You
should not call handover.close () for normal exit, otherwise you will also
throw a CloseException.
Final the details and solutions are in the attachment
was:
*Background:*
Today I tried to detect my Flink job with a timing thread, and if
the job did not read the data for a long time, it automatically exited. But
when I detect the read timeout and call the cancel function of
FlinkKafkaConsumer, I find that a CloseException is thrown, and then Flink's
recovery mechanism considers that it exited abnormally and re-puller the task.
*Bug*:
I checked the Cancel code of the FlinkKafkaConsumer code, and found
that in fact, the Cancel of KafkaFetcher was first called, then the Close () of
Handover was called, and then the shutdown () of KafkaConsumerThread was
called. Finally, the KafkaConsumerThread thread exited the while loop and
called once after detecting the running identifier. Handover's Close ().
There will be several problems here: 1. CloseException will be thrown
when Handover is called in Cancel of KafkaFetcher, here need to remove the call
of handover.close () 2. The thread in KafkaConsumerThread exits because of
running = false After the loop, you need to determine whether to exit normally.
You should not call handover.close () for normal exit, otherwise you will also
throw a CloseException.
> Flink Job throw CloseException when call the FlinkKafkaConsumer cancel
> function
> -------------------------------------------------------------------------------
>
> Key: FLINK-16482
> URL: https://issues.apache.org/jira/browse/FLINK-16482
> Project: Flink
> Issue Type: Bug
> Reporter: likang
> Priority: Critical
> Attachments: The bug and solution.docx
>
>
> *Background:*
> Today I tried to detect my Flink job with a timing thread, and if
> the job did not read the data for a long time, it automatically exited. But
> when I detect the read timeout and call the cancel function of
> FlinkKafkaConsumer, I find that a CloseException is thrown, and then Flink's
> recovery mechanism considers that it exited abnormally and re-puller the task.
> *Bug*:
> I checked the Cancel code of the FlinkKafkaConsumer code, and found
> that in fact, the Cancel of KafkaFetcher was first called, then the Close ()
> of Handover was called, and then the shutdown () of KafkaConsumerThread was
> called. Finally, the KafkaConsumerThread thread exited the while loop and
> called once after detecting the running identifier. Handover's Close ().
> There will be several problems here: 1. CloseException will be thrown
> when Handover is called in Cancel of KafkaFetcher, here need to remove the
> call of handover.close () 2. The thread in KafkaConsumerThread exits because
> of running = false After the loop, you need to determine whether to exit
> normally. You should not call handover.close () for normal exit, otherwise
> you will also throw a CloseException.
> Final the details and solutions are in the attachment
--
This message was sent by Atlassian Jira
(v8.3.4#803005)