This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 557830a  IGNITE-13217 Fixed partition loss detection on client nodes. 
Fixes #7994
557830a is described below

commit 557830a63b4ce7a7a9a50101c0424ca2a1a58ee7
Author: sergeyuttsel <utt...@gmail.com>
AuthorDate: Mon Nov 2 22:14:49 2020 +0300

    IGNITE-13217 Fixed partition loss detection on client nodes. Fixes #7994
    
    Signed-off-by: Slava Koptilin <slava.kopti...@gmail.com>
---
 .../cache/CacheAffinitySharedManager.java          |  24 +--
 .../dht/ClientCacheDhtTopologyFuture.java          |  95 -----------
 .../preloader/GridDhtPartitionsExchangeFuture.java |  13 ++
 .../dht/topology/GridClientPartitionTopology.java  |   3 +-
 .../cache/IgniteClientCacheStartFailoverTest.java  |   4 -
 .../distributed/CacheDetectLostPartitionsTest.java | 180 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite2.java   |   2 +
 ...acheMvccAbstractSqlCoordinatorFailoverTest.java |   3 -
 8 files changed, 209 insertions(+), 115 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 05fa72e..66aa98d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -46,7 +46,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
@@ -54,7 +53,6 @@ import 
org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.ClientCacheDhtTopologyFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
@@ -581,30 +579,32 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                     fetchFut);
 
                 GridDhtPartitionFullMap partMap;
-                ClientCacheDhtTopologyFuture topFut;
 
                 if (res != null) {
                     partMap = res.partitionMap();
 
                     assert partMap != null : res;
-
-                    topFut = new ClientCacheDhtTopologyFuture(topVer);
                 }
