Thanks for your answers.

In my use case I am reading from a large number of individual files. Jobs are 
issued directly from the Java API, the results are collected (in memory) and 
re-used partially in follow-up jobs.

I feared that using a MiniCluster or local environment I would not be able to 
overwrite the default for `akka.ask.timeout` without tinkering with Flink's 
source code - thanks for your confirmation. For now I'll stick to this solution 
though.

Cheers,
Lukas

> On 19. Jul 2018, at 11:03, Gary Yao <g...@data-artisans.com> wrote:
> 
> Hi Lukas,
> 
> It seems that when using MiniCluster, the config key akka.ask.timeout is not
> respected. Instead, a hardcoded timeout of 10s is used [1]. Since all
> communication is locally, it would be interesting to see in detail what your
> job looks like that it exceeds the timeout.
> 
> The key akka.ask.timeout specifies the default RPC timeout. However, for
> requests originating from the REST API, web.timeout overrides this value. When
> submitting a job using the CLI to a (standalone) cluster, a request is issued
> against the REST API. Therefore, you can try setting web.timeout=100000 in the
> flink-conf.yaml as already proposed by Vishal Santoshi.
> 
> Best,
> Gary
> 
> [1] 
> https://github.com/apache/flink/blob/749dd29935f319b062051141e150eed7a1a5f298/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L185
>  
> <https://github.com/apache/flink/blob/749dd29935f319b062051141e150eed7a1a5f298/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L185>
> 
> On Fri, Jul 13, 2018 at 12:24 PM, Lukas Kircher <lukaskirch...@gmail.com 
> <mailto:lukaskirch...@gmail.com>> wrote:
> Hello,
> 
> I have problems setting configuration parameters for Akka in Flink 1.5.0. 
> When I run a job I get the exception listed below which states that Akka 
> timed out after 10000ms. I tried to increase the timeout by following the 
> Flink configuration documentation. Specifically I did the following:
> 
> 1) Passed a configuration to the Flink execution environment with 
> `akka.ask.timeout` set to a higher value. I started this in Intellij.
> 2) Passed program arguments via the run configuration in Intellij, e.g. 
> `-Dakka.ask.timeout:100s`
> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local 
> standalone cluster via start-cluster.sh. The setting is reflected in Flink's 
> web interface.
> 
> However - despite explicit configuration the default setting seems to be 
> used. The exception below states in each case that akka ask timed out after 
> 10000ms.
> 
> As my problem seems very basic I do not include an SSCCE for now but I can 
> try to build one if this helps figuring out the issue.
> 
> ------
> [...]
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve 
> JobResult.
> [...]
>       at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)
>       at 
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
>       at 
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
>       at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
> [...]
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583
>  <>]] after [10000 ms]. Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>       at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>       at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>       at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>       at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>       at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>       at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>       at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>       at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>       at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>       at java.lang.Thread.run(Thread.java:745)
> [...]
> ------
> 
> 
> Best regards and thanks for your help,
> Lukas
> 
> 
> 
> 

Reply via email to