[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612265#comment-16612265
 ] 

ASF GitHub Bot commented on FLINK-10011:


tillrohrmann closed pull request #6590: [Backport 1.4][FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6590
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
index 069cb833a3a..45d11412c50 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -56,8 +56,7 @@ public MesosWorkerStore createMesosWorkerStore(Configuration 
configuration, Exec
 
ZooKeeperStateHandleStore 
zooKeeperStateHandleStore = 
zooKeeperUtilityFactory.createZooKeeperStateHandleStore(
"/workers",
-   stateStorageHelper,
-   executor);
+   stateStorageHelper);
 
ZooKeeperSharedValue frameworkId = 
zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]);
ZooKeeperSharedCount totalTaskCount = 
zooKeeperUtilityFactory.createSharedCount("/taskCount", 0);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index f22127041d3..533026041f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -31,8 +31,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -86,6 +84,8 @@
 */
private final ArrayDeque completedCheckpoints;
 
+   private final Executor executor;
+
/**
 * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
 *
@@ -98,7 +98,7 @@
 *   start with a '/')
 * @param stateStorage   State storage to be used to 
persist the completed
 *   checkpoint
-* @param executor to give to the ZooKeeperStateHandleStore to run 
ZooKeeper callbacks
+* @param executor to execute blocking calls
 * @throws Exception
 */
