This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-7665 by this push: new 04e2081 GEODE-7912: cacheWriter should be triggered when PR.clear (#4882) 04e2081 is described below commit 04e20810c24a179fb8b7b306ca58657a3e3ed011 Author: Xiaojian Zhou <gesterz...@users.noreply.github.com> AuthorDate: Mon Mar 30 19:34:35 2020 -0700 GEODE-7912: cacheWriter should be triggered when PR.clear (#4882) Co-authored-by: Anil <aging...@pivotal.io> Co-authored-by: Xiaojian Zhou <gz...@pivotal.io> --- .../cache/PartitionedRegionClearDUnitTest.java | 228 +++++++++++++++++---- .../apache/geode/internal/cache/LocalRegion.java | 4 +- .../geode/internal/cache/PartitionedRegion.java | 56 +++-- 3 files changed, 223 insertions(+), 65 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java index fb2a81b..a5a22b9 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java @@ -20,6 +20,7 @@ import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCach import static org.assertj.core.api.Assertions.assertThat; import java.io.Serializable; +import java.util.HashMap; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; @@ -30,13 +31,15 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.apache.geode.cache.CacheWriterException; import org.apache.geode.cache.InterestResultPolicy; import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionFactory; import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.client.ClientRegionShortcut; -import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.cache.util.CacheWriterAdapter; import org.apache.geode.test.dunit.SerializableCallableIF; import org.apache.geode.test.dunit.rules.ClientVM; import org.apache.geode.test.dunit.rules.ClusterStartupRule; @@ -68,12 +71,6 @@ public class PartitionedRegionClearDUnitTest implements Serializable { c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); client2 = cluster.startClientVM(6, c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); - dataStore1.invoke(this::initDataStore); - dataStore2.invoke(this::initDataStore); - dataStore3.invoke(this::initDataStore); - accessor.invoke(this::initAccessor); - client1.invoke(this::initClientCache); - client2.invoke(this::initClientCache); } protected RegionShortcut getRegionShortCut() { @@ -104,14 +101,18 @@ public class PartitionedRegionClearDUnitTest implements Serializable { region.registerInterestForAllKeys(InterestResultPolicy.KEYS); } - private void initDataStore() { - getCache().createRegionFactory(getRegionShortCut()) - .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) - .addCacheListener(new CountingCacheListener()) - .create(REGION_NAME); + private void initDataStore(boolean withWriter) { + RegionFactory factory = getCache().createRegionFactory(getRegionShortCut()) + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()); + if (withWriter) { + factory.setCacheWriter(new CountingCacheWriter()); + } + factory.create(REGION_NAME); + clearsByRegion = new HashMap<>(); + destroysByRegion = new HashMap<>(); } - private void initAccessor() { + private void initAccessor(boolean withWriter) { RegionShortcut shortcut = getRegionShortCut(); if (shortcut.isPersistent()) { if (shortcut == RegionShortcut.PARTITION_PERSISTENT) { @@ -126,12 +127,16 @@ public class PartitionedRegionClearDUnitTest implements Serializable { fail("Wrong region type:" + shortcut); } } - getCache().createRegionFactory(shortcut) + RegionFactory factory = getCache().createRegionFactory(shortcut) .setPartitionAttributes( new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create()) - .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) - .addCacheListener(new CountingCacheListener()) - .create(REGION_NAME); + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()); + if (withWriter) { + factory.setCacheWriter(new CountingCacheWriter()); + } + factory.create(REGION_NAME); + clearsByRegion = new HashMap<>(); + destroysByRegion = new HashMap<>(); } private void feed(boolean isClient) { @@ -152,45 +157,148 @@ public class PartitionedRegionClearDUnitTest implements Serializable { // client2.invoke(()->verifyRegionSize(true, expectedNum)); } - private void verifyCacheListenerTriggerCount(MemberVM serverVM) { - SerializableCallableIF<Integer> getListenerTriggerCount = () -> { - CountingCacheListener countingCacheListener = - (CountingCacheListener) getRegion(false).getAttributes() - .getCacheListeners()[0]; - return countingCacheListener.getClears(); - }; + SerializableCallableIF<Integer> getWriterClears = () -> { + int clears = + clearsByRegion.get(REGION_NAME) == null ? 0 : clearsByRegion.get(REGION_NAME).get(); + return clears; + }; - int count = accessor.invoke(getListenerTriggerCount) - + dataStore1.invoke(getListenerTriggerCount) - + dataStore2.invoke(getListenerTriggerCount) - + dataStore3.invoke(getListenerTriggerCount); - assertThat(count).isEqualTo(1); + SerializableCallableIF<Integer> getWriterDestroys = () -> { + int destroys = + destroysByRegion.get(REGION_NAME) == null ? 0 : destroysByRegion.get(REGION_NAME).get(); + return destroys; + }; - if (serverVM != null) { - assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1); - } + void configureServers(boolean dataStoreWithWriter, boolean accessorWithWriter) { + dataStore1.invoke(() -> initDataStore(dataStoreWithWriter)); + dataStore2.invoke(() -> initDataStore(dataStoreWithWriter)); + dataStore3.invoke(() -> initDataStore(dataStoreWithWriter)); + accessor.invoke(() -> initAccessor(accessorWithWriter)); + // make sure only datastore3 has cacheWriter + dataStore1.invoke(() -> { + Region region = getRegion(false); + region.getAttributesMutator().setCacheWriter(null); + }); + dataStore2.invoke(() -> { + Region region = getRegion(false); + region.getAttributesMutator().setCacheWriter(null); + }); + } + + @Test + public void normalClearFromDataStoreWithWriterOnDataStore() { + configureServers(true, true); + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + dataStore3.invoke(() -> getRegion(false).clear()); + verifyServerRegionSize(0); + + // do the region destroy to compare that the same callbacks will be triggered + dataStore3.invoke(() -> { + Region region = getRegion(false); + region.destroyRegion(); + }); + + assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears)) + .isEqualTo(1); + assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears)) + .isEqualTo(0); } @Test - public void normalClearFromDataStore() { + public void normalClearFromDataStoreWithoutWriterOnDataStore() { + configureServers(false, true); + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + accessor.invoke(() -> feed(false)); verifyServerRegionSize(NUM_ENTRIES); dataStore1.invoke(() -> getRegion(false).clear()); verifyServerRegionSize(0); - verifyCacheListenerTriggerCount(dataStore1); + + // do the region destroy to compare that the same callbacks will be triggered + dataStore1.invoke(() -> { + Region region = getRegion(false); + region.destroyRegion(); + }); + + assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears)) + .isEqualTo(1); } @Test - public void normalClearFromAccessor() { + public void normalClearFromAccessorWithWriterOnDataStore() { + configureServers(true, true); + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + accessor.invoke(() -> feed(false)); verifyServerRegionSize(NUM_ENTRIES); accessor.invoke(() -> getRegion(false).clear()); verifyServerRegionSize(0); - verifyCacheListenerTriggerCount(accessor); + + // do the region destroy to compare that the same callbacks will be triggered + accessor.invoke(() -> { + Region region = getRegion(false); + region.destroyRegion(); + }); + + assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears)) + .isEqualTo(1); + } + + @Test + public void normalClearFromAccessorWithoutWriterButWithWriterOnDataStore() { + configureServers(true, false); + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + accessor.invoke(() -> getRegion(false).clear()); + verifyServerRegionSize(0); + + // do the region destroy to compare that the same callbacks will be triggered + accessor.invoke(() -> { + Region region = getRegion(false); + region.destroyRegion(); + }); + + assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears)) + .isEqualTo(1); + assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears)) + .isEqualTo(0); } @Test public void normalClearFromClient() { + configureServers(true, false); + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + client1.invoke(() -> feed(true)); verifyClientRegionSize(NUM_ENTRIES); verifyServerRegionSize(NUM_ENTRIES); @@ -198,21 +306,53 @@ public class PartitionedRegionClearDUnitTest implements Serializable { client1.invoke(() -> getRegion(true).clear()); verifyServerRegionSize(0); verifyClientRegionSize(0); - verifyCacheListenerTriggerCount(null); + + // do the region destroy to compare that the same callbacks will be triggered + client1.invoke(() -> { + Region region = getRegion(true); + region.destroyRegion(); + }); + + assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears)) + .isEqualTo(0); + assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears)) + .isEqualTo(1); + assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears)) + .isEqualTo(0); } - private static class CountingCacheListener extends CacheListenerAdapter { - private final AtomicInteger clears = new AtomicInteger(); + public static HashMap<String, AtomicInteger> clearsByRegion = new HashMap<>(); + public static HashMap<String, AtomicInteger> destroysByRegion = new HashMap<>(); + private static class CountingCacheWriter extends CacheWriterAdapter { @Override - public void afterRegionClear(RegionEvent event) { + public void beforeRegionClear(RegionEvent event) throws CacheWriterException { Region region = event.getRegion(); - logger.info("Region " + region.getFullPath() + " is cleared."); - clears.incrementAndGet(); + AtomicInteger clears = clearsByRegion.get(region.getName()); + if (clears == null) { + clears = new AtomicInteger(1); + clearsByRegion.put(region.getName(), clears); + } else { + clears.incrementAndGet(); + } + logger + .info("Region " + region.getName() + " will be cleared, clear count is:" + clears.get()); } - int getClears() { - return clears.get(); + @Override + public void beforeRegionDestroy(RegionEvent event) throws CacheWriterException { + Region region = event.getRegion(); + AtomicInteger destroys = destroysByRegion.get(region.getName()); + if (destroys == null) { + destroys = new AtomicInteger(1); + destroysByRegion.put(region.getName(), destroys); + } else { + destroys.incrementAndGet(); + } + logger.info( + "Region " + region.getName() + " will be destroyed, destroy count is:" + destroys.get()); } } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index d5f9156..159d5c2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -3002,7 +3002,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, /** * @since GemFire 5.7 */ - private void serverRegionClear(RegionEventImpl regionEvent) { + protected void serverRegionClear(RegionEventImpl regionEvent) { if (regionEvent.getOperation().isDistributed()) { ServerRegionProxy mySRP = getServerProxy(); if (mySRP != null) { @@ -3123,7 +3123,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return result; } - private void cacheWriteBeforeRegionClear(RegionEventImpl event) + void cacheWriteBeforeRegionClear(RegionEventImpl event) throws CacheWriterException, TimeoutException { // copy into local var to prevent race condition CacheWriter writer = basicGetWriter(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index a26f720..9f52824 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -2161,6 +2161,9 @@ public class PartitionedRegion extends LocalRegion throw cache.getCacheClosedException("Cache is shutting down"); } + // do cacheWrite + cacheWriteBeforeRegionClear(regionEvent); + // create ClearPRMessage per bucket List<ClearPRMessage> clearMsgList = createClearPRMessages(regionEvent.getEventId()); for (ClearPRMessage clearPRMessage : clearMsgList) { @@ -4428,6 +4431,26 @@ public class PartitionedRegion extends LocalRegion return null; } + boolean triggerWriter(RegionEventImpl event, SearchLoadAndWriteProcessor processor, int paction, + String theKey) { + CacheWriter localWriter = basicGetWriter(); + Set netWriteRecipients = localWriter == null ? this.distAdvisor.adviseNetWrite() : null; + + if (localWriter == null && (netWriteRecipients == null || netWriteRecipients.isEmpty())) { + return false; + } + + final long start = getCachePerfStats().startCacheWriterCall(); + try { + processor.initialize(this, theKey, null); + processor.doNetWrite(event, netWriteRecipients, localWriter, paction); + processor.release(); + } finally { + getCachePerfStats().endCacheWriterCall(start); + } + return true; + } + /** * This invokes a cache writer before a destroy operation. Although it has the same method * signature as the method in LocalRegion, it is invoked in a different code path. LocalRegion @@ -4437,31 +4460,26 @@ public class PartitionedRegion extends LocalRegion @Override boolean cacheWriteBeforeRegionDestroy(RegionEventImpl event) throws CacheWriterException, TimeoutException { - if (event.getOperation().isDistributed()) { serverRegionDestroy(event); - CacheWriter localWriter = basicGetWriter(); - Set netWriteRecipients = localWriter == null ? this.distAdvisor.adviseNetWrite() : null; - - if (localWriter == null && (netWriteRecipients == null || netWriteRecipients.isEmpty())) { - return false; - } - - final long start = getCachePerfStats().startCacheWriterCall(); - try { - SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor(); - processor.initialize(this, "preDestroyRegion", null); - processor.doNetWrite(event, netWriteRecipients, localWriter, - SearchLoadAndWriteProcessor.BEFOREREGIONDESTROY); - processor.release(); - } finally { - getCachePerfStats().endCacheWriterCall(start); - } - return true; + SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor(); + return triggerWriter(event, processor, SearchLoadAndWriteProcessor.BEFOREREGIONDESTROY, + "preDestroyRegion"); } return false; } + @Override + void cacheWriteBeforeRegionClear(RegionEventImpl event) + throws CacheWriterException, TimeoutException { + if (event.getOperation().isDistributed()) { + serverRegionClear(event); + SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor(); + triggerWriter(event, processor, SearchLoadAndWriteProcessor.BEFOREREGIONCLEAR, + "preClearRegion"); + } + } + /** * Test Method: Get the DistributedMember identifier for the vm containing a key *