[jira] [Commented] (FLINK-14091) Job can not trigger checkpoint forever after zookeeper change leader

2020-01-02 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007206#comment-17007206
 ] 

Zili Chen commented on FLINK-14091:
---

Here is the [pull request|https://github.com/apache/flink/pull/10754]. You can 
review and comment. Also I'm wondering which version(s) will have this fix.

> Job can not trigger checkpoint forever after zookeeper change leader 
> -
>
> Key: FLINK-14091
> URL: https://issues.apache.org/jira/browse/FLINK-14091
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.0
>Reporter: Peng Wang
>Assignee: Zili Chen
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> when zk change leader, the state of curator is suspended,job manager can not 
> tigger checkpoint.but it doesn't tigger checkpoint after zk resume.
> we found that the lastState in the class ZooKeeperCheckpointIDCounter  never 
> change back to normal when it fall into SUSPENDED or LOST.
> h6. _/**_
>  _* Connection state listener. In case of \{@link ConnectionState#SUSPENDED} 
> or {@link_
>  _* ConnectionState#LOST} we are not guaranteed to read a current count from 
> ZooKeeper._
>  _*/_
> _private static class SharedCountConnectionStateListener implements 
> ConnectionStateListener {_
>  _private volatile ConnectionState lastState;_
>  _@Override_
>  _public void stateChanged(CuratorFramework client, ConnectionState newState) 
> {_
>  _if (newState == ConnectionState.SUSPENDED || newState == 
> ConnectionState.LOST) {_
>  _lastState = newState;_
>  _}_
>  _}_
>  _private ConnectionState getLastState() {_
>  _return lastState;_
>  _}_
> _}_
>  
> we change the state back. after test, solve the problem.
>  
> h6. _/**_
>  _* Connection state listener. In case of \{@link ConnectionState#SUSPENDED} 
> or {@link_
>  _* ConnectionState#LOST} we are not guaranteed to read a current count from 
> ZooKeeper._
>  _*/_
> _private static class SharedCountConnectionStateListener implements 
> ConnectionStateListener {_
>  _private volatile ConnectionState lastState;_
>  _@Override_
>  _public void stateChanged(CuratorFramework client, ConnectionState newState) 
> {_
>  _if (newState == ConnectionState.SUSPENDED || newState == 
> ConnectionState.LOST) {_
>  _lastState = newState;_
>  _}_
>  _else{_
>  _/* if connectionState is not SUSPENDED and LOST, reset lastState. */_
>  _lastState = null;_
>  _}_
>  _}_
>  _private ConnectionState getLastState() {_
>  _return lastState;_
>  _}_
> _}_
>  
> log:
> h6. 2019-09-16 13:38:38,020 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Unable 
> to }}{{read}} {{additional data from server sessionid 0x26cff6487c2000e, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect2019-09-16 13:38:38,122 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: SUSPENDED2019-09-16 13:38:38,123 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,126 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,126 WARN  
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
> ZooKeeper connection SUSPENDING. Changes to the submitted job graphs are not 
> monitored (temporarily).2019-09-16 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
> Connection to ZooKeeper suspended. The contender 
> akka.tcp:}}{{//flink}}{{@node007224:19115}}{{/user/dispatcher}} {{no longer 
> participates }}{{in}} {{the leader election.2019-09-16 13:38:38,128 
> WARN  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  
> - Connection to ZooKeeper suspended. The contender 
> akka.tcp:}}{{//flink}}{{@node007224:19115}}{{/user/resourcemanager}} {{no 
> longer participates }}{{in}} {{the leader election.2019-09-16 
> 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
> Connection to ZooKeeper suspended. The contender 
> http:}}{{//node007224}}{{:8081 no longer participates }}{{in}} {{the leader 
> election.2019-09-16 13:38:38,128 WARN  
> org.apac

[jira] [Commented] (FLINK-14091) Job can not trigger checkpoint forever after zookeeper change leader

2020-01-02 Thread Zili Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007197#comment-17007197
 ] 

Zili Chen commented on FLINK-14091:
---

It's a known issue we also faced internally. I have a fix and will push a pull 
request later today.

cc [~trohrmann]

> Job can not trigger checkpoint forever after zookeeper change leader 
> -
>
> Key: FLINK-14091
> URL: https://issues.apache.org/jira/browse/FLINK-14091
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.0
>Reporter: Peng Wang
>Assignee: Zili Chen
>Priority: Critical
>
> when zk change leader, the state of curator is suspended,job manager can not 
> tigger checkpoint.but it doesn't tigger checkpoint after zk resume.
> we found that the lastState in the class ZooKeeperCheckpointIDCounter  never 
> change back to normal when it fall into SUSPENDED or LOST.
> h6. _/**_
>  _* Connection state listener. In case of \{@link ConnectionState#SUSPENDED} 
> or {@link_
>  _* ConnectionState#LOST} we are not guaranteed to read a current count from 
> ZooKeeper._
>  _*/_
> _private static class SharedCountConnectionStateListener implements 
> ConnectionStateListener {_
>  _private volatile ConnectionState lastState;_
>  _@Override_
>  _public void stateChanged(CuratorFramework client, ConnectionState newState) 
> {_
>  _if (newState == ConnectionState.SUSPENDED || newState == 
> ConnectionState.LOST) {_
>  _lastState = newState;_
>  _}_
>  _}_
>  _private ConnectionState getLastState() {_
>  _return lastState;_
>  _}_
> _}_
>  
> we change the state back. after test, solve the problem.
>  
> h6. _/**_
>  _* Connection state listener. In case of \{@link ConnectionState#SUSPENDED} 
> or {@link_
>  _* ConnectionState#LOST} we are not guaranteed to read a current count from 
> ZooKeeper._
>  _*/_
> _private static class SharedCountConnectionStateListener implements 
> ConnectionStateListener {_
>  _private volatile ConnectionState lastState;_
>  _@Override_
>  _public void stateChanged(CuratorFramework client, ConnectionState newState) 
> {_
>  _if (newState == ConnectionState.SUSPENDED || newState == 
> ConnectionState.LOST) {_
>  _lastState = newState;_
>  _}_
>  _else{_
>  _/* if connectionState is not SUSPENDED and LOST, reset lastState. */_
>  _lastState = null;_
>  _}_
>  _}_
>  _private ConnectionState getLastState() {_
>  _return lastState;_
>  _}_
> _}_
>  
> log:
> h6. 2019-09-16 13:38:38,020 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Unable 
> to }}{{read}} {{additional data from server sessionid 0x26cff6487c2000e, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect2019-09-16 13:38:38,122 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: SUSPENDED2019-09-16 13:38:38,123 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,126 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,126 WARN  
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
> ZooKeeper connection SUSPENDING. Changes to the submitted job graphs are not 
> monitored (temporarily).2019-09-16 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
> Connection to ZooKeeper suspended. The contender 
> akka.tcp:}}{{//flink}}{{@node007224:19115}}{{/user/dispatcher}} {{no longer 
> participates }}{{in}} {{the leader election.2019-09-16 13:38:38,128 
> WARN  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  
> - Connection to ZooKeeper suspended. The contender 
> akka.tcp:}}{{//flink}}{{@node007224:19115}}{{/user/resourcemanager}} {{no 
> longer participates }}{{in}} {{the leader election.2019-09-16 
> 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
> Connection to ZooKeeper suspended. The contender 
> http:}}{{//node007224}}{{:8081 no longer participates }}{{in}} {{the leader 
> election.2019-09-16 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the lead

[jira] [Commented] (FLINK-14091) Job can not trigger checkpoint forever after zookeeper change leader

2019-09-16 Thread Peng Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931044#comment-16931044
 ] 

Peng Wang commented on FLINK-14091:
---

{{SharedCountConnectionStateListener already handle it(as shown below), but 
ZooKeeperCheckpointIDCounter not. }}
h6. @Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
 if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) 
{
 lastState = newState;
 }
 else{
 /* if connectionState is not SUSPENDED and LOST, reset lastState. */
 lastState = null;
 }
}

