[ https://issues.apache.org/jira/browse/FLINK-22483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Metzger updated FLINK-22483: ----------------------------------- Description: Recovering checkpoints (from the CompletedCheckpointStore) is a potentially long-lasting/blocking operation, for example if the file system implementation is retrying to connect to a unavailable storage backend. Currently, we are calling the CompletedCheckpointStore.recover() method from the main thread of the JobManager, making it unresponsive to any RPC call while the recover method is blocked: {code} 2021-04-02 20:33:31,384 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX switched from state RUNNING to RESTARTING. com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to minio.minio.svc:9000 [minio.minio.svc/XXXX] failed: Connection refused (Connection refused) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) ~[?:?] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) ~[?:?] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) ~[?:?] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) ~[?:?] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) ~[?:?] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) ~[?:?] at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) ~[?:?] at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) ~[?:?] at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) ~[?:?] at com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) ~[?:?] at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) ~[?:?] at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) ~[?:?] at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) ~[?:?] at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) ~[?:?] at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) ~[?:?] at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] at com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) ~[?:?] at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) ~[?:1.8.0_282] at XXX.recover(KubernetesHaCheckpointStore.java:69) ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_282] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] Caused by: org.apache.http.conn.HttpHostConnectException: Connect to minio.minio.svc:9000 [minio.minio.svc/10.115.246.236] failed: Connection refused (Connection refused) at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:156) ~[?:?] at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374) ~[?:?] at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_282] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282] at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76) ~[?:?] at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?] at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) ~[?:?] at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) ~[?:?] at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[?:?] at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[?:?] at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[?:?] at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) ~[?:?] at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) ~[?:?] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330) ~[?:?] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145) ~[?:?] ... 67 more Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_282] at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[?:1.8.0_282] at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[?:1.8.0_282] at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[?:1.8.0_282] at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_282] at java.net.Socket.connect(Socket.java:607) ~[?:1.8.0_282] at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75) ~[?:?] at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142) ~[?:?] at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374) ~[?:?] at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_282] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282] at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76) ~[?:?] at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?] at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) ~[?:?] at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) ~[?:?] at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[?:?] at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[?:?] at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[?:?] at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) ~[?:?] at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) ~[?:?] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330) ~[?:?] at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145) ~[?:?] ... 67 more {code} By moving the recovery to the start of the JobManager (which happens asynchronously after the JobMaster has gained leadership), Flink will remain responsive (reporting a job in INITIALIZING state). was: Recovering checkpoints (from the CompletedCheckpointStore) is a potentially blocking operation, for example if the file system implementation is retrying to connect to a unavailable storage backend. Currently, we are calling the CompletedCheckpointStore.recover() method from the main thread of the JobManager, making it unresponsive to any RPC call while the recover method is blocked. By moving the recovery to the start of the JobManager (which happens asynchronously after the JobMaster has gained leadership), Flink will remain responsive (reporting a job in INITIALIZING state). > Recover checkpoints when JobMaster gains leadership > --------------------------------------------------- > > Key: FLINK-22483 > URL: https://issues.apache.org/jira/browse/FLINK-22483 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.13.0 > Reporter: Robert Metzger > Priority: Major > Fix For: 1.14.0 > > > Recovering checkpoints (from the CompletedCheckpointStore) is a potentially > long-lasting/blocking operation, for example if the file system > implementation is retrying to connect to a unavailable storage backend. > Currently, we are calling the CompletedCheckpointStore.recover() method from > the main thread of the JobManager, making it unresponsive to any RPC call > while the recover method is blocked: > {code} > 2021-04-02 20:33:31,384 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job XXX > switched from state RUNNING to RESTARTING. > com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to > minio.minio.svc:9000 [minio.minio.svc/XXXX] failed: Connection refused > (Connection refused) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[?:?] > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) > ~[?:?] > at > com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1490) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$1(PrestoS3FileSystem.java:905) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:902) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:887) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:880) > ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$0(PrestoS3FileSystem.java:819) > ~[?:?] > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] > at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:818) > ~[?:?] > at java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > ~[?:1.8.0_282] > at XXX.recover(KubernetesHaCheckpointStore.java:69) > ~[vvp-flink-ha-kubernetes-flink112-1.1.0.jar:?] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1511) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1451) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:421) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:314) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) > ~[?:1.8.0_282] > at > java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701) > ~[?:1.8.0_282] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) > ~[?:1.8.0_282] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at akka.actor.Actor.aroundReceive(Actor.scala:517) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at akka.actor.Actor.aroundReceive$(Actor.scala:515) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > Caused by: org.apache.http.conn.HttpHostConnectException: Connect to > minio.minio.svc:9000 [minio.minio.svc/10.115.246.236] failed: Connection > refused (Connection refused) > at > org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:156) > ~[?:?] > at > org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374) > ~[?:?] > at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_282] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282] > at > com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76) > ~[?:?] > at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?] > at > org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) > ~[?:?] > at > org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) > ~[?:?] > at > org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) > ~[?:?] > at > org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) > ~[?:?] > at > org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) > ~[?:?] > at > org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) > ~[?:?] > at > com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145) > ~[?:?] > ... 67 more > Caused by: java.net.ConnectException: Connection refused (Connection refused) > at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_282] > at > java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) > ~[?:1.8.0_282] > at > java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) > ~[?:1.8.0_282] > at > java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) > ~[?:1.8.0_282] > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) > ~[?:1.8.0_282] > at java.net.Socket.connect(Socket.java:607) ~[?:1.8.0_282] > at > org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75) > ~[?:?] > at > org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142) > ~[?:?] > at > org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374) > ~[?:?] > at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source) ~[?:?] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_282] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282] > at > com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76) > ~[?:?] > at com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) ~[?:?] > at > org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) > ~[?:?] > at > org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) > ~[?:?] > at > org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) > ~[?:?] > at > org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) > ~[?:?] > at > org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) > ~[?:?] > at > org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) > ~[?:?] > at > com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1330) > ~[?:?] > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145) > ~[?:?] > ... 67 more > {code} > By moving the recovery to the start of the JobManager (which happens > asynchronously after the JobMaster has gained leadership), Flink will remain > responsive (reporting a job in INITIALIZING state). -- This message was sent by Atlassian Jira (v8.3.4#803005)