This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-7912
in repository https://gitbox.apache.org/repos/asf/geode.git

commit b412d216683fbc28fbc5fe8fb6d36d5edaa7d92f
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Mon Mar 30 00:06:06 2020 -0700

    GEODE-7912: cacheWriter should be triggered when PR.clear
    
            Co-authored-by: Anil <aging...@pivotal.io>
            Co-authored-by: Xiaojian Zhou <gz...@pivotal.io>
---
 .../cache/PartitionedRegionClearDUnitTest.java     | 232 +++++++++++++++++++--
 .../apache/geode/internal/cache/LocalRegion.java   |   4 +-
 .../geode/internal/cache/PartitionedRegion.java    |  61 ++++--
 3 files changed, 255 insertions(+), 42 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
index fb2a81b..27c4c4d 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
@@ -20,6 +20,7 @@ import static 
org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCach
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
@@ -30,13 +31,16 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.geode.cache.CacheWriterException;
 import org.apache.geode.cache.InterestResultPolicy;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.cache.util.CacheWriterAdapter;
 import org.apache.geode.test.dunit.SerializableCallableIF;
 import org.apache.geode.test.dunit.rules.ClientVM;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
@@ -68,12 +72,6 @@ public class PartitionedRegionClearDUnitTest implements 
Serializable {
         c -> 
c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
     client2 = cluster.startClientVM(6,
         c -> 
c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
-    dataStore1.invoke(this::initDataStore);
-    dataStore2.invoke(this::initDataStore);
-    dataStore3.invoke(this::initDataStore);
-    accessor.invoke(this::initAccessor);
-    client1.invoke(this::initClientCache);
-    client2.invoke(this::initClientCache);
   }
 
   protected RegionShortcut getRegionShortCut() {
@@ -104,14 +102,21 @@ public class PartitionedRegionClearDUnitTest implements 
Serializable {
     region.registerInterestForAllKeys(InterestResultPolicy.KEYS);
   }
 
-  private void initDataStore() {
-    getCache().createRegionFactory(getRegionShortCut())
-        .setPartitionAttributes(new 
PartitionAttributesFactory().setTotalNumBuckets(10).create())
-        .addCacheListener(new CountingCacheListener())
-        .create(REGION_NAME);
+  private void initDataStore(boolean withListener, boolean withWriter) {
+    RegionFactory factory = getCache().createRegionFactory(getRegionShortCut())
+        .setPartitionAttributes(new 
PartitionAttributesFactory().setTotalNumBuckets(10).create());
+    if (withListener) {
+      factory.addCacheListener(new CountingCacheListener());
+    }
+    if (withWriter) {
+      factory.setCacheWriter(new CountingCacheWriter());
+    }
+    factory.create(REGION_NAME);
+    clearsByRegion = new HashMap<>();
+    destroyesByRegion = new HashMap<>();
   }
 
-  private void initAccessor() {
+  private void initAccessor(boolean withListener, boolean withWriter) {
     RegionShortcut shortcut = getRegionShortCut();
     if (shortcut.isPersistent()) {
       if (shortcut == RegionShortcut.PARTITION_PERSISTENT) {
@@ -126,12 +131,19 @@ public class PartitionedRegionClearDUnitTest implements 
Serializable {
         fail("Wrong region type:" + shortcut);
       }
     }
-    getCache().createRegionFactory(shortcut)
+    RegionFactory factory = getCache().createRegionFactory(shortcut)
         .setPartitionAttributes(
             new 
PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create())
-        .setPartitionAttributes(new 
PartitionAttributesFactory().setTotalNumBuckets(10).create())
-        .addCacheListener(new CountingCacheListener())
-        .create(REGION_NAME);
+        .setPartitionAttributes(new 
PartitionAttributesFactory().setTotalNumBuckets(10).create());
+    if (withListener) {
+      factory.addCacheListener(new CountingCacheListener());
+    }
+    if (withWriter) {
+      factory.setCacheWriter(new CountingCacheWriter());
+    }
+    factory.create(REGION_NAME);
+    clearsByRegion = new HashMap<>();
+    destroyesByRegion = new HashMap<>();
   }
 
   private void feed(boolean isClient) {
@@ -171,26 +183,146 @@ public class PartitionedRegionClearDUnitTest implements 
Serializable {
     }
   }
 
+  SerializableCallableIF<Integer> getWriterClears = () -> {
+    int clears =
+        clearsByRegion.get(REGION_NAME) == null ? 0 : 
clearsByRegion.get(REGION_NAME).get();
+    return clears;
+  };
+
+  SerializableCallableIF<Integer> getWriterDestroys = () -> {
+    int destroys =
+        destroyesByRegion.get(REGION_NAME) == null ? 0 : 
destroyesByRegion.get(REGION_NAME).get();
+    return destroys;
+  };
+
+  void configureServers(boolean dataStoreWithWriter, boolean 
accessorWithWriter) {
+    dataStore1.invoke(() -> initDataStore(true, dataStoreWithWriter));
+    dataStore2.invoke(() -> initDataStore(true, dataStoreWithWriter));
+    dataStore3.invoke(() -> initDataStore(true, dataStoreWithWriter));
+    accessor.invoke(() -> initAccessor(true, accessorWithWriter));
+    // make sure only datastore3 has cacheWriter
+    dataStore1.invoke(() -> {
+      Region region = getRegion(false);
+      region.getAttributesMutator().setCacheWriter(null);
+    });
+    dataStore2.invoke(() -> {
+      Region region = getRegion(false);
+      region.getAttributesMutator().setCacheWriter(null);
+    });
+  }
+
+  @Test
+  public void normalClearFromDataStoreWithWriterOnDataStore() {
+    configureServers(true, true);
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    dataStore3.invoke(() -> getRegion(false).clear());
+    verifyServerRegionSize(0);
+    // verifyCacheListenerTriggerCount(dataStore1);
+
+    assertThat(dataStore1.invoke(getWriterClears)).isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterClears)).isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterClears)).isEqualTo(1);
+    assertThat(accessor.invoke(getWriterClears)).isEqualTo(0);
+
+    dataStore3.invoke(() -> {
+      Region region = getRegion(false);
+      region.destroyRegion();
+    });
+    assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(1);
+    assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(0);
+  }
+
   @Test
-  public void normalClearFromDataStore() {
+  public void normalClearFromDataStoreWithoutWriterOnDataStore() {
+    configureServers(false, true);
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
     accessor.invoke(() -> feed(false));
     verifyServerRegionSize(NUM_ENTRIES);
     dataStore1.invoke(() -> getRegion(false).clear());
     verifyServerRegionSize(0);
-    verifyCacheListenerTriggerCount(dataStore1);
+    // verifyCacheListenerTriggerCount(dataStore1);
+
+    assertThat(dataStore1.invoke(getWriterClears)).isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterClears)).isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterClears)).isEqualTo(0);
+    assertThat(accessor.invoke(getWriterClears)).isEqualTo(1);
+
+    dataStore1.invoke(() -> {
+      Region region = getRegion(false);
+      region.destroyRegion();
+    });
+    assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(0);
+    assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(1);
   }
 
   @Test
