ignite-5196 Need use concurrent collection for 
GridDiscoveryManager.registeredCaches


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/56768a2b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/56768a2b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/56768a2b

Branch: refs/heads/ignite-5414
Commit: 56768a2b2f50892537c91fa46db6b887660c9932
Parents: 3ab4b9a
Author: Igor Seliverstov <gvvinbl...@gmail.com>
Authored: Wed Jun 7 11:10:47 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Jun 7 11:10:47 2017 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  32 ++--
 .../handlers/cache/GridCacheCommandHandler.java |   4 +-
 .../top/GridTopologyCommandHandler.java         |  10 +-
 .../ignite/internal/ClusterGroupSelfTest.java   | 100 ++++++++++++
 .../top/CacheTopologyCommandHandlerTest.java    | 152 +++++++++++++++++++
 .../testsuites/IgniteRestHandlerTestSuite.java  |   2 +
 6 files changed, 279 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/56768a2b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 8c2bd8c..c91ff74 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -54,6 +54,7 @@ import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.ClusterMetricsSnapshot;
@@ -70,6 +71,7 @@ import 
org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
 import org.apache.ignite.internal.processors.security.SecurityContext;
@@ -254,7 +256,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     private final Collection<IgniteInClosure<ClusterNode>> locNodeInitLsnrs = 
new ArrayList<>();
 
     /** Map of dynamic cache filters. */
-    private Map<String, CachePredicate> registeredCaches = new HashMap<>();
+    private ConcurrentMap<String, CachePredicate> registeredCaches = new 
ConcurrentHashMap<>();
 
     /** */
     private Map<Integer, CacheGroupAffinity> registeredCacheGrps = new 
HashMap<>();
@@ -353,7 +355,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * Adds dynamic cache filter.
+     * Called from discovery thread. Adds dynamic cache filter.
      *
      * @param grpId Cache group ID.
      * @param cacheName Cache name.
@@ -377,7 +379,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * Removes dynamic cache filter.
+     * Called from discovery thread. Removes dynamic cache filter.
      *
      * @param cacheName Cache name.
      */
@@ -404,7 +406,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * Removes near node ID from cache filter.
+     * Called from discovery thread. Removes near node ID from cache filter.
      *
      * @param cacheName Cache name.
      * @param clientNodeId Near node ID.
