Hi Alex,

I’m not very familiar with JsonLinesInputFormat, is that your own 
implementation? You may look into the `createInputSplits()` method which should 
do the listing work. You may rewrite it with concurrent listing.

> On Dec 13, 2018, at 11:56 PM, Alex Vinnik <alvinni...@gmail.com> wrote:
> 
> Qi,
> 
> Thanks for references! How do enable concurrent s3 file listing? Here is the 
> code.
> 
> // Consume the JSON files
> Configuration configuration = new 
> Configuration(GlobalConfiguration.loadConfiguration());
> configuration.setBoolean(JsonLinesInputFormat.ENUMERATE_NESTED_FILES_FLAG, 
> true);
> 
> JsonLinesInputFormat jsonInputFormat = new JsonLinesInputFormat(new 
> Path(inputPath), configuration);
> jsonInputFormat.setFilesFilter(new BucketingSinkFilter());
> 
> DataSet<ObjectNode> input = env.readFile(jsonInputFormat, 
> inputPath).withParameters(configuration);
> 
> On Wed, Dec 12, 2018 at 8:53 PM qi luo <luoqi...@gmail.com 
> <mailto:luoqi...@gmail.com>> wrote:
> Hi Alex,
> 
> The hard code I’ve found is [1] and [2].
> 
> We encountered a similar issue like yours (listing a lot of HDFS files). We 
> end up with a newer version of HDFSFileInput which lists files concurrently. 
> Another hack we did is to list the files in client side and pass them to 
> JobManager via serialization (not recommended though as it doesn’t follow 
> Flink framework mechanism). 
> 
> You can also try listing S3 files concurrently, or paste your sample code 
> here.
> 
> [1] 
> https://github.com/apache/flink/blob/967b31b333e6f4b014ea3041f420bfaff2484618/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L187
>  
> <https://github.com/apache/flink/blob/967b31b333e6f4b014ea3041f420bfaff2484618/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L187>
> [2] 
> https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117
>  
> <https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117>
> 
>> On Dec 13, 2018, at 1:09 AM, Alex Vinnik <alvinni...@gmail.com 
>> <mailto:alvinni...@gmail.com>> wrote:
>> 
>> Qi,
>> 
>> Job submission timeout is caused by listing too many files in S3 during 
>> env.readFile call to create input DataSet. Is there a way NOT to list S3 
>> files during a job submission? It seems like it should help to mitigate that 
>> timeout problem.
>> 
>> What hardcoded value you were referring to? 
>> 
>> Best,
>> -Alex
>> 
>> On Wed, Dec 12, 2018 at 7:47 AM Alex Vinnik <alvinni...@gmail.com 
>> <mailto:alvinni...@gmail.com>> wrote:
>> Hi Qi,
>> 
>> Thanks for looking into this. Here is ticket 
>> https://issues.apache.org/jira/browse/FLINK-11143 
>> <https://issues.apache.org/jira/browse/FLINK-11143>
>> 
>> Best,
>> -Alex
>> 
>> On Tue, Dec 11, 2018 at 8:47 PM qi luo <luoqi...@gmail.com 
>> <mailto:luoqi...@gmail.com>> wrote:
>> Hi Alex and Lukas,
>> 
>> This error is controlled by another RPC timeout (which is hard coded and not 
>> affected by “akka.ask.timeout”). Could you open an JIRA issue so I can 
>> propose a fix on that?
>> 
>> Cheers,
>> Qi
>> 
>>> On Dec 12, 2018, at 7:07 AM, Alex Vinnik <alvinni...@gmail.com 
>>> <mailto:alvinni...@gmail.com>> wrote:
>>> 
>>> Hi there,
>>> 
>>> Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .
>>> 
>>> akka.pattern.AskTimeoutException: Ask timed out on 
>>> [Actor[akka://flink/user/dispatcher#202546747 <>]] after [10000 ms]. 
>>> Sender[null] sent message of type 
>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>> 
>>> akka.ask.timeout: 600s
>>> 
>>> But looks like it is not honored. Any suggestions what can be done.
>>> 
>>> Thanks
>>> 
>>> On 2018/07/13 10:24:16, Lukas Kircher <l...@gmail.com 
>>> <mailto:l...@gmail.com>> wrote: 
>>> > Hello,> 
>>> > 
>>> > I have problems setting configuration parameters for Akka in Flink 1.5.0. 
>>> > When I run a job I get the exception listed below which states that Akka 
>>> > timed out after 10000ms. I tried to increase the timeout by following the 
>>> > Flink configuration documentation. Specifically I did the following:> 
>>> > 
>>> > 1) Passed a configuration to the Flink execution environment with 
>>> > `akka.ask.timeout` set to a higher value. I started this in Intellij.> 
>>> > 2) Passed program arguments via the run configuration in Intellij, e.g. 
>>> > `-Dakka.ask.timeout:100s`> 
>>> > 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local 
>>> > standalone cluster via start-cluster.sh. The setting is reflected in 
>>> > Flink's web interface.> 
>>> > 
>>> > However - despite explicit configuration the default setting seems to be 
>>> > used. The exception below states in each case that akka ask timed out 
>>> > after 10000ms.> 
>>> > 
>>> > As my problem seems very basic I do not include an SSCCE for now but I 
>>> > can try to build one if this helps figuring out the issue.> 
>>> > 
>>> > ------> 
>>> > [...]> 
>>> > Exception in thread "main" 
>>> > org.apache.flink.runtime.client.JobExecutionException: Could not retrieve 
>>> > JobResult.> 
>>> > [...]> 
>>> > at 
>>> > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>
>>> >  
>>> > at 
>>> > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
>>> >  
>>> > at 
>>> > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>
>>> >  
>>> > at 
>>> > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>
>>> >  
>>> > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)> 
>>> > [...]> 
>>> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
>>> > [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583
>>> >  <>]] after [10000 ms]. Sender[null] sent message of type 
>>> > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".> 
>>> > at 
>>> > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
>>> >  
>>> > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)> 
>>> > at 
>>> > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>
>>> >  
>>> > at 
>>> > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>
>>> >  
>>> > at 
>>> > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>
>>> >  
>>> > at 
>>> > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>
>>> >  
>>> > at 
>>> > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>
>>> >  
>>> > at 
>>> > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>
>>> >  
>>> > at 
>>> > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>
>>> >  
>>> > at java.lang.Thread.run(Thread.java:745)> 
>>> > [...]> 
>>> > ------> 
>>> > 
>>> > 
>>> > Best regards and thanks for your help,> 
>>> > Lukas> 
>>> > 
>>> > 
>>> > 
>>> >
>> 
> 

Reply via email to