Hi Lukas

>From your first two steps' description ("started this in Intellij") and the 
>exception log, I think you run your program locally within Intellij with 
>LocalStreamEnvironment. You can view the configuration related code from 
>org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java below:


Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);

// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);

if (!configuration.contains(RestOptions.PORT)) {
   configuration.setInteger(RestOptions.PORT, 0);
}

int numSlotsPerTaskManager = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
jobGraph.getMaximumParallelism());

MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
   .setConfiguration(configuration)
   .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
   .build();

Unluckily, from the above code, I don't think you might be able to set specific 
akka-timout if you don't change this class (if I'm wrong, please correct me), 
the easiest way is just to change the ASK_TIMEOUT's default value within 
org/apache/flink/configuration/AkkaOptions.java from "10 s" to "100 s".

Best
Yun

________________________________
From: Lukas Kircher <lukaskirch...@gmail.com>
Sent: Wednesday, July 18, 2018 14:47
To: user
Subject: Re: Cannot configure akka.ask.timeout

Hello,

does anybody have an idea what is going on? I have not yet found a solution.

Am I doing something wrong? Or is the 'akka.ask.timeout' parameter not related 
to the exception stated below?

Could somebody please take a look at this? More details can be found in the 
message prior to this.

akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
 after [10000 ms]

Best regards,
Lukas


On 13. Jul 2018, at 12:24, Lukas Kircher 
<lukaskirch...@gmail.com<mailto:lukaskirch...@gmail.com>> wrote:

Hello,

I have problems setting configuration parameters for Akka in Flink 1.5.0. When 
I run a job I get the exception listed below which states that Akka timed out 
after 10000ms. I tried to increase the timeout by following the Flink 
configuration documentation. Specifically I did the following:

1) Passed a configuration to the Flink execution environment with 
`akka.ask.timeout` set to a higher value. I started this in Intellij.
2) Passed program arguments via the run configuration in Intellij, e.g. 
`-Dakka.ask.timeout:100s`
3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local 
standalone cluster via start-cluster.sh. The setting is reflected in Flink's 
web interface.

However - despite explicit configuration the default setting seems to be used. 
The exception below states in each case that akka ask timed out after 10000ms.

As my problem seems very basic I do not include an SSCCE for now but I can try 
to build one if this helps figuring out the issue.

------
[...]
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Could not retrieve 
JobResult.
[...]
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
[...]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
 after [10000 ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:745)
[...]
------


Best regards and thanks for your help,
Lukas




Reply via email to