Re: Change Akka Ask Timeout for Job Submission Only
There are 3 communication layers involved here: 1) client <=> server (REST API) This goes through REST and does not use timeouts AFAIK. We wait until a response comes or the connection terminates. 2) server (REST API) <=> processes (JM, Dispatcher) This goes through akka, with "web.timeout" being used for the timeout. 3) processes <=> processes Also akka, with "akka.ask.timeout" being used. The timeout in question occurs on layer 2) due to the JM being incredibly busy, possible due to some heavy-weight computation in the job setup. In any case, you can try increasing web.timeout to maybe resolve this issue. On 20/12/2019 06:13, tison wrote: Forward to user list. Best, tison. Abdul Qadeer mailto:quadeer@gmail.com>> 于2019年12月20日周五 下午12:57写道: Around submission time, logs from jobmanager: {"timeMillis":1576764854245,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Received JobGraph submission 714829e8f6c8cd0daaed335c1b8c588a (sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M {"timeMillis":1576764854247,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Submitting job 714829e8f6c8cd0daaed335c1b8c588a (sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M {"timeMillis":1576764856119,"thread":"flink-akka.actor.default-dispatcher-1036","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from Actor[akka://flink/deadLetters] to Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [87] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1150,"threadPriority":5}^M {"timeMillis":1576764877732,"thread":"flink-akka.actor.default-dispatcher-1039","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from Actor[akka://flink/deadLetters] to Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [88] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1155,"threadPriority":5}^M {"timeMillis":1576764877732,"thread":"flink-scheduler-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.job.JobSubmitHandler","message":"Unhandled exception.","thrown":{"commonElementCount":0,"localizedMessage":"Ask timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [1 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","message":"Ask timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [1 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","name":"akka.pattern.AskTimeoutException","extendedStackTrace":[{"class":"akka.pattern.PromiseActorRef$$anonfun$1","method":"apply$mcV$sp","file":"AskSupport.scala","line":604,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.Scheduler$$anon$4","method":"run","file":"Scheduler.scala","line":126,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"unbatchedExecute","file":"Future.scala","line":601,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.BatchingExecutor$class","method":"execute","file":"BatchingExecutor.scala","line":109,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"execute","file":"Future.scala","line":599,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$TaskHolder","method":"executeTask","file":"LightArrayRevolverScheduler.scala","line":329,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"executeBucket$1","file":"LightArrayRevolverScheduler.scala","line":280,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"nextTick","file":"LightArrayRevolverScheduler.scala","line":284,"exact":true,
Re: Change Akka Ask Timeout for Job Submission Only
Forward to user list. Best, tison. Abdul Qadeer 于2019年12月20日周五 下午12:57写道: > Around submission time, logs from jobmanager: > > {"timeMillis":1576764854245,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Received > JobGraph submission 714829e8f6c8cd0daaed335c1b8c588a > (sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M > {"timeMillis":1576764854247,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Submitting > job 714829e8f6c8cd0daaed335c1b8c588a > (sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M > {"timeMillis":1576764856119,"thread":"flink-akka.actor.default-dispatcher-1036","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message > [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from > Actor[akka://flink/deadLetters] to > Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [87] > dead letters encountered. This logging can be turned off or adjusted with > configuration settings 'akka.log-dead-letters' and > 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1150,"threadPriority":5}^M > {"timeMillis":1576764877732,"thread":"flink-akka.actor.default-dispatcher-1039","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message > [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from > Actor[akka://flink/deadLetters] to > Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [88] > dead letters encountered. This logging can be turned off or adjusted with > configuration settings 'akka.log-dead-letters' and > 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1155,"threadPriority":5}^M > {"timeMillis":1576764877732,"thread":"flink-scheduler-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.job.JobSubmitHandler","message":"Unhandled > exception.","thrown":{"commonElementCount":0,"localizedMessage":"Ask timed > out on [Actor[akka://flink/user/dispatcher#1899316777]] after [1 ms]. > Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","message":"Ask > timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [1 > ms]. Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","name":"akka.pattern.AskTimeoutException","extendedStackTrace":[{"class":"akka.pattern.PromiseActorRef$$anonfun$1","method":"apply$mcV$sp","file":"AskSupport.scala","line":604,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.Scheduler$$anon$4","method":"run","file":"Scheduler.scala","line":126,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"unbatchedExecute","file":"Future.scala","line":601,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.BatchingExecutor$class","method":"execute","file":"BatchingExecutor.scala","line":109,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"execute","file":"Future.scala","line":599,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$TaskHolder","method":"executeTask","file":"LightArrayRevolverScheduler.scala","line":329,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"executeBucket$1","file":"LightArrayRevolverScheduler.scala","line":280,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"nextTick","file":"LightArrayRevolverScheduler.scala","line":284,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"run","file":"LightArrayRevolverScheduler.scala","line":236,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"java.lang.Thread","method":"run","file":"Thread.java","line":745,"exact":true,"location":"?","version":"1.8.0_66"}]},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":41,"threadPriority":5}^M > {"timeMillis":1576764877809,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.rpc.akka.AkkaRpcService","message":"Starting > RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at > akka://flink/user/jobmanager_5 > .","end
Re: Change Akka Ask Timeout for Job Submission Only
IIRC this issue is possibly caused by resource limited or some occasional reasons. Ever I heard that someone upgrade Java version and the issue vanished. For "akka.ask.timeout", it is used for all akka ask requests timeout. And I second Yang that the timeout is irrelevant with client-server connection. Best, tison. Yang Wang 于2019年12月20日周五 上午11:02写道: > It seems that not because the timeout of rest client. It is a server side > akka timeout exception. > Could you share the jobmanager logs? > > Best, > Yang > > Abdul Qadeer 于2019年12月20日周五 上午10:59写道: > >> The relevant config here is "akka.ask.timeout". >> >> On Thu, Dec 19, 2019 at 6:51 PM tison wrote: >> >>> In previous version there is an "akka.client.timeout" option but it is >>> only used for timeout the future in client side so I don't think it change >>> akka scope timeout. >>> >>> Best, >>> tison. >>> >>> >>> Abdul Qadeer 于2019年12月20日周五 上午10:44写道: >>> Hi! I am using Flink 1.8.3 and facing an issue where job submission through RestClusterClient times out on Akka (default value 10s). In previous Flink versions there was an option to set a different timeout value just for the submission client (ClusterClient config), but looks like it is not honored now as job submission from client is no more through Akka and it will use the same value present with Dispatcher. I wanted to know how to increase this timeout just for job submission without affecting other akka threads in TaskManager/JobManager, or any other solution for the problem. The relevant stack trace is pasted below: "cause":{"commonElementCount":8,"localizedMessage":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed to submit JobGraph.","message":"Failed to submit JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal server error., >>> side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","message":"[Internal server error., >>> side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"locat
Re: Change Akka Ask Timeout for Job Submission Only
It seems that not because the timeout of rest client. It is a server side akka timeout exception. Could you share the jobmanager logs? Best, Yang Abdul Qadeer 于2019年12月20日周五 上午10:59写道: > The relevant config here is "akka.ask.timeout". > > On Thu, Dec 19, 2019 at 6:51 PM tison wrote: > >> In previous version there is an "akka.client.timeout" option but it is >> only used for timeout the future in client side so I don't think it change >> akka scope timeout. >> >> Best, >> tison. >> >> >> Abdul Qadeer 于2019年12月20日周五 上午10:44写道: >> >>> Hi! >>> >>> I am using Flink 1.8.3 and facing an issue where job submission through >>> RestClusterClient times out on Akka (default value 10s). In previous Flink >>> versions there was an option to set a different timeout value just for the >>> submission client (ClusterClient config), but looks like it is not honored >>> now as job submission from client is no more through Akka and it will use >>> the same value present with Dispatcher. I wanted to know how to increase >>> this timeout just for job submission without affecting other akka threads >>> in TaskManager/JobManager, or any other solution for the problem. >>> >>> The relevant stack trace is pasted below: >>> >>> "cause":{"commonElementCount":8,"localizedMessage":"Could not submit job >>> (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job >>> (JobID: >>> 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed >>> to submit JobGraph.","message":"Failed to submit >>> JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal >>> server error., >> side:\nakka.pattern.AskTimeoutException: Ask timed out on >>> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms]. >>> Sender[null] sent message of type >>> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat >>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat >>> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat >>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat >>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat >>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat >>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat >>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat >>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat >>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat >>> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server >>> side>]","message":"[Internal server error., >> side:\nakka.pattern.AskTimeoutException: Ask timed out on >>> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms]. >>> Sender[null] sent message of type >>> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat >>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat >>> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat >>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat >>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat >>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat >>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat >>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat >>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat >>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat >>> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server >>> side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"} >>> >>
Re: Change Akka Ask Timeout for Job Submission Only
The relevant config here is "akka.ask.timeout". On Thu, Dec 19, 2019 at 6:51 PM tison wrote: > In previous version there is an "akka.client.timeout" option but it is > only used for timeout the future in client side so I don't think it change > akka scope timeout. > > Best, > tison. > > > Abdul Qadeer 于2019年12月20日周五 上午10:44写道: > >> Hi! >> >> I am using Flink 1.8.3 and facing an issue where job submission through >> RestClusterClient times out on Akka (default value 10s). In previous Flink >> versions there was an option to set a different timeout value just for the >> submission client (ClusterClient config), but looks like it is not honored >> now as job submission from client is no more through Akka and it will use >> the same value present with Dispatcher. I wanted to know how to increase >> this timeout just for job submission without affecting other akka threads >> in TaskManager/JobManager, or any other solution for the problem. >> >> The relevant stack trace is pasted below: >> >> "cause":{"commonElementCount":8,"localizedMessage":"Could not submit job >> (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job >> (JobID: >> 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed >> to submit JobGraph.","message":"Failed to submit >> JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal >> server error., > side:\nakka.pattern.AskTimeoutException: Ask timed out on >> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms]. >> Sender[null] sent message of type >> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat >> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat >> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat >> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat >> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat >> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat >> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat >> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat >> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat >> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat >> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server >> side>]","message":"[Internal server error., > side:\nakka.pattern.AskTimeoutException: Ask timed out on >> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms]. >> Sender[null] sent message of type >> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat >> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat >> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat >> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat >> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat >> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat >> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat >> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat >> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat >> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat >> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server >> side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"} >> >
Re: Change Akka Ask Timeout for Job Submission Only
In previous version there is an "akka.client.timeout" option but it is only used for timeout the future in client side so I don't think it change akka scope timeout. Best, tison. Abdul Qadeer 于2019年12月20日周五 上午10:44写道: > Hi! > > I am using Flink 1.8.3 and facing an issue where job submission through > RestClusterClient times out on Akka (default value 10s). In previous Flink > versions there was an option to set a different timeout value just for the > submission client (ClusterClient config), but looks like it is not honored > now as job submission from client is no more through Akka and it will use > the same value present with Dispatcher. I wanted to know how to increase > this timeout just for job submission without affecting other akka threads > in TaskManager/JobManager, or any other solution for the problem. > > The relevant stack trace is pasted below: > > "cause":{"commonElementCount":8,"localizedMessage":"Could not submit job > (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job > (JobID: > 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed > to submit JobGraph.","message":"Failed to submit > JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal > server error., side:\nakka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms]. > Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat > akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat > java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server > side>]","message":"[Internal server error., side:\nakka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms]. > Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat > akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat > java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server > side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"} >
Change Akka Ask Timeout for Job Submission Only
Hi! I am using Flink 1.8.3 and facing an issue where job submission through RestClusterClient times out on Akka (default value 10s). In previous Flink versions there was an option to set a different timeout value just for the submission client (ClusterClient config), but looks like it is not honored now as job submission from client is no more through Akka and it will use the same value present with Dispatcher. I wanted to know how to increase this timeout just for job submission without affecting other akka threads in TaskManager/JobManager, or any other solution for the problem. The relevant stack trace is pasted below: "cause":{"commonElementCount":8,"localizedMessage":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed to submit JobGraph.","message":"Failed to submit JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal server error., ]","message":"[Internal server error., ]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}