Repository: incubator-geode Updated Branches: refs/heads/develop a20a6ba09 -> 02d962c20
GEODE-1971: fix shutDownAll hang changed to an AtomicBoolean, remove cache sync on addPartitionedRegion and requiresNotificationFromPR Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/02d962c2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/02d962c2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/02d962c2 Branch: refs/heads/develop Commit: 02d962c20830e2d35347af3cc718eb3ccb3a40f4 Parents: a20a6ba Author: Darrel Schneider <dschnei...@pivotal.io> Authored: Tue Oct 11 15:04:43 2016 -0700 Committer: Darrel Schneider <dschnei...@pivotal.io> Committed: Mon Nov 7 13:43:39 2016 -0800 ---------------------------------------------------------------------- .../geode/internal/cache/GemFireCacheImpl.java | 159 +++++++++---------- .../cache/partitioned/ShutdownAllDUnitTest.java | 57 +++++++ 2 files changed, 135 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/02d962c2/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index ba4f1f4..e1b2007 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -213,6 +213,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -543,7 +544,8 @@ public class GemFireCacheImpl private final Object clientMetaDatServiceLock = new Object(); - private volatile boolean isShutDownAll = false; + private final AtomicBoolean isShutDownAll = new AtomicBoolean(); + private final CountDownLatch shutDownAllFinished = new CountDownLatch(1); private final ResourceAdvisor resourceAdvisor; private final JmxManagerAdvisor jmxAdvisor; @@ -664,7 +666,7 @@ public class GemFireCacheImpl sb.append("GemFireCache["); sb.append("id = " + System.identityHashCode(this)); sb.append("; isClosing = " + this.isClosing); - sb.append("; isShutDownAll = " + this.isShutDownAll); + sb.append("; isShutDownAll = " + isCacheAtShutdownAll()); sb.append("; created = " + this.creationDate); sb.append("; server = " + this.isServer); sb.append("; copyOnRead = " + this.copyOnRead); @@ -874,7 +876,7 @@ public class GemFireCacheImpl this.cqService = CqServiceProvider.create(this); - initReliableMessageQueueFactory(); + this.rmqFactory = new ReliableMessageQueueFactoryImpl(); // Create the CacheStatistics this.cachePerfStats = new CachePerfStats(system); @@ -1735,7 +1737,7 @@ public class GemFireCacheImpl } public boolean isCacheAtShutdownAll() { - return isShutDownAll; + return isShutDownAll.get(); } /** @@ -1751,18 +1753,7 @@ public class GemFireCacheImpl } } - public synchronized void shutDownAll() { - boolean testIGE = Boolean.getBoolean("TestInternalGemFireError"); - - if (testIGE) { - InternalGemFireError assErr = new InternalGemFireError( - LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString()); - throw assErr; - } - if (isCacheAtShutdownAll()) { - // it's already doing shutdown by another thread - return; - } + public void shutDownAll() { if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { try { CacheObserverHolder.getInstance().beforeShutdownAll(); @@ -1770,39 +1761,63 @@ public class GemFireCacheImpl LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false; } } - this.isShutDownAll = true; - - // bug 44031 requires multithread shutdownall should be grouped - // by root region. However, shutDownAllDuringRecovery.conf test revealed that - // we have to close colocated child regions first. - // Now check all the PR, if anyone has colocate-with attribute, sort all the - // PRs by colocation relationship and close them sequentially, otherwise still - // group them by root region. - TreeMap<String, Map<String, PartitionedRegion>> prTrees = getPRTrees(); - if (prTrees.size() > 1 && shutdownAllPoolSize != 1) { - ExecutorService es = getShutdownAllExecutorService(prTrees.size()); - for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) { - es.execute(new Runnable() { - public void run() { - ConnectionTable.threadWantsSharedResources(); - shutdownSubTreeGracefully(prSubMap); - } - }); - } // for each root - es.shutdown(); + if (!this.isShutDownAll.compareAndSet(false, true)) { + // it's already doing shutdown by another thread try { - es.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); + this.shutDownAllFinished.await(); } catch (InterruptedException e) { - logger.debug("Shutdown all interrupted while waiting for PRs to be shutdown gracefully."); + logger.debug( + "Shutdown all interrupted while waiting for another thread to do the shutDownAll"); + Thread.currentThread().interrupt(); } + return; + } + synchronized (GemFireCacheImpl.class) { + try { + boolean testIGE = Boolean.getBoolean("TestInternalGemFireError"); - } else { - for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) { - shutdownSubTreeGracefully(prSubMap); + if (testIGE) { + InternalGemFireError assErr = new InternalGemFireError( + LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString()); + throw assErr; + } + + // bug 44031 requires multithread shutdownall should be grouped + // by root region. However, shutDownAllDuringRecovery.conf test revealed that + // we have to close colocated child regions first. + // Now check all the PR, if anyone has colocate-with attribute, sort all the + // PRs by colocation relationship and close them sequentially, otherwise still + // group them by root region. + TreeMap<String, Map<String, PartitionedRegion>> prTrees = getPRTrees(); + if (prTrees.size() > 1 && shutdownAllPoolSize != 1) { + ExecutorService es = getShutdownAllExecutorService(prTrees.size()); + for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) { + es.execute(new Runnable() { + public void run() { + ConnectionTable.threadWantsSharedResources(); + shutdownSubTreeGracefully(prSubMap); + } + }); + } // for each root + es.shutdown(); + try { + es.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger + .debug("Shutdown all interrupted while waiting for PRs to be shutdown gracefully."); + } + + } else { + for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) { + shutdownSubTreeGracefully(prSubMap); + } + } + + close("Shut down all members", null, false, true); + } finally { + this.shutDownAllFinished.countDown(); } } - - close("Shut down all members", null, false, true); } private ExecutorService getShutdownAllExecutorService(int size) { @@ -4180,17 +4195,15 @@ public class GemFireCacheImpl * regions when this cache requires, or does not require notification of all region/entry events. */ public void addPartitionedRegion(PartitionedRegion r) { - synchronized (GemFireCacheImpl.class) { - synchronized (this.partitionedRegions) { - if (r.isDestroyed()) { - if (logger.isDebugEnabled()) { - logger.debug("GemFireCache#addPartitionedRegion did not add destroyed {}", r); - } - return; - } - if (this.partitionedRegions.add(r)) { - getCachePerfStats().incPartitionedRegions(1); + synchronized (this.partitionedRegions) { + if (r.isDestroyed()) { + if (logger.isDebugEnabled()) { + logger.debug("GemFireCache#addPartitionedRegion did not add destroyed {}", r); } + return; + } + if (this.partitionedRegions.add(r)) { + getCachePerfStats().incPartitionedRegions(1); } } } @@ -4288,22 +4301,20 @@ public class GemFireCacheImpl * @return true if the region should deliver all of its events to this cache */ protected boolean requiresNotificationFromPR(PartitionedRegion r) { - synchronized (GemFireCacheImpl.class) { - boolean hasSerialSenders = hasSerialSenders(r); - boolean result = hasSerialSenders; - if (!result) { - Iterator allCacheServersIterator = allCacheServers.iterator(); - while (allCacheServersIterator.hasNext()) { - CacheServerImpl server = (CacheServerImpl) allCacheServersIterator.next(); - if (!server.getNotifyBySubscription()) { - result = true; - break; - } + boolean hasSerialSenders = hasSerialSenders(r); + boolean result = hasSerialSenders; + if (!result) { + Iterator allCacheServersIterator = allCacheServers.iterator(); + while (allCacheServersIterator.hasNext()) { + CacheServerImpl server = (CacheServerImpl) allCacheServersIterator.next(); + if (!server.getNotifyBySubscription()) { + result = true; + break; } - } - return result; + } + return result; } private boolean hasSerialSenders(PartitionedRegion r) { @@ -4483,25 +4494,11 @@ public class GemFireCacheImpl /** * This cache's reliable message queue factory. Should always have an instance of it. */ - private ReliableMessageQueueFactory rmqFactory; + private final ReliableMessageQueueFactory rmqFactory; private List<File> backupFiles = Collections.emptyList(); /** - * Initializes the reliable message queue. Needs to be called at cache creation - * - * @throws IllegalStateException if the factory is in use - */ - private void initReliableMessageQueueFactory() { - synchronized (GemFireCacheImpl.class) { - if (this.rmqFactory != null) { - this.rmqFactory.close(false); - } - this.rmqFactory = new ReliableMessageQueueFactoryImpl(); - } - } - - /** * Returns this cache's ReliableMessageQueueFactory. * * @since GemFire 5.0 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/02d962c2/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ShutdownAllDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ShutdownAllDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ShutdownAllDUnitTest.java index 1bb06f1..52d1327 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ShutdownAllDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ShutdownAllDUnitTest.java @@ -21,8 +21,12 @@ import java.io.IOException; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import com.jayway.awaitility.Awaitility; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -44,7 +48,9 @@ import org.apache.geode.cache.Region; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.cache.util.CacheListenerAdapter; import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.internal.cache.CacheLifecycleListener; import org.apache.geode.internal.cache.DiskRegion; +import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.control.InternalResourceManager; import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver; @@ -187,6 +193,57 @@ public class ShutdownAllDUnitTest extends JUnit4CacheTestCase { vm1.invoke(removeExceptionTag1(expectedExceptions)); } + private static final AtomicBoolean calledCreateCache = new AtomicBoolean(); + private static final AtomicBoolean calledCloseCache = new AtomicBoolean(); + private static CacheLifecycleListener cll; + + @Test + public void testShutdownAllInterruptsCacheCreation() + throws ExecutionException, InterruptedException, TimeoutException { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm2 = host.getVM(2); + closeAllCache(); + // in vm0 create the cache in a way that hangs until + // it sees that a shutDownAll is in progress + AsyncInvocation<?> asyncCreate = vm0.invokeAsync(() -> { + cll = new CacheLifecycleListener() { + @Override + public void cacheCreated(GemFireCacheImpl cache) { + calledCreateCache.set(true); + Awaitility.await().atMost(90, TimeUnit.SECONDS).until(() -> cache.isCacheAtShutdownAll()); + } + + @Override + public void cacheClosed(GemFireCacheImpl cache) { + calledCloseCache.set(true); + } + }; + GemFireCacheImpl.addCacheLifecycleListener(cll); + getCache(); + }); + try { + boolean vm0CalledCreateCache = vm0.invoke(() -> { + Awaitility.await().atMost(90, TimeUnit.SECONDS).until(() -> calledCreateCache.get()); + return calledCreateCache.get(); + }); + assertTrue(vm0CalledCreateCache); + shutDownAllMembers(vm2, 1); + asyncCreate.get(60, TimeUnit.SECONDS); + boolean vm0CalledCloseCache = vm0.invoke(() -> { + Awaitility.await().atMost(90, TimeUnit.SECONDS).until(() -> calledCloseCache.get()); + return calledCloseCache.get(); + }); + assertTrue(vm0CalledCloseCache); + } finally { + vm0.invoke(() -> { + calledCreateCache.set(false); + calledCloseCache.set(false); + GemFireCacheImpl.removeCacheLifecycleListener(cll); + }); + } + } + @Test public void testShutdownAllOneServerAndRecover() throws Throwable { Host host = Host.getHost(0);