public ZooKeeperCompletedCheckpointStore(
@@ -123,10 +123,12 @@ public ZooKeeperCompletedCheckpointStore(
// All operations will have the path as root
this.client = client.usingNamespace(client.getNamespace() + 
checkpointsPath);
 
-   this.checkpointsInZooKeeper = new 
ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
+   this.checkpointsInZooKeeper = new 
ZooKeeperStateHandleStore<>(this.client, stateStorage);
 
this.completedCheckpoints = new 
ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 
+   this.executor = checkNotNull(executor);
+
LOG.info("Initialized in '{}'.", checkpointsPath);
}
 
@@ -237,7 +239,18 @@ public void addCheckpoint(final CompletedCheckpoint 
checkpoint) throws Exception
// Everything worked, let's remove a previous checkpoint if 
necessary.
while (completedCheckpoints.size() > 
maxNumberOfCheckpointsToRetain) {
try {
-   
removeSubsumed(completedCheckpoints.removeFirst());
+   final CompletedCheckpoint completedCheckpoint = 
completedCheckpoints.removeFirst();
+
+   if 
(tryRemove(completedCheckpoint.getCheckpointID())) {
+   executor.execute(() -> {
+   try {
+   
completedCheckpoint.discardOnSubsume();
+   } catch (Exception e) {
+   LOG.warn("Could not 
discard subsumed completed checkpoint {}.", 
completedCheckpoint.getCheckpointID(), e);
+ 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612264#comment-16612264
 ] 

ASF GitHub Bot commented on FLINK-10011:


tillrohrmann commented on issue #6590: [Backport 1.4][FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6590#issuecomment-420678214
 
 
   Merged via
   d2a828c714188902f2e29da68bf137c3f82de014
   07ab3d9bc567cf67b0d5cb7ae55a185f898cc766


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.6.1, 1.7.0, 1.5.4
>
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager 
> Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612257#comment-16612257
 ] 

ASF GitHub Bot commented on FLINK-10011:


tillrohrmann closed pull request #6589: [Backport 1.5][FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6589
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
index 069cb833a3a..45d11412c50 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -56,8 +56,7 @@ public MesosWorkerStore createMesosWorkerStore(Configuration 
configuration, Exec
 
ZooKeeperStateHandleStore 
zooKeeperStateHandleStore = 
zooKeeperUtilityFactory.createZooKeeperStateHandleStore(
"/workers",
-   stateStorageHelper,
-   executor);
+   stateStorageHelper);
 
ZooKeeperSharedValue frameworkId = 
zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]);
ZooKeeperSharedCount totalTaskCount = 
zooKeeperUtilityFactory.createSharedCount("/taskCount", 0);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
index f2f905971c3..9a992814235 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
@@ -19,9 +19,11 @@
 package org.apache.flink.runtime.akka;
 
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 
 import akka.actor.ActorRef;
 import akka.actor.Kill;
+import akka.actor.PoisonPill;
 import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,5 +87,13 @@
return FutureUtils.completeAll(terminationFutures);
}
 
+   public static void stopActor(AkkaActorGateway akkaActorGateway) {
+   stopActor(akkaActorGateway.actor());
+   }
+
+   public static void stopActor(ActorRef actorRef) {
+   actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+   }
+
private ActorUtils() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index f22127041d3..131733924ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -25,14 +25,13 @@
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.ConsumerWithException;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -86,6 +85,8 @@
 */
private final ArrayDeque completedCheckpoints;
 
+   private final Executor executor;
+
/**
 * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
 *
@@ -98,7 +99,7 @@
 *   start with a '/')
 * @param stateStorage   State storage to be used to 
persist the completed
 *   checkpoint
-* @param executor to give to the ZooKeeperStateHandleStore to run 
ZooKeeper callbacks
+* @param executor to execute blocking calls
 * @throws Exception
 */
public ZooKeeperCompletedCheckpointStore(
@@ -123,10 +124,12 @@ public ZooKeeperCompletedCheckpointStore(
// All operations will have the path as root
this.client = client.usingNamespace(client.getNamespace() + 
checkpointsPath);
 
-   this.checkpointsInZooKeeper = new 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612256#comment-16612256
 ] 

ASF GitHub Bot commented on FLINK-10011:


tillrohrmann commented on issue #6589: [Backport 1.5][FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6589#issuecomment-420677376
 
 
   Merged via
   7c56d15b911d4fb87c073a25eb93d61502995c5a
   5c8630cf34b3d3cce5a821972840f0a7c2fa1bb7
   dd096eb44d8401d22f7e3cfaa51345f59abe04d4


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager 
> Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
>  ** 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612252#comment-16612252
 ] 

ASF GitHub Bot commented on FLINK-10011:


tillrohrmann commented on issue #6588: [Backport 1.6][FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6588#issuecomment-420677005
 
 
   Merged via
   fabd246e5342001d64be55c820ba50b3cf75d2a6
   5e62da0f95d9abe35997e45dc9b0df3a9c7495cd
   4d7204784270b5ad8d5455d82f7a965efdb5ae45


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager 
> Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
>  ** 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612253#comment-16612253
 ] 

ASF GitHub Bot commented on FLINK-10011:


tillrohrmann closed pull request #6588: [Backport 1.6][FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6588
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
index 069cb833a3a..45d11412c50 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -56,8 +56,7 @@ public MesosWorkerStore createMesosWorkerStore(Configuration 
configuration, Exec
 
ZooKeeperStateHandleStore 
zooKeeperStateHandleStore = 
zooKeeperUtilityFactory.createZooKeeperStateHandleStore(
"/workers",
-   stateStorageHelper,
-   executor);
+   stateStorageHelper);
 
ZooKeeperSharedValue frameworkId = 
zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]);
ZooKeeperSharedCount totalTaskCount = 
zooKeeperUtilityFactory.createSharedCount("/taskCount", 0);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
index f2f905971c3..9a992814235 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
@@ -19,9 +19,11 @@
 package org.apache.flink.runtime.akka;
 
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 
 import akka.actor.ActorRef;
 import akka.actor.Kill;
+import akka.actor.PoisonPill;
 import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,5 +87,13 @@
return FutureUtils.completeAll(terminationFutures);
}
 
+   public static void stopActor(AkkaActorGateway akkaActorGateway) {
+   stopActor(akkaActorGateway.actor());
+   }
+
+   public static void stopActor(ActorRef actorRef) {
+   actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+   }
+
private ActorUtils() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index f22127041d3..131733924ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -25,14 +25,13 @@
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.ConsumerWithException;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -86,6 +85,8 @@
 */
