Repository: ignite Updated Branches: refs/heads/master 95fc14903 -> 36e8cf54a
IGNITE-8324 Update disco cache for affinity topology together with topology version - Fixes #3880. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/36e8cf54 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/36e8cf54 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/36e8cf54 Branch: refs/heads/master Commit: 36e8cf54a4acb2e43731d17f22fd6f50dc5d0e5f Parents: 95fc149 Author: Pavel Kovalenko <[email protected]> Authored: Wed Apr 25 17:45:28 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Apr 25 17:45:28 2018 +0300 ---------------------------------------------------------------------- .../dht/ClientCacheDhtTopologyFuture.java | 3 +- .../dht/GridClientPartitionTopology.java | 6 +--- .../dht/GridDhtPartitionTopologyImpl.java | 15 ++++---- .../GridDhtPartitionsExchangeFuture.java | 21 ----------- .../distributed/CacheExchangeMergeTest.java | 38 ++++++++++++++++++++ ...gnitePdsCacheAssignmentNodeRestartsTest.java | 24 +++++++------ 6 files changed, 63 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java index 317037b..4b48f5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java @@ -27,8 +27,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; /** * Topology future created for client cache start. */ -public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter - implements GridDhtTopologyFuture { +public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter { /** */ final AffinityTopologyVersion topVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index dcb8b96..cf8fc34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -207,11 +207,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { try { AffinityTopologyVersion exchTopVer = exchFut.initialVersion(); - // Update is correct if topology version is newer or in case of newer discovery caches. - boolean isCorrectUpdate = exchTopVer.compareTo(topVer) > 0 - || (exchTopVer.compareTo(topVer) == 0 && this.discoCache != null && discoCache.version().compareTo(this.discoCache.version()) > 0); - - assert isCorrectUpdate : "Invalid topology version [grp=" + grpId + + assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [grp=" + grpId + ", topVer=" + topVer + ", exchVer=" + exchTopVer + ", discoCacheVer=" + (this.discoCache != null ? this.discoCache.version() : "None") + http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 3d664f3..2c47315 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -241,11 +241,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { AffinityTopologyVersion exchTopVer = exchFut.initialVersion(); - // Update is correct if topology version is newer or in case of newer discovery caches. - boolean isCorrectUpdate = exchTopVer.compareTo(readyTopVer) > 0 - || (exchTopVer.compareTo(readyTopVer) == 0 && this.discoCache != null && discoCache.version().compareTo(this.discoCache.version()) > 0); - - assert isCorrectUpdate : "Invalid topology version [grp=" + grp.cacheOrGroupName() + + assert exchTopVer.compareTo(readyTopVer) > 0 : "Invalid topology version [grp=" + grp.cacheOrGroupName() + ", topVer=" + readyTopVer + ", exchTopVer=" + exchTopVer + ", discoCacheVer=" + (this.discoCache != null ? this.discoCache.version() : "None") + @@ -489,6 +485,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ", evtsVer=" + evts.topologyVersion() + ']'; lastTopChangeVer = readyTopVer = evts.topologyVersion(); + + discoCache = evts.discoveryCache(); } if (log.isDebugEnabled()) { @@ -1821,7 +1819,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, AffinityAssignment assignment, boolean updateRebalanceVer) { + @Override public void onExchangeDone(@Nullable GridDhtPartitionsExchangeFuture fut, + AffinityAssignment assignment, + boolean updateRebalanceVer) { lock.writeLock().lock(); try { @@ -1830,6 +1830,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { readyTopVer = lastTopChangeVer = assignment.topologyVersion(); + if (fut != null) + discoCache = fut.events().discoveryCache(); + if (!grp.isReplicated()) { boolean rebuildDiff = fut == null || fut.localJoinExchange() || fut.serverNodeDiscoveryEvent() || fut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || !diffFromAffinityVer.initialized(); http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 56907b2..33bd989 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1919,18 +1919,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * Checks that some futures were merged to the current. - * Future without merges has only one DiscoveryEvent. - * If we merge futures to the current (see {@link GridCachePartitionExchangeManager#mergeExchanges(GridDhtPartitionsExchangeFuture, GridDhtPartitionsFullMessage)}) - * we add new discovery event from merged future. - * - * @return {@code True} If some futures were merged to current, false in other case. - */ - private boolean hasMergedExchanges() { - return context().events().events().size() > 1; - } - - /** * @param fut Current future. * @return Pending join request if any. */ @@ -2475,12 +2463,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean finish = cctx.exchange().mergeExchangesOnCoordinator(this); - // Synchronize in case of changed coordinator (thread switched to sys-*) - synchronized (mux) { - if (hasMergedExchanges()) - updateTopologies(true); - } - if (!finish) return; } @@ -3116,9 +3098,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte return; // Node is stopping, no need to further process exchange. } - if (hasMergedExchanges()) - updateTopologies(false); - assert resTopVer.equals(exchCtx.events().topologyVersion()) : "Unexpected result version [" + "msgVer=" + resTopVer + ", locVer=" + exchCtx.events().topologyVersion() + ']'; http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index 9660a76..6c714b1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -1190,6 +1190,8 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { awaitPartitionMapExchange(); + checkTopologiesConsistency(); + checkCaches0(); } @@ -1206,6 +1208,42 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { } /** + * Checks that after exchange all nodes have consistent state about partition owners. + * + * @throws Exception If failed. + */ + private void checkTopologiesConsistency() throws Exception { + List<Ignite> nodes = G.allGrids(); + + IgniteEx crdNode = null; + + for (Ignite node : nodes) { + ClusterNode locNode = node.cluster().localNode(); + + if (crdNode == null || locNode.order() < crdNode.localNode().order()) + crdNode = (IgniteEx) node; + } + + for (Ignite node : nodes) { + IgniteEx node0 = (IgniteEx) node; + + if (node0.localNode().id().equals(crdNode.localNode().id())) + continue; + + for (IgniteInternalCache cache : node0.context().cache().caches()) { + int partitions = cache.context().affinity().partitions(); + for (int p = 0; p < partitions; p++) { + List<ClusterNode> crdOwners = crdNode.cachex(cache.name()).cache().context().topology().owners(p); + + List<ClusterNode> owners = cache.context().topology().owners(p); + + assertEquals(crdOwners, owners); + } + } + } + } + + /** * @throws Exception If failed. */ private void checkAffinity() throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/36e8cf54/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java index 0b8f15a..c57165c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java @@ -29,10 +29,9 @@ import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.MemoryConfiguration; -import org.apache.ignite.configuration.MemoryPolicyConfiguration; -import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; @@ -41,7 +40,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -65,11 +63,17 @@ public class IgnitePdsCacheAssignmentNodeRestartsTest extends GridCommonAbstract @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - cfg.setMemoryConfiguration(new MemoryConfiguration().setDefaultMemoryPolicyName("d"). - setPageSize(1024).setMemoryPolicies(new MemoryPolicyConfiguration().setName("d"). - setInitialSize(50 * 1024 * 1024L).setMaxSize(50 * 1024 * 1024))); + cfg.setConsistentId(igniteInstanceName); - cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration().setWalMode(WALMode.LOG_ONLY)); + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setPageSize(1024) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setInitialSize(50 * 1024 * 1024L) + .setMaxSize(50 * 1024 * 1024L) + ) + .setWalMode(WALMode.LOG_ONLY) + ); ((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder); @@ -80,14 +84,14 @@ public class IgnitePdsCacheAssignmentNodeRestartsTest extends GridCommonAbstract @Override protected void beforeTest() throws Exception { super.beforeTest(); - U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + cleanPersistenceDir(); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); - U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + cleanPersistenceDir(); super.afterTest(); }