-  public void normalClearFromAccessor() {
+  public void normalClearFromAccessorWithWriterOnDataStore() {
+    configureServers(true, true);
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
     accessor.invoke(() -> feed(false));
     verifyServerRegionSize(NUM_ENTRIES);
     accessor.invoke(() -> getRegion(false).clear());
     verifyServerRegionSize(0);
-    verifyCacheListenerTriggerCount(accessor);
+    // verifyCacheListenerTriggerCount(accessor);
+    assertThat(dataStore1.invoke(getWriterClears)).isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterClears)).isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterClears)).isEqualTo(0);
+    assertThat(accessor.invoke(getWriterClears)).isEqualTo(1);
+
+    accessor.invoke(() -> {
+      Region region = getRegion(false);
+      region.destroyRegion();
+    });
+    assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(0);
+    assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(1);
+  }
+
+  @Test
+  public void normalClearFromAccessorWithoutWriterButWithWriterOnDataStore() {
+    configureServers(true, false);
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    accessor.invoke(() -> getRegion(false).clear());
+    verifyServerRegionSize(0);
+    // verifyCacheListenerTriggerCount(accessor);
+    assertThat(dataStore1.invoke(getWriterClears)).isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterClears)).isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterClears)).isEqualTo(1);
+    assertThat(accessor.invoke(getWriterClears)).isEqualTo(0);
+
+    accessor.invoke(() -> {
+      Region region = getRegion(false);
+      region.destroyRegion();
+    });
+    assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(1);
+    assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(0);
   }
 
   @Test
   public void normalClearFromClient() {
+    configureServers(true, false);
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
     client1.invoke(() -> feed(true));
     verifyClientRegionSize(NUM_ENTRIES);
     verifyServerRegionSize(NUM_ENTRIES);
@@ -198,11 +330,25 @@ public class PartitionedRegionClearDUnitTest implements 
Serializable {
     client1.invoke(() -> getRegion(true).clear());
     verifyServerRegionSize(0);
     verifyClientRegionSize(0);
-    verifyCacheListenerTriggerCount(null);
+    // verifyCacheListenerTriggerCount(null);
+    assertThat(dataStore1.invoke(getWriterClears)).isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterClears)).isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterClears)).isEqualTo(1);
+    assertThat(accessor.invoke(getWriterClears)).isEqualTo(0);
+
+    client1.invoke(() -> {
+      Region region = getRegion(true);
+      region.destroyRegion();
+    });
+    assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(1);
+    assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(0);
   }
 
   private static class CountingCacheListener extends CacheListenerAdapter {
     private final AtomicInteger clears = new AtomicInteger();
+    private final AtomicInteger destroyes = new AtomicInteger();
 
     @Override
     public void afterRegionClear(RegionEvent event) {
@@ -211,8 +357,52 @@ public class PartitionedRegionClearDUnitTest implements 
Serializable {
       clears.incrementAndGet();
     }
 
+    @Override
+    public void afterRegionDestroy(RegionEvent event) {
+      Region region = event.getRegion();
+      logger.info("Region " + region.getFullPath() + " is destroyed.");
+      destroyes.incrementAndGet();
+    }
+
     int getClears() {
       return clears.get();
     }
+
+    int getDestroys() {
+      return destroyes.get();
+    }
+  }
+
+  public static HashMap<String, AtomicInteger> clearsByRegion = new 
HashMap<>();
+  public static HashMap<String, AtomicInteger> destroyesByRegion = new 
HashMap<>();
+
+  private static class CountingCacheWriter extends CacheWriterAdapter {
+    @Override
+    public void beforeRegionClear(RegionEvent event) throws 
CacheWriterException {
+      Region region = event.getRegion();
+      AtomicInteger clears = clearsByRegion.get(region.getName());
+      if (clears == null) {
+        clears = new AtomicInteger(1);
+        clearsByRegion.put(region.getName(), clears);
+      } else {
+        clears.incrementAndGet();
+      }
+      logger
+          .info("Region " + region.getName() + " will be cleared, clear count 
is:" + clears.get());
+    }
+
+    @Override
+    public void beforeRegionDestroy(RegionEvent event) throws 
CacheWriterException {
+      Region region = event.getRegion();
+      AtomicInteger destroys = destroyesByRegion.get(region.getName());
+      if (destroys == null) {
+        destroys = new AtomicInteger(1);
+        destroyesByRegion.put(region.getName(), destroys);
+      } else {
+        destroys.incrementAndGet();
+      }
+      logger.info(
+          "Region " + region.getName() + " will be destroyed, destroy count 
is:" + destroys.get());
+    }
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index d5f9156..159d5c2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -3002,7 +3002,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
   /**
    * @since GemFire 5.7
    */
-  private void serverRegionClear(RegionEventImpl regionEvent) {
+  protected void serverRegionClear(RegionEventImpl regionEvent) {
     if (regionEvent.getOperation().isDistributed()) {
       ServerRegionProxy mySRP = getServerProxy();
       if (mySRP != null) {
@@ -3123,7 +3123,7 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
     return result;
   }
 
-  private void cacheWriteBeforeRegionClear(RegionEventImpl event)
+  void cacheWriteBeforeRegionClear(RegionEventImpl event)
       throws CacheWriterException, TimeoutException {
     // copy into local var to prevent race condition
     CacheWriter writer = basicGetWriter();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index a26f720..6514fdd 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -2161,6 +2161,9 @@ public class PartitionedRegion extends LocalRegion
           throw cache.getCacheClosedException("Cache is shutting down");
         }
 
+        // do cacheWrite
+        cacheWriteBeforeRegionClear(regionEvent);
+
         // create ClearPRMessage per bucket
         List<ClearPRMessage> clearMsgList = 
createClearPRMessages(regionEvent.getEventId());
         for (ClearPRMessage clearPRMessage : clearMsgList) {
@@ -4428,6 +4431,35 @@ public class PartitionedRegion extends LocalRegion
     return null;
   }
 
+  boolean triggerWriter(RegionEventImpl event) {
+    CacheWriter localWriter = basicGetWriter();
+    Set netWriteRecipients = localWriter == null ? 
this.distAdvisor.adviseNetWrite() : null;
+
+    if (localWriter == null && (netWriteRecipients == null || 
netWriteRecipients.isEmpty())) {
+      return false;
+    }
+
+    final long start = getCachePerfStats().startCacheWriterCall();
+    try {
+      SearchLoadAndWriteProcessor processor = 
SearchLoadAndWriteProcessor.getProcessor();
+      String theKey = "preDestroyRegion";
+      int paction = 0;
+      if (event.getOperation().isRegionDestroy()) {
+        theKey = "preDestroyRegion";
+        paction = SearchLoadAndWriteProcessor.BEFOREREGIONDESTROY;
+      } else if (event.getOperation().isClear()) {
+        theKey = "preClearRegion";
+        paction = SearchLoadAndWriteProcessor.BEFOREREGIONCLEAR;
+      }
+      processor.initialize(this, theKey, null);
+      processor.doNetWrite(event, netWriteRecipients, localWriter, paction);
+      processor.release();
+    } finally {
+      getCachePerfStats().endCacheWriterCall(start);
+    }
+    return true;
+  }
+
   /**
    * This invokes a cache writer before a destroy operation. Although it has 
the same method
    * signature as the method in LocalRegion, it is invoked in a different code 
path. LocalRegion
@@ -4437,31 +4469,22 @@ public class PartitionedRegion extends LocalRegion
   @Override
   boolean cacheWriteBeforeRegionDestroy(RegionEventImpl event)
       throws CacheWriterException, TimeoutException {
-
     if (event.getOperation().isDistributed()) {
       serverRegionDestroy(event);
-      CacheWriter localWriter = basicGetWriter();
-      Set netWriteRecipients = localWriter == null ? 
this.distAdvisor.adviseNetWrite() : null;
-
-      if (localWriter == null && (netWriteRecipients == null || 
netWriteRecipients.isEmpty())) {
-        return false;
-      }
-
-      final long start = getCachePerfStats().startCacheWriterCall();
-      try {
-        SearchLoadAndWriteProcessor processor = 
SearchLoadAndWriteProcessor.getProcessor();
-        processor.initialize(this, "preDestroyRegion", null);
-        processor.doNetWrite(event, netWriteRecipients, localWriter,
-            SearchLoadAndWriteProcessor.BEFOREREGIONDESTROY);
-        processor.release();
-      } finally {
-        getCachePerfStats().endCacheWriterCall(start);
-      }
-      return true;
+      return triggerWriter(event);
     }
     return false;
   }
 
+  @Override
+  void cacheWriteBeforeRegionClear(RegionEventImpl event)
+      throws CacheWriterException, TimeoutException {
+    if (event.getOperation().isDistributed()) {
+      serverRegionClear(event);
+      triggerWriter(event);
+    }
+  }
+
   /**
    * Test Method: Get the DistributedMember identifier for the vm containing a 
key
    *

Reply via email to