-                else {
+                else
                     partMap = new GridDhtPartitionFullMap(cctx.localNodeId(), 
cctx.localNode().order(), 1);
 
-                    topFut = new ClientCacheDhtTopologyFuture(topVer,
-                        new ClusterTopologyServerNotFoundException("All server 
nodes left grid."));
-                }
+                GridDhtPartitionsExchangeFuture exchFut = 
context().exchange().lastFinishedFuture();
 
-                grp.topology().updateTopologyVersion(topFut,
+                grp.topology().updateTopologyVersion(exchFut,
                     discoCache,
                     -1,
                     false);
 
-                grp.topology().update(topVer, partMap, null, 
Collections.emptySet(), null, null, null, null);
+                GridClientPartitionTopology clientTop = 
cctx.exchange().clearClientTopology(grp.groupId());
+
+                Set<Integer> lostParts = clientTop == null ? null : 
clientTop.lostPartitions();
+
+                grp.topology().update(topVer, partMap, null, 
Collections.emptySet(), null, null, null, lostParts);
+
+                if (clientTop == null)
+                    grp.topology().detectLostPartitions(topVer, exchFut);
 
-                topFut.validate(grp, discoCache.allNodes());
+                exchFut.validate(grp);
             }
             catch (IgniteCheckedException e) {
                 cctx.cache().closeCaches(startedCaches, false);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
deleted file mode 100644
index 8fae639..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.util.Collection;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Topology future created for client cache start.
- */
-public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter 
{
-    /** */
-    final AffinityTopologyVersion topVer;
-
-    /**
-     * @param topVer Topology version.
-     */
-    public ClientCacheDhtTopologyFuture(AffinityTopologyVersion topVer) {
-        assert topVer != null;
-
-        this.topVer = topVer;
-
-        onDone(topVer);
-    }
-
-    /**
-     * @param topVer Topology version.
-     * @param e Error.
-     */
-    public ClientCacheDhtTopologyFuture(AffinityTopologyVersion topVer, 
IgniteCheckedException e) {
-        assert e != null;
-        assert topVer != null;
-
-        this.topVer = topVer;
-
-        onDone(e);
-    }
-
-    /**
-     * @param grp Cache group.
-     * @param topNodes Topology nodes.
-     */
-    public void validate(CacheGroupContext grp, Collection<ClusterNode> 
topNodes) {
-        grpValidRes = U.newHashMap(1);
-
-        CacheGroupValidation valRes = validateCacheGroup(grp, topNodes);
-
-        if (!valRes.isValid() || valRes.hasLostPartitions())
-            grpValidRes.put(grp.groupId(), valRes);
-    }
-
-    /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion initialVersion() {
-        return topVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean exchangeDone() {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean changedAffinity() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return "ClientCacheDhtTopologyFuture [topVer=" + topVer + ']';
-    }
-}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e9f4d05..4a9435c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2673,6 +2673,19 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @param grp Cache group.
+     */
+    public void validate(CacheGroupContext grp) {
+        if (grpValidRes == null)
+            grpValidRes = new ConcurrentHashMap<>();
+
+        CacheGroupValidation valRes = validateCacheGroup(grp, 
events().lastEvent().topologyNodes());
+
+        if (!valRes.isValid() || valRes.hasLostPartitions())
+            grpValidRes.put(grp.groupId(), valRes);
+    }
+
+    /**
      * Updates the {@link 
GridMetricManager#PME_OPS_BLOCKED_DURATION_HISTOGRAM} and {@link
      * GridMetricManager#PME_DURATION_HISTOGRAM} metrics if needed.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index ad0a52b..4df8273 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -849,7 +849,8 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
 
             consistencyCheck();
 
-            this.lostParts = lostParts == null ? null : new 
TreeSet<>(lostParts);
+            if (exchangeVer != null)
+                this.lostParts = lostParts == null ? null : new 
TreeSet<>(lostParts);
 
             if (log.isDebugEnabled())
                 log.debug("Partition map after full update: " + 
fullMapString());
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
index e475cbc..2626459 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java
@@ -131,8 +131,6 @@ public class IgniteClientCacheStartFailoverTest extends 
GridCommonAbstractTest {
             }
         }, "start-cache");
 
-        U.sleep(1000);
-
         assertFalse(fut.isDone());
 
         stopGrid(0);
@@ -201,8 +199,6 @@ public class IgniteClientCacheStartFailoverTest extends 
GridCommonAbstractTest {
             }
         }, "start-cache");
 
-        U.sleep(1000);
-
         assertFalse(fut.isDone());
 
         stopGrid(1);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDetectLostPartitionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDetectLostPartitionsTest.java
new file mode 100644
index 0000000..d7143d0
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheDetectLostPartitionsTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.PartitionLossPolicy;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/** */
+public class CacheDetectLostPartitionsTest extends GridCommonAbstractTest {
+    /** */
+    private static final String TEST_CACHE_NAME = "testcache";
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * Test detect lost partitions on a client node when the cache init after 
partitions was lost.
+     * @throws Exception
+     */
+    @Test
+    public void testDetectLostPartitionsOnClient() throws Exception {
+        IgniteEx ig = startGrids(2);
+
+        awaitPartitionMapExchange();
+
+        IgniteCache<Object, Object> cache1 = 
ig.createCache(getCacheConfig(TEST_CACHE_NAME + 1));
+
+        IgniteCache<Object, Object> cache2 = 
ig.createCache(getCacheConfig(TEST_CACHE_NAME + 2));
+
+        for (int i = 0; i < 1000; i++) {
+            cache1.put(i, i);
+
+            cache2.put(i, i);
+        }
+
+        IgniteEx client = startClientGrid(2);
+
+        stopGrid(1);
+
+        cache1 = client.cache(TEST_CACHE_NAME + 1);
+        checkCache(cache1);
+
+        cache2 = client.cache(TEST_CACHE_NAME + 2);
+        checkCache(cache2);
+
+        cache1.close();
+        cache2.close();
+
+        checkCache(client.cache(TEST_CACHE_NAME + 1));
+        checkCache(client.cache(TEST_CACHE_NAME + 2));
+    }
+
+    /**
+     * Test detect lost partitions on a client node when the cache was closed 
before partitions was lost.
+     * @throws Exception
+     */
+    @Test
+    public void testDetectLostPartitionsOnClientWithClosedCache() throws 
Exception {
+        IgniteEx ig = startGrids(2);
+
+        awaitPartitionMapExchange();
+
+        IgniteCache<Object, Object> cacheSrv = 
ig.createCache(getCacheConfig(TEST_CACHE_NAME));
+
+        for (int i = 0; i < 1000; i++)
+            cacheSrv.put(i, i);
+
+        IgniteEx client = startClientGrid(2);
+
+        IgniteCache<Object, Object> cacheCl = client.cache(TEST_CACHE_NAME);
+
+        cacheCl.close();
+
+        stopGrid(1);
+
+        cacheCl = client.cache(TEST_CACHE_NAME);
+
+        checkCache(cacheCl);
+    }
+
+    /**
+     * Test detect lost partitions on a server node which doesn't have 
partitions when the cache was closed
+     * before partitions was lost.
+     * @throws Exception
+     */
+    @Test
+    public void testDetectLostPartitionsOnServerWithClosedCache() throws 
Exception {
+        startGrids(3);
+
+        awaitPartitionMapExchange();
+
+        IgniteCache<Object, Object> cacheSrv1 = grid(1).createCache(
+            getCacheConfig(TEST_CACHE_NAME)
+                .setNodeFilter(new 
NodeConsistentIdFilter(grid(2).localNode().consistentId()))
+        );
+
+        for (int i = 0; i < 1000; i++)
+            cacheSrv1.put(i, i);
+
+        IgniteEx ig2 = grid(2);
+
+        IgniteCache<Object, Object> cacheSrv2 = ig2.cache(TEST_CACHE_NAME);
+
+        cacheSrv2.close();
+
+        stopGrid(1);
+
+        cacheSrv2 = ig2.cache(TEST_CACHE_NAME);
+
+        checkCache(cacheSrv2);
+    }
+
+    /** */
+    private CacheConfiguration<Object, Object> getCacheConfig(String 
cacheName) {
+        return new CacheConfiguration<>(cacheName)
+            .setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE);
+    }
+
+    /** */
+    private void checkCache(IgniteCache<Object, Object> cache) {
+        assertFalse(cache.lostPartitions().isEmpty());
+
+        GridTestUtils.assertThrows(null, () -> {
+                for (int i = 0; i < 1000; i++)
+                    cache.get(i);
+            },
+            IgniteException.class, "partition data has been lost");
+
+        GridTestUtils.assertThrows(null, () -> {
+                for (int i = 0; i < 1000; i++)
+                    cache.put(i, i);
+            },
+            IgniteException.class, "partition data has been lost");
+    }
+
+    /** Filter by consistent id. */
+    private static class NodeConsistentIdFilter implements 
IgnitePredicate<ClusterNode> {
+        /** */
+        private final Object consistentId;
+
+        /**
+         * @param consistentId Consistent id where cache should be started.
+         */
+        NodeConsistentIdFilter(Object consistentId) {
+            this.consistentId = consistentId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            return !node.consistentId().equals(consistentId);
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 4935d49..63516f8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -58,6 +58,7 @@ import 
org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitNearReade
 import 
org.apache.ignite.internal.processors.cache.MemoryPolicyConfigValidationTest;
 import 
org.apache.ignite.internal.processors.cache.NoPresentCacheInterceptorOnClientTest;
 import 
org.apache.ignite.internal.processors.cache.NonAffinityCoordinatorDynamicStartStopTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.CacheDetectLostPartitionsTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTestAllowOverwrite;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheLockReleaseNodeLeaveTest;
@@ -381,6 +382,7 @@ public class IgniteCacheTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, 
CachePartitionPartialCountersMapSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteReflectionFactorySelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
NoPresentCacheInterceptorOnClientTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
CacheDetectLostPartitionsTest.class, ignoredTests);
 
         return suite;
     }
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java
index 87f2c6a..fb62417 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java
@@ -31,7 +31,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.transactions.Transaction;
@@ -283,8 +282,6 @@ public abstract class 
CacheMvccAbstractSqlCoordinatorFailoverTest extends CacheM
             }
         }, "start-cache");
 
-        U.sleep(1000);
-
         assertFalse(fut.isDone());
 
         stopGrid(1);

Reply via email to