I have a streaming job that works in standalone cluster. Flink version is 
1.4.1. Everything was working so far. But since I added new treatments, I can 
not start my job anymore. I have this exception : 

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: JobManager did not respond within 60000 ms
        at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524)
        at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
        at 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402)
        at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
        at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
        at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
        at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
        at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did 
not respond within 60000 ms
        at 
org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437)
        at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516)
        ... 11 more
Caused by: java.util.concurrent.TimeoutException
        at 
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
        at 
org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435)
        ... 12 more

I see a very strange behavior. When I comment on a function (any one, for 
example a FilterFunction, which was present before or after my modification). 
I tried to change the configuration (akka.client.timeout and akka.framesize) 
without success.

This is my flink-conf.yaml
  jobmanager.rpc.address: myhost
  jobmanager.rpc.port: 6123
  jobmanager.heap.mb: 128
  taskmanager.heap.mb: 1024
  taskmanager.numberOfTaskSlots: 100
  taskmanager.memory.preallocate: false
  taskmanager.data.port: 6121
  parallelism.default: 1
  taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr
  blob.storage.directory: /dohdev/flink/tmp/blob
  jobmanager.web.port: -1
  high-availability: zookeeper
  high-availability.zookeeper.quorum: localhost:2181
  high-availability.zookeeper.path.root: /dohdev/flink
  high-availability.cluster-id: dev
  high-availability.storageDir: file:////mnt/metaflink
  high-availability.zookeeper.storageDir: /mnt/metaflink/inh/agregateur/recovery
  restart-strategy: fixed-delay
  restart-strategy.fixed-delay.attempts: 1000
  restart-strategy.fixed-delay.delay: 5 s
  zookeeper.sasl.disable: true
  blob.service.cleanup.interval: 60

And I launch a job with this command : bin/flink run -d myjar.jar

I added as an attachment a graph of my job when it works (Graph.PNG).

Do you have an idea of the problem ?

Thanks.
Julien


Reply via email to