This is an automated email from the ASF dual-hosted git repository. sk0x50 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 557830a IGNITE-13217 Fixed partition loss detection on client nodes. Fixes #7994 557830a is described below commit 557830a63b4ce7a7a9a50101c0424ca2a1a58ee7 Author: sergeyuttsel <utt...@gmail.com> AuthorDate: Mon Nov 2 22:14:49 2020 +0300 IGNITE-13217 Fixed partition loss detection on client nodes. Fixes #7994 Signed-off-by: Slava Koptilin <slava.kopti...@gmail.com> --- .../cache/CacheAffinitySharedManager.java | 24 +-- .../dht/ClientCacheDhtTopologyFuture.java | 95 ----------- .../preloader/GridDhtPartitionsExchangeFuture.java | 13 ++ .../dht/topology/GridClientPartitionTopology.java | 3 +- .../cache/IgniteClientCacheStartFailoverTest.java | 4 - .../distributed/CacheDetectLostPartitionsTest.java | 180 +++++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite2.java | 2 + ...acheMvccAbstractSqlCoordinatorFailoverTest.java | 3 - 8 files changed, 209 insertions(+), 115 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 05fa72e..66aa98d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -46,7 +46,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; @@ -54,7 +53,6 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; -import org.apache.ignite.internal.processors.cache.distributed.dht.ClientCacheDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage; @@ -581,30 +579,32 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap fetchFut); GridDhtPartitionFullMap partMap; - ClientCacheDhtTopologyFuture topFut; if (res != null) { partMap = res.partitionMap(); assert partMap != null : res; - - topFut = new ClientCacheDhtTopologyFuture(topVer); } - else { + else partMap = new GridDhtPartitionFullMap(cctx.localNodeId(), cctx.localNode().order(), 1); - topFut = new ClientCacheDhtTopologyFuture(topVer, - new ClusterTopologyServerNotFoundException("All server nodes left grid.")); - } + GridDhtPartitionsExchangeFuture exchFut = context().exchange().lastFinishedFuture(); - grp.topology().updateTopologyVersion(topFut, + grp.topology().updateTopologyVersion(exchFut, discoCache, -1, false); - grp.topology().update(topVer, partMap, null, Collections.emptySet(), null, null, null, null); + GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(grp.groupId()); + + Set<Integer> lostParts = clientTop == null ? null : clientTop.lostPartitions(); + + grp.topology().update(topVer, partMap, null, Collections.emptySet(), null, null, null, lostParts); + + if (clientTop == null) + grp.topology().detectLostPartitions(topVer, exchFut); - topFut.validate(grp, discoCache.allNodes()); + exchFut.validate(grp); } catch (IgniteCheckedException e) { cctx.cache().closeCaches(startedCaches, false); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java deleted file mode 100644 index 8fae639..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht; - -import java.util.Collection; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheGroupContext; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Topology future created for client cache start. - */ -public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter { - /** */ - final AffinityTopologyVersion topVer; - - /** - * @param topVer Topology version. - */ - public ClientCacheDhtTopologyFuture(AffinityTopologyVersion topVer) { - assert topVer != null; - - this.topVer = topVer; - - onDone(topVer); - } - - /** - * @param topVer Topology version. - * @param e Error. - */ - public ClientCacheDhtTopologyFuture(AffinityTopologyVersion topVer, IgniteCheckedException e) { - assert e != null; - assert topVer != null; - - this.topVer = topVer; - - onDone(e); - } - - /** - * @param grp Cache group. - * @param topNodes Topology nodes. - */ - public void validate(CacheGroupContext grp, Collection<ClusterNode> topNodes) { - grpValidRes = U.newHashMap(1); - - CacheGroupValidation valRes = validateCacheGroup(grp, topNodes); - - if (!valRes.isValid() || valRes.hasLostPartitions()) - grpValidRes.put(grp.groupId(), valRes); - } - - /** {@inheritDoc} */ - @Override public AffinityTopologyVersion initialVersion() { - return topVer; - } - - /** {@inheritDoc} */ - @Override public boolean exchangeDone() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public AffinityTopologyVersion topologyVersion() { - return topVer; - } - - /** {@inheritDoc} */ - @Override public boolean changedAffinity() { - return true; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "ClientCacheDhtTopologyFuture [topVer=" + topVer + ']'; - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e9f4d05..4a9435c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2673,6 +2673,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * @param grp Cache group. + */ + public void validate(CacheGroupContext grp) { + if (grpValidRes == null) + grpValidRes = new ConcurrentHashMap<>(); + + CacheGroupValidation valRes = validateCacheGroup(grp, events().lastEvent().topologyNodes()); + + if (!valRes.isValid() || valRes.hasLostPartitions()) + grpValidRes.put(grp.groupId(), valRes); + } + + /** * Updates the {@link GridMetricManager#PME_OPS_BLOCKED_DURATION_HISTOGRAM} and {@link * GridMetricManager#PME_DURATION_HISTOGRAM} metrics if needed. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java index ad0a52b..4df8273 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java @@ -849,7 +849,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { consistencyCheck(); - this.lostParts = lostParts == null ? null : new TreeSet<>(lostParts); + if (exchangeVer != null) + this.lostParts = lostParts == null ? null : new TreeSet<>(lostParts); if (log.isDebugEnabled()) log.debug("Partition map after full update: " + fullMapString()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java index e475cbc..2626459 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java @@ -131,8 +131,6 @@ public class IgniteClientCacheStartFailoverTest extends GridCommonAbstractTest { } }, "start-cache"); - U.sleep(1000); - assertFalse(fut.isDone()); stopGrid(0); @@ -201,8 +199,6 @@ public class IgniteClientCacheStartFailoverTest extends GridCommonAbstractTest { } }, "start-cache"); - U.sleep(1000); - assertFalse(fut.isDone()); stopGrid(1); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDetectLostPartitionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDetectLostPartitionsTest.java new file mode 100644 index 0000000..d7143d0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDetectLostPartitionsTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.PartitionLossPolicy; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** */ +public class CacheDetectLostPartitionsTest extends GridCommonAbstractTest { + /** */ + private static final String TEST_CACHE_NAME = "testcache"; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * Test detect lost partitions on a client node when the cache init after partitions was lost. + * @throws Exception + */ + @Test + public void testDetectLostPartitionsOnClient() throws Exception { + IgniteEx ig = startGrids(2); + + awaitPartitionMapExchange(); + + IgniteCache<Object, Object> cache1 = ig.createCache(getCacheConfig(TEST_CACHE_NAME + 1)); + + IgniteCache<Object, Object> cache2 = ig.createCache(getCacheConfig(TEST_CACHE_NAME + 2)); + + for (int i = 0; i < 1000; i++) { + cache1.put(i, i); + + cache2.put(i, i); + } + + IgniteEx client = startClientGrid(2); + + stopGrid(1); + + cache1 = client.cache(TEST_CACHE_NAME + 1); + checkCache(cache1); + + cache2 = client.cache(TEST_CACHE_NAME + 2); + checkCache(cache2); + + cache1.close(); + cache2.close(); + + checkCache(client.cache(TEST_CACHE_NAME + 1)); + checkCache(client.cache(TEST_CACHE_NAME + 2)); + } + + /** + * Test detect lost partitions on a client node when the cache was closed before partitions was lost. + * @throws Exception + */ + @Test + public void testDetectLostPartitionsOnClientWithClosedCache() throws Exception { + IgniteEx ig = startGrids(2); + + awaitPartitionMapExchange(); + + IgniteCache<Object, Object> cacheSrv = ig.createCache(getCacheConfig(TEST_CACHE_NAME)); + + for (int i = 0; i < 1000; i++) + cacheSrv.put(i, i); + + IgniteEx client = startClientGrid(2); + + IgniteCache<Object, Object> cacheCl = client.cache(TEST_CACHE_NAME); + + cacheCl.close(); + + stopGrid(1); + + cacheCl = client.cache(TEST_CACHE_NAME); + + checkCache(cacheCl); + } + + /** + * Test detect lost partitions on a server node which doesn't have partitions when the cache was closed + * before partitions was lost. + * @throws Exception + */ + @Test + public void testDetectLostPartitionsOnServerWithClosedCache() throws Exception { + startGrids(3); + + awaitPartitionMapExchange(); + + IgniteCache<Object, Object> cacheSrv1 = grid(1).createCache( + getCacheConfig(TEST_CACHE_NAME) + .setNodeFilter(new NodeConsistentIdFilter(grid(2).localNode().consistentId())) + ); + + for (int i = 0; i < 1000; i++) + cacheSrv1.put(i, i); + + IgniteEx ig2 = grid(2); + + IgniteCache<Object, Object> cacheSrv2 = ig2.cache(TEST_CACHE_NAME); + + cacheSrv2.close(); + + stopGrid(1); + + cacheSrv2 = ig2.cache(TEST_CACHE_NAME); + + checkCache(cacheSrv2); + } + + /** */ + private CacheConfiguration<Object, Object> getCacheConfig(String cacheName) { + return new CacheConfiguration<>(cacheName) + .setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE); + } + + /** */ + private void checkCache(IgniteCache<Object, Object> cache) { + assertFalse(cache.lostPartitions().isEmpty()); + + GridTestUtils.assertThrows(null, () -> { + for (int i = 0; i < 1000; i++) + cache.get(i); + }, + IgniteException.class, "partition data has been lost"); + + GridTestUtils.assertThrows(null, () -> { + for (int i = 0; i < 1000; i++) + cache.put(i, i); + }, + IgniteException.class, "partition data has been lost"); + } + + /** Filter by consistent id. */ + private static class NodeConsistentIdFilter implements IgnitePredicate<ClusterNode> { + /** */ + private final Object consistentId; + + /** + * @param consistentId Consistent id where cache should be started. + */ + NodeConsistentIdFilter(Object consistentId) { + this.consistentId = consistentId; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + return !node.consistentId().equals(consistentId); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index 4935d49..63516f8 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitNearReade import org.apache.ignite.internal.processors.cache.MemoryPolicyConfigValidationTest; import org.apache.ignite.internal.processors.cache.NoPresentCacheInterceptorOnClientTest; import org.apache.ignite.internal.processors.cache.NonAffinityCoordinatorDynamicStartStopTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheDetectLostPartitionsTest; import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTest; import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTestAllowOverwrite; import org.apache.ignite.internal.processors.cache.distributed.CacheLockReleaseNodeLeaveTest; @@ -381,6 +382,7 @@ public class IgniteCacheTestSuite2 { GridTestUtils.addTestIfNeeded(suite, CachePartitionPartialCountersMapSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteReflectionFactorySelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, NoPresentCacheInterceptorOnClientTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, CacheDetectLostPartitionsTest.class, ignoredTests); return suite; } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java index 87f2c6a..fb62417 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java @@ -31,7 +31,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; @@ -283,8 +282,6 @@ public abstract class CacheMvccAbstractSqlCoordinatorFailoverTest extends CacheM } }, "start-cache"); - U.sleep(1000); - assertFalse(fut.isDone()); stopGrid(1);