@@ -419,6 +421,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * Called from discovery thread.
+     *
      * @return Client nodes map.
      */
     public Map<String, Map<UUID, Boolean>> clientNodesMap() {
@@ -439,6 +443,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * Called from discovery thread.
+     *
      * @param leftNodeId Left node ID.
      */
     private void updateClientNodes(UUID leftNodeId) {
@@ -1837,17 +1843,17 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
      * @param node Node to check.
      * @return Public cache names accessible on the given node.
      */
-    public Map<String, CacheMode> nodeCaches(ClusterNode node) {
-        Map<String, CacheMode> caches = U.newHashMap(registeredCaches.size());
+    public Map<String, CacheConfiguration> nodePublicCaches(ClusterNode node) {
+        Map<String, CacheConfiguration> caches = 
U.newHashMap(registeredCaches.size());
 
-        for (Map.Entry<String, CachePredicate> entry : 
registeredCaches.entrySet()) {
-            String cacheName = entry.getKey();
+        for (DynamicCacheDescriptor cacheDesc : 
ctx.cache().cacheDescriptors().values()) {
+            if (!cacheDesc.cacheType().userCache())
+                continue;
 
-            CachePredicate pred = entry.getValue();
+            CachePredicate p = registeredCaches.get(cacheDesc.cacheName());
 
-            if (!CU.isSystemCache(cacheName) && !CU.isIgfsCache(ctx.config(), 
cacheName) &&
-                pred != null && pred.cacheNode(node))
-                caches.put(cacheName, pred.aff.cacheMode);
+            if (p != null && p.cacheNode(node))
+                caches.put(cacheDesc.cacheName(), 
cacheDesc.cacheConfiguration());
         }
 
         return caches;
@@ -2059,6 +2065,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * Called from discovery thread.
+     *
      * @param loc Local node.
      * @param topSnapshot Topology snapshot.
      * @return Newly created discovery cache.

http://git-wip-us.apache.org/repos/asf/ignite/blob/56768a2b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index da260b1..bfc5282 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -943,10 +943,10 @@ public class GridCacheCommandHandler extends 
GridRestCommandHandlerAdapter {
 
             boolean sameCaches = true;
 
-            int hash = discovery.nodeCaches(F.first(subgrid)).hashCode();
+            Set<String> caches = 
discovery.nodePublicCaches(F.first(subgrid)).keySet();
 
             for (int i = 1; i < subgrid.size(); i++) {
-                if (hash != discovery.nodeCaches(subgrid.get(i)).hashCode()) {
+                if 
(!caches.equals(discovery.nodePublicCaches(subgrid.get(i)).keySet())) {
                     sameCaches = false;
 
                     break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/56768a2b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
index d9e023d..fa677c7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
@@ -27,14 +27,12 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.client.GridClientCacheMode;
-import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
 import org.apache.ignite.internal.processors.rest.GridRestCommand;
 import org.apache.ignite.internal.processors.rest.GridRestProtocol;
@@ -217,14 +215,12 @@ public class GridTopologyCommandHandler extends 
GridRestCommandHandlerAdapter {
         
nodeBean.setTcpAddresses(nonEmptyList(node.<Collection<String>>attribute(ATTR_REST_TCP_ADDRS)));
         
nodeBean.setTcpHostNames(nonEmptyList(node.<Collection<String>>attribute(ATTR_REST_TCP_HOST_NAMES)));
 
-        GridCacheProcessor cacheProc = ctx.cache();
-
-        Map<String, CacheMode> nodeCaches = ctx.discovery().nodeCaches(node);
+        Map<String, CacheConfiguration> nodeCaches = 
ctx.discovery().nodePublicCaches(node);
 
         Collection<GridClientCacheBean> caches = new 
ArrayList<>(nodeCaches.size());
 
-        for (String cacheName : nodeCaches.keySet())
-            
caches.add(createCacheBean(cacheProc.cacheConfiguration(cacheName)));
+        for (CacheConfiguration ccfg : nodeCaches.values())
+            caches.add(createCacheBean(ccfg));
 
         nodeBean.setCaches(caches);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/56768a2b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
index 142626b..9df561a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
@@ -19,14 +19,21 @@ package org.apache.ignite.internal;
 
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
 
 /**
@@ -274,6 +281,99 @@ public class ClusterGroupSelfTest extends 
ClusterGroupAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testForCacheNodesOnDynamicCacheCreateDestroy() throws 
Exception {
+        Random rnd = ThreadLocalRandom.current();
+
+        final AtomicReference<Exception> ex = new AtomicReference<>();
+
+        IgniteInternalFuture fut = runCacheCreateDestroyTask(ex);
+
+        while (!fut.isDone())
+            ignite.cluster().forCacheNodes("cache" + rnd.nextInt(16)).nodes();
+
+        if (ex.get() != null)
+            throw ex.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testForClientNodesOnDynamicCacheCreateDestroy() throws 
Exception {
+        Random rnd = ThreadLocalRandom.current();
+
+        final AtomicReference<Exception> ex = new AtomicReference<>();
+
+        IgniteInternalFuture fut = runCacheCreateDestroyTask(ex);
+
+        while (!fut.isDone())
+            ignite.cluster().forClientNodes("cache" + rnd.nextInt(16)).nodes();
+
+        if (ex.get() != null)
+            throw ex.get();
+    }
+
+    /**
+     * @param exHldr Exception holder.
+     * @return Task future.
+     */
+    private IgniteInternalFuture runCacheCreateDestroyTask(final 
AtomicReference<Exception> exHldr) {
+        final long deadline = System.currentTimeMillis() + 5000;
+
+        final AtomicInteger cntr = new AtomicInteger();
+
+        return GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                int startIdx = cntr.getAndAdd(4);
+                int idx = 0;
+                boolean start = true;
+
+                Set<String> caches = U.newHashSet(4);
+
+                while (System.currentTimeMillis() < deadline) {
+                    try {
+                        if (start) {
+                            caches.add("cache" + (startIdx + idx));
+                            ignite.createCache("cache" + (startIdx + idx));
+                        }
+                        else {
+                            ignite.destroyCache("cache" + (startIdx + idx));
+                            caches.remove("cache" + (startIdx + idx));
+                        }
+
+                        if ((idx = (idx + 1) % 4) == 0)
+                            start = !start;
+                    }
+                    catch (Exception e) {
+                        addException(exHldr, e);
+
+                        break;
+                    }
+                }
+
+                for (String cache : caches) {
+                    try {
+                        ignite.destroyCache(cache);
+                    }
+                    catch (Exception e) {
+                        addException(exHldr, e);
+                    }
+                }
+            }
+        }, 4, "cache-start-destroy");
+    }
+
+    /**
+     * @param exHldr Exception holder.
+     * @param ex Exception.
+     */
+    private void addException(AtomicReference<Exception> exHldr, Exception ex) 
{
+        if (exHldr.get() != null || !exHldr.compareAndSet(null, ex))
+            exHldr.get().addSuppressed(ex);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testEmptyGroup() throws Exception {
         ClusterGroup emptyGrp = ignite.cluster().forAttribute("nonExistent", 
"val");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/56768a2b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/top/CacheTopologyCommandHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/top/CacheTopologyCommandHandlerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/top/CacheTopologyCommandHandlerTest.java
new file mode 100644
index 0000000..5d7dfd7
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/top/CacheTopologyCommandHandlerTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.rest.handlers.top;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.rest.GridRestCommand;
+import 
org.apache.ignite.internal.processors.rest.request.GridRestTopologyRequest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheTopologyCommandHandlerTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration() throws 
Exception {
+        // Discovery config.
+        TcpDiscoverySpi disco = new TcpDiscoverySpi()
+            .setIpFinder(new TcpDiscoveryVmIpFinder(true))
+            .setJoinTimeout(5000);
+
+        // Cache config.
+        CacheConfiguration ccfg = new CacheConfiguration("cache*")
+            .setCacheMode(CacheMode.LOCAL)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC);
+
+        ConnectorConfiguration clnCfg = new ConnectorConfiguration()
+            .setHost("127.0.0.1");
+
+        return super.getConfiguration()
+            .setLocalHost("127.0.0.1")
+            .setConnectorConfiguration(clnCfg)
+            .setDiscoverySpi(disco)
+            .setCacheConfiguration(ccfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTopologyCommandOnDynamicCacheCreateDestroy() throws 
Exception {
+        GridRestTopologyRequest req = new GridRestTopologyRequest();
+        req.command(GridRestCommand.TOPOLOGY);
+
+        topologyCommandOnDynamicCacheCreateDestroy(startGrid(), req);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeCommandOnDynamicCacheCreateDestroy1() throws Exception 
{
+        Ignite node = startGrid();
+
+        GridRestTopologyRequest req = new GridRestTopologyRequest();
+        req.command(GridRestCommand.NODE);
+        req.nodeId(node.cluster().localNode().id());
+
+        topologyCommandOnDynamicCacheCreateDestroy(node, req);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeCommandOnDynamicCacheCreateDestroy2() throws Exception 
{
+        Ignite node = startGrid();
+
+        GridRestTopologyRequest req = new GridRestTopologyRequest();
+        req.command(GridRestCommand.NODE);
+        req.nodeIp("127.0.0.1");
+
+        topologyCommandOnDynamicCacheCreateDestroy(node, req);
+    }
+
+    /**
+     * @param node Ignite node.
+     * @param req Rest request.
+     * @throws Exception If failed.
+     */
+    private void topologyCommandOnDynamicCacheCreateDestroy(final Ignite node, 
GridRestTopologyRequest req) throws Exception {
+        GridTopologyCommandHandler hnd = new 
GridTopologyCommandHandler(((IgniteKernal)node).context());
+
+        final AtomicReference<Exception> ex = new AtomicReference<>();
+
+        final long deadline = System.currentTimeMillis() + 5000;
+
+        final AtomicInteger cntr = new AtomicInteger();
+
+        IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new 
Runnable() {
+            @Override public void run() {
+                int startIdx = cntr.getAndAdd(4);
+                int idx = 0;
+                boolean start = true;
+
+                while (System.currentTimeMillis() < deadline) {
+                    try {
+                        if (start)
+                            node.createCache("cache" + (startIdx + idx));
+                        else
+                            node.destroyCache("cache" + (startIdx + idx));
+
+                        if ((idx = (idx + 1) % 4) == 0)
+                            start = !start;
+                    }
+                    catch (Exception e) {
+                        if (ex.get() != null || !ex.compareAndSet(null, e))
+                            ex.get().addSuppressed(e);
+
+                        break;
+                    }
+                }
+            }
+        }, 4, "cache-start-destroy");
+
+        while (!fut.isDone())
+            hnd.handleAsync(req).get();
+
+        if (ex.get() != null)
+            throw ex.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/56768a2b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java
index 6263e8b..f3e5828 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java
@@ -22,6 +22,7 @@ import 
org.apache.ignite.internal.processors.rest.handlers.cache.GridCacheAtomic
 import 
org.apache.ignite.internal.processors.rest.handlers.cache.GridCacheCommandHandlerSelfTest;
 import 
org.apache.ignite.internal.processors.rest.handlers.log.GridLogCommandHandlerTest;
 import 
org.apache.ignite.internal.processors.rest.handlers.query.GridQueryCommandHandlerTest;
+import 
org.apache.ignite.internal.processors.rest.handlers.top.CacheTopologyCommandHandlerTest;
 
 /**
  * REST support tests.
@@ -38,6 +39,7 @@ public class IgniteRestHandlerTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheAtomicCommandHandlerSelfTest.class);
         suite.addTestSuite(GridLogCommandHandlerTest.class);
         suite.addTestSuite(GridQueryCommandHandlerTest.class);
+        suite.addTestSuite(CacheTopologyCommandHandlerTest.class);
 
         return suite;
     }

Reply via email to