private final ArrayDeque completedCheckpoints;
 
+   private final Executor executor;
+
/**
 * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
 *
@@ -98,7 +99,7 @@
 *   start with a '/')
 * @param stateStorage   State storage to be used to 
persist the completed
 *   checkpoint
-* @param executor to give to the ZooKeeperStateHandleStore to run 
ZooKeeper callbacks
+* @param executor to execute blocking calls
 * @throws Exception
 */
public ZooKeeperCompletedCheckpointStore(
@@ -123,10 +124,12 @@ public ZooKeeperCompletedCheckpointStore(
// All operations will have the path as root
this.client = client.usingNamespace(client.getNamespace() + 
checkpointsPath);
 
-   this.checkpointsInZooKeeper = new 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612245#comment-16612245
 ] 

ASF GitHub Bot commented on FLINK-10011:


asfgit closed pull request #6587: [FLINK-10011] Release JobGraph from 
SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6587
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
index 069cb833a3a..45d11412c50 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -56,8 +56,7 @@ public MesosWorkerStore createMesosWorkerStore(Configuration 
configuration, Exec
 
ZooKeeperStateHandleStore 
zooKeeperStateHandleStore = 
zooKeeperUtilityFactory.createZooKeeperStateHandleStore(
"/workers",
-   stateStorageHelper,
-   executor);
+   stateStorageHelper);
 
ZooKeeperSharedValue frameworkId = 
zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]);
ZooKeeperSharedCount totalTaskCount = 
zooKeeperUtilityFactory.createSharedCount("/taskCount", 0);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
index f2f905971c3..9a992814235 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
@@ -19,9 +19,11 @@
 package org.apache.flink.runtime.akka;
 
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 
 import akka.actor.ActorRef;
 import akka.actor.Kill;
+import akka.actor.PoisonPill;
 import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,5 +87,13 @@
return FutureUtils.completeAll(terminationFutures);
}
 
+   public static void stopActor(AkkaActorGateway akkaActorGateway) {
+   stopActor(akkaActorGateway.actor());
+   }
+
+   public static void stopActor(ActorRef actorRef) {
+   actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+   }
+
private ActorUtils() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index f22127041d3..131733924ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -25,14 +25,13 @@
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.ConsumerWithException;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -86,6 +85,8 @@
 */
private final ArrayDeque completedCheckpoints;
 
+   private final Executor executor;
+
/**
 * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
 *
@@ -98,7 +99,7 @@
 *   start with a '/')
 * @param stateStorage   State storage to be used to 
persist the completed
 *   checkpoint
-* @param executor to give to the ZooKeeperStateHandleStore to run 
ZooKeeper callbacks
+* @param executor to execute blocking calls
 * @throws Exception
 */
public ZooKeeperCompletedCheckpointStore(
@@ -123,10 +124,12 @@ public ZooKeeperCompletedCheckpointStore(
// All operations will have the path as root
this.client = client.usingNamespace(client.getNamespace() + 
checkpointsPath);
 
-   this.checkpointsInZooKeeper = new 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612230#comment-16612230
 ] 

ASF GitHub Bot commented on FLINK-10011:


tillrohrmann commented on issue #6587: [FLINK-10011] Release JobGraph from 
SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6587#issuecomment-420672486
 
 
   Thanks for the review @azagrebin. Merging this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager 
> Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
>  ** {{Received leader address but not running in leader ActorSystem. 
> Cancelling registration.}}
>  * 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-09-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16608017#comment-16608017
 ] 

ASF GitHub Bot commented on FLINK-10011:


