IGNITE-8998 Send reconnect exception without partitions - Fixes #4358.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/78e0bb7e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/78e0bb7e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/78e0bb7e

Branch: refs/heads/ignite-8446
Commit: 78e0bb7efbc53e969c4c4918b6c6272c7b98dc36
Parents: 1cd5845
Author: Anton Kalashnikov <kaa....@yandex.ru>
Authored: Mon Jul 23 16:48:18 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Mon Jul 23 17:57:43 2018 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 56 +++++++++++++-------
 .../GridDhtPartitionsExchangeFuture.java        | 18 ++++---
 .../Authentication1kUsersNodeRestartTest.java   |  1 +
 .../AuthenticationConfigurationClusterTest.java |  1 +
 .../AuthenticationOnNotActiveClusterTest.java   |  1 +
 .../AuthenticationProcessorNPEOnStartTest.java  |  1 +
 .../AuthenticationProcessorNodeRestartTest.java |  1 +
 .../AuthenticationProcessorSelfTest.java        |  1 +
 .../IgniteCacheClientReconnectTest.java         |  2 +
 9 files changed, 55 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1f28da2..05eeee3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -30,6 +30,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Objects;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -179,10 +180,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         new ConcurrentSkipListMap<>();
 
     /** */
-    private final AtomicReference<AffinityTopologyVersion> readyTopVer =
-        new AtomicReference<>(AffinityTopologyVersion.NONE);
-
-    /** */
     private GridFutureAdapter<?> reconnectExchangeFut;
 
     /** */
@@ -759,7 +756,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         // Do not allow any activity in exchange manager after stop.
         busyLock.writeLock().lock();
 
-        exchFuts = null;
+        exchFuts.clear();
     }
 
     /**
@@ -826,7 +823,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @return Topology version of latest completed partition exchange.
      */
     public AffinityTopologyVersion readyAffinityVersion() {
-        return readyTopVer.get();
+        return exchFuts.readyTopVer();
     }
 
     /**
@@ -876,7 +873,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             return lastInitializedFut0;
         }
 
-        AffinityTopologyVersion topVer = readyTopVer.get();
+        AffinityTopologyVersion topVer = exchFuts.readyTopVer();
 
         if (topVer.compareTo(ver) >= 0) {
             if (log.isDebugEnabled())
@@ -891,7 +888,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         if (log.isDebugEnabled())
             log.debug("Created topology ready future [ver=" + ver + ", fut=" + 
fut + ']');
 
-        topVer = readyTopVer.get();
+        topVer = exchFuts.readyTopVer();
 
         if (topVer.compareTo(ver) >= 0) {
             if (log.isDebugEnabled())
@@ -1400,15 +1397,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             log.debug("Exchange done [topVer=" + topVer + ", err=" + err + 
']');
 
         if (err == null) {
-            while (true) {
-                AffinityTopologyVersion readyVer = readyTopVer.get();
-
-                if (readyVer.compareTo(topVer) >= 0)
-                    break;
-
-                if (readyTopVer.compareAndSet(readyVer, topVer))
-                    break;
-            }
+            exchFuts.readyTopVer(topVer);
 
             for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry 
: readyFuts.entrySet()) {
                 if (entry.getKey().compareTo(topVer) <= 0) {
@@ -1576,7 +1565,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     AffinityTopologyVersion initVer = exchFut.initialVersion();
                     AffinityTopologyVersion readyVer = readyAffinityVersion();
 
-                    if (initVer.compareTo(readyVer) <= 0 && 
!exchFut.exchangeDone()) {
+                    if (initVer.compareTo(readyVer) < 0 && !exchFut.isDone()) {
                         U.warn(log, "Client node tries to connect but its 
exchange " +
                             "info is cleaned up from exchange history. " +
                             "Consider increasing 
'IGNITE_EXCHANGE_HISTORY_SIZE' property " +
@@ -1637,7 +1626,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     public void dumpDebugInfo(@Nullable GridDhtPartitionsExchangeFuture 
exchFut) throws Exception {
         AffinityTopologyVersion exchTopVer = exchFut != null ? 
exchFut.initialVersion() : null;
 
-        U.warn(diagnosticLog, "Ready affinity version: " + readyTopVer.get());
+        U.warn(diagnosticLog, "Ready affinity version: " + 
exchFuts.readyTopVer());
 
         U.warn(diagnosticLog, "Last exchange future: " + lastInitializedFut);
 
@@ -2781,6 +2770,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         /** */
         private final int histSize;
 
