This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8ae5808  GEODE-5505 Cache listener not invoked on a retried destroy() 
operation
8ae5808 is described below

commit 8ae5808dd7a48eab3be9ec02e61fab0e75002320
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Wed Aug 22 08:46:29 2018 -0700

    GEODE-5505 Cache listener not invoked on a retried destroy() operation
    
    Delay region synchronization operations to allow clients to retry their
    operations.  This keeps a server from obtaining the change made by a
    client in another server that was only partially distributed.  Without
    the delay it is possible for the sync to complete before a client retries
    its operation, so that the client's replay encounters the state change
    it already made in its previous attempt.
    
    With this change a client's replay will either be on a server that didn't
    see the previous attempt, or it will be on a server that did see the
    previous attempt & has the operation recorded in its EventTracker.  In
    the first case the operation will be performed again and transmitted
    to other servers.  In the second case the server will not apply the 
operation
    to its cache but will forward it to other servers.
    
    The delay is based on the server's "maximum time between pings interval",
    which defaults to 1 minute.
    
    This closes #2340
---
 .../cache30/PRBucketSynchronizationDUnitTest.java  | 53 ++++++----------------
 .../geode/cache30/RRSynchronizationDUnitTest.java  | 53 ++++++----------------
 .../distributed/internal/DistributionAdvisor.java  | 24 ++++++++--
 .../geode/internal/cache/map/RegionMapDestroy.java |  3 --
 .../internal/cache/map/RegionMapDestroyTest.java   | 35 --------------
 5 files changed, 49 insertions(+), 119 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/PRBucketSynchronizationDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/PRBucketSynchronizationDUnitTest.java
index c029f5f..4391bff 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/PRBucketSynchronizationDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/PRBucketSynchronizationDUnitTest.java
@@ -20,6 +20,7 @@ import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
 
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Test;
 
@@ -45,8 +46,6 @@ import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.dunit.cache.CacheTestCase;
 
 /**
@@ -211,44 +210,22 @@ public class PRBucketSynchronizationDUnitTest extends 
CacheTestCase {
       PartitionedRegion pr = (PartitionedRegion) testRegion;
       final BucketRegion bucket = pr.getDataStore().getLocalBucketById(0);
 
-      Wait.waitForCriterion(new WaitCriterion() {
-        String waitingFor = "primary is still in membership view: " + 
crashedMember;
-        boolean dumped = false;
-
-        @Override
-        public boolean done() {
-          if 
(testRegion.getCache().getDistributionManager().isCurrentMember(crashedMember)) 
{
-            return false;
-          }
-          if (!testRegion.containsKey("Object3")) {
-            waitingFor = "entry for Object3 not found";
-            return false;
-          }
-          RegionEntry re = bucket.getRegionMap().getEntry("Object5");
-          if (re == null) {
-            if (!dumped) {
-              dumped = true;
-              bucket.dumpBackingMap();
-            }
-            waitingFor = "entry for Object5 not found";
-            return false;
-          }
-          if (!re.isTombstone()) {
-            if (!dumped) {
-              dumped = true;
-              bucket.dumpBackingMap();
-            }
-            waitingFor = "Object5 is not a tombstone but should be: " + re;
-            return false;
-          }
-          return true;
+      Awaitility.await().until(() -> {
+        if 
(testRegion.getCache().getDistributionManager().isCurrentMember(crashedMember)) 
{
+          return false;
         }
-
-        @Override
-        public String description() {
-          return waitingFor;
+        if (!testRegion.containsKey("Object3")) {
+          return false;
+        }
+        RegionEntry re = bucket.getRegionMap().getEntry("Object5");
+        if (re == null) {
+          return false;
+        }
+        if (!re.isTombstone()) {
+          return false;
         }
-      }, 30000, 5000, true);
+        return true;
+      });
     });
   }
 
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/RRSynchronizationDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/RRSynchronizationDUnitTest.java
index 8065faa..fd2c77f 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/RRSynchronizationDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/RRSynchronizationDUnitTest.java
@@ -18,6 +18,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.Properties;
 
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Test;
 
@@ -42,8 +43,6 @@ import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.dunit.cache.CacheTestCase;
 
 /**
@@ -171,45 +170,23 @@ public class RRSynchronizationDUnitTest extends 
CacheTestCase {
   private void verifySynchronized(VM vm, final InternalDistributedMember 
crashedMember) {
     vm.invoke("check that synchronization happened", () -> {
       final DistributedRegion dr = (DistributedRegion) testRegion;
+      Awaitility.await().until(() -> {
 
-      Wait.waitForCriterion(new WaitCriterion() {
-        String waitingFor = "crashed member is still in membership view: " + 
crashedMember;
-        boolean dumped = false;
-
-        @Override
-        public boolean done() {
-          if 
(testRegion.getCache().getDistributionManager().isCurrentMember(crashedMember)) 
{
-            return false;
-          }
-          if (!testRegion.containsKey("Object3")) {
-            waitingFor = "entry for Object3 not found";
-            return false;
-          }
-          RegionEntry re = dr.getRegionMap().getEntry("Object5");
-          if (re == null) {
-            if (!dumped) {
-              dumped = true;
-              dr.dumpBackingMap();
-            }
-            waitingFor = "entry for Object5 not found";
-            return false;
-          }
-          if (!re.isTombstone()) {
-            if (!dumped) {
-              dumped = true;
-              dr.dumpBackingMap();
-            }
-            waitingFor = "Object5 is not a tombstone but should be: " + re;
-            return false;
-          }
-          return true;
+        if 
(testRegion.getCache().getDistributionManager().isCurrentMember(crashedMember)) 
{
+          return false;
         }
-
-        @Override
-        public String description() {
-          return waitingFor;
+        if (!testRegion.containsKey("Object3")) {
+          return false;
+        }
+        RegionEntry re = dr.getRegionMap().getEntry("Object5");
+        if (re == null) {
+          return false;
+        }
+        if (!re.isTombstone()) {
+          return false;
         }
-      }, 30000, 5000, true);
+        return true;
+      });
     });
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index 71d64a4..f153326 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -39,6 +40,7 @@ import 
org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.DataSerializableFixedID;
 import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
 import org.apache.geode.internal.cache.DistributedRegion;
@@ -256,11 +258,23 @@ public class DistributionAdvisor {
 
     final boolean isDebugEnabled = logger.isDebugEnabled();
     if (isDebugEnabled) {
-      logger.debug("da.syncForCrashedMember will sync region in waiting thread 
pool: {}", dr);
+      logger.debug("da.syncForCrashedMember will sync region in cache's timer 
for region: {}", dr);
     }
-    dr.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
-      // bug #49601 - don't synchronize until GII has been performed
-      public void run() {
+    // schedule the synchronization for execution in the future based on the 
client health monitor
+    // interval. This allows client caches to retry an operation that might 
otherwise be recovered
+    // through the sync operation. Without associated event information this 
could cause the
+    // retried operation to be mishandled. See GEODE-5505
+    long delay;
+    try {
+      delay = dr.getGemFireCache().getCacheServers().stream().max(
+          (o1, o2) -> o1.getMaximumTimeBetweenPings() - 
o2.getMaximumTimeBetweenPings()).get()
+          .getMaximumTimeBetweenPings();
+    } catch (NoSuchElementException e) {
+      delay = 0;
+    }
+    dr.getGemFireCache().getCCPTimer().schedule(new 
SystemTimer.SystemTimerTask() {
+      @Override
+      public void run2() {
         while (!dr.isInitialized()) {
           if (dr.isDestroyed()) {
             return;
@@ -300,7 +314,7 @@ public class DistributionAdvisor {
         }
         dr.synchronizeForLostMember(id, lostVersionID);
       }
-    });
+    }, delay);
   }
 
   /** find the region for a delta-gii operation (synch) */
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapDestroy.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapDestroy.java
index 4d9eed0..6a34c50 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapDestroy.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapDestroy.java
@@ -551,9 +551,6 @@ public class RegionMapDestroy {
         if (regionEntry == null) {
           regionEntry = newRegionEntry;
         }
-        // invoke listeners and inform clients
-        internalRegion.basicDestroyPart2(regionEntry, event, inTokenMode,
-            false, duringRI, true);
         doPart3 = true;
       }
     }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapDestroyTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapDestroyTest.java
