Repository: ignite Updated Branches: refs/heads/ignite-6973 [created] 81499e7a5
ignite-6973 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81499e7a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81499e7a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81499e7a Branch: refs/heads/ignite-6973 Commit: 81499e7a576e397aba6694d4d508ad98c6e51e89 Parents: 58b5041 Author: sboikov <sboi...@gridgain.com> Authored: Tue Nov 21 15:00:58 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Nov 21 15:00:58 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/ClusterCachesInfo.java | 4 +- ...gnitePdsCacheAssignmentNodeRestartsTest.java | 257 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite6.java | 3 +- 3 files changed, 261 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81499e7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 69f1a27..2b5ebf9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -1214,7 +1214,7 @@ class ClusterCachesInfo { if (storedCfgs != null) { List<DynamicCacheChangeRequest> reqs = new ArrayList<>(); - IgniteUuid deplymentId = IgniteUuid.fromUuid(msg.requestId()); + IgniteUuid deploymentId = msg.id(); for (StoredCacheData storedCfg : storedCfgs) { CacheConfiguration ccfg = storedCfg.config(); @@ -1224,7 +1224,7 @@ class ClusterCachesInfo { ccfg.getName(), msg.initiatorNodeId()); - req.deploymentId(deplymentId); + req.deploymentId(deploymentId); req.startCacheConfiguration(ccfg); req.cacheType(ctx.cache().cacheType(ccfg.getName())); req.schema(new QuerySchema(storedCfg.queryEntities())); http://git-wip-us.apache.org/repos/asf/ignite/blob/81499e7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java new file mode 100644 index 0000000..807f532 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheAssignmentNodeRestartsTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.MemoryPolicyConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC; + +/** + * The test validates assignment after nodes restart with enabled persistence. + */ +public class IgnitePdsCacheAssignmentNodeRestartsTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setMemoryConfiguration(new MemoryConfiguration().setDefaultMemoryPolicyName("d"). + setPageSize(1024).setMemoryPolicies(new MemoryPolicyConfiguration().setName("d"). + setInitialSize(50 * 1024 * 1024L).setMaxSize(50 * 1024 * 1024))); + + cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration().setWalMode(WALMode.LOG_ONLY)); + + ((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + + super.afterTest(); + } + + /** + * @param name Name. + * @param atomicityMode Atomicity mode. + * @param cacheMode Cache mode. + * @param backups Backups. + * @param grp Group. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String name, + CacheAtomicityMode atomicityMode, + CacheMode cacheMode, + int backups, + String grp) { + CacheConfiguration ccfg = new CacheConfiguration(name); + + ccfg.setAtomicityMode(atomicityMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setCacheMode(cacheMode); + ccfg.setGroupName(grp); + + ccfg.setAffinity(new RendezvousAffinityFunction(false, 128)); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * @throws Exception If failed. + */ + public void testAssignmentAfterRestarts() throws Exception { + try { + System.setProperty(IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC, "true"); + + final int gridsCnt = 5; + + final int groupsCnt = 2; + + final IgniteEx node = (IgniteEx) startGridsMultiThreaded(gridsCnt); + + final List<CacheConfiguration> cfgs = Arrays.asList( + cacheConfiguration("g1c1", TRANSACTIONAL, PARTITIONED, gridsCnt, "testGrp1"), + cacheConfiguration("g1c2", TRANSACTIONAL, PARTITIONED, gridsCnt, "testGrp1"), + cacheConfiguration("g2c1", TRANSACTIONAL, PARTITIONED, gridsCnt, "testGrp2"), + cacheConfiguration("g2c2", TRANSACTIONAL, PARTITIONED, gridsCnt, "testGrp2")); + + node.getOrCreateCaches(cfgs); + + validateDepIds(groupsCnt); + + stopAllGrids(); + + IgniteEx node2 = (IgniteEx) startGridsMultiThreaded(gridsCnt); + + validateDepIds(groupsCnt); // Deployment ids must be the same on all nodes. + + final int restartIdxFrom = 2; + + final AtomicInteger idx = new AtomicInteger(restartIdxFrom); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int nodeIdx = idx.getAndIncrement(); + + stopGrid(nodeIdx); + + return null; + } + }, gridsCnt - restartIdxFrom, "stop-node"); + + fut.get(); + + awaitPartitionMapExchange(); + + checkAffinity(); + + idx.set(restartIdxFrom); + + fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int nodeIdx = idx.getAndIncrement(); + + startGrid(nodeIdx); + + return null; + } + }, gridsCnt - restartIdxFrom, "start-node"); + + fut.get(); + + awaitPartitionMapExchange(); + + AffinityTopologyVersion topVer = node2.context().cache().context().exchange().readyAffinityVersion(); + + log.info("Using version: " + topVer); + + checkAffinity(); + } + finally { + System.clearProperty(IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC); + } + } + + /** + * @param grpCnt Group count. + */ + private void validateDepIds(int grpCnt) { + Map<Integer, IgniteUuid> depIds = new HashMap<>(); + + for (Ignite ignite : G.allGrids()) { + final Map<Integer, CacheGroupDescriptor> descMap = ((IgniteEx) ignite).context().cache().cacheGroupDescriptors(); + + for (Map.Entry<Integer, CacheGroupDescriptor> entry : descMap.entrySet()) { + final IgniteUuid u = entry.getValue().deploymentId(); + + final IgniteUuid u0 = depIds.get(entry.getKey()); + + if (u0 == null) + depIds.put(entry.getKey(), u); + else + assertEquals("Descriptors do not match", u0, u); + } + } + + assertEquals(grpCnt + 1, depIds.size()); + } + + /** + * @throws Exception If failed. + */ + private void checkAffinity() throws Exception { + List<Ignite> nodes = G.allGrids(); + + ClusterNode crdNode = null; + + for (Ignite node : nodes) { + ClusterNode locNode = node.cluster().localNode(); + + if (crdNode == null || locNode.order() < crdNode.order()) + crdNode = locNode; + } + + AffinityTopologyVersion topVer = ((IgniteKernal) grid(crdNode)). + context().cache().context().exchange().readyAffinityVersion(); + + Map<String, List<List<ClusterNode>>> affMap = new HashMap<>(); + + for (Ignite node : nodes) { + IgniteKernal node0 = (IgniteKernal) node; + + for (IgniteInternalCache cache : node0.context().cache().caches()) { + List<List<ClusterNode>> aff = affMap.get(cache.name()); + List<List<ClusterNode>> aff0 = cache.context().affinity().assignments(topVer); + + if (aff != null) + assertEquals(aff, aff0); + else + affMap.put(cache.name(), aff0); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/81499e7a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java index 8a2d6a0..8ad2e38 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java @@ -18,7 +18,6 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; -import org.apache.ignite.internal.processors.cache.IgniteOutOfMemoryPropagationTest; import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest; import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest; import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest; @@ -26,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThread import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeMultiServerTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest; import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest; +import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheAssignmentNodeRestartsTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNearCacheTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest; @@ -55,6 +55,7 @@ public class IgniteCacheTestSuite6 extends TestSuite { suite.addTestSuite(TxRollbackOnTimeoutNearCacheTest.class); suite.addTestSuite(IgniteCacheThreadLocalTxTest.class); + suite.addTestSuite(IgnitePdsCacheAssignmentNodeRestartsTest.class); // TODO enable this test after IGNITE-6753, now it takes too long // suite.addTestSuite(IgniteOutOfMemoryPropagationTest.class);