Repository: incubator-geode
Updated Branches:
  refs/heads/develop a20a6ba09 -> 02d962c20


GEODE-1971: fix shutDownAll hang

changed to an AtomicBoolean, remove cache sync on addPartitionedRegion and 
requiresNotificationFromPR


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/02d962c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/02d962c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/02d962c2

Branch: refs/heads/develop
Commit: 02d962c20830e2d35347af3cc718eb3ccb3a40f4
Parents: a20a6ba
Author: Darrel Schneider <dschnei...@pivotal.io>
Authored: Tue Oct 11 15:04:43 2016 -0700
Committer: Darrel Schneider <dschnei...@pivotal.io>
Committed: Mon Nov 7 13:43:39 2016 -0800

----------------------------------------------------------------------
 .../geode/internal/cache/GemFireCacheImpl.java  | 159 +++++++++----------
 .../cache/partitioned/ShutdownAllDUnitTest.java |  57 +++++++
 2 files changed, 135 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/02d962c2/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index ba4f1f4..e1b2007 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -213,6 +213,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -543,7 +544,8 @@ public class GemFireCacheImpl
 
   private final Object clientMetaDatServiceLock = new Object();
 
-  private volatile boolean isShutDownAll = false;
+  private final AtomicBoolean isShutDownAll = new AtomicBoolean();
+  private final CountDownLatch shutDownAllFinished = new CountDownLatch(1);
 
   private final ResourceAdvisor resourceAdvisor;
   private final JmxManagerAdvisor jmxAdvisor;
@@ -664,7 +666,7 @@ public class GemFireCacheImpl
     sb.append("GemFireCache[");
     sb.append("id = " + System.identityHashCode(this));
     sb.append("; isClosing = " + this.isClosing);
-    sb.append("; isShutDownAll = " + this.isShutDownAll);
+    sb.append("; isShutDownAll = " + isCacheAtShutdownAll());
     sb.append("; created = " + this.creationDate);
     sb.append("; server = " + this.isServer);
     sb.append("; copyOnRead = " + this.copyOnRead);
@@ -874,7 +876,7 @@ public class GemFireCacheImpl
 
       this.cqService = CqServiceProvider.create(this);
 
-      initReliableMessageQueueFactory();
+      this.rmqFactory = new ReliableMessageQueueFactoryImpl();
 
       // Create the CacheStatistics
       this.cachePerfStats = new CachePerfStats(system);
