GEODE-3380: There're 2 problems here
1) when removeAll is retried, it will get EntryNotFound exception. It should 
still put the remove event
into the AEQ.
2) An old bug fix in 8.2 was not merged into develop: when removeAll encounter 
EntryNotFound exception, should
return version tag of the tombstone.

This closes #674


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/71dc0f86
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/71dc0f86
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/71dc0f86

Branch: refs/heads/feature/GEODE-3299
Commit: 71dc0f865668974668077fe5d3289587944e0b34
Parents: d91096c
Author: zhouxh <gz...@pivotal.io>
Authored: Mon Jul 31 17:55:09 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Wed Aug 2 10:34:09 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/AbstractRegionMap.java | 13 +++
 .../cache/DistributedPutAllOperation.java       |  2 +-
 .../cache/DistributedRemoveAllOperation.java    |  2 +-
 .../cache/partitioned/RemoveAllPRMessage.java   |  8 ++
 .../geode/internal/cache/PutAllCSDUnitTest.java | 90 +++++++++++++++++++-
 5 files changed, 110 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/71dc0f86/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index fd5a430..40c8b07 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -1359,6 +1359,10 @@ public abstract class AbstractRegionMap implements 
RegionMap {
                           owner.basicDestroyPart2(tombstone, event, 
inTokenMode,
                               true /* conflict with clear */, duringRI, true);
                           opCompleted = true;
+                        } else {
+                          Assert.assertTrue(event.getVersionTag() == null);
+                          Assert.assertTrue(newRe == tombstone);
+                          
event.setVersionTag(getVersionTagFromStamp(tombstone.getVersionStamp()));
                         }
                       } catch (ConcurrentCacheModificationException ccme) {
                         VersionTag tag = event.getVersionTag();
@@ -1564,6 +1568,15 @@ public abstract class AbstractRegionMap implements 
RegionMap {
     return false;
   }
 
+  private VersionTag getVersionTagFromStamp(VersionStamp stamp) {
+    VersionTag tag = VersionTag.create(stamp.getMemberID());
+    tag.setEntryVersion(stamp.getEntryVersion());
+    tag.setRegionVersion(stamp.getRegionVersion());
+    tag.setVersionTimeStamp(stamp.getVersionTimeStamp());
+    tag.setDistributedSystemId(stamp.getDistributedSystemId());
+    return tag;
+  }
+
   public void txApplyDestroy(Object key, TransactionId txId, TXRmtEvent 
txEvent,
       boolean inTokenMode, boolean inRI, Operation op, EventID eventId, Object 
aCallbackArgument,
       List<EntryEventImpl> pendingCallbacks, FilterRoutingInfo 
filterRoutingInfo,

http://git-wip-us.apache.org/repos/asf/geode/blob/71dc0f86/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
index 4dcb0b7..37a6703 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedPutAllOperation.java
@@ -1098,7 +1098,7 @@ public class DistributedPutAllOperation extends 
AbstractUpdateOperation {
       try {
         super.basicOperateOnRegion(ev, rgn);
       } finally {
-        if (ev.getVersionTag() != null && !ev.getVersionTag().isRecorded()) {
+        if (ev.hasValidVersionTag() && !ev.getVersionTag().isRecorded()) {
           if (rgn.getVersionVector() != null) {
             rgn.getVersionVector().recordVersion(getSender(), 
ev.getVersionTag());
           }

http://git-wip-us.apache.org/repos/asf/geode/blob/71dc0f86/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
index a4661b6..8ea4f97 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRemoveAllOperation.java
@@ -891,7 +891,7 @@ public class DistributedRemoveAllOperation extends 
AbstractUpdateOperation {
         dispatchElidedEvent(rgn, ev);
         this.appliedOperation = false;
       } finally {
-        if (ev.getVersionTag() != null && !ev.getVersionTag().isRecorded()) {
+        if (ev.hasValidVersionTag() && !ev.getVersionTag().isRecorded()) {
           if (rgn.getVersionVector() != null) {
             rgn.getVersionVector().recordVersion(getSender(), 
ev.getVersionTag());
           }

http://git-wip-us.apache.org/repos/asf/geode/blob/71dc0f86/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index 1898461..ebfc5ed 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -469,6 +469,14 @@ public class RemoveAllPRMessage extends 
PartitionMessageWithDirectReply {
                   }
                 } catch (EntryNotFoundException ignore) {
                   didRemove = true;
+                  if (ev.isPossibleDuplicate() && ev.hasValidVersionTag()) {
+                    op.addEntry(ev);
+                    if (logger.isDebugEnabled()) {
+                      logger.debug(
+                          "RemoveAllPRMessage.doLocalRemoveAll:notify client 
and gateway for not-found-entry:"
+                              + ev);
+                    }
+                  }
                   if (ev.getVersionTag() == null) {
                     if (logger.isDebugEnabled()) {
                       logger.debug(

http://git-wip-us.apache.org/repos/asf/geode/blob/71dc0f86/geode-cq/src/test/java/org/apache/geode/internal/cache/PutAllCSDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-cq/src/test/java/org/apache/geode/internal/cache/PutAllCSDUnitTest.java 
b/geode-cq/src/test/java/org/apache/geode/internal/cache/PutAllCSDUnitTest.java
index 2c6252b..d1c0004 100755
--- 
a/geode-cq/src/test/java/org/apache/geode/internal/cache/PutAllCSDUnitTest.java
+++ 
b/geode-cq/src/test/java/org/apache/geode/internal/cache/PutAllCSDUnitTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -77,6 +78,7 @@ import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.cache.util.CacheWriterAdapter;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.cache30.ClientServerTestCase;
+import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.test.dunit.Assert;
@@ -500,6 +502,81 @@ public class PutAllCSDUnitTest extends 
ClientServerTestCase {
   }
 
   /**
+   * Create PR without redundancy on 2 servers with lucene index. Feed some 
key s. From a client, do
+   * removeAll on keys in server1. During the removeAll, restart server1 and 
trigger the removeAll
+   * to retry. The retried removeAll should return the version tag of 
tombstones. Do removeAll again
+   * on the same key, it should get the version tag again.
+   */
+  @Test
+  public void shouldReturnVersionTagOfTombstoneVersionWhenRemoveAllRetried() 
throws CacheException, InterruptedException {
+    final String title = "test51871:";
+
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    final String regionName = getUniqueName();
+
+    final String serverHost = 
NetworkUtils.getServerHostName(server1.getHost());
+
+    // set notifyBySubscription=false to test local-invalidates
+    final int serverPort1 = createBridgeServer(server1, regionName, 0, true, 
0, "ds1");
+    createBridgeClient(client1, regionName, serverHost, new int[] 
{serverPort1}, -1, -1, true);
+
+    client1.invoke(new CacheSerializableRunnable(title + "client1 add listener 
and putAll") {
+      @Override
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+        doPutAll(regionName, "key-", numberOfEntries);
+        assertEquals(numberOfEntries, region.size());
+      }
+    });
+
+    // verify bridge server 1, its data are from client
+    server1.invoke(new CacheSerializableRunnable(title + "verify Bridge Server 
1") {
+      @Override
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(numberOfEntries, region.size());
+      }
+    });
+
+    @SuppressWarnings("unchecked")
+    VersionedObjectList versions =
+        (VersionedObjectList) client1.invoke(new SerializableCallable(title + 
"client1 removeAll") {
+          @Override
+          public Object call() throws CacheException {
+            Region region = getRootRegion().getSubregion(regionName);
+            VersionedObjectList versions = doRemoveAll(regionName, "key-", 
numberOfEntries);
+            assertEquals(0, region.size());
+            return versions;
+          }
+        });
+
+    @SuppressWarnings("unchecked")
+    VersionedObjectList versionsAfterRetry = (VersionedObjectList) client1
+        .invoke(new SerializableCallable(title + "client1 removeAll again") {
+          @Override
+          public Object call() throws CacheException {
+            Region region = getRootRegion().getSubregion(regionName);
+            VersionedObjectList versions = doRemoveAll(regionName, "key-", 
numberOfEntries);
+            assertEquals(0, region.size());
+            return versions;
+          }
+        });
+
+    LogWriterUtils.getLogWriter().info("Version tags are:" + 
versions.getVersionTags() + ":"
+        + versionsAfterRetry.getVersionTags());
+    assertEquals(versionsAfterRetry.getVersionTags(), 
versions.getVersionTags());
+
+    // clean up
+    // Stop serverß
+    stopBridgeServers(getCache());
+  }
+
+  /**
    * Tests putAll and removeAll to 2 servers. Use Case: 1) putAll from a 
single-threaded client to a
    * replicated region 2) putAll from a multi-threaded client to a replicated 
region 3)
    */
@@ -4092,14 +4169,21 @@ public class PutAllCSDUnitTest extends 
ClientServerTestCase {
     return region;
   }
 
-  protected Region doRemoveAll(String regionName, String keyStub, int 
numEntries) {
+  protected VersionedObjectList doRemoveAll(String regionName, String keyStub, 
int numEntries) {
     Region region = getRootRegion().getSubregion(regionName);
     ArrayList<String> keys = new ArrayList<String>();
     for (int i = 0; i < numEntries; i++) {
       keys.add(keyStub + i);
     }
-    region.removeAll(keys, "removeAllCallback");
-    return region;
+    // region.removeAll(keys, "removeAllCallback");
+    LocalRegion lr = (LocalRegion) region;
+    final EntryEventImpl event = EntryEventImpl.create(lr, 
Operation.REMOVEALL_DESTROY, null, null,
+        "removeAllCallback", false, lr.getMyId());
+    event.disallowOffHeapValues();
+    DistributedRemoveAllOperation removeAllOp =
+        new DistributedRemoveAllOperation(event, keys.size(), false);
+    VersionedObjectList versions = lr.basicRemoveAll((Collection) keys, 
removeAllOp, null);
+    return versions;
   }
 
   public static void waitTillNotify(Object lock_object, int waitTime, boolean 
ready) {

Reply via email to