index b65016a..774f090 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapDestroyTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapDestroyTest.java
@@ -20,7 +20,6 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -36,7 +35,6 @@ import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.Operation;
-import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.AbstractRegionMap;
 import org.apache.geode.internal.cache.CachePerfStats;
 import org.apache.geode.internal.cache.EntryEventImpl;
@@ -51,9 +49,7 @@ import org.apache.geode.internal.cache.VMLRURegionMap;
 import org.apache.geode.internal.cache.eviction.EvictableEntry;
 import org.apache.geode.internal.cache.eviction.EvictionController;
 import org.apache.geode.internal.cache.eviction.EvictionCounters;
-import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
-import org.apache.geode.internal.cache.versions.VersionStamp;
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap;
 
@@ -176,20 +172,6 @@ public class RegionMapDestroyTest {
     arm.getEntryMap().put(KEY, entry);
   }
 
-  private void givenExistingEntryWithValueAndVersion(Object value, VersionTag 
version) {
-    RegionEntry entry = arm.getEntryFactory().createEntry(arm._getOwner(), 
KEY, value);
-    ((VersionStamp) entry).setVersions(version);
-    arm.getEntryMap().put(KEY, entry);
-  }
-
-  private void givenExistingEntryWithValueAndSameVersion(Object value, 
VersionTag version) {
-    givenExistingEntryWithValueAndVersion(value, version);
-
-    RegionVersionVector<?> versionVector = mock(RegionVersionVector.class);
-    when(arm._getOwner().getVersionVector()).thenReturn(versionVector);
-    event.setVersionTag(version);
-  }
-
   private void givenExistingEntry(Object value) {
     RegionEntry entry = arm.getEntryFactory().createEntry(arm._getOwner(), 
KEY, value);
     arm.getEntryMap().put(KEY, entry);
@@ -242,23 +224,6 @@ public class RegionMapDestroyTest {
   }
 
   @Test
-  public void destroyWithDuplicateVersionInvokesListener() {
-    givenEmptyRegionMap();
-    givenConcurrencyChecks(true);
-    VersionTag version = VersionTag.create(new 
InternalDistributedMember("localhost", 123));
-    version.setEntryVersion(1);
-    version.setRegionVersion(1);
-    givenExistingEntryWithValueAndSameVersion(Token.TOMBSTONE, version);
-    // make this a client/server operation
-    event.setContext(new ClientProxyMembershipID());
-    assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite, 
isEviction, expectedOldValue,
-        removeRecoveredEntry)).isTrue();
-    assertThat(event.getIsRedestroyedEntry()).isTrue();
-    verify(owner).basicDestroyPart2(isA(RegionEntry.class), 
isA(EntryEventImpl.class),
-        isA(Boolean.class), isA(Boolean.class), isA(Boolean.class), 
isA(Boolean.class));
-  }
-
-  @Test
   public void destroyWithEmptyRegionThrowsException() {
     givenConcurrencyChecks(false);
     givenEmptyRegionMap();

Reply via email to