Hi Sen, Are you using the default MemoryStateBackend [1]? As far as I know, it does not support JobManager failover. If you are already using FsStateBackend or RocksDBStateBackend, please send JM logs.
Best, Gary [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html#available-state-backends On Mon, Mar 4, 2019 at 10:01 AM 孙森 <senny...@163.com> wrote: > Hi Gary: > > > Yes, I enable the checkpoints in my program . > > 在 2019年3月4日,上午3:03,Gary Yao <g...@ververica.com> 写道: > > Hi Sen, > > Did you set a restart strategy [1]? If you enabled checkpoints [2], the > fixed- > delay strategy will be used by default. > > Best, > Gary > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/restart_strategies.html > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/checkpoints.html > > On Fri, Mar 1, 2019 at 7:27 AM 孙森 <senny...@163.com> wrote: > >> Hi Gary: >> I checked the znode, the address of leader was there. >> >> <屏幕快照 2019-03-01 上午10.45.45 1.png> >> >> When I removed the ZooKeeper configuration in the client's >> flink-conf.yaml, the job was submitted successfully. >> Then I tried to test if the HA could work. I killed the job manager ,it >> restarted .But the job did not restart when the jog manager restarted. >> >> >> Best! >> Sen >> >> 在 2019年2月28日,下午6:59,Gary Yao <g...@ververica.com> 写道: >> >> Hi Sen, >> >> I took a look at the CLI code again, and found out that -m is ignored if >> high- >> availability: ZOOKEEPER is configured in your flink-conf.yaml. This does >> not >> seem right and should be at least documented [1]. >> >> Judging from the client logs that you provided, I think the problem is >> that >> the client cannot resolve the leading JobManager from ZooKeeper [2][3]. >> You >> can try the following things for debugging: >> >> * Check the contents in the znode >> /flink/[...]/leader/rest_server_lock using the ZK CLI. It should >> contain the >> address of the leader. If not, I would check the jobmanager logs >> for releated >> errors. >> >> * Submit the job with -m parameter but without ZooKeeper >> configuration in >> the client's flink-conf.yaml >> >> Best, >> Gary >> >> [1] https://issues.apache.org/jira/browse/FLINK-11779 >> [2] >> https://github.com/apache/flink/blob/release-1.5.1/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L170 >> [3] >> https://github.com/apache/flink/blob/release-1.5.1/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L746-L750 >> >> On Thu, Feb 28, 2019 at 4:34 AM 孙森 <senny...@163.com> wrote: >> >>> Hi,Gary >>> >>> Actually, I have several Flink cluster on Yarn ,each for a >>> project. For one project ,it can only submit job to the specify cluster. >>> I’ve already enabled logging on DEBUG level. >>> >>> How did you determine "jmhost" and "port”? >>> >>> We do this by request the rest api : http://activeRm/proxy/appId/jars >>> <http://activerm/proxy/appId/jars> >>> >>> >>> The all client log is in the mail attachment. >>> >>> >>> >>> >>> 在 2019年2月27日,下午9:30,Gary Yao <g...@ververica.com> 写道: >>> >>> Hi, >>> >>> How did you determine "jmhost" and "port"? Actually you do not need to >>> specify >>> these manually. If the client is using the same configuration as your >>> cluster, >>> the client will look up the leading JM from ZooKeeper. >>> >>> If you have already tried omitting the "-m" parameter, you can check in >>> the >>> client logs which host is used for the job submission [1]. Note that you >>> need >>> to enable logging on DEBUG level. >>> >>> The root cause in your stacktrace is a TimeoutException. I would debug >>> this by >>> checking if you can establish a TCP connection – from the machine you are >>> submitting the job from, to the target host/port [2]. >>> >>> Moreover, you are using a quite dated Flink version. The newest version >>> in the >>> 1.5 major release is 1.5.6 – so consider upgrading to that or even to >>> 1.7. >>> >>> Best, >>> Gary >>> >>> [1] >>> https://github.com/apache/flink/blob/3488f8b144a2127497c39b8ed5a48a65b551c57d/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java#L185 >>> [2] >>> https://stackoverflow.com/questions/4922943/test-from-shell-script-if-remote-tcp-port-is-open >>> >>> On Wed, Feb 27, 2019 at 8:09 AM 孙森 <senny...@163.com> wrote: >>> >>>> Hi all: >>>> >>>> I run flink (1.5.1 with hadoop 2.7) on yarn ,and submit job by >>>> “/usr/local/flink/bin/flink run -m jmhost:port my.jar”, but the submission >>>> is failed. >>>> The HA configuration is : >>>> >>>> - high-availability: zookeeper >>>> - high-availability.storageDir: hdfs:///flink/ha/ >>>> - high-availability.zookeeper.quorum: >>>> hdp1:2181,hdp2:2181,hdp3:2181 >>>> - yarn.application-attempts: 2 >>>> >>>> The info showed int the client log: >>>> >>>> >>>> 2019-02-27 11:48:38,651 INFO org.apache.flink.runtime.rest.RestClient >>>> - Shutting down rest endpoint. >>>> 2019-02-27 11:48:38,659 INFO org.apache.flink.runtime.rest.RestClient >>>> - Rest endpoint shutdown complete. >>>> 2019-02-27 11:48:38,662 INFO >>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService >>>> - Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock. >>>> 2019-02-27 11:48:38,665 INFO >>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService >>>> - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock. >>>> 2019-02-27 11:48:38,670 INFO >>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl >>>> - backgroundOperationsLoop exiting >>>> 2019-02-27 11:48:38,689 INFO >>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - >>>> Session: 0x2679c52880c00ee closed >>>> 2019-02-27 11:48:38,689 INFO >>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - >>>> EventThread shut down for session: 0x2679c52880c00ee >>>> 2019-02-27 11:48:38,690 ERROR org.apache.flink.client.cli.CliFrontend >>>> - Error while running the command. >>>> org.apache.flink.client.program.ProgramInvocationException: Could not >>>> retrieve the execution result. >>>> at >>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:257) >>>> at >>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >>>> at >>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34) >>>> at >>>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) >>>> at scala.App$$anonfun$main$1.apply(App.scala:76) >>>> at scala.App$$anonfun$main$1.apply(App.scala:76) >>>> at scala.collection.immutable.List.foreach(List.scala:381) >>>> at >>>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) >>>> at scala.App$class.main(App.scala:76) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) >>>> at >>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279) >>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101) >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>> at >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) >>>> at >>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101) >>>> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed >>>> to submit JobGraph. >>>> at >>>> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370) >>>> at >>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) >>>> at >>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) >>>> at >>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>> at >>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >>>> at >>>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214) >>>> at >>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >>>> at >>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >>>> at >>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>> at >>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >>>> at >>>> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834) >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.util.concurrent.CompletionException: >>>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not >>>> complete the operation. Exception is not retryable. >>>> at >>>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) >>>> at >>>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) >>>> at >>>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) >>>> at >>>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) >>>> ... 15 more >>>> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: >>>> Could not complete the operation. Exception is not retryable. >>>> ... 13 more >>>> Caused by: java.util.concurrent.CompletionException: >>>> java.util.concurrent.TimeoutException >>>> at >>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) >>>> at >>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) >>>> at >>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) >>>> at >>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) >>>> ... 10 more >>>> Caused by: java.util.concurrent.TimeoutException >>>> ... 8 more >>>> >>>> ------------------------------------------------------------ >>>> The program finished with the following exception: >>>> >>>> org.apache.flink.client.program.ProgramInvocationException: Could not >>>> retrieve the execution result. >>>> at >>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:257) >>>> at >>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >>>> at >>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>> at >>>> edp.wormhole.flinkx.eventflow.WormholeFlinkMainProcess.process(WormholeFlinkMainProcess.scala:114) >>>> at >>>> edp.wormhole.flinkx.eventflow.WormholeFlinkxStarter$.delayedEndpoint$edp$wormhole$flinkx$eventflow$WormholeFlinkxStarter$1(WormholeFlinkxStarter.scala:40) >>>> at >>>> edp.wormhole.flinkx.eventflow.WormholeFlinkxStarter$delayedInit$body.apply(WormholeFlinkxStarter.scala:29) >>>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34) >>>> at >>>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) >>>> at scala.App$$anonfun$main$1.apply(App.scala:76) >>>> at scala.App$$anonfun$main$1.apply(App.scala:76) >>>> at scala.collection.immutable.List.foreach(List.scala:381) >>>> at >>>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) >>>> at scala.App$class.main(App.scala:76) >>>> at >>>> edp.wormhole.flinkx.eventflow.WormholeFlinkxStarter$.main(WormholeFlinkxStarter.scala:29) >>>> at >>>> edp.wormhole.flinkx.eventflow.WormholeFlinkxStarter.main(WormholeFlinkxStarter.scala) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) >>>> at >>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279) >>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025) >>>> at >>>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101) >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>> at >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) >>>> at >>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101) >>>> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed >>>> to submit JobGraph. >>>> at >>>> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370) >>>> at >>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) >>>> at >>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) >>>> at >>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>> at >>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >>>> at >>>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214) >>>> at >>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >>>> at >>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >>>> at >>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>> at >>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >>>> at >>>> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834) >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.util.concurrent.CompletionException: >>>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not >>>> complete the operation. Exception is not retryable. >>>> at >>>> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) >>>> at >>>> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) >>>> at >>>> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) >>>> at >>>> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) >>>> ... 15 more >>>> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: >>>> Could not complete the operation. Exception is not retryable. >>>> ... 13 more >>>> Caused by: java.util.concurrent.CompletionException: >>>> java.util.concurrent.TimeoutException >>>> at >>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) >>>> at >>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) >>>> at >>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) >>>> at >>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) >>>> ... 10 more >>>> Caused by: java.util.concurrent.TimeoutException >>>> ... 8 more >>>> >>>> >>>> >>> >> >