GEODE-2918 Close cache when ConflictingPersistentDataException is thrown.

During disk recovery the ConflictingPersistentDataException is not handled
properly; it should have logged an error and closed the cache.
When it is handled incorrectly, the cache is in inconsistent state; causing
other operations to fail in unexpected ways.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/3cbb6fcd
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/3cbb6fcd
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/3cbb6fcd

Branch: refs/heads/feature/GEODE-2632-17
Commit: 3cbb6fcd08272dc6a5219e9092b34ae29eed79f3
Parents: adbdf50
Author: Anil <aging...@pivotal.io>
Authored: Mon May 22 10:42:08 2017 -0700
Committer: Anil <aging...@pivotal.io>
Committed: Tue May 23 10:18:23 2017 -0700

----------------------------------------------------------------------
 .../apache/geode/cache/DiskAccessException.java |   9 +
 .../ConflictingPersistentDataException.java     |   6 +-
 .../geode/internal/cache/LocalRegion.java       |   9 +-
 .../geode/internal/cache/ProxyBucketRegion.java |   5 +-
 .../persistence/PersistenceAdvisorImpl.java     |  16 +-
 .../PersistentPartitionedRegionDUnitTest.java   | 169 ++++++++++++++++---
 .../PersistentPartitionedRegionTestBase.java    |  82 ++++++++-
 7 files changed, 262 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/3cbb6fcd/geode-core/src/main/java/org/apache/geode/cache/DiskAccessException.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/DiskAccessException.java 
