I am using version 1.1.4 (latest stable)

> On Jan 23, 2017, at 12:41 AM, Abhishek R. Singh 
> <abhis...@tetrationanalytics.com> wrote:
> 
> I am trying to construct a topology like this (shown for parallelism of 4) - 
> basically n parallel windowed processing sub-pipelines with single source and 
> single sink:
> 
> <PastedGraphic-1.png>
> 
> I am getting the following  failure (if I go beyond 28 - found empirically 
> using binary search). There is nothing in the job manager logs to 
> troubleshoot this further.
> 
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:10620 
> <http://127.0.0.1:10620/>
> Starting execution of program
> Submitting job with JobID: 27ae3db2946aac3336941bdfa184e537. Waiting for job 
> completion.
> Connected to JobManager at 
> Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#2043695445 
> <akka.tcp://flink@127.0.0.1:6123/user/jobmanager#2043695445>]
> 
> ------------------------------------------------------------
>  The program finished with the following exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Communication with JobManager failed: Job submission to the 
> JobManager timed out. You may increase 'akka.client.timeout' in case the 
> JobManager needs more time to configure and confirm the job submission.
>         at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:410)
>         at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
>         at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
>         at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
>         at 
> com.tetration.pipeline.IngestionPipelineMain.main(IngestionPipelineMain.java:116)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:510)
>         at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:404)
>         at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:321)
>         at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>         at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: 
> Communication with JobManager failed: Job submission to the JobManager timed 
> out. You may increase 'akka.client.timeout' in case the JobManager needs more 
> time to configure and confirm the job submission.
>         at 
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
>         at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:406)
>         ... 15 more
> Caused by: 
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
> submission to the JobManager timed out. You may increase 
> 'akka.client.timeout' in case the JobManager needs more time to configure and 
> confirm the job submission.
>         at 
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:264)
>         at 
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
>         at 
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
>         at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> The code to reproduce this problem is shown below (flink job submission 
> itself fails, the code has been dumbed down to focus on the topology I am 
> trying to build)
> 
> int nParts = cfg.getInt("dummyPartitions", 4);
> 
> SingleOutputStreamOperator<String> in = env.socketTextStream("localhost",
>     cfg.getInt("dummyPort", 16408)).setParallelism(1).name("src");
> 
> SingleOutputStreamOperator<String> fanout =
>     in.flatMap(new FlatMapFunction<String, String>() {
>       @Override public void flatMap(String input, Collector<String> out)
>           throws Exception {
>         for (int i = 0; i < nParts; i++) {
>           out.collect(Integer.toString(i));
>         }
>       }
>     }).setParallelism(1).name("flatmap");
> 
> 
> SplitStream<String> afterSplit =
>     fanout.split(value -> Collections.singletonList(value));
> 
> ArrayList<DataStream<String>> splitUp = new ArrayList<>(nParts);
> for (int i = 0; i < nParts; i++) {
>   splitUp.add(
>       afterSplit.select(Integer.toString(i))
>           .map(a -> a).startNewChain().setParallelism(1)
>           .keyBy(s -> 
> s).window(TumblingEventTimeWindows.of(Time.seconds(10))).max(0).setParallelism(1)
>   );
> }
> 
> DataStream<String> combined = splitUp.get(0);
> for (int i = 1; i < nParts; i++) {
>   combined = combined.union(splitUp.get(i));
> }
> 
> combined.print().setParallelism(1);
> 
> 
> 
> 

Reply via email to