IGNITE-8998 Send reconnect exception without partitions - Fixes #4358. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/78e0bb7e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/78e0bb7e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/78e0bb7e Branch: refs/heads/ignite-8446 Commit: 78e0bb7efbc53e969c4c4918b6c6272c7b98dc36 Parents: 1cd5845 Author: Anton Kalashnikov <kaa....@yandex.ru> Authored: Mon Jul 23 16:48:18 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Mon Jul 23 17:57:43 2018 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 56 +++++++++++++------- .../GridDhtPartitionsExchangeFuture.java | 18 ++++--- .../Authentication1kUsersNodeRestartTest.java | 1 + .../AuthenticationConfigurationClusterTest.java | 1 + .../AuthenticationOnNotActiveClusterTest.java | 1 + .../AuthenticationProcessorNPEOnStartTest.java | 1 + .../AuthenticationProcessorNodeRestartTest.java | 1 + .../AuthenticationProcessorSelfTest.java | 1 + .../IgniteCacheClientReconnectTest.java | 2 + 9 files changed, 55 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 1f28da2..05eeee3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -30,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Objects; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -179,10 +180,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana new ConcurrentSkipListMap<>(); /** */ - private final AtomicReference<AffinityTopologyVersion> readyTopVer = - new AtomicReference<>(AffinityTopologyVersion.NONE); - - /** */ private GridFutureAdapter<?> reconnectExchangeFut; /** */ @@ -759,7 +756,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // Do not allow any activity in exchange manager after stop. busyLock.writeLock().lock(); - exchFuts = null; + exchFuts.clear(); } /** @@ -826,7 +823,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @return Topology version of latest completed partition exchange. */ public AffinityTopologyVersion readyAffinityVersion() { - return readyTopVer.get(); + return exchFuts.readyTopVer(); } /** @@ -876,7 +873,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return lastInitializedFut0; } - AffinityTopologyVersion topVer = readyTopVer.get(); + AffinityTopologyVersion topVer = exchFuts.readyTopVer(); if (topVer.compareTo(ver) >= 0) { if (log.isDebugEnabled()) @@ -891,7 +888,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Created topology ready future [ver=" + ver + ", fut=" + fut + ']'); - topVer = readyTopVer.get(); + topVer = exchFuts.readyTopVer(); if (topVer.compareTo(ver) >= 0) { if (log.isDebugEnabled()) @@ -1400,15 +1397,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana log.debug("Exchange done [topVer=" + topVer + ", err=" + err + ']'); if (err == null) { - while (true) { - AffinityTopologyVersion readyVer = readyTopVer.get(); - - if (readyVer.compareTo(topVer) >= 0) - break; - - if (readyTopVer.compareAndSet(readyVer, topVer)) - break; - } + exchFuts.readyTopVer(topVer); for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { if (entry.getKey().compareTo(topVer) <= 0) { @@ -1576,7 +1565,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana AffinityTopologyVersion initVer = exchFut.initialVersion(); AffinityTopologyVersion readyVer = readyAffinityVersion(); - if (initVer.compareTo(readyVer) <= 0 && !exchFut.exchangeDone()) { + if (initVer.compareTo(readyVer) < 0 && !exchFut.isDone()) { U.warn(log, "Client node tries to connect but its exchange " + "info is cleaned up from exchange history. " + "Consider increasing 'IGNITE_EXCHANGE_HISTORY_SIZE' property " + @@ -1637,7 +1626,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana public void dumpDebugInfo(@Nullable GridDhtPartitionsExchangeFuture exchFut) throws Exception { AffinityTopologyVersion exchTopVer = exchFut != null ? exchFut.initialVersion() : null; - U.warn(diagnosticLog, "Ready affinity version: " + readyTopVer.get()); + U.warn(diagnosticLog, "Ready affinity version: " + exchFuts.readyTopVer()); U.warn(diagnosticLog, "Last exchange future: " + lastInitializedFut); @@ -2781,6 +2770,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private final int histSize; + /** */ + private final AtomicReference<AffinityTopologyVersion> readyTopVer = + new AtomicReference<>(AffinityTopologyVersion.NONE); + /** * Creates ordered, not strict list set. * @@ -2817,7 +2810,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana while (size() > histSize) { GridDhtPartitionsExchangeFuture last = last(); - if (!last.isDone()) + if (!last.isDone() || Objects.equals(last.initialVersion(), readyTopVer())) break; removeLast(); @@ -2827,6 +2820,29 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return cur == null ? fut : cur; } + /** + * @return Ready top version. + */ + public AffinityTopologyVersion readyTopVer() { + return readyTopVer.get(); + } + + /** + * @param readyTopVersion Ready top version. + * @return {@code true} if version was set and {@code false} otherwise. + */ + public boolean readyTopVer(AffinityTopologyVersion readyTopVersion) { + while (true) { + AffinityTopologyVersion readyVer = readyTopVer.get(); + + if (readyVer.compareTo(readyTopVersion) >= 0) + return false; + + if (readyTopVer.compareAndSet(readyVer, readyTopVersion)) + return true; + } + } + /** {@inheritDoc} */ @Nullable @Override public synchronized GridDhtPartitionsExchangeFuture removex( GridDhtPartitionsExchangeFuture val) { http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 75cd491..4d0b583 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2116,21 +2116,25 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param msg Single message received from the client which didn't find original ExchangeFuture. */ public void forceClientReconnect(ClusterNode node, GridDhtPartitionsSingleMessage msg) { - Exception e = new IgniteNeedReconnectException(node, null); + Exception reconnectException = new IgniteNeedReconnectException(node, null); - exchangeGlobalExceptions.put(node.id(), e); + exchangeGlobalExceptions.put(node.id(), reconnectException); - onDone(null, e); + onDone(null, reconnectException); GridDhtPartitionsFullMessage fullMsg = createPartitionsMessage(true, false); fullMsg.setErrorsMap(exchangeGlobalExceptions); - FinishState finishState0 = new FinishState(cctx.localNodeId(), - initialVersion(), - fullMsg); + try { + cctx.io().send(node, fullMsg, SYSTEM_POOL); - sendAllPartitionsToNode(finishState0, msg, node.id()); + if (log.isDebugEnabled()) + log.debug("Full message for reconnect client was sent to node: " + node + ", fullMsg: " + fullMsg); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send reconnect client message [node=" + node + ']', e); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/Authentication1kUsersNodeRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/Authentication1kUsersNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/Authentication1kUsersNodeRestartTest.java index 384cbe8..0ca621e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/Authentication1kUsersNodeRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/Authentication1kUsersNodeRestartTest.java @@ -49,6 +49,7 @@ public class Authentication1kUsersNodeRestartTest extends GridCommonAbstractTest cfg.setDataStorageConfiguration(new DataStorageConfiguration() .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setMaxSize(200L * 1024 * 1024) .setPersistenceEnabled(true))); return cfg; http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationConfigurationClusterTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationConfigurationClusterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationConfigurationClusterTest.java index 7d946dc..fb61d60 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationConfigurationClusterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationConfigurationClusterTest.java @@ -59,6 +59,7 @@ public class AuthenticationConfigurationClusterTest extends GridCommonAbstractTe cfg.setDataStorageConfiguration(new DataStorageConfiguration() .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setMaxSize(200L * 1024 * 1024) .setPersistenceEnabled(true))); return cfg; http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationOnNotActiveClusterTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationOnNotActiveClusterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationOnNotActiveClusterTest.java index 7405d7a..638c378 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationOnNotActiveClusterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationOnNotActiveClusterTest.java @@ -56,6 +56,7 @@ public class AuthenticationOnNotActiveClusterTest extends GridCommonAbstractTest cfg.setDataStorageConfiguration(new DataStorageConfiguration() .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setMaxSize(200L * 1024 * 1024) .setPersistenceEnabled(true))); return cfg; http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNPEOnStartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNPEOnStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNPEOnStartTest.java index 9748db7..661c875 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNPEOnStartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNPEOnStartTest.java @@ -49,6 +49,7 @@ public class AuthenticationProcessorNPEOnStartTest extends GridCommonAbstractTes cfg.setDataStorageConfiguration(new DataStorageConfiguration() .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setMaxSize(200L * 1024 * 1024) .setPersistenceEnabled(true))); return cfg; http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNodeRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNodeRestartTest.java index 632a8b9..7496cfe 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNodeRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorNodeRestartTest.java @@ -71,6 +71,7 @@ public class AuthenticationProcessorNodeRestartTest extends GridCommonAbstractTe cfg.setDataStorageConfiguration(new DataStorageConfiguration() .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setMaxSize(200L * 1024 * 1024) .setPersistenceEnabled(true))); return cfg; http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java index 71d86f2..6c79c7f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/authentication/AuthenticationProcessorSelfTest.java @@ -85,6 +85,7 @@ public class AuthenticationProcessorSelfTest extends GridCommonAbstractTest { cfg.setDataStorageConfiguration(new DataStorageConfiguration() .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setMaxSize(200L * 1024 * 1024) .setPersistenceEnabled(true))); return cfg; http://git-wip-us.apache.org/repos/asf/ignite/blob/78e0bb7e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java index 4beb31a..a0796a3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java @@ -141,6 +141,8 @@ public class IgniteCacheClientReconnectTest extends GridCommonAbstractTest { waitForTopology(SRV_CNT + CLIENTS_CNT); + awaitPartitionMapExchange(); + verifyPartitionToNodeMappings(); verifyAffinityTopologyVersions();