This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 8ae5808 GEODE-5505 Cache listener not invoked on a retried destroy()
operation
8ae5808 is described below
commit 8ae5808dd7a48eab3be9ec02e61fab0e75002320
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Wed Aug 22 08:46:29 2018 -0700
GEODE-5505 Cache listener not invoked on a retried destroy() operation
Delay region synchronization operations to allow clients to retry their
operations. This keeps a server from obtaining the change made by a
client in another server that was only partially distributed. Without
the delay it is possible for the sync to complete before a client retries
its operation, so that the client's replay encounters the state change
it already made in its previous attempt.
With this change a client's replay will either be on a server that didn't
see the previous attempt, or it will be on a server that did see the
previous attempt & has the operation recorded in its EventTracker. In
the first case the operation will be performed again and transmitted
to other servers. In the second case the server will not apply the
operation
to its cache but will forward it to other servers.
The delay is based on the server's "maximum time between pings interval",
which defaults to 1 minute.
This closes #2340
---
.../cache30/PRBucketSynchronizationDUnitTest.java | 53 ++++++----------------
.../geode/cache30/RRSynchronizationDUnitTest.java | 53 ++++++----------------
.../distributed/internal/DistributionAdvisor.java | 24 ++++++++--
.../geode/internal/cache/map/RegionMapDestroy.java | 3 --
.../internal/cache/map/RegionMapDestroyTest.java | 35 --------------
5 files changed, 49 insertions(+), 119 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/PRBucketSynchronizationDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/PRBucketSynchronizationDUnitTest.java
index c029f5f..4391bff 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/PRBucketSynchronizationDUnitTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/PRBucketSynchronizationDUnitTest.java
@@ -20,6 +20,7 @@ import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Test;
@@ -45,8 +46,6 @@ import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.CacheTestCase;
/**
@@ -211,44 +210,22 @@ public class PRBucketSynchronizationDUnitTest extends
CacheTestCase {
PartitionedRegion pr = (PartitionedRegion) testRegion;
final BucketRegion bucket = pr.getDataStore().getLocalBucketById(0);
- Wait.waitForCriterion(new WaitCriterion() {
- String waitingFor = "primary is still in membership view: " +
crashedMember;
- boolean dumped = false;
-
- @Override
- public boolean done() {
- if
(testRegion.getCache().getDistributionManager().isCurrentMember(crashedMember))
{
- return false;
- }
- if (!testRegion.containsKey("Object3")) {
- waitingFor = "entry for Object3 not found";
- return false;
- }
- RegionEntry re = bucket.getRegionMap().getEntry("Object5");
- if (re == null) {
- if (!dumped) {
- dumped = true;
- bucket.dumpBackingMap();
- }
- waitingFor = "entry for Object5 not found";
- return false;
- }
- if (!re.isTombstone()) {
- if (!dumped) {
- dumped = true;
- bucket.dumpBackingMap();
- }
- waitingFor = "Object5 is not a tombstone but should be: " + re;
- return false;
- }
- return true;
+ Awaitility.await().until(() -> {
+ if
(testRegion.getCache().getDistributionManager().isCurrentMember(crashedMember))
{
+ return false;
}
-
- @Override
- public String description() {
- return waitingFor;
+ if (!testRegion.containsKey("Object3")) {
+ return false;
+ }
+ RegionEntry re = bucket.getRegionMap().getEntry("Object5");
+ if (re == null) {
+ return false;
+ }
+ if (!re.isTombstone()) {
+ return false;
}
- }, 30000, 5000, true);
+ return true;
+ });
});
}
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/RRSynchronizationDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/RRSynchronizationDUnitTest.java
index 8065faa..fd2c77f 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/cache30/RRSynchronizationDUnitTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/cache30/RRSynchronizationDUnitTest.java
@@ -18,6 +18,7 @@ import static org.junit.Assert.assertTrue;
import java.util.Properties;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Test;
@@ -42,8 +43,6 @@ import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.CacheTestCase;
/**
@@ -171,45 +170,23 @@ public class RRSynchronizationDUnitTest extends
CacheTestCase {
private void verifySynchronized(VM vm, final InternalDistributedMember
crashedMember) {
vm.invoke("check that synchronization happened", () -> {
final DistributedRegion dr = (DistributedRegion) testRegion;
+ Awaitility.await().until(() -> {
- Wait.waitForCriterion(new WaitCriterion() {
- String waitingFor = "crashed member is still in membership view: " +
crashedMember;
- boolean dumped = false;
-
- @Override
- public boolean done() {
- if
(testRegion.getCache().getDistributionManager().isCurrentMember(crashedMember))
{
- return false;
- }
- if (!testRegion.containsKey("Object3")) {
- waitingFor = "entry for Object3 not found";
- return false;
- }
- RegionEntry re = dr.getRegionMap().getEntry("Object5");
- if (re == null) {
- if (!dumped) {
- dumped = true;
- dr.dumpBackingMap();
- }
- waitingFor = "entry for Object5 not found";
- return false;
- }
- if (!re.isTombstone()) {
- if (!dumped) {
- dumped = true;
- dr.dumpBackingMap();
- }
- waitingFor = "Object5 is not a tombstone but should be: " + re;
- return false;
- }
- return true;
+ if
(testRegion.getCache().getDistributionManager().isCurrentMember(crashedMember))
{
+ return false;
}
-
- @Override
- public String description() {
- return waitingFor;
+ if (!testRegion.containsKey("Object3")) {
+ return false;
+ }
+ RegionEntry re = dr.getRegionMap().getEntry("Object5");
+ if (re == null) {
+ return false;
+ }
+ if (!re.isTombstone()) {
+ return false;
}
- }, 30000, 5000, true);
+ return true;
+ });
});
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index 71d64a4..f153326 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -39,6 +40,7 @@ import
org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
import org.apache.geode.internal.cache.DistributedRegion;
@@ -256,11 +258,23 @@ public class DistributionAdvisor {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
- logger.debug("da.syncForCrashedMember will sync region in waiting thread
pool: {}", dr);
+ logger.debug("da.syncForCrashedMember will sync region in cache's timer
for region: {}", dr);
}
- dr.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
- // bug #49601 - don't synchronize until GII has been performed
- public void run() {
+ // schedule the synchronization for execution in the future based on the
client health monitor
+ // interval. This allows client caches to retry an operation that might
otherwise be recovered
+ // through the sync operation. Without associated event information this
could cause the
+ // retried operation to be mishandled. See GEODE-5505
+ long delay;
+ try {
+ delay = dr.getGemFireCache().getCacheServers().stream().max(
+ (o1, o2) -> o1.getMaximumTimeBetweenPings() -
o2.getMaximumTimeBetweenPings()).get()
+ .getMaximumTimeBetweenPings();
+ } catch (NoSuchElementException e) {
+ delay = 0;
+ }
+ dr.getGemFireCache().getCCPTimer().schedule(new
SystemTimer.SystemTimerTask() {
+ @Override
+ public void run2() {
while (!dr.isInitialized()) {
if (dr.isDestroyed()) {
return;
@@ -300,7 +314,7 @@ public class DistributionAdvisor {
}
dr.synchronizeForLostMember(id, lostVersionID);
}
- });
+ }, delay);
}
/** find the region for a delta-gii operation (synch) */
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapDestroy.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapDestroy.java
index 4d9eed0..6a34c50 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapDestroy.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/map/RegionMapDestroy.java
@@ -551,9 +551,6 @@ public class RegionMapDestroy {
if (regionEntry == null) {
regionEntry = newRegionEntry;
}
- // invoke listeners and inform clients
- internalRegion.basicDestroyPart2(regionEntry, event, inTokenMode,
- false, duringRI, true);
doPart3 = true;
}
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapDestroyTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapDestroyTest.java
index b65016a..774f090 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapDestroyTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/map/RegionMapDestroyTest.java
@@ -20,7 +20,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -36,7 +35,6 @@ import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Operation;
-import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.AbstractRegionMap;
import org.apache.geode.internal.cache.CachePerfStats;
import org.apache.geode.internal.cache.EntryEventImpl;
@@ -51,9 +49,7 @@ import org.apache.geode.internal.cache.VMLRURegionMap;
import org.apache.geode.internal.cache.eviction.EvictableEntry;
import org.apache.geode.internal.cache.eviction.EvictionController;
import org.apache.geode.internal.cache.eviction.EvictionCounters;
-import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
-import org.apache.geode.internal.cache.versions.VersionStamp;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap;
@@ -176,20 +172,6 @@ public class RegionMapDestroyTest {
arm.getEntryMap().put(KEY, entry);
}
- private void givenExistingEntryWithValueAndVersion(Object value, VersionTag
version) {
- RegionEntry entry = arm.getEntryFactory().createEntry(arm._getOwner(),
KEY, value);
- ((VersionStamp) entry).setVersions(version);
- arm.getEntryMap().put(KEY, entry);
- }
-
- private void givenExistingEntryWithValueAndSameVersion(Object value,
VersionTag version) {
- givenExistingEntryWithValueAndVersion(value, version);
-
- RegionVersionVector<?> versionVector = mock(RegionVersionVector.class);
- when(arm._getOwner().getVersionVector()).thenReturn(versionVector);
- event.setVersionTag(version);
- }
-
private void givenExistingEntry(Object value) {
RegionEntry entry = arm.getEntryFactory().createEntry(arm._getOwner(),
KEY, value);
arm.getEntryMap().put(KEY, entry);
@@ -242,23 +224,6 @@ public class RegionMapDestroyTest {
}
@Test
- public void destroyWithDuplicateVersionInvokesListener() {
- givenEmptyRegionMap();
- givenConcurrencyChecks(true);
- VersionTag version = VersionTag.create(new
InternalDistributedMember("localhost", 123));
- version.setEntryVersion(1);
- version.setRegionVersion(1);
- givenExistingEntryWithValueAndSameVersion(Token.TOMBSTONE, version);
- // make this a client/server operation
- event.setContext(new ClientProxyMembershipID());
- assertThat(arm.destroy(event, inTokenMode, duringRI, cacheWrite,
isEviction, expectedOldValue,
- removeRecoveredEntry)).isTrue();
- assertThat(event.getIsRedestroyedEntry()).isTrue();
- verify(owner).basicDestroyPart2(isA(RegionEntry.class),
isA(EntryEventImpl.class),
- isA(Boolean.class), isA(Boolean.class), isA(Boolean.class),
isA(Boolean.class));
- }
-
- @Test
public void destroyWithEmptyRegionThrowsException() {
givenConcurrencyChecks(false);
givenEmptyRegionMap();