b/geode-core/src/main/java/org/apache/geode/cache/DiskAccessException.java
index fb640cd..51018a5 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/DiskAccessException.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/DiskAccessException.java
@@ -110,6 +110,15 @@ public class DiskAccessException extends 
CacheRuntimeException {
   }
 
   /**
+   * Constructs a new <code>DiskAccessException</code> with a message string.
+   *
+   * @param msg the message string
+   */
+  public DiskAccessException(String msg) {
+    super(msg);
+  }
+
+  /**
    * Returns true if this exception originated from a remote node.
    */
   public boolean isRemote() {

http://git-wip-us.apache.org/repos/asf/geode/blob/3cbb6fcd/geode-core/src/main/java/org/apache/geode/cache/persistence/ConflictingPersistentDataException.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/persistence/ConflictingPersistentDataException.java
 
b/geode-core/src/main/java/org/apache/geode/cache/persistence/ConflictingPersistentDataException.java
index 9bf7234..3ea7c3e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/persistence/ConflictingPersistentDataException.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/persistence/ConflictingPersistentDataException.java
@@ -15,7 +15,7 @@
 package org.apache.geode.cache.persistence;
 
 import org.apache.geode.GemFireException;
-import org.apache.geode.admin.AdminDistributedSystem;
+import org.apache.geode.cache.DiskAccessException;
 
 /**
  * Thrown when a member with persistence is recovering, and it discovers that 
the data it has on
@@ -28,7 +28,7 @@ import org.apache.geode.admin.AdminDistributedSystem;
  * 
  * @since GemFire 6.5
  */
-public class ConflictingPersistentDataException extends GemFireException {
+public class ConflictingPersistentDataException extends DiskAccessException {
 
   private static final long serialVersionUID = -2629287782021455875L;
 
@@ -48,6 +48,4 @@ public class ConflictingPersistentDataException extends 
GemFireException {
     super(cause);
   }
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/3cbb6fcd/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 8e7ec68..4446d48 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -81,6 +81,7 @@ import org.apache.geode.cache.control.ResourceManager;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.persistence.ConflictingPersistentDataException;
 import org.apache.geode.cache.query.FunctionDomainException;
 import org.apache.geode.cache.query.Index;
 import org.apache.geode.cache.query.IndexMaintenanceException;
@@ -6595,8 +6596,12 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
    * @see InitialImageOperation#processChunk
    */
   public void handleDiskAccessException(DiskAccessException dae, boolean 
duringInitialization) {
-    // these will rethrow the originating exception
-    if (duringInitialization || causedByRDE(dae)) {
+
+    if (duringInitialization && !(dae instanceof 
ConflictingPersistentDataException)) {
+      return;
+    }
+
+    if (causedByRDE(dae)) {
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/3cbb6fcd/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java
index ab90a05..7e60e6a 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java
@@ -23,9 +23,9 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.InternalGemFireError;
+import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
@@ -472,6 +472,9 @@ public class ProxyBucketRegion implements Bucket {
       }
 
       persistenceAdvisor.initializeMembershipView();
+    } catch (DiskAccessException dae) {
+      this.partitionedRegion.handleDiskAccessException(dae);
+      throw dae;
     } catch (RuntimeException e) {
       exception = e;
       throw e;

http://git-wip-us.apache.org/repos/asf/geode/blob/3cbb6fcd/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java
index fc95f0b..f4a0da9 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java
@@ -66,6 +66,7 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
   private volatile Set<PersistentMemberID> allMembersWaitingFor;
   private volatile Set<PersistentMemberID> offlineMembersWaitingFor;
   protected final Object lock;
+  private static PersistenceAdvisorObserver observer = null;
 
   private static final int PERSISTENT_VIEW_RETRY =
       Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + 
"PERSISTENT_VIEW_RETRY", 5);
@@ -716,6 +717,11 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
       throws ReplyException {
     PersistentStateQueryResults remoteStates = getMyStateOnMembers(replicates);
     boolean equal = false;
+
+    if (observer != null) {
+      observer.observe(regionPath);
+    }
+
     for (Map.Entry<InternalDistributedMember, PersistentMemberState> entry : 
remoteStates.stateOnPeers
         .entrySet()) {
       InternalDistributedMember member = entry.getKey();
@@ -730,12 +736,12 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
                 .toLocalizedString(myId));
       }
 
-
       if (myId != null && stateOnPeer == null) {
         String message = 
LocalizedStrings.CreatePersistentRegionProcessor_SPLIT_DISTRIBUTED_SYSTEM
             .toLocalizedString(regionPath, member, remoteId, myId);
         throw new ConflictingPersistentDataException(message);
       }
+
       if (myId != null && stateOnPeer == PersistentMemberState.EQUAL) {
         equal = true;
       }
@@ -1338,4 +1344,12 @@ public class PersistenceAdvisorImpl implements 
PersistenceAdvisor {
   public boolean isOnline() {
     return online;
   }
+
+  public static interface PersistenceAdvisorObserver {
+    default public void observe(String regionPath) {}
+  }
+
+  public static void setPersistenceAdvisorObserver(PersistenceAdvisorObserver 
o) {
+    observer = o;
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/3cbb6fcd/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionDUnitTest.java
index 3ee5cf0..edd86e7 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionDUnitTest.java
@@ -19,7 +19,6 @@ import static org.awaitility.Awaitility.*;
 import static java.util.concurrent.TimeUnit.*;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.*;
-import static org.junit.Assert.fail;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -32,11 +31,12 @@ import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-
 import org.apache.geode.DataSerializable;
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.Cache;
@@ -49,6 +49,7 @@ import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.ExpirationAction;
 import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.PartitionedRegionStorageException;
 import org.apache.geode.cache.Region;
@@ -75,6 +76,8 @@ import 
org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.internal.AvailablePort;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionTestHelper;
 import 
org.apache.geode.internal.cache.InitialImageOperation.RequestImageMessage;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
 import 
org.apache.geode.internal.cache.partitioned.ManageBucketMessage.ManageBucketReplyMessage;
@@ -93,6 +96,7 @@ import org.apache.geode.test.dunit.Wait;
 import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.categories.FlakyTest;
+import org.awaitility.Awaitility;
 
 /**
  * Tests the basic use cases for PR persistence.
@@ -2007,50 +2011,59 @@ public class PersistentPartitionedRegionDUnitTest 
extends PersistentPartitionedR
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);
     VM vm1 = host.getVM(1);
+
     createPR(vm0, 0);
     // create some buckets
     createData(vm0, 0, 2, "a");
     closePR(vm0);
+
     createPR(vm1, 0);
     // create an overlapping bucket
-
-
-    // TODO - this test hangs if vm1 has some buckets that vm0
-    // does not have. The problem is that when vm0 starts up and gets a 
conflict
-    // on some buckets, it updates it's view for other buckets.
-    // createData(vm1, 1, 3, "a");
     createData(vm1, 1, 2, "a");
 
-    // this should throw a conflicting data exception.
-    IgnoredException expect =
-        
IgnoredException.addIgnoredException("ConflictingPersistentDataException", vm0);
+    IgnoredException[] expectVm0 =
+        
{IgnoredException.addIgnoredException("ConflictingPersistentDataException", 
vm0),
+            IgnoredException.addIgnoredException("CacheClosedException", vm0)};
+
     try {
+      // This results in ConflictingPersistentDataException. As part of
+      // GEODE-2918, the cache is closed, when 
ConflictingPersistentDataException
+      // is encountered.
       createPR(vm0, 0);
       fail("should have seen a conflicting data exception");
-    } catch (Exception e) {
-      if (!(e.getCause() instanceof ConflictingPersistentDataException)) {
-        throw e;
+    } catch (Exception ex) {
+      boolean expectedException = false;
+      if (ex.getCause() instanceof CacheClosedException) {
+        CacheClosedException cce = (CacheClosedException) ex.getCause();
+        if (cce.getCause() instanceof ConflictingPersistentDataException) {
+          expectedException = true;
+        }
+      }
+      if (!expectedException) {
+        throw ex;
       }
     } finally {
-      expect.remove();
+      for (IgnoredException ie : expectVm0) {
+        ie.remove();
+      }
     }
 
-    // This will hang, if this test fails.
-    // TODO - DAN - I'm not even sure what this means here?
-    // It seems like if anything, vm1 should not have updated it's persistent
-    // view from vm0 because vm0 was in conflict!
-    // In fact, this is a bit of a problem, because now vm1 is dependent
-    // on vm vm0.
-    expect = IgnoredException.addIgnoredException("PartitionOfflineException", 
vm1);
+    IgnoredException expectVm1 =
+        IgnoredException.addIgnoredException("PartitionOfflineException", vm1);
     try {
       createData(vm1, 0, 1, "a");
-      fail("Should have seen a PartitionOfflineException for bucket 0");
     } catch (Exception e) {
+      // This could happen due to a race in bucket-region creation.
+      // When vm0 was started, it could so happen that it successfully
+      // created bucket0 before it encountered 
ConflictingPersistentDataException
+      // with bucket1. This is a problem, the vm1 is dependent on vm0 for
+      // bucket0. We need to fix that. The workaround is to stop vm1 and then
+      // restart.
       if (!(e.getCause() instanceof PartitionOfflineException)) {
         throw e;
       }
     } finally {
-      expect.remove();
+      expectVm1.remove();
     }
 
     closePR(vm1);
@@ -2062,6 +2075,102 @@ public class PersistentPartitionedRegionDUnitTest 
extends PersistentPartitionedR
     checkData(vm0, 2, 3, null);
   }
 
+  @Test
+  public void testDiskConflictWithRedundancy() throws Exception {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+
+    createPR(vm0, 1);
+    // create some buckets
+    createData(vm0, 0, 2, "a");
+    closePR(vm0);
+
+    createPR(vm1, 1);
+    // create an overlapping bucket
+    createData(vm1, 1, 2, "a");
+
+    IgnoredException[] expectVm0 =
+        
{IgnoredException.addIgnoredException("ConflictingPersistentDataException", 
vm0),
+            IgnoredException.addIgnoredException("CacheClosedException", vm0)};
+
+    try {
+      createPR(vm0, 1);
+      fail("should have seen a conflicting data exception");
+    } catch (Exception ex) {
+      boolean expectedException = false;
+      if (ex.getCause() instanceof CacheClosedException) {
+        CacheClosedException cce = (CacheClosedException) ex.getCause();
+        if (cce.getCause() instanceof ConflictingPersistentDataException) {
+          expectedException = true;
+        }
+      }
+      if (!expectedException) {
+        throw ex;
+      }
+    } finally {
+      for (IgnoredException ie : expectVm0) {
+        ie.remove();
+      }
+    }
+
+    closePR(vm1);
+  }
+
+  @Test
+  public void testDiskConflictWithCoLocation() throws Exception {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+
+    // createPR(vm0, 1);
+    createCoLocatedPR(vm0, 1, false);
+
+    // create some buckets
+    createData(vm0, 0, 2, "a");
+    createData(vm0, 0, 2, "a", PR_CHILD_REGION_NAME);
+    closePR(vm0, PR_CHILD_REGION_NAME);
+    closePR(vm0);
+
+    // createPR(vm1, 1);
+    createCoLocatedPR(vm1, 1, false);
+    // create an overlapping bucket
+    createData(vm1, 2, 4, "a");
+    createData(vm1, 2, 4, "a", PR_CHILD_REGION_NAME);
+
+    IgnoredException[] expectVm0 =
+        
{IgnoredException.addIgnoredException("ConflictingPersistentDataException", 
vm0),
+            IgnoredException.addIgnoredException("CacheClosedException", vm0)};
+
+    try {
+      createCoLocatedPR(vm0, 1, true);
+      // Cache should have closed due to ConflictingPersistentDataException
+      vm0.invoke(() -> {
+        Awaitility.await().atMost(MAX_WAIT, TimeUnit.MILLISECONDS)
+            .until(() -> basicGetCache().isClosed());
+        basicGetCache().getCancelCriterion();
+      });
+    } catch (Exception ex) {
+      boolean expectedException = false;
+      if (ex.getCause() instanceof CacheClosedException) {
+        CacheClosedException cce = (CacheClosedException) ex.getCause();
+        if (cce.getCause() instanceof ConflictingPersistentDataException) {
+          expectedException = true;
+        }
+      }
+      if (!expectedException) {
+        throw ex;
+      }
+    } finally {
+      for (IgnoredException ie : expectVm0) {
+        ie.remove();
+      }
+    }
+
+    closePR(vm1, PR_CHILD_REGION_NAME);
+    closePR(vm1);
+  }
+
   /**
    * Test to make sure that primaries are rebalanced after recovering from 
disk.
    */
@@ -2238,7 +2347,17 @@ public class PersistentPartitionedRegionDUnitTest 
extends PersistentPartitionedR
     vm3.invoke(createPersistentReplicate);
   }
 
-  private static class RecoveryObserver extends 
InternalResourceManager.ResourceObserverAdapter {
+  private void createChildPR(VM vm) {
+    vm.invoke(() -> {
+      PartitionAttributes PRatts =
+          new 
PartitionAttributesFactory().setColocatedWith(PR_REGION_NAME).create();
+      PartitionedRegion child =
+          (PartitionedRegion) 
PartitionedRegionTestHelper.createPartionedRegion("CHILD", PRatts);
+    });
+  }
+
+  private static final class RecoveryObserver
+      extends InternalResourceManager.ResourceObserverAdapter {
     final CountDownLatch recoveryDone = new CountDownLatch(1);
 
     @Override

http://git-wip-us.apache.org/repos/asf/geode/blob/3cbb6fcd/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionTestBase.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionTestBase.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionTestBase.java
index ccdd38d..ec0ec4c 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionTestBase.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionTestBase.java
@@ -34,6 +34,7 @@ import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.control.RebalanceFactory;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.cache.partition.PartitionRegionInfo;
+import org.apache.geode.cache.persistence.ConflictingPersistentDataException;
 import org.apache.geode.cache.persistence.PersistentID;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.DiskRegion;
@@ -43,6 +44,7 @@ import 
org.apache.geode.internal.cache.PartitionedRegionDataStore;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
 import 
org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver;
 import org.apache.geode.internal.cache.persistence.PersistenceAdvisor;
+import org.apache.geode.internal.cache.persistence.PersistenceAdvisorImpl;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.dunit.AsyncInvocation;
@@ -54,6 +56,7 @@ import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.Wait;
 import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.awaitility.Awaitility;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -70,6 +73,7 @@ import java.util.concurrent.TimeUnit;
 public abstract class PersistentPartitionedRegionTestBase extends 
JUnit4CacheTestCase {
 
   public static String PR_REGION_NAME = "region";
+  public static String PR_CHILD_REGION_NAME = "childRegion";
   // This must be bigger than the dunit ack-wait-threshold for the revoke
   // tests. The command line is setting the ack-wait-threshold to be
   // 60 seconds.
@@ -205,7 +209,7 @@ public abstract class PersistentPartitionedRegionTestBase 
extends JUnit4CacheTes
 
       public void run() {
         Cache cache = getCache();
-        LogWriterUtils.getLogWriter().info("creating data in " + regionName);
+        cache.getLogger().info("creating data in " + regionName);
         Region region = cache.getRegion(regionName);
 
         for (int i = startKey; i < endKey; i++) {
@@ -380,6 +384,82 @@ public abstract class PersistentPartitionedRegionTestBase 
extends JUnit4CacheTes
     return createPR;
   }
 
+  protected void createCoLocatedPR(VM vm, int setRedundantCopies,
+      boolean setPersistenceAdvisorObserver) {
+    vm.invoke(() -> {
+      Cache cache = getCache();
+
+      // Wait for both nested PRs to be created
+      final CountDownLatch recoveryDone = new CountDownLatch(2);
+      ResourceObserver observer = new 
InternalResourceManager.ResourceObserverAdapter() {
+        @Override
+        public void recoveryFinished(Region region) {
+          recoveryDone.countDown();
+        }
+      };
+      InternalResourceManager.setResourceObserver(observer);
+
+      // Wait for parent and child region to be created.
+      // And throw exception while region is getting initialized.
+      final CountDownLatch childRegionCreated = new CountDownLatch(1);
+      if (setPersistenceAdvisorObserver) {
+        PersistenceAdvisorImpl
+            .setPersistenceAdvisorObserver(new 
PersistenceAdvisorImpl.PersistenceAdvisorObserver() {
+              public void observe(String regionPath) {
+                if (regionPath.contains(PR_CHILD_REGION_NAME)) {
+                  try {
+                    childRegionCreated.await(MAX_WAIT, TimeUnit.MILLISECONDS);
+                  } catch (Exception e) {
+                    Assert.fail("Exception", e);
+                  }
+                  throw new ConflictingPersistentDataException(
+                      "Testing Cache Close with 
ConflictingPersistentDataException for region."
+                          + regionPath);
+                }
+              }
+            });
+      }
+
+      // Create region.
+      try {
+        DiskStore ds = cache.findDiskStore("disk");
+        if (ds == null) {
+          ds = 
cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk");
+        }
+
+        // Parent Region
+        PartitionAttributesFactory paf =
+            new 
PartitionAttributesFactory().setRedundantCopies(setRedundantCopies);
+        AttributesFactory af = new AttributesFactory();
+        af.setPartitionAttributes(paf.create());
+        af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+        af.setDiskStoreName("disk");
+        cache.createRegion(PR_REGION_NAME, af.create());
+
+        // Colocated region
+        paf = (new 
PartitionAttributesFactory()).setRedundantCopies(setRedundantCopies)
+            .setColocatedWith(PR_REGION_NAME);
+        af = new AttributesFactory();
+        af.setPartitionAttributes(paf.create());
+        af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+        af.setDiskStoreName("disk");
+        cache.createRegion(PR_CHILD_REGION_NAME, af.create());
+
+        // Count down on region create.
+        childRegionCreated.countDown();
+
+        try {
+          recoveryDone.await(MAX_WAIT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+          Assert.fail("interrupted", e);
+        }
+
+      } finally {
+        PersistenceAdvisorImpl.setPersistenceAdvisorObserver(null);
+      }
+    });
+  }
+
   private SerializableRunnable getCreatePRRunnable(final int redundancy, final 
int recoveryDelay) {
     return getCreatePRRunnable(redundancy, recoveryDelay, 113);
   }

Reply via email to