tillrohrmann commented on a change in pull request #6587: [FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6587#discussion_r216127860
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ##
 @@ -237,7 +239,18 @@ public void addCheckpoint(final CompletedCheckpoint 
checkpoint) throws Exception
// Everything worked, let's remove a previous checkpoint if 
necessary.
while (completedCheckpoints.size() > 
maxNumberOfCheckpointsToRetain) {
try {
-   
removeSubsumed(completedCheckpoints.removeFirst());
+   final CompletedCheckpoint completedCheckpoint = 
completedCheckpoints.removeFirst();
 
 Review comment:
   Good idea. I've changed it according to your suggestion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-09-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16608014#comment-16608014
 ] 

ASF GitHub Bot commented on FLINK-10011:


tillrohrmann commented on a change in pull request #6587: [FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6587#discussion_r216127558
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherHATest;
+import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Identify;
+import akka.actor.Terminated;
+import akka.pattern.Patterns;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the ZooKeeper HA service and {@link JobManager} interaction.
+ */
+public class ZooKeeperHAJobManagerTest extends TestLogger {
+
+   @ClassRule
+   public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new 
ZooKeeperResource();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, 
TimeUnit.SECONDS);
+
+   private static ActorSystem system;
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() throws Exception {
+   final Future terminationFuture = system.terminate();
+   Await.ready(terminationFuture, TIMEOUT);
+   }
+
+   /**
+* Tests that the {@link JobManager} releases all locked {@link 
JobGraph} if it loses
+* leadership.
+*/
+   @Test
+   public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
+   final Configuration configuration = new Configuration();
+   

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-29 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596459#comment-16596459
 ] 

Till Rohrmann commented on FLINK-10011:
---

I might have misdiagnosed the underlying problem a little bit. Initially I 
thought that the {{JM2}} did not release the lock it created when the job was 
initially submitted. However, in order for {{JM1}} to become leader, {{JM2}} 
needs to lose its ZooKeeper session (otherwise the ephemeral leader Znode would 
not be deleted). In this case, also the {{JobGraph}} lock Znodes should be 
removed. 

So where do the lock nodes come from? The answer could come from FLINK-10255. 
In Flip-6 we always recover and in pre-Flip-6 we sometimes jobs if we are not 
the leader. If we are not the leader, then we won't start execution of the job 
though. However, recovering the job will also lock it. Therefore, my suspicion 
is that the old leader {{JM2}} actually recovered the job after it reconnected 
to ZooKeeper. If this is the case, then you should see the following log line 
in the log file of {{JM2}}: 
{{org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Recovered SubmittedJobGraph(, null)}}. Do you still have the logs to 
check whether this is true or not [~elevy]?

An non-volatile variable could also explain why it only occurs sometimes with 
pre-Flip-6 and always with Flip-6.

> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596355#comment-16596355
 ] 

ASF GitHub Bot commented on FLINK-10011:


tillrohrmann commented on issue #6587: [FLINK-10011] Release JobGraph from 
SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6587#issuecomment-416961912
 
 
   Unfortunately, this only fixes half of the problem. There is still another 
problem with standby Dispatchers/JobMaster. See 
https://issues.apache.org/jira/browse/FLINK-10255 for more information. 
Nevertheless, I think this PR fixes a valid problem and thus should be merged 
after addressing @azagrebin comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590520#comment-16590520
 ] 

ASF GitHub Bot commented on FLINK-10011:


