Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 930a51799 -> 879f99930


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/879f9993
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/879f9993
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/879f9993

Branch: refs/heads/ignite-zk
Commit: 879f999308be3549250a3032f6a5d007d5415464
Parents: 930a517
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Dec 25 15:04:27 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Dec 25 15:53:46 2017 +0300

----------------------------------------------------------------------
 .../discovery/zk/internal/ZkRuntimeState.java   |   8 ++
 .../zk/internal/ZookeeperDiscoveryImpl.java     |  77 ++++++------
 .../ZookeeperDiscoverySpiBasicTest.java         | 119 ++++++++++++++-----
 .../testframework/junits/GridAbstractTest.java  |  23 +++-
 4 files changed, 154 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/879f9993/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
index 02e75ff..e61e2b2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -98,6 +98,14 @@ class ZkRuntimeState {
     }
 
     /**
+     * @param internalOrder Node internal order.
+     * @return {@code True} if node belongs to previous cluster and should be 
ignored.
+     */
+    boolean ignoreAliveNode(long internalOrder) {
+        return evtsData != null && internalOrder < evtsData.startInternalOrder;
+    }
+
+    /**
      * @param err Error.
      */
     void onCloseStart(Exception err) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/879f9993/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 1d3ad01..d7e0a76 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -308,7 +308,7 @@ public class ZookeeperDiscoveryImpl {
 
                     if (log.isInfoEnabled()) {
                         log.info("Created new communication error process 
future [errNode=" + node0.id() +
-                            ", err= " + err + ']');
+                            ", err=" + err + ']');
                     }
 
                     try {
@@ -1225,6 +1225,9 @@ public class ZookeeperDiscoveryImpl {
 
             Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath);
 
+            if (rtState.ignoreAliveNode(internalId))
+                continue;
+
             aliveSrvs.put(internalId, aliveNodePath);
         }
 
@@ -1258,7 +1261,7 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void checkClientsStatus(final List<String> aliveNodes) throws 
Exception {
-        assert locNode.isClient();
+        assert locNode.isClient() : locNode;
         assert rtState.joined;
         assert rtState.evtsData != null;
 
@@ -1272,6 +1275,9 @@ public class ZookeeperDiscoveryImpl {
         for (String aliveNodePath : aliveNodes) {
             Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath);
 
+            if (rtState.ignoreAliveNode(internalId))
+                continue;
+
             if (ZkIgnitePaths.aliveNodeClientFlag(aliveNodePath))
                 aliveClients.put(internalId, aliveNodePath);
             else {
@@ -1294,23 +1300,19 @@ public class ZookeeperDiscoveryImpl {
 
                 ZkDiscoveryEventsData prevEvts = rtState.evtsData;
 
-                ZkDiscoveryEventsData newEvts;
-
                 byte[] evtsBytes = rtState.zkClient.getData(zkPaths.evtsPath, 
stat);
 
-                if (evtsBytes.length == 0) {
-                    // Possible if new cluster already started and removed old 
events,
-                    // still can try generate {@link ZkNoServersMessage}.
-                    newEvts = rtState.evtsData;
-                }
-                else
-                    newEvts = unmarshalZip(evtsBytes);
+                assert evtsBytes.length > 0;
+
+                ZkDiscoveryEventsData newEvts = unmarshalZip(evtsBytes);
 
                 if (prevEvts.clusterId.equals(newEvts.clusterId)) {
-                    U.warn(log, "All server nodes failed, notify all 
clients.");
+                    U.warn(log, "All server nodes failed, notify all clients 
[locId=" + locNode.id() + ']');
 
                     generateNoServersEvent(newEvts, stat);
                 }
+                else
+                    U.warn(log, "All server nodes failed (received events from 
new cluster).");
             }
         }
         else {
@@ -1445,13 +1447,15 @@ public class ZookeeperDiscoveryImpl {
             handleProcessedEvents("crd");
         }
         else {
-            if (log.isInfoEnabled())
-                log.info("Node is first server node in cluster [locId=" + 
locNode.id() + ']');
-
             DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator();
 
             if (nodeAuth != null) {
                 try {
+                    if (log.isInfoEnabled()) {
+                        log.info("Node is first server node in cluster, try 
authenticate local node " +
+                            "[locId=" + locNode.id() + ']');
+                    }
+
                     localAuthentication(nodeAuth, 
unmarshalCredentials(locNode));
                 }
                 catch (Exception e) {
@@ -1501,7 +1505,7 @@ public class ZookeeperDiscoveryImpl {
         for (String child : aliveNodes) {
             Long internalId = ZkIgnitePaths.aliveInternalId(child);
 
-            if (internalId < rtState.evtsData.startInternalOrder) {
+            if (rtState.ignoreAliveNode(internalId)) {
                 if (log.isInfoEnabled()) {
                     LT.info(log, "Ignore node from previous cluster 
[startOrder=" + rtState.evtsData.startInternalOrder +
                         ", nodeOrder=" + internalId +
@@ -2037,6 +2041,8 @@ public class ZookeeperDiscoveryImpl {
      */
     @SuppressWarnings("unchecked")
     private void newClusterStarted(@Nullable ZkDiscoveryEventsData prevEvts) 
throws Exception {
+        assert !locNode.isClient() : locNode;
+
         long locInternalId = rtState.internalOrder;
 
         assert prevEvts == null || prevEvts.maxInternalOrder < locInternalId;
@@ -2052,6 +2058,12 @@ public class ZookeeperDiscoveryImpl {
             prevEvts != null ? prevEvts.maxInternalOrder + 1 : -1L,
             rtState.gridStartTime);
 
+        if (log.isInfoEnabled()) {
+            log.info("New cluster started [locId=" + locNode.id() +
+                ", clusterId=" + rtState.evtsData.clusterId +
+                ", startTime=" + rtState.evtsData.gridStartTime + ']');
+        }
+
         locNode.internalId(locInternalId);
         locNode.order(1);
 
@@ -2074,16 +2086,7 @@ public class ZookeeperDiscoveryImpl {
             Collections.<Long, Collection<ClusterNode>>emptyMap(),
             null);
 
-        if (rtState.prevJoined) {
-            lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
-                1L,
-                locNode,
-                topSnapshot,
-                Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                null);
-
-            U.quietAndWarn(log, "Client node was reconnected after it was 
already considered failed.");
-        }
+        rtState.zkClient.setData(zkPaths.evtsPath, 
marshalZip(rtState.evtsData), -1);
 
         joinFut.onDone();
     }
@@ -2097,8 +2100,6 @@ public class ZookeeperDiscoveryImpl {
         ZookeeperClient client = rtState.zkClient;
 
         // TODO ZK: use multi, better batching + max-size safe + 
NoNodeException safe.
-        client.setData(zkPaths.evtsPath, null, -1);
-
         List<String> evtChildren = 
rtState.zkClient.getChildren(zkPaths.evtsPath);
 
         for (String evtPath : evtChildren) {
@@ -2332,13 +2333,20 @@ public class ZookeeperDiscoveryImpl {
      * @return Events.
      */
     @Nullable private ZkDiscoveryEventsData processNewEvents(byte[] data) 
throws Exception {
-        if (data.length == 0)
+        ZkDiscoveryEventsData newEvts = data.length > 0 ? 
(ZkDiscoveryEventsData)unmarshalZip(data) : null;
+
+        if (rtState.joined && (newEvts == null || 
!rtState.evtsData.clusterId.equals(newEvts.clusterId))) {
+            assert locNode.isClient() : locNode;
+
+            throw localNodeFail("All server nodes failed, client node 
disconnected (received events from new custer) " +
+                "[locId=" + locNode.id() + ']', true);
+        }
+
+        if (newEvts == null)
             return null;
 
         assert !rtState.crd;
 
-        ZkDiscoveryEventsData newEvts = unmarshalZip(data);
-
         // Need keep processed custom events since they contain message object 
which is needed to create ack.
         if (rtState.evtsData != null) {
             for (Map.Entry<Long, ZkDiscoveryEventData> e : 
rtState.evtsData.evts.entrySet()) {
@@ -2368,13 +2376,6 @@ public class ZookeeperDiscoveryImpl {
      */
     @SuppressWarnings("unchecked")
     private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws 
Exception {
-        if (rtState.joined && rtState.evtsData != null && 
!rtState.evtsData.clusterId.equals(evtsData.clusterId)) {
-            assert locNode.isClient() : locNode;
-
-            throw localNodeFail("All server nodes failed, client node 
disconnected (received events from new custer) " +
-                "[locId=" + locNode.id() + ']', true);
-        }
-
         TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts;
 
         ZookeeperClient zkClient = rtState.zkClient;

http://git-wip-us.apache.org/repos/asf/ignite/blob/879f9993/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 2dd690d..293eb87 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -279,9 +279,9 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
                     }
                 }
                 catch (Throwable e) {
-                    err = true;
-
                     error("Unexpected error [evt=" + evt + ", err=" + e + ']', 
e);
+
+                    err = true;
                 }
 
                 return true;
@@ -2625,57 +2625,112 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
         startGridsMultiThreaded(srvs, clients);
 
-        final CountDownLatch disconnectLatch = new CountDownLatch(clients);
-        final CountDownLatch reconnectLatch = new CountDownLatch(clients);
+        for (int i = 0; i < 5; i++) {
+            info("Iteration: " + i);
 
-        IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    log.info("Disconnected: " + evt);
+            final CountDownLatch disconnectLatch = new CountDownLatch(clients);
+            final CountDownLatch reconnectLatch = new CountDownLatch(clients);
 
-                    disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    log.info("Reconnected: " + evt);
+            IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                        log.info("Disconnected: " + evt);
 
-                    reconnectLatch.countDown();
+                        disconnectLatch.countDown();
+                    }
+                    else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                        log.info("Reconnected: " + evt);
+
+                        reconnectLatch.countDown();
+
+                        return false;
+                    }
+
+                    return true;
                 }
+            };
 
-                return true;
+            for (int c = 0; c < clients; c++) {
+                Ignite client = ignite(srvs + c);
+
+                assertTrue(client.configuration().isClientMode());
+
+                client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, 
EVT_CLIENT_NODE_RECONNECTED);
             }
-        };
 
-        for (int i = 0; i < clients; i++) {
-            Ignite client = ignite(srvs + i);
+            log.info("Stop all servers.");
 
-            assertTrue(client.configuration().isClientMode());
+            GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+                @Override public void apply(Integer threadIdx) {
+                    stopGrid(getTestIgniteInstanceName(threadIdx), true, 
false);
+                }
+            }, srvs, "stop-server");
 
-            client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, 
EVT_CLIENT_NODE_RECONNECTED);
+            waitReconnectEvent(log, disconnectLatch);
+
+            evts.clear();
+
+            client = false;
+
+            log.info("Restart servers.");
+
+            startGridsMultiThreaded(0, srvs);
+
+            waitReconnectEvent(log, reconnectLatch);
+
+            waitForTopology(srvs + clients);
+
+            log.info("Reconnect finished.");
         }
+    }
 
-        log.info("Stop all servers.");
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectServersRestart() throws Exception {
+        startGrid(0);
 
-        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
-            @Override public void apply(Integer threadIdx) {
-                stopGrid(getTestIgniteInstanceName(threadIdx), true, false);
-            }
-        }, srvs, "stop-server");
+        client = true;
 
-        waitReconnectEvent(log, disconnectLatch);
+        final int CLIENTS = 10;
 
-        evts.clear();
+        startGridsMultiThreaded(1, CLIENTS);
 
         client = false;
 
-        log.info("Restart servers.");
+        long stopTime = System.currentTimeMillis() + 30_000;
 
-        startGridsMultiThreaded(0, srvs);
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-        waitReconnectEvent(log, reconnectLatch);
+        final int NODES = 1 + CLIENTS;
 
-        waitForTopology(srvs + clients);
+        int iter = 0;
 
-        log.info("Reconnect finished.");
+        while (System.currentTimeMillis() < stopTime) {
+            int restarts = rnd.nextInt(10) + 1;
+
+            info("Test iteration [iter=" + iter++ + ", restarts=" + restarts + 
']');
+
+            for (int i = 0; i < restarts; i++) {
+                stopGrid(getTestIgniteInstanceName(0), true, false);
+
+                startGrid(0);
+            }
+
+            final Ignite srv = ignite(0);
+
+            assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return srv.cluster().nodes().size() == NODES;
+                }
+            }, 30_000));
+
+            waitForTopology(NODES);
+
+            awaitPartitionMapExchange();
+        }
+
+        evts.clear();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/879f9993/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 1084ce1..3f0b3e3 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -46,6 +46,7 @@ import junit.framework.TestCase;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
@@ -82,6 +83,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.MarshallerContextTestImpl;
@@ -2179,10 +2181,25 @@ public abstract class GridAbstractTest extends TestCase 
{
                 }
 
                 for (Ignite node: nodes) {
-                    int sizeOnNode = node.cluster().nodes().size();
+                    try {
+                        IgniteFuture<?> reconnectFut = 
node.cluster().clientReconnectFuture();
 
-                    if (sizeOnNode != expSize) {
-                        info("Wait for size on node [node=" + node.name() + ", 
size=" + sizeOnNode + ", exp=" + expSize + ']');
+                        if (reconnectFut != null && !reconnectFut.isDone()) {
+                            info("Wait for size on node, reconnect is in 
progress [node=" + node.name() + ']');
+
+                            return false;
+                        }
+
+                        int sizeOnNode = node.cluster().nodes().size();
+
+                        if (sizeOnNode != expSize) {
+                            info("Wait for size on node [node=" + node.name() 
+ ", size=" + sizeOnNode + ", exp=" + expSize + ']');
+
+                            return false;
+                        }
+                    }
+                    catch (IgniteClientDisconnectedException e) {
+                        info("Wait for size on node, node disconnected [node=" 
+ node.name() + ']');
 
                         return false;
                     }

Reply via email to