[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612265#comment-16612265 ] ASF GitHub Bot commented on FLINK-10011: tillrohrmann closed pull request #6590: [Backport 1.4][FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6590 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java index 069cb833a3a..45d11412c50 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java @@ -56,8 +56,7 @@ public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Exec ZooKeeperStateHandleStore zooKeeperStateHandleStore = zooKeeperUtilityFactory.createZooKeeperStateHandleStore( "/workers", - stateStorageHelper, - executor); + stateStorageHelper); ZooKeeperSharedValue frameworkId = zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]); ZooKeeperSharedCount totalTaskCount = zooKeeperUtilityFactory.createSharedCount("/taskCount", 0); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index f22127041d3..533026041f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -31,8 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -86,6 +84,8 @@ */ private final ArrayDeque completedCheckpoints; + private final Executor executor; + /** * Creates a {@link ZooKeeperCompletedCheckpointStore} instance. * @@ -98,7 +98,7 @@ * start with a '/') * @param stateStorage State storage to be used to persist the completed * checkpoint -* @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks +* @param executor to execute blocking calls * @throws Exception */ public ZooKeeperCompletedCheckpointStore( @@ -123,10 +123,12 @@ public ZooKeeperCompletedCheckpointStore( // All operations will have the path as root this.client = client.usingNamespace(client.getNamespace() + checkpointsPath); - this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor); + this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage); this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); + this.executor = checkNotNull(executor); + LOG.info("Initialized in '{}'.", checkpointsPath); } @@ -237,7 +239,18 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception // Everything worked, let's remove a previous checkpoint if necessary. while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) { try { - removeSubsumed(completedCheckpoints.removeFirst()); + final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst(); + + if (tryRemove(completedCheckpoint.getCheckpointID())) { + executor.execute(() -> { + try { + completedCheckpoint.discardOnSubsume(); + } catch (Exception e) { + LOG.warn("Could not discard subsumed completed checkpoint {}.", completedCheckpoint.getCheckpointID(), e); +
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612264#comment-16612264 ] ASF GitHub Bot commented on FLINK-10011: tillrohrmann commented on issue #6590: [Backport 1.4][FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6590#issuecomment-420678214 Merged via d2a828c714188902f2e29da68bf137c3f82de014 07ab3d9bc567cf67b0d5cb7ae55a185f898cc766 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.4.3, 1.6.1, 1.7.0, 1.5.4 > > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted > leadership with leader session ID > Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} > * 15:19:57 JM 2 reports the job has been suspended > ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter > Shutting down.}} > ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} > * 15:19:57 JM 2 reports it has lost leadership: > ** {{Associated JobManager > Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612257#comment-16612257 ] ASF GitHub Bot commented on FLINK-10011: tillrohrmann closed pull request #6589: [Backport 1.5][FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6589 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java index 069cb833a3a..45d11412c50 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java @@ -56,8 +56,7 @@ public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Exec ZooKeeperStateHandleStore zooKeeperStateHandleStore = zooKeeperUtilityFactory.createZooKeeperStateHandleStore( "/workers", - stateStorageHelper, - executor); + stateStorageHelper); ZooKeeperSharedValue frameworkId = zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]); ZooKeeperSharedCount totalTaskCount = zooKeeperUtilityFactory.createSharedCount("/taskCount", 0); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java index f2f905971c3..9a992814235 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java @@ -19,9 +19,11 @@ package org.apache.flink.runtime.akka; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.AkkaActorGateway; import akka.actor.ActorRef; import akka.actor.Kill; +import akka.actor.PoisonPill; import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,5 +87,13 @@ return FutureUtils.completeAll(terminationFutures); } + public static void stopActor(AkkaActorGateway akkaActorGateway) { + stopActor(akkaActorGateway.actor()); + } + + public static void stopActor(ActorRef actorRef) { + actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + private ActorUtils() {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index f22127041d3..131733924ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -25,14 +25,13 @@ import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.ConsumerWithException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.ZKPaths; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -86,6 +85,8 @@ */ private final ArrayDeque completedCheckpoints; + private final Executor executor; + /** * Creates a {@link ZooKeeperCompletedCheckpointStore} instance. * @@ -98,7 +99,7 @@ * start with a '/') * @param stateStorage State storage to be used to persist the completed * checkpoint -* @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks +* @param executor to execute blocking calls * @throws Exception */ public ZooKeeperCompletedCheckpointStore( @@ -123,10 +124,12 @@ public ZooKeeperCompletedCheckpointStore( // All operations will have the path as root this.client = client.usingNamespace(client.getNamespace() + checkpointsPath); - this.checkpointsInZooKeeper = new
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612256#comment-16612256 ] ASF GitHub Bot commented on FLINK-10011: tillrohrmann commented on issue #6589: [Backport 1.5][FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6589#issuecomment-420677376 Merged via 7c56d15b911d4fb87c073a25eb93d61502995c5a 5c8630cf34b3d3cce5a821972840f0a7c2fa1bb7 dd096eb44d8401d22f7e3cfaa51345f59abe04d4 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted > leadership with leader session ID > Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} > * 15:19:57 JM 2 reports the job has been suspended > ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter > Shutting down.}} > ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} > * 15:19:57 JM 2 reports it has lost leadership: > ** {{Associated JobManager > Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}} > **
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612252#comment-16612252 ] ASF GitHub Bot commented on FLINK-10011: tillrohrmann commented on issue #6588: [Backport 1.6][FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6588#issuecomment-420677005 Merged via fabd246e5342001d64be55c820ba50b3cf75d2a6 5e62da0f95d9abe35997e45dc9b0df3a9c7495cd 4d7204784270b5ad8d5455d82f7a965efdb5ae45 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted > leadership with leader session ID > Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} > * 15:19:57 JM 2 reports the job has been suspended > ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter > Shutting down.}} > ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} > * 15:19:57 JM 2 reports it has lost leadership: > ** {{Associated JobManager > Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}} > **
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612253#comment-16612253 ] ASF GitHub Bot commented on FLINK-10011: tillrohrmann closed pull request #6588: [Backport 1.6][FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6588 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java index 069cb833a3a..45d11412c50 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java @@ -56,8 +56,7 @@ public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Exec ZooKeeperStateHandleStore zooKeeperStateHandleStore = zooKeeperUtilityFactory.createZooKeeperStateHandleStore( "/workers", - stateStorageHelper, - executor); + stateStorageHelper); ZooKeeperSharedValue frameworkId = zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]); ZooKeeperSharedCount totalTaskCount = zooKeeperUtilityFactory.createSharedCount("/taskCount", 0); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java index f2f905971c3..9a992814235 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java @@ -19,9 +19,11 @@ package org.apache.flink.runtime.akka; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.AkkaActorGateway; import akka.actor.ActorRef; import akka.actor.Kill; +import akka.actor.PoisonPill; import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,5 +87,13 @@ return FutureUtils.completeAll(terminationFutures); } + public static void stopActor(AkkaActorGateway akkaActorGateway) { + stopActor(akkaActorGateway.actor()); + } + + public static void stopActor(ActorRef actorRef) { + actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + private ActorUtils() {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index f22127041d3..131733924ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -25,14 +25,13 @@ import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.ConsumerWithException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.ZKPaths; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -86,6 +85,8 @@ */ private final ArrayDeque completedCheckpoints; + private final Executor executor; + /** * Creates a {@link ZooKeeperCompletedCheckpointStore} instance. * @@ -98,7 +99,7 @@ * start with a '/') * @param stateStorage State storage to be used to persist the completed * checkpoint -* @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks +* @param executor to execute blocking calls * @throws Exception */ public ZooKeeperCompletedCheckpointStore( @@ -123,10 +124,12 @@ public ZooKeeperCompletedCheckpointStore( // All operations will have the path as root this.client = client.usingNamespace(client.getNamespace() + checkpointsPath); - this.checkpointsInZooKeeper = new
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612245#comment-16612245 ] ASF GitHub Bot commented on FLINK-10011: asfgit closed pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6587 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java index 069cb833a3a..45d11412c50 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java @@ -56,8 +56,7 @@ public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Exec ZooKeeperStateHandleStore zooKeeperStateHandleStore = zooKeeperUtilityFactory.createZooKeeperStateHandleStore( "/workers", - stateStorageHelper, - executor); + stateStorageHelper); ZooKeeperSharedValue frameworkId = zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]); ZooKeeperSharedCount totalTaskCount = zooKeeperUtilityFactory.createSharedCount("/taskCount", 0); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java index f2f905971c3..9a992814235 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java @@ -19,9 +19,11 @@ package org.apache.flink.runtime.akka; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.AkkaActorGateway; import akka.actor.ActorRef; import akka.actor.Kill; +import akka.actor.PoisonPill; import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,5 +87,13 @@ return FutureUtils.completeAll(terminationFutures); } + public static void stopActor(AkkaActorGateway akkaActorGateway) { + stopActor(akkaActorGateway.actor()); + } + + public static void stopActor(ActorRef actorRef) { + actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + private ActorUtils() {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index f22127041d3..131733924ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -25,14 +25,13 @@ import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.ConsumerWithException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.ZKPaths; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -86,6 +85,8 @@ */ private final ArrayDeque completedCheckpoints; + private final Executor executor; + /** * Creates a {@link ZooKeeperCompletedCheckpointStore} instance. * @@ -98,7 +99,7 @@ * start with a '/') * @param stateStorage State storage to be used to persist the completed * checkpoint -* @param executor to give to the ZooKeeperStateHandleStore to run ZooKeeper callbacks +* @param executor to execute blocking calls * @throws Exception */ public ZooKeeperCompletedCheckpointStore( @@ -123,10 +124,12 @@ public ZooKeeperCompletedCheckpointStore( // All operations will have the path as root this.client = client.usingNamespace(client.getNamespace() + checkpointsPath); - this.checkpointsInZooKeeper = new
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612230#comment-16612230 ] ASF GitHub Bot commented on FLINK-10011: tillrohrmann commented on issue #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6587#issuecomment-420672486 Thanks for the review @azagrebin. Merging this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted > leadership with leader session ID > Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} > * 15:19:57 JM 2 reports the job has been suspended > ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter > Shutting down.}} > ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} > * 15:19:57 JM 2 reports it has lost leadership: > ** {{Associated JobManager > Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}} > ** {{Received leader address but not running in leader ActorSystem. > Cancelling registration.}} > *
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16608017#comment-16608017 ] ASF GitHub Bot commented on FLINK-10011: tillrohrmann commented on a change in pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6587#discussion_r216127860 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ## @@ -237,7 +239,18 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception // Everything worked, let's remove a previous checkpoint if necessary. while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) { try { - removeSubsumed(completedCheckpoints.removeFirst()); + final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst(); Review comment: Good idea. I've changed it according to your suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16608014#comment-16608014 ] ASF GitHub Bot commented on FLINK-10011: tillrohrmann commented on a change in pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6587#discussion_r216127558 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.ActorUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.dispatcher.DispatcherHATest; +import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.testingUtils.TestingJobManager; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.ZooKeeperResource; +import org.apache.flink.util.TestLogger; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.ExtendedActorSystem; +import akka.actor.Identify; +import akka.actor.Terminated; +import akka.pattern.Patterns; +import org.apache.curator.framework.CuratorFramework; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +/** + * Tests for the ZooKeeper HA service and {@link JobManager} interaction. + */ +public class ZooKeeperHAJobManagerTest extends TestLogger { + + @ClassRule + public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, TimeUnit.SECONDS); + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() throws Exception { + final Future terminationFuture = system.terminate(); + Await.ready(terminationFuture, TIMEOUT); + } + + /** +* Tests that the {@link JobManager} releases all locked {@link JobGraph} if it loses +* leadership. +*/ + @Test + public void testJobGraphReleaseWhenLosingLeadership() throws Exception { + final Configuration configuration = new Configuration(); +
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596459#comment-16596459 ] Till Rohrmann commented on FLINK-10011: --- I might have misdiagnosed the underlying problem a little bit. Initially I thought that the {{JM2}} did not release the lock it created when the job was initially submitted. However, in order for {{JM1}} to become leader, {{JM2}} needs to lose its ZooKeeper session (otherwise the ephemeral leader Znode would not be deleted). In this case, also the {{JobGraph}} lock Znodes should be removed. So where do the lock nodes come from? The answer could come from FLINK-10255. In Flip-6 we always recover and in pre-Flip-6 we sometimes jobs if we are not the leader. If we are not the leader, then we won't start execution of the job though. However, recovering the job will also lock it. Therefore, my suspicion is that the old leader {{JM2}} actually recovered the job after it reconnected to ZooKeeper. If this is the case, then you should see the following log line in the log file of {{JM2}}: {{org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - Recovered SubmittedJobGraph(, null)}}. Do you still have the logs to check whether this is true or not [~elevy]? An non-volatile variable could also explain why it only occurs sometimes with pre-Flip-6 and always with Flip-6. > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596355#comment-16596355 ] ASF GitHub Bot commented on FLINK-10011: tillrohrmann commented on issue #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6587#issuecomment-416961912 Unfortunately, this only fixes half of the problem. There is still another problem with standby Dispatchers/JobMaster. See https://issues.apache.org/jira/browse/FLINK-10255 for more information. Nevertheless, I think this PR fixes a valid problem and thus should be merged after addressing @azagrebin comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted > leadership with leader session ID > Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} > * 15:19:57 JM 2 reports the job has been suspended > ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter > Shutting down.}} > ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} > *
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590520#comment-16590520 ] ASF GitHub Bot commented on FLINK-10011: azagrebin commented on a change in pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6587#discussion_r212321052 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ## @@ -237,7 +239,18 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception // Everything worked, let's remove a previous checkpoint if necessary. while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) { try { - removeSubsumed(completedCheckpoints.removeFirst()); + final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst(); Review comment: I would try to move the whole try/catch into one method to deduplicate code with `shutdown()`, e.g.: ``` void tryRemove(Runnable doRemove) { try { // .. doRemove.run(); // completedCheckpoint.discardOnSubsume(); or OnShutdown // .. } catch { // ... } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590519#comment-16590519 ] ASF GitHub Bot commented on FLINK-10011: azagrebin commented on a change in pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6587#discussion_r212379451 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.ActorUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.dispatcher.DispatcherHATest; +import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.testingUtils.TestingJobManager; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.ZooKeeperResource; +import org.apache.flink.util.TestLogger; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.ExtendedActorSystem; +import akka.actor.Identify; +import akka.actor.Terminated; +import akka.pattern.Patterns; +import org.apache.curator.framework.CuratorFramework; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +/** + * Tests for the ZooKeeper HA service and {@link JobManager} interaction. + */ +public class ZooKeeperHAJobManagerTest extends TestLogger { + + @ClassRule + public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, TimeUnit.SECONDS); + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() throws Exception { + final Future terminationFuture = system.terminate(); + Await.ready(terminationFuture, TIMEOUT); + } + + /** +* Tests that the {@link JobManager} releases all locked {@link JobGraph} if it loses +* leadership. +*/ + @Test + public void testJobGraphReleaseWhenLosingLeadership() throws Exception { + final Configuration configuration = new Configuration(); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586652#comment-16586652 ] ASF GitHub Bot commented on FLINK-10011: tillrohrmann opened a new pull request #6590: [Backport 1.4][FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6590 ## What is the purpose of the change Backport of #6587 for `release-1.4`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted > leadership with leader session ID > Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} > * 15:19:57 JM 2 reports the job has been suspended > ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter > Shutting down.}} > ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} > * 15:19:57 JM 2 reports it has lost leadership: > ** {{Associated JobManager > Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}} > ** {{Received leader address but not running in leader ActorSystem. >
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586627#comment-16586627 ] ASF GitHub Bot commented on FLINK-10011: tillrohrmann opened a new pull request #6589: [Backport 1.5][FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6589 ## What is the purpose of the change Backport of #6587 for `release-1.5`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted > leadership with leader session ID > Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} > * 15:19:57 JM 2 reports the job has been suspended > ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter > Shutting down.}} > ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} > * 15:19:57 JM 2 reports it has lost leadership: > ** {{Associated JobManager > Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}} > ** {{Received leader address but not running in leader ActorSystem. >
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586624#comment-16586624 ] ASF GitHub Bot commented on FLINK-10011: tillrohrmann opened a new pull request #6588: [Backport 1.6][FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6588 ## What is the purpose of the change Backport of #6587 for `release-1.6`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted > leadership with leader session ID > Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} > * 15:19:57 JM 2 reports the job has been suspended > ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter > Shutting down.}} > ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} > * 15:19:57 JM 2 reports it has lost leadership: > ** {{Associated JobManager > Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}} > ** {{Received leader address but not running in leader ActorSystem. >
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586618#comment-16586618 ] ASF GitHub Bot commented on FLINK-10011: tillrohrmann opened a new pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6587 ## What is the purpose of the change This PR fixes the problem that sometimes `JobGraphs` cannot be removed from the `ZooKeeperSubmittedJobGraphStore` because a former leader might still keep a lock on the `JobGraph`. This usually happens in multi stand-by JobManager/Dispatcher scenarios, where a leader loses leadership due to a temporary network glitch but can restore its connection to ZooKeeper. The lock nodes, which are ephemeral and are created to protect against concurrent deletions, won't be deleted in this case and, thus, the `JobGraph` won't be removable by the new leader. The problem will be solved by explicitly removing all locks a `JobManager`/`Dispatcher` keeps on the stored `JobGraphs` if it loses leadership. This PR is based on #6586 ## Brief change log SubmitedJobGraphStore#releaseJobGraph removes a potentially existing lock from the specified JobGraph. This allows other SubmittedJobGraphStores to remove the JobGraph given that it is no longer locked. The JobManager now releases its lock on all JobGraphs it has stored in the SubmittedJobGraphStore if the JobManager loses leadership. This ensures that a different JobManager can delete these jobs after it has recovered them and reached a globally terminal state. This is especially important when using stand-by JobManagers where a former leader might still be connected to ZooKeeper and, thus, keeping all ephemeral nodes/locks. The Dispatcher now releases all JobGraphs it has stored in the SubmittedJobGraphStore if it loses leadership. This ensures that the newly elected leader after recovering the jobs can remove them from the SubmittedJobGraphStore. Before, the problem was that a former leader might still be connected to ZooKeeper which keeps its ephemeral lock nodes alive. This could prevent the deletion of the JobGraph from ZooKeeper. The problem occurs in particular in multi stand-by Dispatcher scenarios. ## Verifying this change - Added `ZooKeeperHAJobManagerTest#testSubmittedJobGraphRelease` and `ZooKeeperHADispatcherTest#testSubmittedJobGraphRelease` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > **
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568511#comment-16568511 ] Till Rohrmann commented on FLINK-10011: --- FLINK-10052 is the dedicated issue for allowing Flink to tolerate temporarily suspended ZooKeeper connections. > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Priority: Blocker > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted > leadership with leader session ID > Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} > * 15:19:57 JM 2 reports the job has been suspended > ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter > Shutting down.}} > ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} > * 15:19:57 JM 2 reports it has lost leadership: > ** {{Associated JobManager > Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}} > ** {{Received leader address but not running in leader ActorSystem. > Cancelling registration.}} > * 15:19:57 TMs register with JM 1 > * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs: > ** {{Attempting to recover all jobs.}} > ** {{There are 2 jobs to recover. Starting the job recovery.}} > ** {{Attempting to recover job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}} > ** {{Attempting to recover job > {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}} > * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form: > ** {{Got user-level KeeperException when
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568489#comment-16568489 ] Till Rohrmann commented on FLINK-10011: --- I think your analysis is correct [~elevy] and the problem is that in a multi standby JobManager scenario, it can happen that old leaders can keep locks on {{JobGraphs}}. As Elias proposed, we should explicitly free locked resources in case of a lost leadership in order to solve this problem. This issue not only affects the legacy mode but also the new mode. A somewhat related problem is to harden Flink's behaviour in case of a suspended ZooKeeper connection. I think it is a good idea to wait for the ZooKeeper session timeout before giving up the leadership instead of doing it right away. That way, Flink could tolerate ZooKeeper hickups/network issues without failing an executed job. I would like to create a new JIRA issue for this improvement since the underlying problem of this issue is the non released lock on the {{JobGraph}}. > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Priority: Blocker > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted > leadership with leader session ID > Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} > * 15:19:57 JM 2 reports the job has been suspended > ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter > Shutting down.}} > ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} > * 15:19:57 JM 2 reports it has lost
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16566032#comment-16566032 ] Elias Levy commented on FLINK-10011: [~azagrebin] I don't think they are the same issue. The issue I am observing is that the new JM leader after a failover can't delete a job graph in ZK when it is canceled because the previous JM leader still has the job graph locked in ZK via the child ephemeral node. This is the state in ZK: [zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs [d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1] Job 2a4eff355aef849c5ca37dbac04f2ff1 was running before fail over and we canceled after fail over. The job is no longer running, but it is still in ZK. In the logs we see that the JM 1 (10.210.22.167), that one that became leader after failover, thinks it deleted the 2a4eff355aef849c5ca37dbac04f2ff1 job from ZK when it was canceled: July 30th 2018, 15:32:27.231Trying to cancel job with ID 2a4eff355aef849c5ca37dbac04f2ff1. July 30th 2018, 15:32:27.232Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RESTARTING to CANCELED. July 30th 2018, 15:32:27.232Stopping checkpoint coordinator for job 2a4eff355aef849c5ca37dbac04f2ff1 July 30th 2018, 15:32:27.239Removed job graph 2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper. July 30th 2018, 15:32:27.245Removing /flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper July 30th 2018, 15:32:27.251Removing /checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper Looking at the ZK logs I find the problem: July 30th 2018, 15:32:27.241Got user-level KeeperException when processing sessionid:0x201d2330001 type:delete cxid:0x434c zxid:0x60009dd94 txntype:-1 reqpath:n/a Error Path:/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1 Error:KeeperErrorCode = Directory not empty for /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1 Looking in ZK, we see: [zk: localhost:2181(CONNECTED) 0] ls /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1 [d833418c-891a-4b5e-b983-080be803275c] >From the comments in ZooKeeperStateHandleStore.java I gather that this child >node is used as a deletion lock. Looking at the contents of this ephemeral >lock node: [zk: localhost:2181(CONNECTED) 16] get /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1/d833418c-891a-4b5e-b983-080be803275c 10.210.42.62 cZxid = 0x60002ffa7 ctime = Tue Jun 12 20:01:26 UTC 2018 mZxid = 0x60002ffa7 mtime = Tue Jun 12 20:01:26 UTC 2018 pZxid = 0x60002ffa7 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x3003f4a0003 dataLength = 12 numChildren = 0 and compared to the ephemeral node lock of the currently running job: [zk: localhost:2181(CONNECTED) 17] get /flink/cluster_1/jobgraphs/d77948df92813a68ea6dfd6783f40e7e/596a4add-9f5c-4113-99ec-9c942fe91172 10.210.22.167 cZxid = 0x60009df4b ctime = Mon Jul 30 23:01:04 UTC 2018 mZxid = 0x60009df4b mtime = Mon Jul 30 23:01:04 UTC 2018 pZxid = 0x60009df4b cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x201d2330001 dataLength = 13 numChildren = 0 Assuming the content of the nodes represent the owner, it seems the job graph for the old canceled job, 2a4eff355aef849c5ca37dbac04f2ff1, is locked by the previous JM leader, JM 2(10.210.42.62), while the running job locked by the current JM leader, JM 1 (10.210.22.167). Somehow the previous leader, JM 2, did not give up the lock when leadership failed over to JM 2. Note that JM 2 never lost it's ZK session, as it recovered it when it connected to another ZK node. So some code in the JobManager needed to explicitly release the lock on the job graph during failover and failed to do so. [~till.rohrmann] and [~uce] I think you wrote the ZK HA code. Any thoughts? > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Blocker > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16565583#comment-16565583 ] Andrey Zagrebin commented on FLINK-10011: - This problem might be related to the recently fixed race condition in FLINK-9575. It is about async deletion of graph and sync deletion of canceled job's blob data, so that async deletion might fail and graph stays then. > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Blocker > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted > leadership with leader session ID > Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} > * 15:19:57 JM 2 reports the job has been suspended > ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter > Shutting down.}} > ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} > * 15:19:57 JM 2 reports it has lost leadership: > ** {{Associated JobManager > Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}} > ** {{Received leader address but not running in leader ActorSystem. > Cancelling registration.}} > * 15:19:57 TMs register with JM 1 > * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs: > ** {{Attempting to recover all jobs.}} > ** {{There are 2 jobs to recover. Starting the job recovery.}} > ** {{Attempting to recover job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}} > ** {{Attempting to recover job > {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}} > * 15:20:08 –
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564573#comment-16564573 ] Elias Levy commented on FLINK-10011: [~trohrm...@apache.org] what do you think? > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Blocker > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted > leadership with leader session ID > Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} > * 15:19:57 JM 2 reports the job has been suspended > ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter > Shutting down.}} > ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} > * 15:19:57 JM 2 reports it has lost leadership: > ** {{Associated JobManager > Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}} > ** {{Received leader address but not running in leader ActorSystem. > Cancelling registration.}} > * 15:19:57 TMs register with JM 1 > * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs: > ** {{Attempting to recover all jobs.}} > ** {{There are 2 jobs to recover. Starting the job recovery.}} > ** {{Attempting to recover job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}} > ** {{Attempting to recover job > {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}} > * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form: > ** {{Got user-level KeeperException when processing > sessionid:0x201d2330001 type:create cxid:0x4211 zxid:0x60009dc70 >
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564569#comment-16564569 ] Elias Levy commented on FLINK-10011: It appears that it may not be necessary to replace the {{LeaderLatch}} Curator recipe to avoid loosing leadership during temporary connection failures. The Curator error handling [documentation|https://curator.apache.org/errors.html] talks about a {{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED}} and {{LOST}} connection states differently. And this [test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146] shows how leadership is not lost with a {{LeaderLatch and that policy. The [code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631] implementing the policy. And [this shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314] that Curator will inject a session expiration even while it is in {{SUSPENDED}} state, so that a disconnected client won't continue to think it is leader past its session expiration. > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Blocker > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager