Re: Change Akka Ask Timeout for Job Submission Only

2020-01-03 Thread Chesnay Schepler

There are 3 communication layers involved here:

1) client <=> server (REST API)

This goes through REST and does not use timeouts AFAIK. We wait until a 
response comes or the connection terminates.


2) server (REST API) <=> processes (JM, Dispatcher)

This goes through akka, with "web.timeout" being used for the timeout.

3) processes <=> processes

Also akka, with "akka.ask.timeout" being used.


The timeout in question occurs on layer 2) due to the JM being 
incredibly busy, possible due to some heavy-weight computation in the 
job setup.

In any case, you can try increasing web.timeout to maybe resolve this issue.


On 20/12/2019 06:13, tison wrote:

Forward to user list.

Best,
tison.


Abdul Qadeer mailto:quadeer@gmail.com>> 
于2019年12月20日周五 下午12:57写道:


Around submission time, logs from jobmanager:


{"timeMillis":1576764854245,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Received
JobGraph submission 714829e8f6c8cd0daaed335c1b8c588a

(sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M

{"timeMillis":1576764854247,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Submitting
job 714829e8f6c8cd0daaed335c1b8c588a

(sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M

{"timeMillis":1576764856119,"thread":"flink-akka.actor.default-dispatcher-1036","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message
[org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from
Actor[akka://flink/deadLetters] to
Actor[akka://flink/user/jobmanager_4#-2122695705] was not
delivered. [87] dead letters encountered. This logging can be
turned off or adjusted with configuration settings
'akka.log-dead-letters' and

'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1150,"threadPriority":5}^M

{"timeMillis":1576764877732,"thread":"flink-akka.actor.default-dispatcher-1039","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message
[org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from
Actor[akka://flink/deadLetters] to
Actor[akka://flink/user/jobmanager_4#-2122695705] was not
delivered. [88] dead letters encountered. This logging can be
turned off or adjusted with configuration settings
'akka.log-dead-letters' and

'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1155,"threadPriority":5}^M

{"timeMillis":1576764877732,"thread":"flink-scheduler-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.job.JobSubmitHandler","message":"Unhandled
exception.","thrown":{"commonElementCount":0,"localizedMessage":"Ask
timed out on [Actor[akka://flink/user/dispatcher#1899316777]]
after [1 ms]. Sender[null] sent message of type

\"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","message":"Ask
timed out on [Actor[akka://flink/user/dispatcher#1899316777]]
after [1 ms]. Sender[null] sent message of type

\"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","name":"akka.pattern.AskTimeoutException","extendedStackTrace":[{"class":"akka.pattern.PromiseActorRef$$anonfun$1","method":"apply$mcV$sp","file":"AskSupport.scala","line":604,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.Scheduler$$anon$4","method":"run","file":"Scheduler.scala","line":126,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"unbatchedExecute","file":"Future.scala","line":601,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.BatchingExecutor$class","method":"execute","file":"BatchingExecutor.scala","line":109,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"execute","file":"Future.scala","line":599,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$TaskHolder","method":"executeTask","file":"LightArrayRevolverScheduler.scala","line":329,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"executeBucket$1","file":"LightArrayRevolverScheduler.scala","line":280,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"nextTick","file":"LightArrayRevolverScheduler.scala","line":284,"exact":true,

Re: Change Akka Ask Timeout for Job Submission Only

2019-12-19 Thread tison
Forward to user list.

Best,
tison.


Abdul Qadeer  于2019年12月20日周五 下午12:57写道:

> Around submission time, logs from jobmanager:
>
> {"timeMillis":1576764854245,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Received
> JobGraph submission 714829e8f6c8cd0daaed335c1b8c588a
> (sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
> {"timeMillis":1576764854247,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Submitting
> job 714829e8f6c8cd0daaed335c1b8c588a
> (sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
> {"timeMillis":1576764856119,"thread":"flink-akka.actor.default-dispatcher-1036","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message
> [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from
> Actor[akka://flink/deadLetters] to
> Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [87]
> dead letters encountered. This logging can be turned off or adjusted with
> configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1150,"threadPriority":5}^M
> {"timeMillis":1576764877732,"thread":"flink-akka.actor.default-dispatcher-1039","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message
> [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from
> Actor[akka://flink/deadLetters] to
> Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [88]
> dead letters encountered. This logging can be turned off or adjusted with
> configuration settings 'akka.log-dead-letters' and
> 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1155,"threadPriority":5}^M
> {"timeMillis":1576764877732,"thread":"flink-scheduler-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.job.JobSubmitHandler","message":"Unhandled
> exception.","thrown":{"commonElementCount":0,"localizedMessage":"Ask timed
> out on [Actor[akka://flink/user/dispatcher#1899316777]] after [1 ms].
> Sender[null] sent message of type
> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","message":"Ask
> timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [1
> ms]. Sender[null] sent message of type
> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","name":"akka.pattern.AskTimeoutException","extendedStackTrace":[{"class":"akka.pattern.PromiseActorRef$$anonfun$1","method":"apply$mcV$sp","file":"AskSupport.scala","line":604,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.Scheduler$$anon$4","method":"run","file":"Scheduler.scala","line":126,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"unbatchedExecute","file":"Future.scala","line":601,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.BatchingExecutor$class","method":"execute","file":"BatchingExecutor.scala","line":109,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"execute","file":"Future.scala","line":599,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$TaskHolder","method":"executeTask","file":"LightArrayRevolverScheduler.scala","line":329,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"executeBucket$1","file":"LightArrayRevolverScheduler.scala","line":280,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"nextTick","file":"LightArrayRevolverScheduler.scala","line":284,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"run","file":"LightArrayRevolverScheduler.scala","line":236,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"java.lang.Thread","method":"run","file":"Thread.java","line":745,"exact":true,"location":"?","version":"1.8.0_66"}]},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":41,"threadPriority":5}^M
> {"timeMillis":1576764877809,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.rpc.akka.AkkaRpcService","message":"Starting
> RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
> akka://flink/user/jobmanager_5
> .","end

Re: Change Akka Ask Timeout for Job Submission Only

2019-12-19 Thread tison
IIRC this issue is possibly caused by resource limited or some occasional
reasons. Ever I heard that someone upgrade Java version and the issue
vanished.

For "akka.ask.timeout", it is used for all akka ask requests timeout. And I
second Yang that the timeout is irrelevant with client-server connection.

Best,
tison.


Yang Wang  于2019年12月20日周五 上午11:02写道:

> It seems that not because the timeout of rest client. It is a server side
> akka timeout exception.
> Could you share the jobmanager logs?
>
> Best,
> Yang
>
> Abdul Qadeer  于2019年12月20日周五 上午10:59写道:
>
>> The relevant config here is "akka.ask.timeout".
>>
>> On Thu, Dec 19, 2019 at 6:51 PM tison  wrote:
>>
>>> In previous version there is an "akka.client.timeout" option but it is
>>> only used for timeout the future in client side so I don't think it change
>>> akka scope timeout.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Abdul Qadeer  于2019年12月20日周五 上午10:44写道:
>>>
 Hi!

 I am using Flink 1.8.3 and facing an issue where job submission through
 RestClusterClient times out on Akka (default value 10s). In previous Flink
 versions there was an option to set a different timeout value just for the
 submission client (ClusterClient config), but looks like it is not honored
 now as job submission from client is no more through Akka and it will use
 the same value present with Dispatcher. I wanted to know how to increase
 this timeout just for job submission without affecting other akka threads
 in TaskManager/JobManager, or any other solution for the problem.

 The relevant stack trace is pasted below:

 "cause":{"commonElementCount":8,"localizedMessage":"Could not submit
 job (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit
 job (JobID:
 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed
 to submit JobGraph.","message":"Failed to submit
 JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal
 server error., >>> side:\nakka.pattern.AskTimeoutException: Ask timed out on
 [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
 Sender[null] sent message of type
 \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
 akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
 akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
 scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
 scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
 scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
 akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
 akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
 akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
 akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
 java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
 side>]","message":"[Internal server error., >>> side:\nakka.pattern.AskTimeoutException: Ask timed out on
 [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
 Sender[null] sent message of type
 \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
 akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
 akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
 scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
 scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
 scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
 akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
 akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
 akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
 akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
 java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
 side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"locat

Re: Change Akka Ask Timeout for Job Submission Only

2019-12-19 Thread Yang Wang
It seems that not because the timeout of rest client. It is a server side
akka timeout exception.
Could you share the jobmanager logs?

Best,
Yang

Abdul Qadeer  于2019年12月20日周五 上午10:59写道:

> The relevant config here is "akka.ask.timeout".
>
> On Thu, Dec 19, 2019 at 6:51 PM tison  wrote:
>
>> In previous version there is an "akka.client.timeout" option but it is
>> only used for timeout the future in client side so I don't think it change
>> akka scope timeout.
>>
>> Best,
>> tison.
>>
>>
>> Abdul Qadeer  于2019年12月20日周五 上午10:44写道:
>>
>>> Hi!
>>>
>>> I am using Flink 1.8.3 and facing an issue where job submission through
>>> RestClusterClient times out on Akka (default value 10s). In previous Flink
>>> versions there was an option to set a different timeout value just for the
>>> submission client (ClusterClient config), but looks like it is not honored
>>> now as job submission from client is no more through Akka and it will use
>>> the same value present with Dispatcher. I wanted to know how to increase
>>> this timeout just for job submission without affecting other akka threads
>>> in TaskManager/JobManager, or any other solution for the problem.
>>>
>>> The relevant stack trace is pasted below:
>>>
>>> "cause":{"commonElementCount":8,"localizedMessage":"Could not submit job
>>> (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job
>>> (JobID:
>>> 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed
>>> to submit JobGraph.","message":"Failed to submit
>>> JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal
>>> server error., >> side:\nakka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
>>> Sender[null] sent message of type
>>> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
>>> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
>>> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
>>> side>]","message":"[Internal server error., >> side:\nakka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
>>> Sender[null] sent message of type
>>> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
>>> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
>>> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
>>> side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}
>>>
>>


Re: Change Akka Ask Timeout for Job Submission Only

2019-12-19 Thread Abdul Qadeer
The relevant config here is "akka.ask.timeout".

On Thu, Dec 19, 2019 at 6:51 PM tison  wrote:

> In previous version there is an "akka.client.timeout" option but it is
> only used for timeout the future in client side so I don't think it change
> akka scope timeout.
>
> Best,
> tison.
>
>
> Abdul Qadeer  于2019年12月20日周五 上午10:44写道:
>
>> Hi!
>>
>> I am using Flink 1.8.3 and facing an issue where job submission through
>> RestClusterClient times out on Akka (default value 10s). In previous Flink
>> versions there was an option to set a different timeout value just for the
>> submission client (ClusterClient config), but looks like it is not honored
>> now as job submission from client is no more through Akka and it will use
>> the same value present with Dispatcher. I wanted to know how to increase
>> this timeout just for job submission without affecting other akka threads
>> in TaskManager/JobManager, or any other solution for the problem.
>>
>> The relevant stack trace is pasted below:
>>
>> "cause":{"commonElementCount":8,"localizedMessage":"Could not submit job
>> (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job
>> (JobID:
>> 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed
>> to submit JobGraph.","message":"Failed to submit
>> JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal
>> server error., > side:\nakka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
>> Sender[null] sent message of type
>> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
>> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
>> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
>> side>]","message":"[Internal server error., > side:\nakka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
>> Sender[null] sent message of type
>> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
>> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
>> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
>> side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}
>>
>


Re: Change Akka Ask Timeout for Job Submission Only

2019-12-19 Thread tison
In previous version there is an "akka.client.timeout" option but it is only
used for timeout the future in client side so I don't think it change akka
scope timeout.

Best,
tison.


Abdul Qadeer  于2019年12月20日周五 上午10:44写道:

> Hi!
>
> I am using Flink 1.8.3 and facing an issue where job submission through
> RestClusterClient times out on Akka (default value 10s). In previous Flink
> versions there was an option to set a different timeout value just for the
> submission client (ClusterClient config), but looks like it is not honored
> now as job submission from client is no more through Akka and it will use
> the same value present with Dispatcher. I wanted to know how to increase
> this timeout just for job submission without affecting other akka threads
> in TaskManager/JobManager, or any other solution for the problem.
>
> The relevant stack trace is pasted below:
>
> "cause":{"commonElementCount":8,"localizedMessage":"Could not submit job
> (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job
> (JobID:
> 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed
> to submit JobGraph.","message":"Failed to submit
> JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal
> server error.,  side:\nakka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
> Sender[null] sent message of type
> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
> side>]","message":"[Internal server error.,  side:\nakka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#1457923918]] after [1 ms].
> Sender[null] sent message of type
> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat
> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat
> java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server
> side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}
>


Change Akka Ask Timeout for Job Submission Only

2019-12-19 Thread Abdul Qadeer
Hi!

I am using Flink 1.8.3 and facing an issue where job submission through
RestClusterClient times out on Akka (default value 10s). In previous Flink
versions there was an option to set a different timeout value just for the
submission client (ClusterClient config), but looks like it is not honored
now as job submission from client is no more through Akka and it will use
the same value present with Dispatcher. I wanted to know how to increase
this timeout just for job submission without affecting other akka threads
in TaskManager/JobManager, or any other solution for the problem.

The relevant stack trace is pasted below:

"cause":{"commonElementCount":8,"localizedMessage":"Could not submit job
(JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job
(JobID:
26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed
to submit JobGraph.","message":"Failed to submit
JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal
server error., ]","message":"[Internal server error., ]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}