I am trying to construct a topology like this (shown for parallelism of 4) - 
basically n parallel windowed processing sub-pipelines with single source and 
single sink:



I am getting the following  failure (if I go beyond 28 - found empirically 
using binary search). There is nothing in the job manager logs to troubleshoot 
this further.

Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:10620
Starting execution of program
Submitting job with JobID: 27ae3db2946aac3336941bdfa184e537. Waiting for job 
completion.
Connected to JobManager at 
Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#2043695445]

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Communication with JobManager failed: Job submission to the 
JobManager timed out. You may increase 'akka.client.timeout' in case the 
JobManager needs more time to configure and confirm the job submission.
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:410)
        at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
        at 
com.tetration.pipeline.IngestionPipelineMain.main(IngestionPipelineMain.java:116)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:510)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:404)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:321)
        at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
        at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Communication 
with JobManager failed: Job submission to the JobManager timed out. You may 
increase 'akka.client.timeout' in case the JobManager needs more time to 
configure and confirm the job submission.
        at 
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:406)
        ... 15 more
Caused by: 
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
submission to the JobManager timed out. You may increase 'akka.client.timeout' 
in case the JobManager needs more time to configure and confirm the job 
submission.
        at 
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:264)
        at 
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
        at 
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
        at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The code to reproduce this problem is shown below (flink job submission itself 
fails, the code has been dumbed down to focus on the topology I am trying to 
build)

int nParts = cfg.getInt("dummyPartitions", 4);

SingleOutputStreamOperator<String> in = env.socketTextStream("localhost",
    cfg.getInt("dummyPort", 16408)).setParallelism(1).name("src");

SingleOutputStreamOperator<String> fanout =
    in.flatMap(new FlatMapFunction<String, String>() {
      @Override public void flatMap(String input, Collector<String> out)
          throws Exception {
        for (int i = 0; i < nParts; i++) {
          out.collect(Integer.toString(i));
        }
      }
    }).setParallelism(1).name("flatmap");


SplitStream<String> afterSplit =
    fanout.split(value -> Collections.singletonList(value));

ArrayList<DataStream<String>> splitUp = new ArrayList<>(nParts);
for (int i = 0; i < nParts; i++) {
  splitUp.add(
      afterSplit.select(Integer.toString(i))
          .map(a -> a).startNewChain().setParallelism(1)
          .keyBy(s -> 
s).window(TumblingEventTimeWindows.of(Time.seconds(10))).max(0).setParallelism(1)
  );
}

DataStream<String> combined = splitUp.get(0);
for (int i = 1; i < nParts; i++) {
  combined = combined.union(splitUp.get(i));
}

combined.print().setParallelism(1);




Reply via email to