ignite-1334 Fixed concurrent destroyCache/node stop. Check initFut result in GridDhtPartitionsExchangeFuture.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/afce2699 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/afce2699 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/afce2699 Branch: refs/heads/ignite-1093-2 Commit: afce2699644c8af1e50eb5ef595ed299734c68e5 Parents: 2814d0e Author: sboikov <sboi...@gridgain.com> Authored: Tue Sep 1 08:59:01 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Sep 1 08:59:01 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 27 ++++-- .../GridDhtPartitionsExchangeFuture.java | 94 +++++++++++-------- .../cache/IgniteDynamicCacheAndNodeStop.java | 95 ++++++++++++++++++++ .../testsuites/IgniteCacheTestSuite2.java | 3 +- 4 files changed, 175 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/afce2699/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index bf203b8..c5f8168 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -156,6 +156,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** */ private final Map<String, GridCacheAdapter<?, ?>> caches; + /** Caches stopped from onKernalStop callback. */ + private final Map<String, GridCacheAdapter> stoppedCaches = new ConcurrentHashMap<>(); + /** Map of proxies. */ private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies; @@ -893,14 +896,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { @SuppressWarnings("unchecked") @Override public void stop(boolean cancel) throws IgniteCheckedException { for (String cacheName : stopSeq) { - GridCacheAdapter<?, ?> cache = caches.remove(maskNull(cacheName)); + GridCacheAdapter<?, ?> cache = stoppedCaches.remove(maskNull(cacheName)); if (cache != null) stopCache(cache, cancel); } - for (GridCacheAdapter<?, ?> cache : caches.values()) - stopCache(cache, cancel); + for (GridCacheAdapter<?, ?> cache : stoppedCaches.values()) { + if (cache == stoppedCaches.remove(maskNull(cache.name()))) + stopCache(cache, cancel); + } List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers(); @@ -932,15 +937,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { cacheStartedLatch.countDown(); for (String cacheName : stopSeq) { - GridCacheAdapter<?, ?> cache = caches.get(maskNull(cacheName)); + GridCacheAdapter<?, ?> cache = caches.remove(maskNull(cacheName)); + + if (cache != null) { + stoppedCaches.put(maskNull(cacheName), cache); - if (cache != null) onKernalStop(cache, cancel); + } } for (Map.Entry<String, GridCacheAdapter<?, ?>> entry : caches.entrySet()) { - if (!stopSeq.contains(entry.getKey())) + GridCacheAdapter<?, ?> cache = entry.getValue(); + + if (cache == caches.remove(entry.getKey())) { + stoppedCaches.put(entry.getKey(), cache); + onKernalStop(entry.getValue(), cancel); + } } List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers(); @@ -3457,4 +3470,4 @@ public class GridCacheProcessor extends GridProcessorAdapter { // No-op. } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/afce2699/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 414a152..865bbdc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1146,52 +1146,54 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } else { initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> t) { + @Override public void apply(IgniteInternalFuture<Boolean> f) { try { - if (!t.get()) // Just to check if there was an error. + if (!f.get()) return; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize exchange future: " + this, e); - ClusterNode loc = cctx.localNode(); + return; + } - singleMsgs.put(nodeId, msg); + ClusterNode loc = cctx.localNode(); - boolean match = true; + singleMsgs.put(nodeId, msg); - // Check if oldest node has changed. - if (!oldestNode.get().equals(loc)) { - match = false; + boolean match = true; - synchronized (mux) { - // Double check. - if (oldestNode.get().equals(loc)) - match = true; - } + // Check if oldest node has changed. + if (!oldestNode.get().equals(loc)) { + match = false; + + synchronized (mux) { + // Double check. + if (oldestNode.get().equals(loc)) + match = true; } + } - if (match) { - boolean allReceived; + if (match) { + boolean allReceived; - synchronized (rcvdIds) { - if (rcvdIds.add(nodeId)) - updatePartitionSingleMap(msg); + synchronized (rcvdIds) { + if (rcvdIds.add(nodeId)) + updatePartitionSingleMap(msg); - allReceived = allReceived(); - } + allReceived = allReceived(); + } - // If got all replies, and initialization finished, and reply has not been sent yet. - if (allReceived && ready.get() && replied.compareAndSet(false, true)) { - spreadPartitions(); + // If got all replies, and initialization finished, and reply has not been sent yet. + if (allReceived && ready.get() && replied.compareAndSet(false, true)) { + spreadPartitions(); - onDone(exchId.topologyVersion()); - } - else if (log.isDebugEnabled()) - log.debug("Exchange future full map is not sent [allReceived=" + allReceived() + - ", ready=" + ready + ", replied=" + replied.get() + ", init=" + init.get() + - ", fut=" + this + ']'); + onDone(exchId.topologyVersion()); } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to initialize exchange future: " + this, e); + else if (log.isDebugEnabled()) + log.debug("Exchange future full map is not sent [allReceived=" + allReceived() + + ", ready=" + ready + ", replied=" + replied.get() + ", init=" + init.get() + + ", fut=" + this + ']'); } } }); @@ -1254,7 +1256,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert exchId.topologyVersion().equals(msg.topologyVersion()); initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> t) { + @Override public void apply(IgniteInternalFuture<Boolean> f) { + try { + if (!f.get()) + return; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize exchange future: " + this, e); + + return; + } + ClusterNode curOldest = oldestNode.get(); if (!nodeId.equals(curOldest.id())) { @@ -1343,8 +1355,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT try { // Wait for initialization part of this future to complete. - initFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { + initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> f) { + try { + if (!f.get()) + return; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize exchange future: " + this, e); + + return; + } + if (isDone()) return; @@ -1571,4 +1593,4 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT "remaining", remaining(), "super", super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/afce2699/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheAndNodeStop.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheAndNodeStop.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheAndNodeStop.java new file mode 100644 index 0000000..a389e1f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheAndNodeStop.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; + +/** + * + */ +public class IgniteDynamicCacheAndNodeStop extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testCacheAndNodeStop() throws Exception { + final Ignite ignite = startGrid(0); + + for (int i = 0; i < 3; i++) { + log.info("Iteration: " + i); + + startGrid(1); + + final CacheConfiguration ccfg = new CacheConfiguration(); + + ignite.createCache(ccfg); + + final CyclicBarrier barrier = new CyclicBarrier(2); + + IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + barrier.await(); + + ignite.destroyCache(null); + + return null; + } + }); + + IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + barrier.await(); + + stopGrid(1); + + return null; + } + }); + + fut1.get(); + fut2.get(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/afce2699/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index e903115..9b9bbba 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -238,7 +238,8 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(IgniteAtomicCacheEntryProcessorNodeJoinTest.class)); suite.addTest(new TestSuite(GridCacheNearTxForceKeyTest.class)); suite.addTest(new TestSuite(CrossCacheTxRandomOperationsTest.class)); + suite.addTest(new TestSuite(IgniteDynamicCacheAndNodeStop.class)); return suite; } -} \ No newline at end of file +}