> Job can not trigger checkpoint forever after zookeeper change leader 
> -
>
> Key: FLINK-14091
> URL: https://issues.apache.org/jira/browse/FLINK-14091
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.0
>Reporter: Peng Wang
>Priority: Minor
>
> when zk change leader, the state of curator is suspended,job manager can not 
> tigger checkpoint.but it doesn't tigger checkpoint after zk resume.
> we found that the lastState in the class ZooKeeperCheckpointIDCounter  never 
> change back to normal when it fall into SUSPENDED or LOST.
> h6. _/**_
>  _* Connection state listener. In case of \{@link ConnectionState#SUSPENDED} 
> or {@link_
>  _* ConnectionState#LOST} we are not guaranteed to read a current count from 
> ZooKeeper._
>  _*/_
> _private static class SharedCountConnectionStateListener implements 
> ConnectionStateListener {_
>  _private volatile ConnectionState lastState;_
>  _@Override_
>  _public void stateChanged(CuratorFramework client, ConnectionState newState) 
> {_
>  _if (newState == ConnectionState.SUSPENDED || newState == 
> ConnectionState.LOST) {_
>  _lastState = newState;_
>  _}_
>  _}_
>  _private ConnectionState getLastState() {_
>  _return lastState;_
>  _}_
> _}_
>  
> we change the state back. after test, solve the problem.
>  
> h6. _/**_
>  _* Connection state listener. In case of \{@link ConnectionState#SUSPENDED} 
> or {@link_
>  _* ConnectionState#LOST} we are not guaranteed to read a current count from 
> ZooKeeper._
>  _*/_
> _private static class SharedCountConnectionStateListener implements 
> ConnectionStateListener {_
>  _private volatile ConnectionState lastState;_
>  _@Override_
>  _public void stateChanged(CuratorFramework client, ConnectionState newState) 
> {_
>  _if (newState == ConnectionState.SUSPENDED || newState == 
> ConnectionState.LOST) {_
>  _lastState = newState;_
>  _}_
>  _else{_
>  _/* if connectionState is not SUSPENDED and LOST, reset lastState. */_
>  _lastState = null;_
>  _}_
>  _}_
>  _private ConnectionState getLastState() {_
>  _return lastState;_
>  _}_
> _}_
>  
> log:
> h6. 2019-09-16 13:38:38,020 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Unable 
> to }}{{read}} {{additional data from server sessionid 0x26cff6487c2000e, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect2019-09-16 13:38:38,122 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: SUSPENDED2019-09-16 13:38:38,123 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,126 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,126 WARN  
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
> ZooKeeper connection SUSPENDING. Changes to the submitted job graphs are not 
> monitored (temporarily).2019-09-16 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
> Connection to ZooKeeper suspended. The contender 
> akka.tcp:}}{{//flink}}{{@node007224:19115}}{{/user/dispatcher}} {{no longer 
> participates }}{{in}} {{the leader election.2019-09-16 13:38:38,128 
> WARN  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  
> - Connection to ZooKeeper suspended. The contender 
> akka.tcp:}}{{//flink}}{{@node007224:19115}}{{/user/resourcemanager}} {{no 
> longer participates }}{{in}} {{the leader election.2019-09-16 
> 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
> Connection to ZooKeeper suspended. The contender 
> http:}}{{//n

[jira] [Commented] (FLINK-14091) Job can not trigger checkpoint forever after zookeeper change leader

2019-09-16 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931030#comment-16931030
 ] 

Yun Tang commented on FLINK-14091:
--

>From the error description, I think {{SharedCountConnectionStateListener}} 
>should also handle the connection state after {{RECONNECTED}}, CC [~uce]

> Job can not trigger checkpoint forever after zookeeper change leader 
> -
>
> Key: FLINK-14091
> URL: https://issues.apache.org/jira/browse/FLINK-14091
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.0
>Reporter: Peng Wang
>Priority: Minor
>
> when zk change leader, the state of curator is suspended,job manager can not 
> tigger checkpoint.but it doesn't tigger checkpoint after zk resume.
> we found that the lastState in the class ZooKeeperCheckpointIDCounter  never 
> change back to normal when it fall into SUSPENDED or LOST.
> h6. _/**_
>  _* Connection state listener. In case of \{@link ConnectionState#SUSPENDED} 
> or {@link_
>  _* ConnectionState#LOST} we are not guaranteed to read a current count from 
> ZooKeeper._
>  _*/_
> _private static class SharedCountConnectionStateListener implements 
> ConnectionStateListener {_
>  _private volatile ConnectionState lastState;_
>  _@Override_
>  _public void stateChanged(CuratorFramework client, ConnectionState newState) 
> {_
>  _if (newState == ConnectionState.SUSPENDED || newState == 
> ConnectionState.LOST) {_
>  _lastState = newState;_
>  _}_
>  _}_
>  _private ConnectionState getLastState() {_
>  _return lastState;_
>  _}_
> _}_
>  
> we change the state back. after test, solve the problem.
>  
> h6. _/**_
>  _* Connection state listener. In case of \{@link ConnectionState#SUSPENDED} 
> or {@link_
>  _* ConnectionState#LOST} we are not guaranteed to read a current count from 
> ZooKeeper._
>  _*/_
> _private static class SharedCountConnectionStateListener implements 
> ConnectionStateListener {_
>  _private volatile ConnectionState lastState;_
>  _@Override_
>  _public void stateChanged(CuratorFramework client, ConnectionState newState) 
> {_
>  _if (newState == ConnectionState.SUSPENDED || newState == 
> ConnectionState.LOST) {_
>  _lastState = newState;_
>  _}_
>  _else{_
>  _/* if connectionState is not SUSPENDED and LOST, reset lastState. */_
>  _lastState = null;_
>  _}_
>  _}_
>  _private ConnectionState getLastState() {_
>  _return lastState;_
>  _}_
> _}_
>  
> log:
> h6. 2019-09-16 13:38:38,020 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Unable 
> to }}{{read}} {{additional data from server sessionid 0x26cff6487c2000e, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect2019-09-16 13:38:38,122 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: SUSPENDED2019-09-16 13:38:38,123 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,126 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,126 WARN  
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
> ZooKeeper connection SUSPENDING. Changes to the submitted job graphs are not 
> monitored (temporarily).2019-09-16 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
> Connection to ZooKeeper suspended. The contender 
> akka.tcp:}}{{//flink}}{{@node007224:19115}}{{/user/dispatcher}} {{no longer 
> participates }}{{in}} {{the leader election.2019-09-16 13:38:38,128 
> WARN  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  
> - Connection to ZooKeeper suspended. The contender 
> akka.tcp:}}{{//flink}}{{@node007224:19115}}{{/user/resourcemanager}} {{no 
> longer participates }}{{in}} {{the leader election.2019-09-16 
> 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
> Connection to ZooKeeper suspended. The contender 
> http:}}{{//node007224}}{{:8081 no longer participates }}{{in}} {{the leader 
> election.2019-09-16 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader fr

[jira] [Commented] (FLINK-14091) Job can not trigger checkpoint forever after zookeeper change leader

2019-09-16 Thread Yang Shen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931022#comment-16931022
 ] 

Yang Shen commented on FLINK-14091:
---

Same problem!

> Job can not trigger checkpoint forever after zookeeper change leader 
> -
>
> Key: FLINK-14091
> URL: https://issues.apache.org/jira/browse/FLINK-14091
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.0
>Reporter: Peng Wang
>Priority: Minor
>
> when zk change leader, the state of curator is suspended,job manager can not 
> tigger checkpoint.but it doesn't tigger checkpoint after zk resume.
> we found that the lastState in the class ZooKeeperCheckpointIDCounter  never 
> change back to normal when it fall into SUSPENDED or LOST.
> h6. _/**_
>  _* Connection state listener. In case of \{@link ConnectionState#SUSPENDED} 
> or {@link_
>  _* ConnectionState#LOST} we are not guaranteed to read a current count from 
> ZooKeeper._
>  _*/_
> _private static class SharedCountConnectionStateListener implements 
> ConnectionStateListener {_
>  _private volatile ConnectionState lastState;_
>  _@Override_
>  _public void stateChanged(CuratorFramework client, ConnectionState newState) 
> {_
>  _if (newState == ConnectionState.SUSPENDED || newState == 
> ConnectionState.LOST) {_
>  _lastState = newState;_
>  _}_
>  _}_
>  _private ConnectionState getLastState() {_
>  _return lastState;_
>  _}_
> _}_
>  
> we change the state back. after test, solve the problem.
>  
> h6. _/**_
>  _* Connection state listener. In case of \{@link ConnectionState#SUSPENDED} 
> or {@link_
>  _* ConnectionState#LOST} we are not guaranteed to read a current count from 
> ZooKeeper._
>  _*/_
> _private static class SharedCountConnectionStateListener implements 
> ConnectionStateListener {_
>  _private volatile ConnectionState lastState;_
>  _@Override_
>  _public void stateChanged(CuratorFramework client, ConnectionState newState) 
> {_
>  _if (newState == ConnectionState.SUSPENDED || newState == 
> ConnectionState.LOST) {_
>  _lastState = newState;_
>  _}_
>  _else{_
>  _/* if connectionState is not SUSPENDED and LOST, reset lastState. */_
>  _lastState = null;_
>  _}_
>  _}_
>  _private ConnectionState getLastState() {_
>  _return lastState;_
>  _}_
> _}_
>  
> log:
> h6. 2019-09-16 13:38:38,020 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Unable 
> to }}{{read}} {{additional data from server sessionid 0x26cff6487c2000e, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect2019-09-16 13:38:38,122 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: SUSPENDED2019-09-16 13:38:38,123 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,126 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,126 WARN  
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
> ZooKeeper connection SUSPENDING. Changes to the submitted job graphs are not 
> monitored (temporarily).2019-09-16 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
> Connection to ZooKeeper suspended. The contender 
> akka.tcp:}}{{//flink}}{{@node007224:19115}}{{/user/dispatcher}} {{no longer 
> participates }}{{in}} {{the leader election.2019-09-16 13:38:38,128 
> WARN  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  
> - Connection to ZooKeeper suspended. The contender 
> akka.tcp:}}{{//flink}}{{@node007224:19115}}{{/user/resourcemanager}} {{no 
> longer participates }}{{in}} {{the leader election.2019-09-16 
> 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
> Connection to ZooKeeper suspended. The contender 
> http:}}{{//node007224}}{{:8081 no longer participates }}{{in}} {{the leader 
> election.2019-09-16 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.2019-09-16 13:38:38,128 WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
> C