[ 
https://issues.apache.org/jira/browse/FLINK-10721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhaoshijie updated FLINK-10721:
-------------------------------
    Description: 
In FlinkKafkaConsumerBase run method on line 721(master branch), if 
kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw 
exception then finally execute cancel method, cancel method will execute 
kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute 
handover.close, then result in handover.pollNext throw ClosedException),then 
next code will not execute,especially discoveryLoopError not be throwed,so, 
real culprit exception will be Swallowed.
failed log like this:

{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

  at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229)
  at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
  at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
  at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
  at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: 
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
  at 
org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180)
  at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174)
  at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753)
  at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695)
  at java.lang.Thread.run(Thread.java:745)
{code}

Shoud we modify it as follows?
{code:java}
try {
                                kafkaFetcher.runFetchLoop();
                        } catch (Exception e) {
                                // if discoveryLoopErrorRef not null ,we should 
throw real culprit exception
                                if (discoveryLoopErrorRef.get() != null){
                                        throw new 
RuntimeException(discoveryLoopErrorRef.get());
                                } else {
                                        throw e;
                                }
                        }
{code}


  was:
In FlinkKafkaConsumerBase run method on line 721(master branch), if 
kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw 
exception then finally execute cancel method, cancel method will execute 
kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute 
handover.close, then result in handover.pollNext throw ClosedException),then 
next code will not execute,especially discoveryLoopError not be throwed,so, 
real culprit exception will be Swallowed.
failed log like this:

{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

  at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229)
  at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
  at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
  at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
  at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: 
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
  at 
org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180)
  at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174)
  at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753)
  at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695)
  at java.lang.Thread.run(Thread.java:745)
{code}

Shoud we modify it as follows?
{code:java}
try {
                                kafkaFetcher.runFetchLoop();
                        } catch (Exception e) {
                                // if discoveryLoopErrorRef not null ,we should 
throw real culprit exception
                                if (discoveryLoopErrorRef.get() != null){
                                        throw new 
RuntimeException(discoveryLoopErrorRef.get());
                                } else {
                                        throw e;
                                }
                        }
{code:java}



> kafkaFetcher runFetchLoop throw exception will cause follow-up code not 
> execute in FlinkKafkaConsumerBase run method 
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-10721
>                 URL: https://issues.apache.org/jira/browse/FLINK-10721
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.6.2
>            Reporter: zhaoshijie
>            Priority: Major
>
> In FlinkKafkaConsumerBase run method on line 721(master branch), if 
> kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw 
> exception then finally execute cancel method, cancel method will execute 
> kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute 
> handover.close, then result in handover.pollNext throw ClosedException),then 
> next code will not execute,especially discoveryLoopError not be throwed,so, 
> real culprit exception will be Swallowed.
> failed log like this:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Shoud we modify it as follows?
> {code:java}
> try {
>                               kafkaFetcher.runFetchLoop();
>                       } catch (Exception e) {
>                               // if discoveryLoopErrorRef not null ,we should 
> throw real culprit exception
>                               if (discoveryLoopErrorRef.get() != null){
>                                       throw new 
> RuntimeException(discoveryLoopErrorRef.get());
>                               } else {
>                                       throw e;
>                               }
>                       }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to