azagrebin commented on a change in pull request #6587: [FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6587#discussion_r212321052
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ##
 @@ -237,7 +239,18 @@ public void addCheckpoint(final CompletedCheckpoint 
checkpoint) throws Exception
// Everything worked, let's remove a previous checkpoint if 
necessary.
while (completedCheckpoints.size() > 
maxNumberOfCheckpointsToRetain) {
try {
-   
removeSubsumed(completedCheckpoints.removeFirst());
+   final CompletedCheckpoint completedCheckpoint = 
completedCheckpoints.removeFirst();
 
 Review comment:
   I would try to move the whole try/catch into one method to deduplicate code 
with `shutdown()`, e.g.:
   ```
   void tryRemove(Runnable doRemove) {
 try {
   // ..
 doRemove.run(); // completedCheckpoint.discardOnSubsume(); or 
OnShutdown
   // ..
 } catch {
   // ...
 }
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590519#comment-16590519
 ] 

ASF GitHub Bot commented on FLINK-10011:


azagrebin commented on a change in pull request #6587: [FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6587#discussion_r212379451
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherHATest;
+import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Identify;
+import akka.actor.Terminated;
+import akka.pattern.Patterns;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the ZooKeeper HA service and {@link JobManager} interaction.
+ */
+public class ZooKeeperHAJobManagerTest extends TestLogger {
+
+   @ClassRule
+   public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new 
ZooKeeperResource();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, 
TimeUnit.SECONDS);
+
+   private static ActorSystem system;
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() throws Exception {
+   final Future terminationFuture = system.terminate();
+   Await.ready(terminationFuture, TIMEOUT);
+   }
+
+   /**
+* Tests that the {@link JobManager} releases all locked {@link 
JobGraph} if it loses
+* leadership.
+*/
+   @Test
+   public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
+   final Configuration configuration = new Configuration();
+   
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586652#comment-16586652
 ] 

ASF GitHub Bot commented on FLINK-10011:


tillrohrmann opened a new pull request #6590: [Backport 1.4][FLINK-10011] 
Release JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6590
 
 
   ## What is the purpose of the change
   
   Backport of #6587 for `release-1.4`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager 
> Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
>  ** {{Received leader address but not running in leader ActorSystem. 
> 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586627#comment-16586627
 ] 

ASF GitHub Bot commented on FLINK-10011:


tillrohrmann opened a new pull request #6589: [Backport 1.5][FLINK-10011] 
Release JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6589
 
 
   ## What is the purpose of the change
   
   Backport of #6587 for `release-1.5`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager 
> Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
>  ** {{Received leader address but not running in leader ActorSystem. 
> 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586624#comment-16586624
 ] 

ASF GitHub Bot commented on FLINK-10011:


tillrohrmann opened a new pull request #6588: [Backport 1.6][FLINK-10011] 
Release JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6588
 
 
   ## What is the purpose of the change
   
   Backport of #6587 for `release-1.6`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager 
> Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
>  ** {{Received leader address but not running in leader ActorSystem. 
> 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586618#comment-16586618
 ] 

ASF GitHub Bot commented on FLINK-10011:


tillrohrmann opened a new pull request #6587: [FLINK-10011] Release JobGraph 
from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6587
 
 
   ## What is the purpose of the change
   
   This PR fixes the problem that sometimes `JobGraphs` cannot be removed from 
the `ZooKeeperSubmittedJobGraphStore` because a former leader might still keep 
a lock on the `JobGraph`. This usually happens in multi stand-by 
JobManager/Dispatcher scenarios, where a leader loses leadership due to a 
temporary network glitch but can restore its connection to ZooKeeper. The lock 
nodes, which are ephemeral and are created to protect against concurrent 
deletions, won't be deleted in this case and, thus, the `JobGraph` won't be 
removable by the new leader.
   
   The problem will be solved by explicitly removing all locks a 
`JobManager`/`Dispatcher` keeps on the stored `JobGraphs` if it loses 
leadership.
   
   This PR is based on #6586 
   
   ## Brief change log
   
   SubmitedJobGraphStore#releaseJobGraph removes a potentially existing lock
   from the specified JobGraph. This allows other SubmittedJobGraphStores to
   remove the JobGraph given that it is no longer locked.
   
   The JobManager now releases its lock on all JobGraphs it has stored in
   the SubmittedJobGraphStore if the JobManager loses leadership. This ensures
   that a different JobManager can delete these jobs after it has recovered
   them and reached a globally terminal state. This is especially important
   when using stand-by JobManagers where a former leader might still be
   connected to ZooKeeper and, thus, keeping all ephemeral nodes/locks.
   
   The Dispatcher now releases all JobGraphs it has stored in the 
SubmittedJobGraphStore
   if it loses leadership. This ensures that the newly elected leader after 
recovering
   the jobs can remove them from the SubmittedJobGraphStore. Before, the 
problem was
   that a former leader might still be connected to ZooKeeper which keeps its 
ephemeral
   lock nodes alive. This could prevent the deletion of the JobGraph from 
ZooKeeper.
   The problem occurs in particular in multi stand-by Dispatcher scenarios.
   
   
   ## Verifying this change
   
   - Added `ZooKeeperHAJobManagerTest#testSubmittedJobGraphRelease` and 
`ZooKeeperHADispatcherTest#testSubmittedJobGraphRelease`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-03 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568511#comment-16568511
 ] 

Till Rohrmann commented on FLINK-10011:
---

FLINK-10052 is the dedicated issue for allowing Flink to tolerate temporarily 
suspended ZooKeeper connections.

> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Priority: Blocker
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager 
> Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
>  ** {{Received leader address but not running in leader ActorSystem. 
> Cancelling registration.}}
>  * 15:19:57 TMs register with JM 1
>  * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
>  ** {{Attempting to recover all jobs.}}
>  ** {{There are 2 jobs to recover. Starting the job recovery.}}
>  ** {{Attempting to recover job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}}
>  ** {{Attempting to recover job 
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}}
>  * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form:
>  ** {{Got user-level KeeperException when 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-03 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568489#comment-16568489
 ] 

Till Rohrmann commented on FLINK-10011:
---

I think your analysis is correct [~elevy] and the problem is that in a multi 
standby JobManager scenario, it can happen that old leaders can keep locks on 
{{JobGraphs}}. As Elias proposed, we should explicitly free locked resources in 
case of a lost leadership in order to solve this problem. This issue not only 
affects the legacy mode but also the new mode.

A somewhat related problem is to harden Flink's behaviour in case of a 
suspended ZooKeeper connection. I think it is a good idea to wait for the 
ZooKeeper session timeout before giving up the leadership instead of doing it 
right away. That way, Flink could tolerate ZooKeeper hickups/network issues 
without failing an executed job. I would like to create a new JIRA issue for 
this improvement since the underlying problem of this issue is the non released 
lock on the {{JobGraph}}.





> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Priority: Blocker
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-01 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16566032#comment-16566032
 ] 

Elias Levy commented on FLINK-10011:


[~azagrebin] I don't think they are the same issue.  The issue I am observing 
is that the new JM leader after a failover can't delete a job graph in ZK when 
it is canceled because the previous JM leader still has the job graph locked in 
ZK via the child ephemeral node.

This is the state in ZK:

[zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs
[d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1]

Job 2a4eff355aef849c5ca37dbac04f2ff1 was running before fail over and we 
canceled after fail over.  The job is no longer running, but it is still in ZK.

In the logs we see that the JM 1 (10.210.22.167), that one that became leader 
after failover, thinks it deleted the 2a4eff355aef849c5ca37dbac04f2ff1 job from 
ZK when it was canceled:

July 30th 2018, 15:32:27.231Trying to cancel job with ID 
2a4eff355aef849c5ca37dbac04f2ff1.
July 30th 2018, 15:32:27.232Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) 
switched from state RESTARTING to CANCELED.
July 30th 2018, 15:32:27.232Stopping checkpoint coordinator for job 
2a4eff355aef849c5ca37dbac04f2ff1
July 30th 2018, 15:32:27.239Removed job graph 
2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper.
July 30th 2018, 15:32:27.245Removing 
/flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper
July 30th 2018, 15:32:27.251Removing 
/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper

Looking at the ZK logs I find the problem:

July 30th 2018, 15:32:27.241Got user-level KeeperException when processing 
sessionid:0x201d2330001 type:delete cxid:0x434c zxid:0x60009dd94 txntype:-1 
reqpath:n/a Error 
Path:/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1 
Error:KeeperErrorCode = Directory not empty for 
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1

Looking in ZK, we see:

[zk: localhost:2181(CONNECTED) 0] ls 
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1
[d833418c-891a-4b5e-b983-080be803275c]

>From the comments in ZooKeeperStateHandleStore.java I gather that this child 
>node is used as a deletion lock.  Looking at the contents of this ephemeral 
>lock node:

[zk: localhost:2181(CONNECTED) 16] get 
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1/d833418c-891a-4b5e-b983-080be803275c
10.210.42.62
cZxid = 0x60002ffa7
ctime = Tue Jun 12 20:01:26 UTC 2018
mZxid = 0x60002ffa7
mtime = Tue Jun 12 20:01:26 UTC 2018
pZxid = 0x60002ffa7
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3003f4a0003
dataLength = 12
numChildren = 0

and compared to the ephemeral node lock of the currently running job:

[zk: localhost:2181(CONNECTED) 17] get 
/flink/cluster_1/jobgraphs/d77948df92813a68ea6dfd6783f40e7e/596a4add-9f5c-4113-99ec-9c942fe91172
10.210.22.167
cZxid = 0x60009df4b
ctime = Mon Jul 30 23:01:04 UTC 2018
mZxid = 0x60009df4b
mtime = Mon Jul 30 23:01:04 UTC 2018
pZxid = 0x60009df4b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x201d2330001
dataLength = 13
numChildren = 0

Assuming the content of the nodes represent the owner, it seems the job graph 
for the old canceled job, 2a4eff355aef849c5ca37dbac04f2ff1, is locked by the 
previous JM leader, JM 2(10.210.42.62), while the running job locked by the 
current JM leader, JM 1 (10.210.22.167).

Somehow the previous leader, JM 2, did not give up the lock when leadership 
failed over to JM 2.  Note that JM 2 never lost it's ZK session, as it 
recovered it when it connected to another ZK node.  So some code in the 
JobManager needed to explicitly release the lock on the job graph during 
failover and failed to do so.

[~till.rohrmann] and [~uce] I think you wrote the ZK HA code.  Any thoughts?


> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Blocker
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-01 Thread Andrey Zagrebin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16565583#comment-16565583
 ] 