@@ -1735,7 +1737,7 @@ public class GemFireCacheImpl
   }
 
   public boolean isCacheAtShutdownAll() {
-    return isShutDownAll;
+    return isShutDownAll.get();
   }
 
   /**
@@ -1751,18 +1753,7 @@ public class GemFireCacheImpl
     }
   }
 
-  public synchronized void shutDownAll() {
-    boolean testIGE = Boolean.getBoolean("TestInternalGemFireError");
-
-    if (testIGE) {
-      InternalGemFireError assErr = new InternalGemFireError(
-          
LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString());
-      throw assErr;
-    }
-    if (isCacheAtShutdownAll()) {
-      // it's already doing shutdown by another thread
-      return;
-    }
+  public void shutDownAll() {
     if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
       try {
         CacheObserverHolder.getInstance().beforeShutdownAll();
@@ -1770,39 +1761,63 @@ public class GemFireCacheImpl
         LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
       }
     }
-    this.isShutDownAll = true;
-
-    // bug 44031 requires multithread shutdownall should be grouped
-    // by root region. However, shutDownAllDuringRecovery.conf test revealed 
that
-    // we have to close colocated child regions first.
-    // Now check all the PR, if anyone has colocate-with attribute, sort all 
the
-    // PRs by colocation relationship and close them sequentially, otherwise 
still
-    // group them by root region.
-    TreeMap<String, Map<String, PartitionedRegion>> prTrees = getPRTrees();
-    if (prTrees.size() > 1 && shutdownAllPoolSize != 1) {
-      ExecutorService es = getShutdownAllExecutorService(prTrees.size());
-      for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) {
-        es.execute(new Runnable() {
-          public void run() {
-            ConnectionTable.threadWantsSharedResources();
-            shutdownSubTreeGracefully(prSubMap);
-          }
-        });
-      } // for each root
-      es.shutdown();
+    if (!this.isShutDownAll.compareAndSet(false, true)) {
+      // it's already doing shutdown by another thread
       try {
-        es.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
+        this.shutDownAllFinished.await();
       } catch (InterruptedException e) {
-        logger.debug("Shutdown all interrupted while waiting for PRs to be 
shutdown gracefully.");
+        logger.debug(
+            "Shutdown all interrupted while waiting for another thread to do 
the shutDownAll");
+        Thread.currentThread().interrupt();
       }
+      return;
+    }
+    synchronized (GemFireCacheImpl.class) {
+      try {
+        boolean testIGE = Boolean.getBoolean("TestInternalGemFireError");
 
-    } else {
-      for (final Map<String, PartitionedRegion> prSubMap : prTrees.values()) {
-        shutdownSubTreeGracefully(prSubMap);
+        if (testIGE) {
+          InternalGemFireError assErr = new InternalGemFireError(
+              
LocalizedStrings.GemFireCache_UNEXPECTED_EXCEPTION.toLocalizedString());
+          throw assErr;
+        }
+
+        // bug 44031 requires multithread shutdownall should be grouped
+        // by root region. However, shutDownAllDuringRecovery.conf test 
revealed that
+        // we have to close colocated child regions first.
+        // Now check all the PR, if anyone has colocate-with attribute, sort 
all the
+        // PRs by colocation relationship and close them sequentially, 
otherwise still
+        // group them by root region.
+        TreeMap<String, Map<String, PartitionedRegion>> prTrees = getPRTrees();
+        if (prTrees.size() > 1 && shutdownAllPoolSize != 1) {
+          ExecutorService es = getShutdownAllExecutorService(prTrees.size());
+          for (final Map<String, PartitionedRegion> prSubMap : 
prTrees.values()) {
+            es.execute(new Runnable() {
+              public void run() {
+                ConnectionTable.threadWantsSharedResources();
+                shutdownSubTreeGracefully(prSubMap);
+              }
+            });
+          } // for each root
+          es.shutdown();
+          try {
+            es.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
+          } catch (InterruptedException e) {
+            logger
+                .debug("Shutdown all interrupted while waiting for PRs to be 
shutdown gracefully.");
+          }
+
+        } else {
+          for (final Map<String, PartitionedRegion> prSubMap : 
prTrees.values()) {
+            shutdownSubTreeGracefully(prSubMap);
+          }
+        }
+
+        close("Shut down all members", null, false, true);
+      } finally {
+        this.shutDownAllFinished.countDown();
       }
     }
-
-    close("Shut down all members", null, false, true);
   }
 
   private ExecutorService getShutdownAllExecutorService(int size) {
@@ -4180,17 +4195,15 @@ public class GemFireCacheImpl
    * regions when this cache requires, or does not require notification of all 
region/entry events.
    */
   public void addPartitionedRegion(PartitionedRegion r) {
-    synchronized (GemFireCacheImpl.class) {
-      synchronized (this.partitionedRegions) {
-        if (r.isDestroyed()) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("GemFireCache#addPartitionedRegion did not add 
destroyed {}", r);
-          }
-          return;
-        }
-        if (this.partitionedRegions.add(r)) {
-          getCachePerfStats().incPartitionedRegions(1);
+    synchronized (this.partitionedRegions) {
+      if (r.isDestroyed()) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("GemFireCache#addPartitionedRegion did not add 
destroyed {}", r);
         }
+        return;
+      }
+      if (this.partitionedRegions.add(r)) {
+        getCachePerfStats().incPartitionedRegions(1);
       }
     }
   }
@@ -4288,22 +4301,20 @@ public class GemFireCacheImpl
    * @return true if the region should deliver all of its events to this cache
    */
   protected boolean requiresNotificationFromPR(PartitionedRegion r) {
-    synchronized (GemFireCacheImpl.class) {
-      boolean hasSerialSenders = hasSerialSenders(r);
-      boolean result = hasSerialSenders;
-      if (!result) {
-        Iterator allCacheServersIterator = allCacheServers.iterator();
-        while (allCacheServersIterator.hasNext()) {
-          CacheServerImpl server = (CacheServerImpl) 
allCacheServersIterator.next();
-          if (!server.getNotifyBySubscription()) {
-            result = true;
-            break;
-          }
+    boolean hasSerialSenders = hasSerialSenders(r);
+    boolean result = hasSerialSenders;
+    if (!result) {
+      Iterator allCacheServersIterator = allCacheServers.iterator();
+      while (allCacheServersIterator.hasNext()) {
+        CacheServerImpl server = (CacheServerImpl) 
allCacheServersIterator.next();
+        if (!server.getNotifyBySubscription()) {
+          result = true;
+          break;
         }
-
       }
-      return result;
+
     }
+    return result;
   }
 
   private boolean hasSerialSenders(PartitionedRegion r) {
@@ -4483,25 +4494,11 @@ public class GemFireCacheImpl
   /**
    * This cache's reliable message queue factory. Should always have an 
instance of it.
    */
-  private ReliableMessageQueueFactory rmqFactory;
+  private final ReliableMessageQueueFactory rmqFactory;
 
   private List<File> backupFiles = Collections.emptyList();
 
   /**
-   * Initializes the reliable message queue. Needs to be called at cache 
creation
-   *
-   * @throws IllegalStateException if the factory is in use
-   */
-  private void initReliableMessageQueueFactory() {
-    synchronized (GemFireCacheImpl.class) {
-      if (this.rmqFactory != null) {
-        this.rmqFactory.close(false);
-      }
-      this.rmqFactory = new ReliableMessageQueueFactoryImpl();
-    }
-  }
-
-  /**
    * Returns this cache's ReliableMessageQueueFactory.
    *
    * @since GemFire 5.0

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/02d962c2/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ShutdownAllDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ShutdownAllDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ShutdownAllDUnitTest.java
index 1bb06f1..52d1327 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ShutdownAllDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ShutdownAllDUnitTest.java
@@ -21,8 +21,12 @@ import java.io.IOException;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.jayway.awaitility.Awaitility;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -44,7 +48,9 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.CacheLifecycleListener;
 import org.apache.geode.internal.cache.DiskRegion;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
 import 
org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver;
@@ -187,6 +193,57 @@ public class ShutdownAllDUnitTest extends 
JUnit4CacheTestCase {
     vm1.invoke(removeExceptionTag1(expectedExceptions));
   }
 
+  private static final AtomicBoolean calledCreateCache = new AtomicBoolean();
+  private static final AtomicBoolean calledCloseCache = new AtomicBoolean();
+  private static CacheLifecycleListener cll;
+
+  @Test
+  public void testShutdownAllInterruptsCacheCreation()
+      throws ExecutionException, InterruptedException, TimeoutException {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm2 = host.getVM(2);
+    closeAllCache();
+    // in vm0 create the cache in a way that hangs until
+    // it sees that a shutDownAll is in progress
+    AsyncInvocation<?> asyncCreate = vm0.invokeAsync(() -> {
+      cll = new CacheLifecycleListener() {
+        @Override
+        public void cacheCreated(GemFireCacheImpl cache) {
+          calledCreateCache.set(true);
+          Awaitility.await().atMost(90, TimeUnit.SECONDS).until(() -> 
cache.isCacheAtShutdownAll());
+        }
+
+        @Override
+        public void cacheClosed(GemFireCacheImpl cache) {
+          calledCloseCache.set(true);
+        }
+      };
+      GemFireCacheImpl.addCacheLifecycleListener(cll);
+      getCache();
+    });
+    try {
+      boolean vm0CalledCreateCache = vm0.invoke(() -> {
+        Awaitility.await().atMost(90, TimeUnit.SECONDS).until(() -> 
calledCreateCache.get());
+        return calledCreateCache.get();
+      });
+      assertTrue(vm0CalledCreateCache);
+      shutDownAllMembers(vm2, 1);
+      asyncCreate.get(60, TimeUnit.SECONDS);
+      boolean vm0CalledCloseCache = vm0.invoke(() -> {
+        Awaitility.await().atMost(90, TimeUnit.SECONDS).until(() -> 
calledCloseCache.get());
+        return calledCloseCache.get();
+      });
+      assertTrue(vm0CalledCloseCache);
+    } finally {
+      vm0.invoke(() -> {
+        calledCreateCache.set(false);
+        calledCloseCache.set(false);
+        GemFireCacheImpl.removeCacheLifecycleListener(cll);
+      });
+    }
+  }
+
   @Test
   public void testShutdownAllOneServerAndRecover() throws Throwable {
     Host host = Host.getHost(0);

Reply via email to