+        /** */
+        private final AtomicReference<AffinityTopologyVersion> readyTopVer =
+            new AtomicReference<>(AffinityTopologyVersion.NONE);
+
         /**
          * Creates ordered, not strict list set.
          *
@@ -2817,7 +2810,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             while (size() > histSize) {
                 GridDhtPartitionsExchangeFuture last = last();
 
-                if (!last.isDone())
+                if (!last.isDone() || Objects.equals(last.initialVersion(), 
readyTopVer()))
                     break;
 
                 removeLast();
@@ -2827,6 +2820,29 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             return cur == null ? fut : cur;
         }
 
+        /**
+         * @return Ready top version.
+         */
+        public AffinityTopologyVersion readyTopVer() {
+            return readyTopVer.get();
+        }
+
+        /**
+         * @param readyTopVersion Ready top version.
+         * @return {@code true} if version was set and {@code false} otherwise.
+         */
+        public boolean readyTopVer(AffinityTopologyVersion readyTopVersion) {
+            while (true) {
+                AffinityTopologyVersion readyVer = readyTopVer.get();
+
+                if (readyVer.compareTo(readyTopVersion) >= 0)
+                    return false;
+
+                if (readyTopVer.compareAndSet(readyVer, readyTopVersion))
+                    return true;
+            }
+        }
+
         /** {@inheritDoc} */
         @Nullable @Override public synchronized 
GridDhtPartitionsExchangeFuture removex(
             GridDhtPartitionsExchangeFuture val) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 75cd491..4d0b583 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2116,21 +2116,25 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @param msg Single message received from the client which didn't find 
original ExchangeFuture.
      */
     public void forceClientReconnect(ClusterNode node, 
GridDhtPartitionsSingleMessage msg) {
-        Exception e = new IgniteNeedReconnectException(node, null);
+        Exception reconnectException = new IgniteNeedReconnectException(node, 
null);
 
-        exchangeGlobalExceptions.put(node.id(), e);
+        exchangeGlobalExceptions.put(node.id(), reconnectException);
 
-        onDone(null, e);
+        onDone(null, reconnectException);
 
         GridDhtPartitionsFullMessage fullMsg = createPartitionsMessage(true, 
false);
 
         fullMsg.setErrorsMap(exchangeGlobalExceptions);
 
-        FinishState finishState0 = new FinishState(cctx.localNodeId(),
-            initialVersion(),
-            fullMsg);
+        try {
+            cctx.io().send(node, fullMsg, SYSTEM_POOL);
 
-        sendAllPartitionsToNode(finishState0, msg, node.id());
+            if (log.isDebugEnabled())
+                log.debug("Full message for reconnect client was sent to node: 
" + node + ", fullMsg: " + fullMsg);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send reconnect client message [node=" + 
node + ']', e);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/Authentication1kUsersNodeRestartTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/Authentication1kUsersNodeRestartTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/Authentication1kUsersNodeRestartTest.java
index 384cbe8..0ca621e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/Authentication1kUsersNodeRestartTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/Authentication1kUsersNodeRestartTest.java
@@ -49,6 +49,7 @@ public class Authentication1kUsersNodeRestartTest extends 
GridCommonAbstractTest
 
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setMaxSize(200L * 1024 * 1024)
                 .setPersistenceEnabled(true)));
 
         return cfg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationConfigurationClusterTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationConfigurationClusterTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationConfigurationClusterTest.java
index 7d946dc..fb61d60 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationConfigurationClusterTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationConfigurationClusterTest.java
@@ -59,6 +59,7 @@ public class AuthenticationConfigurationClusterTest extends 
GridCommonAbstractTe
 
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setMaxSize(200L * 1024 * 1024)
                 .setPersistenceEnabled(true)));
 
         return cfg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationOnNotActiveClusterTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationOnNotActiveClusterTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationOnNotActiveClusterTest.java
index 7405d7a..638c378 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationOnNotActiveClusterTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationOnNotActiveClusterTest.java
@@ -56,6 +56,7 @@ public class AuthenticationOnNotActiveClusterTest extends 
GridCommonAbstractTest
 
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setMaxSize(200L * 1024 * 1024)
                 .setPersistenceEnabled(true)));
 
         return cfg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNPEOnStartTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNPEOnStartTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNPEOnStartTest.java
index 9748db7..661c875 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNPEOnStartTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNPEOnStartTest.java
@@ -49,6 +49,7 @@ public class AuthenticationProcessorNPEOnStartTest extends 
GridCommonAbstractTes
 
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setMaxSize(200L * 1024 * 1024)
                 .setPersistenceEnabled(true)));
 
         return cfg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNodeRestartTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNodeRestartTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNodeRestartTest.java
index 632a8b9..7496cfe 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNodeRestartTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNodeRestartTest.java
@@ -71,6 +71,7 @@ public class AuthenticationProcessorNodeRestartTest extends 
GridCommonAbstractTe
 
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setMaxSize(200L * 1024 * 1024)
                 .setPersistenceEnabled(true)));
 
         return cfg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java
index 71d86f2..6c79c7f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java
@@ -85,6 +85,7 @@ public class AuthenticationProcessorSelfTest extends 
GridCommonAbstractTest {
 
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setMaxSize(200L * 1024 * 1024)
                 .setPersistenceEnabled(true)));
 
         return cfg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
index 4beb31a..a0796a3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java
@@ -141,6 +141,8 @@ public class IgniteCacheClientReconnectTest extends 
GridCommonAbstractTest {
 
             waitForTopology(SRV_CNT + CLIENTS_CNT);
 
+            awaitPartitionMapExchange();
+
             verifyPartitionToNodeMappings();
 
             verifyAffinityTopologyVersions();

Reply via email to