Andrey Zagrebin commented on FLINK-10011:
-

This problem might be related to the recently fixed race condition in 
FLINK-9575. It is about async deletion of graph and sync deletion of canceled 
job's blob data, so that async deletion might fail and graph stays then.

> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Blocker
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager 
> Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
>  ** {{Received leader address but not running in leader ActorSystem. 
> Cancelling registration.}}
>  * 15:19:57 TMs register with JM 1
>  * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
>  ** {{Attempting to recover all jobs.}}
>  ** {{There are 2 jobs to recover. Starting the job recovery.}}
>  ** {{Attempting to recover job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}}
>  ** {{Attempting to recover job 
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}}
>  * 15:20:08 – 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-07-31 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564573#comment-16564573
 ] 

Elias Levy commented on FLINK-10011:


[~trohrm...@apache.org] what do you think?

> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Blocker
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager 
> Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
>  ** {{Received leader address but not running in leader ActorSystem. 
> Cancelling registration.}}
>  * 15:19:57 TMs register with JM 1
>  * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
>  ** {{Attempting to recover all jobs.}}
>  ** {{There are 2 jobs to recover. Starting the job recovery.}}
>  ** {{Attempting to recover job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}}
>  ** {{Attempting to recover job 
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}}
>  * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form:
>  ** {{Got user-level KeeperException when processing 
> sessionid:0x201d2330001 type:create cxid:0x4211 zxid:0x60009dc70 
> 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-07-31 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564569#comment-16564569
 ] 

Elias Levy commented on FLINK-10011:


It appears that it may not be necessary to replace the {{LeaderLatch}} Curator 
recipe to avoid loosing leadership during temporary connection failures.

The Curator error handling 
[documentation|https://curator.apache.org/errors.html] talks about a 
{{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED}} and {{LOST}} 
connection states differently.  And this 
[test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146]
 shows how leadership is not lost with a {{LeaderLatch and that policy.  
The 
[code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631]
 implementing the policy.  And [this 
shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314]
 that Curator will inject a session expiration even while it is in 
{{SUSPENDED}} state, so that a disconnected client won't continue to think it 
is leader past its session expiration.

> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Blocker
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager