[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974207#comment-15974207 ]
mingleizhang commented on FLINK-6311: ------------------------------------- [~tzulitai] Hi, here is the sample code I put here, please check it out and how do you think about it ? BTW, Should we put more error messages instead of just "mainThread is null" ? If it is really, what kinda message should we put it here? Thanks ~ :D {code} public void shutdownFetcher() { running = false; checkNotNull(mainThread, "mainThread is null."); mainThread.interrupt(); // the main thread may be sleeping for the discovery interval if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); } checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is null."); shardConsumersExecutor.shutdownNow(); } {code} > NPE in FlinkKinesisConsumer if source was closed before run > ----------------------------------------------------------- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)