ignite-5196 Need use concurrent collection for GridDiscoveryManager.registeredCaches
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/56768a2b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/56768a2b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/56768a2b Branch: refs/heads/ignite-5414 Commit: 56768a2b2f50892537c91fa46db6b887660c9932 Parents: 3ab4b9a Author: Igor Seliverstov <gvvinbl...@gmail.com> Authored: Wed Jun 7 11:10:47 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jun 7 11:10:47 2017 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 32 ++-- .../handlers/cache/GridCacheCommandHandler.java | 4 +- .../top/GridTopologyCommandHandler.java | 10 +- .../ignite/internal/ClusterGroupSelfTest.java | 100 ++++++++++++ .../top/CacheTopologyCommandHandlerTest.java | 152 +++++++++++++++++++ .../testsuites/IgniteRestHandlerTestSuite.java | 2 + 6 files changed, 279 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/56768a2b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 8c2bd8c..c91ff74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -54,6 +54,7 @@ import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.ClusterMetricsSnapshot; @@ -70,6 +71,7 @@ import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics; import org.apache.ignite.internal.processors.security.SecurityContext; @@ -254,7 +256,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private final Collection<IgniteInClosure<ClusterNode>> locNodeInitLsnrs = new ArrayList<>(); /** Map of dynamic cache filters. */ - private Map<String, CachePredicate> registeredCaches = new HashMap<>(); + private ConcurrentMap<String, CachePredicate> registeredCaches = new ConcurrentHashMap<>(); /** */ private Map<Integer, CacheGroupAffinity> registeredCacheGrps = new HashMap<>(); @@ -353,7 +355,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Adds dynamic cache filter. + * Called from discovery thread. Adds dynamic cache filter. * * @param grpId Cache group ID. * @param cacheName Cache name. @@ -377,7 +379,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Removes dynamic cache filter. + * Called from discovery thread. Removes dynamic cache filter. * * @param cacheName Cache name. */ @@ -404,7 +406,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Removes near node ID from cache filter. + * Called from discovery thread. Removes near node ID from cache filter. * * @param cacheName Cache name. * @param clientNodeId Near node ID. @@ -419,6 +421,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * Called from discovery thread. + * * @return Client nodes map. */ public Map<String, Map<UUID, Boolean>> clientNodesMap() { @@ -439,6 +443,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * Called from discovery thread. + * * @param leftNodeId Left node ID. */ private void updateClientNodes(UUID leftNodeId) { @@ -1837,17 +1843,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param node Node to check. * @return Public cache names accessible on the given node. */ - public Map<String, CacheMode> nodeCaches(ClusterNode node) { - Map<String, CacheMode> caches = U.newHashMap(registeredCaches.size()); + public Map<String, CacheConfiguration> nodePublicCaches(ClusterNode node) { + Map<String, CacheConfiguration> caches = U.newHashMap(registeredCaches.size()); - for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) { - String cacheName = entry.getKey(); + for (DynamicCacheDescriptor cacheDesc : ctx.cache().cacheDescriptors().values()) { + if (!cacheDesc.cacheType().userCache()) + continue; - CachePredicate pred = entry.getValue(); + CachePredicate p = registeredCaches.get(cacheDesc.cacheName()); - if (!CU.isSystemCache(cacheName) && !CU.isIgfsCache(ctx.config(), cacheName) && - pred != null && pred.cacheNode(node)) - caches.put(cacheName, pred.aff.cacheMode); + if (p != null && p.cacheNode(node)) + caches.put(cacheDesc.cacheName(), cacheDesc.cacheConfiguration()); } return caches; @@ -2059,6 +2065,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * Called from discovery thread. + * * @param loc Local node. * @param topSnapshot Topology snapshot. * @return Newly created discovery cache. http://git-wip-us.apache.org/repos/asf/ignite/blob/56768a2b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java index da260b1..bfc5282 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java @@ -943,10 +943,10 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { boolean sameCaches = true; - int hash = discovery.nodeCaches(F.first(subgrid)).hashCode(); + Set<String> caches = discovery.nodePublicCaches(F.first(subgrid)).keySet(); for (int i = 1; i < subgrid.size(); i++) { - if (hash != discovery.nodeCaches(subgrid.get(i)).hashCode()) { + if (!caches.equals(discovery.nodePublicCaches(subgrid.get(i)).keySet())) { sameCaches = false; break; http://git-wip-us.apache.org/repos/asf/ignite/blob/56768a2b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java index d9e023d..fa677c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java @@ -27,14 +27,12 @@ import java.util.Iterator; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.client.GridClientCacheMode; -import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.processors.rest.GridRestCommand; import org.apache.ignite.internal.processors.rest.GridRestProtocol; @@ -217,14 +215,12 @@ public class GridTopologyCommandHandler extends GridRestCommandHandlerAdapter { nodeBean.setTcpAddresses(nonEmptyList(node.<Collection<String>>attribute(ATTR_REST_TCP_ADDRS))); nodeBean.setTcpHostNames(nonEmptyList(node.<Collection<String>>attribute(ATTR_REST_TCP_HOST_NAMES))); - GridCacheProcessor cacheProc = ctx.cache(); - - Map<String, CacheMode> nodeCaches = ctx.discovery().nodeCaches(node); + Map<String, CacheConfiguration> nodeCaches = ctx.discovery().nodePublicCaches(node); Collection<GridClientCacheBean> caches = new ArrayList<>(nodeCaches.size()); - for (String cacheName : nodeCaches.keySet()) - caches.add(createCacheBean(cacheProc.cacheConfiguration(cacheName))); + for (CacheConfiguration ccfg : nodeCaches.values()) + caches.add(createCacheBean(ccfg)); nodeBean.setCaches(caches); http://git-wip-us.apache.org/repos/asf/ignite/blob/56768a2b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java index 142626b..9df561a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java @@ -19,14 +19,21 @@ package org.apache.ignite.internal; import java.util.Collection; import java.util.LinkedList; +import java.util.Random; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; import org.apache.ignite.Ignition; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonTest; /** @@ -274,6 +281,99 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest { /** * @throws Exception If failed. */ + public void testForCacheNodesOnDynamicCacheCreateDestroy() throws Exception { + Random rnd = ThreadLocalRandom.current(); + + final AtomicReference<Exception> ex = new AtomicReference<>(); + + IgniteInternalFuture fut = runCacheCreateDestroyTask(ex); + + while (!fut.isDone()) + ignite.cluster().forCacheNodes("cache" + rnd.nextInt(16)).nodes(); + + if (ex.get() != null) + throw ex.get(); + } + + /** + * @throws Exception If failed. + */ + public void testForClientNodesOnDynamicCacheCreateDestroy() throws Exception { + Random rnd = ThreadLocalRandom.current(); + + final AtomicReference<Exception> ex = new AtomicReference<>(); + + IgniteInternalFuture fut = runCacheCreateDestroyTask(ex); + + while (!fut.isDone()) + ignite.cluster().forClientNodes("cache" + rnd.nextInt(16)).nodes(); + + if (ex.get() != null) + throw ex.get(); + } + + /** + * @param exHldr Exception holder. + * @return Task future. + */ + private IgniteInternalFuture runCacheCreateDestroyTask(final AtomicReference<Exception> exHldr) { + final long deadline = System.currentTimeMillis() + 5000; + + final AtomicInteger cntr = new AtomicInteger(); + + return GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + int startIdx = cntr.getAndAdd(4); + int idx = 0; + boolean start = true; + + Set<String> caches = U.newHashSet(4); + + while (System.currentTimeMillis() < deadline) { + try { + if (start) { + caches.add("cache" + (startIdx + idx)); + ignite.createCache("cache" + (startIdx + idx)); + } + else { + ignite.destroyCache("cache" + (startIdx + idx)); + caches.remove("cache" + (startIdx + idx)); + } + + if ((idx = (idx + 1) % 4) == 0) + start = !start; + } + catch (Exception e) { + addException(exHldr, e); + + break; + } + } + + for (String cache : caches) { + try { + ignite.destroyCache(cache); + } + catch (Exception e) { + addException(exHldr, e); + } + } + } + }, 4, "cache-start-destroy"); + } + + /** + * @param exHldr Exception holder. + * @param ex Exception. + */ + private void addException(AtomicReference<Exception> exHldr, Exception ex) { + if (exHldr.get() != null || !exHldr.compareAndSet(null, ex)) + exHldr.get().addSuppressed(ex); + } + + /** + * @throws Exception If failed. + */ public void testEmptyGroup() throws Exception { ClusterGroup emptyGrp = ignite.cluster().forAttribute("nonExistent", "val"); http://git-wip-us.apache.org/repos/asf/ignite/blob/56768a2b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/top/CacheTopologyCommandHandlerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/top/CacheTopologyCommandHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/top/CacheTopologyCommandHandlerTest.java new file mode 100644 index 0000000..5d7dfd7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/top/CacheTopologyCommandHandlerTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.top; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.ConnectorConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.rest.GridRestCommand; +import org.apache.ignite.internal.processors.rest.request.GridRestTopologyRequest; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class CacheTopologyCommandHandlerTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + // Discovery config. + TcpDiscoverySpi disco = new TcpDiscoverySpi() + .setIpFinder(new TcpDiscoveryVmIpFinder(true)) + .setJoinTimeout(5000); + + // Cache config. + CacheConfiguration ccfg = new CacheConfiguration("cache*") + .setCacheMode(CacheMode.LOCAL) + .setAtomicityMode(CacheAtomicityMode.ATOMIC); + + ConnectorConfiguration clnCfg = new ConnectorConfiguration() + .setHost("127.0.0.1"); + + return super.getConfiguration() + .setLocalHost("127.0.0.1") + .setConnectorConfiguration(clnCfg) + .setDiscoverySpi(disco) + .setCacheConfiguration(ccfg); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testTopologyCommandOnDynamicCacheCreateDestroy() throws Exception { + GridRestTopologyRequest req = new GridRestTopologyRequest(); + req.command(GridRestCommand.TOPOLOGY); + + topologyCommandOnDynamicCacheCreateDestroy(startGrid(), req); + } + + /** + * @throws Exception If failed. + */ + public void testNodeCommandOnDynamicCacheCreateDestroy1() throws Exception { + Ignite node = startGrid(); + + GridRestTopologyRequest req = new GridRestTopologyRequest(); + req.command(GridRestCommand.NODE); + req.nodeId(node.cluster().localNode().id()); + + topologyCommandOnDynamicCacheCreateDestroy(node, req); + } + + /** + * @throws Exception If failed. + */ + public void testNodeCommandOnDynamicCacheCreateDestroy2() throws Exception { + Ignite node = startGrid(); + + GridRestTopologyRequest req = new GridRestTopologyRequest(); + req.command(GridRestCommand.NODE); + req.nodeIp("127.0.0.1"); + + topologyCommandOnDynamicCacheCreateDestroy(node, req); + } + + /** + * @param node Ignite node. + * @param req Rest request. + * @throws Exception If failed. + */ + private void topologyCommandOnDynamicCacheCreateDestroy(final Ignite node, GridRestTopologyRequest req) throws Exception { + GridTopologyCommandHandler hnd = new GridTopologyCommandHandler(((IgniteKernal)node).context()); + + final AtomicReference<Exception> ex = new AtomicReference<>(); + + final long deadline = System.currentTimeMillis() + 5000; + + final AtomicInteger cntr = new AtomicInteger(); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + int startIdx = cntr.getAndAdd(4); + int idx = 0; + boolean start = true; + + while (System.currentTimeMillis() < deadline) { + try { + if (start) + node.createCache("cache" + (startIdx + idx)); + else + node.destroyCache("cache" + (startIdx + idx)); + + if ((idx = (idx + 1) % 4) == 0) + start = !start; + } + catch (Exception e) { + if (ex.get() != null || !ex.compareAndSet(null, e)) + ex.get().addSuppressed(e); + + break; + } + } + } + }, 4, "cache-start-destroy"); + + while (!fut.isDone()) + hnd.handleAsync(req).get(); + + if (ex.get() != null) + throw ex.get(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/56768a2b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java index 6263e8b..f3e5828 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.rest.handlers.cache.GridCacheAtomic import org.apache.ignite.internal.processors.rest.handlers.cache.GridCacheCommandHandlerSelfTest; import org.apache.ignite.internal.processors.rest.handlers.log.GridLogCommandHandlerTest; import org.apache.ignite.internal.processors.rest.handlers.query.GridQueryCommandHandlerTest; +import org.apache.ignite.internal.processors.rest.handlers.top.CacheTopologyCommandHandlerTest; /** * REST support tests. @@ -38,6 +39,7 @@ public class IgniteRestHandlerTestSuite extends TestSuite { suite.addTestSuite(GridCacheAtomicCommandHandlerSelfTest.class); suite.addTestSuite(GridLogCommandHandlerTest.class); suite.addTestSuite(GridQueryCommandHandlerTest.class); + suite.addTestSuite(CacheTopologyCommandHandlerTest.class); return suite; }