Repository: ignite
Updated Branches:
  refs/heads/ignite-zk beada20fa -> 01cd78940


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01cd7894
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01cd7894
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01cd7894

Branch: refs/heads/ignite-zk
Commit: 01cd78940f0b8546b360d58ab0dc5a623f3b6fa1
Parents: beada20
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Nov 27 17:12:07 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Nov 27 17:12:07 2017 +0300

----------------------------------------------------------------------
 .../communication/tcp/TcpCommunicationSpi.java  |  4 +
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 89 +++++++++++++++-----
 2 files changed, 70 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/01cd7894/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 41600d5a..662a2b9 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3101,11 +3101,15 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
             int lastWaitingTimeout = 1;
 
             while (client == null) { // Reconnection on handshake timeout.
+                if (stopping)
+                    throw new IgniteSpiException("Node is stopping.");
+
                 if (addr.getAddress().isLoopbackAddress() && addr.getPort() == 
boundTcpPort) {
                     if (log.isDebugEnabled())
                         log.debug("Skipping local address [addr=" + addr +
                             ", locAddrs=" + 
node.attribute(createSpiAttributeName(ATTR_ADDRS)) +
                             ", node=" + node + ']');
+
                     continue;
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/01cd7894/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index f742ad0..76eb306 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -212,6 +212,8 @@ public class ZookeeperDiscoveryImpl {
      * @return Ping result.
      */
     public boolean pingNode(UUID nodeId) {
+        checkState();
+
         // TODO ZK
         return node(nodeId) != null;
     }
@@ -1378,7 +1380,7 @@ public class ZookeeperDiscoveryImpl {
     private void stop0(Throwable e) throws InterruptedException {
         log.info("Stop ZookeeperDiscovery [nodeId=" + locNode.id() + ", err=" 
+ e + ']');
 
-        connState = ConnectionState.DISCONNECTED;
+        connState = ConnectionState.STOPPED;
 
         ZookeeperClient zkClient = state.zkClient;
 
@@ -1454,6 +1456,9 @@ public class ZookeeperDiscoveryImpl {
      */
     private class ZkEventWorker extends IgniteSpiThread {
         /** */
+        private final Runnable RECONNECT = new Runnable() {@Override public 
void run() {}};
+
+        /** */
         private final Runnable CONNECTION_LOST = new Runnable() {@Override 
public void run() {}};
 
         /** */
@@ -1475,6 +1480,8 @@ public class ZookeeperDiscoveryImpl {
             while (!isInterrupted()) {
                 Runnable r = evtsQ.take();
 
+                if (r == RECONNECT)
+                    processReconnect();
                 if (r == CONNECTION_LOST)
                     processConnectionLost();
                 else {
@@ -1501,6 +1508,35 @@ public class ZookeeperDiscoveryImpl {
         /**
          *
          */
+        void processReconnect() {
+            assert locNode.isClient() : locNode;
+
+            if (connState == ConnectionState.DISCONNECTED)
+                return;
+
+            connState = ConnectionState.DISCONNECTED;
+
+            state.zkClient.onCloseStart();
+
+            busyLock.block();
+
+            busyLock.unblock();
+
+            state.zkClient.close();
+
+            UUID newId = UUID.randomUUID();
+
+            U.quietAndWarn(log, "Local node will try to reconnect to cluster 
with new id due to network problems [" +
+                "newId=" + newId +
+                ", prevId=" + locNode.id() +
+                ", locNode=" + locNode + ']');
+
+            reconnect(newId);
+        }
+
+        /**
+         *
+         */
         void processConnectionLost() {
             if (clientReconnectEnabled) {
                 connState = ConnectionState.DISCONNECTED;
@@ -1516,41 +1552,48 @@ public class ZookeeperDiscoveryImpl {
                     ", prevId=" + locNode.id() +
                     ", locNode=" + locNode + ']');
 
-                locNode.onClientDisconnected(newId);
+                reconnect(newId);
+            }
+            else {
+                U.warn(log, "Connection to Zookeeper server is lost, local 
node SEGMENTED.");
 
-                if (state.joined) {
-                    assert state.evtsData != null;
+                onSegmented(new IgniteSpiException("Zookeeper connection 
loss."));
+            }
+        }
 
-                    lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
-                        state.evtsData.topVer,
-                        locNode,
-                        state.top.topologySnapshot(),
-                        Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                        null);
-                }
+        /**
+         * @param newId New ID.
+         */
+        private void reconnect(UUID newId) {
+            locNode.onClientDisconnected(newId);
 
-                state = new ZkRuntimeState(state.joined);
+            if (state.joined) {
+                assert state.evtsData != null;
 
-                try {
-                    joinTopology0(true);
-                }
-                catch (Exception e) {
-                    U.error(log, "Failed to reconnect: " + e, e);
+                lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
+                    state.evtsData.topVer,
+                    locNode,
+                    state.top.topologySnapshot(),
+                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                    null);
+            }
 
-                    onSegemented(e);
-                }
+            state = new ZkRuntimeState(state.joined);
+
+            try {
+                joinTopology0(true);
             }
-            else {
-                U.warn(log, "Connection to Zookeeper server is lost, local 
node SEGMENTED.");
+            catch (Exception e) {
+                U.error(log, "Failed to reconnect: " + e, e);
 
-                onSegemented(new IgniteSpiException("Zookeeper connection 
loss."));
+                onSegmented(e);
             }
         }
 
         /**
          * @param e Error.
          */
-        private void onSegemented(Exception e) {
+        private void onSegmented(Exception e) {
             if (state.joined) {
                 assert state.evtsData != null;
 

Reply via email to