This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 8f1612b1c472deae731a47a07c89226afcfaee0b Author: Xiaojian Zhou <gesterz...@users.noreply.github.com> AuthorDate: Thu Mar 5 23:46:36 2020 -0800 GEODE-7682: add PR.clear API (#4755) * GEODE-7683: introduce BR.cmnClearRegion Co-authored-by: Xiaojian Zhou <gz...@pivotal.io> --- .../cache/PartitionedRegionClearDUnitTest.java | 218 +++++++++++++++++++++ .../PartitionedRegionPersistentClearDUnitTest.java | 26 +++ ...itionedRegionSingleNodeOperationsJUnitTest.java | 66 ------- .../codeAnalysis/sanctionedDataSerializables.txt | 4 +- .../org/apache/geode/internal/DSFIDFactory.java | 3 + .../geode/internal/cache/DistributedRegion.java | 9 - .../apache/geode/internal/cache/LocalRegion.java | 10 + .../geode/internal/cache/PartitionedRegion.java | 214 ++++++++++++++++++-- .../geode/internal/cache/RegionEventImpl.java | 5 + .../internal/cache/partitioned/ClearPRMessage.java | 166 +++++----------- .../cache/partitioned/ClearPRMessageTest.java | 50 ++--- 11 files changed, 522 insertions(+), 249 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java new file mode 100644 index 0000000..fb2a81b --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.apache.geode.internal.Assert.fail; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PartitionedRegionClearDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int NUM_ENTRIES = 1000; + + protected int locatorPort; + protected MemberVM locator; + protected MemberVM dataStore1, dataStore2, dataStore3, accessor; + protected ClientVM client1, client2; + + private static final Logger logger = LogManager.getLogger(); + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(7); + + @Before + public void setUp() throws Exception { + locator = cluster.startLocatorVM(0); + locatorPort = locator.getPort(); + dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort); + dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort); + dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort); + accessor = cluster.startServerVM(4, getProperties(), locatorPort); + client1 = cluster.startClientVM(5, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + client2 = cluster.startClientVM(6, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + dataStore1.invoke(this::initDataStore); + dataStore2.invoke(this::initDataStore); + dataStore3.invoke(this::initDataStore); + accessor.invoke(this::initAccessor); + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + } + + protected RegionShortcut getRegionShortCut() { + return RegionShortcut.PARTITION_REDUNDANT; + } + + protected Properties getProperties() { + Properties properties = new Properties(); + properties.setProperty("log-level", "info"); + return properties; + } + + private Region getRegion(boolean isClient) { + if (isClient) { + return getClientCache().getRegion(REGION_NAME); + } else { + return getCache().getRegion(REGION_NAME); + } + } + + private void verifyRegionSize(boolean isClient, int expectedNum) { + assertThat(getRegion(isClient).size()).isEqualTo(expectedNum); + } + + private void initClientCache() { + Region region = getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) + .create(REGION_NAME); + region.registerInterestForAllKeys(InterestResultPolicy.KEYS); + } + + private void initDataStore() { + getCache().createRegionFactory(getRegionShortCut()) + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void initAccessor() { + RegionShortcut shortcut = getRegionShortCut(); + if (shortcut.isPersistent()) { + if (shortcut == RegionShortcut.PARTITION_PERSISTENT) { + shortcut = RegionShortcut.PARTITION; + } else if (shortcut == RegionShortcut.PARTITION_PERSISTENT_OVERFLOW) { + shortcut = RegionShortcut.PARTITION_OVERFLOW; + } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT) { + shortcut = RegionShortcut.PARTITION_REDUNDANT; + } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW) { + shortcut = RegionShortcut.PARTITION_REDUNDANT_OVERFLOW; + } else { + fail("Wrong region type:" + shortcut); + } + } + getCache().createRegionFactory(shortcut) + .setPartitionAttributes( + new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create()) + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void feed(boolean isClient) { + Region region = getRegion(isClient); + IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i)); + } + + private void verifyServerRegionSize(int expectedNum) { + accessor.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore1.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore2.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore3.invoke(() -> verifyRegionSize(false, expectedNum)); + } + + private void verifyClientRegionSize(int expectedNum) { + client1.invoke(() -> verifyRegionSize(true, expectedNum)); + // TODO: notify register clients + // client2.invoke(()->verifyRegionSize(true, expectedNum)); + } + + private void verifyCacheListenerTriggerCount(MemberVM serverVM) { + SerializableCallableIF<Integer> getListenerTriggerCount = () -> { + CountingCacheListener countingCacheListener = + (CountingCacheListener) getRegion(false).getAttributes() + .getCacheListeners()[0]; + return countingCacheListener.getClears(); + }; + + int count = accessor.invoke(getListenerTriggerCount) + + dataStore1.invoke(getListenerTriggerCount) + + dataStore2.invoke(getListenerTriggerCount) + + dataStore3.invoke(getListenerTriggerCount); + assertThat(count).isEqualTo(1); + + if (serverVM != null) { + assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1); + } + } + + @Test + public void normalClearFromDataStore() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + dataStore1.invoke(() -> getRegion(false).clear()); + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test + public void normalClearFromAccessor() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + accessor.invoke(() -> getRegion(false).clear()); + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(accessor); + } + + @Test + public void normalClearFromClient() { + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + private static class CountingCacheListener extends CacheListenerAdapter { + private final AtomicInteger clears = new AtomicInteger(); + + @Override + public void afterRegionClear(RegionEvent event) { + Region region = event.getRegion(); + logger.info("Region " + region.getFullPath() + " is cleared."); + clears.incrementAndGet(); + } + + int getClears() { + return clears.get(); + } + } +} diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java new file mode 100644 index 0000000..847699b --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + + + +import org.apache.geode.cache.RegionShortcut; + +public class PartitionedRegionPersistentClearDUnitTest extends PartitionedRegionClearDUnitTest { + + protected RegionShortcut getRegionShortCut() { + return RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW; + } +} diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java index b37945b..4f36060 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java @@ -25,7 +25,6 @@ import static org.junit.Assert.fail; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Set; @@ -1298,71 +1297,6 @@ public class PartitionedRegionSingleNodeOperationsJUnitTest { } } - @Test - public void test023UnsupportedOps() throws Exception { - Region pr = null; - try { - pr = PartitionedRegionTestHelper.createPartitionedRegion("testUnsupportedOps", - String.valueOf(200), 0); - - pr.put(new Integer(1), "one"); - pr.put(new Integer(2), "two"); - pr.put(new Integer(3), "three"); - pr.getEntry("key"); - - try { - pr.clear(); - fail( - "PartitionedRegionSingleNodeOperationTest:testUnSupportedOps() operation failed on a blank PartitionedRegion"); - } catch (UnsupportedOperationException expected) { - } - - // try { - // pr.entries(true); - // fail(); - // } - // catch (UnsupportedOperationException expected) { - // } - - // try { - // pr.entrySet(true); - // fail(); - // } - // catch (UnsupportedOperationException expected) { - // } - - try { - HashMap data = new HashMap(); - data.put("foo", "bar"); - data.put("bing", "bam"); - data.put("supper", "hero"); - pr.putAll(data); - // fail("testPutAll() does NOT throw UnsupportedOperationException"); - } catch (UnsupportedOperationException onse) { - } - - - // try { - // pr.values(); - // fail("testValues() does NOT throw UnsupportedOperationException"); - // } - // catch (UnsupportedOperationException expected) { - // } - - - try { - pr.containsValue("foo"); - } catch (UnsupportedOperationException ex) { - fail("PartitionedRegionSingleNodeOperationTest:testContainsValue() operation failed"); - } - - } finally { - if (pr != null) { - pr.destroyRegion(); - } - } - } - /** * This method validates size operations. It verifies that it returns correct size of the * PartitionedRegion. diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index 25c169b..88a8251 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -1448,8 +1448,8 @@ fromData,27 toData,27 org/apache/geode/internal/cache/partitioned/ClearPRMessage,2 -fromData,30 -toData,44 +fromData,19 +toData,36 org/apache/geode/internal/cache/partitioned/ClearPRMessage$ClearReplyMessage,2 fromData,17 diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java index 7990323..c97f391 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java @@ -288,6 +288,7 @@ import org.apache.geode.internal.cache.partitioned.BucketCountLoadProbe; import org.apache.geode.internal.cache.partitioned.BucketProfileUpdateMessage; import org.apache.geode.internal.cache.partitioned.BucketSizeMessage; import org.apache.geode.internal.cache.partitioned.BucketSizeMessage.BucketSizeReplyMessage; +import org.apache.geode.internal.cache.partitioned.ClearPRMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueReplyMessage; import org.apache.geode.internal.cache.partitioned.CreateBucketMessage; @@ -978,6 +979,8 @@ public class DSFIDFactory implements DataSerializableFixedID { serializer.registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY, GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class); serializer.registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class); + serializer.registerDSFID(PR_CLEAR_MESSAGE, ClearPRMessage.class); + serializer.registerDSFID(PR_CLEAR_REPLY_MESSAGE, ClearPRMessage.ClearReplyMessage.class); serializer.registerDSFID(HOST_AND_PORT, HostAndPort.class); serializer.registerDSFID(DISTRIBUTED_PING_MESSAGE, DistributedPingMessage.class); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index 489d85a..84b5a3b 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -192,10 +192,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute @MutableForTesting public static boolean ignoreReconnect = false; - /** - * Lock to prevent multiple threads on this member from performing a clear at the same time. - */ - private final Object clearLock = new Object(); private final ReentrantReadWriteLock failedInitialImageLock = new ReentrantReadWriteLock(true); @MakeNotStatic @@ -933,11 +929,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute } } - private void lockCheckReadiness() { - cache.getCancelCriterion().checkCancelInProgress(null); - checkReadiness(); - } - @Override Object validatedDestroy(Object key, EntryEventImpl event) throws TimeoutException, EntryNotFoundException, CacheWriterException { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 73a67bf..a27e058 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -471,6 +471,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, private final Lock clientMetaDataLock = new ReentrantLock(); /** + * Lock to prevent multiple threads on this member from performing a clear at the same time. + */ + protected final Object clearLock = new Object(); + + /** * Lock for updating the cache service profile for the region. */ private final Lock cacheServiceProfileUpdateLock = new ReentrantLock(); @@ -2748,6 +2753,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, checkRegionDestroyed(true); } + protected void lockCheckReadiness() { + cache.getCancelCriterion().checkCancelInProgress(null); + checkReadiness(); + } + /** * This method should be called when the caller cannot locate an entry and that condition is * unexpected. This will first double check the cache and region state before throwing an diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 78083cf..9ade05f 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -181,6 +181,7 @@ import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultWa import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl; import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender; import org.apache.geode.internal.cache.ha.ThreadIdentifier; +import org.apache.geode.internal.cache.partitioned.ClearPRMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse; import org.apache.geode.internal.cache.partitioned.DestroyMessage; @@ -2171,18 +2172,202 @@ public class PartitionedRegion extends LocalRegion throw new UnsupportedOperationException(); } - /** - * @since GemFire 5.0 - * @throws UnsupportedOperationException OVERRIDES - */ @Override - public void clear() { - throw new UnsupportedOperationException(); + void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) { + final boolean isDebugEnabled = logger.isDebugEnabled(); + synchronized (clearLock) { + final DistributedLockService lockService = getPartitionedRegionLockService(); + try { + lockService.lock("_clearOperation" + this.getFullPath().replace('/', '_'), -1, -1); + } catch (IllegalStateException e) { + lockCheckReadiness(); + throw e; + } + try { + if (cache.isCacheAtShutdownAll()) { + throw cache.getCacheClosedException("Cache is shutting down"); + } + + // create ClearPRMessage per bucket + List<ClearPRMessage> clearMsgList = createClearPRMessages(); + for (ClearPRMessage clearPRMessage : clearMsgList) { + int bucketId = clearPRMessage.getBucketId(); + checkReadiness(); + long sendMessagesStartTime = 0; + if (isDebugEnabled) { + sendMessagesStartTime = System.currentTimeMillis(); + } + try { + sendClearMsgByBucket(bucketId, clearPRMessage); + } catch (PartitionOfflineException poe) { + // TODO add a PartialResultException + logger.info("PR.sendClearMsgByBucket encountered PartitionOfflineException at bucket " + + bucketId, poe); + } catch (Exception e) { + logger.info("PR.sendClearMsgByBucket encountered exception at bucket " + bucketId, e); + } + + if (isDebugEnabled) { + long now = System.currentTimeMillis(); + logger.debug("PR.sendClearMsgByBucket for bucket {} took {} ms", bucketId, + (now - sendMessagesStartTime)); + } + // TODO add psStats + } + } finally { + try { + lockService.unlock("_clearOperation" + this.getFullPath().replace('/', '_')); + } catch (IllegalStateException e) { + lockCheckReadiness(); + } + } + + // notify bridge clients at PR level + regionEvent.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR); + boolean hasListener = hasListener(); + if (hasListener) { + dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent); + } + notifyBridgeClients(regionEvent); + logger.info("Partitioned region {} finsihed clear operation.", this.getFullPath()); + } } - @Override - void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) { - throw new UnsupportedOperationException(); + void sendClearMsgByBucket(final Integer bucketId, ClearPRMessage clearPRMessage) { + RetryTimeKeeper retryTime = null; + InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null); + if (logger.isDebugEnabled()) { + logger.debug("PR.sendClearMsgByBucket:bucket {}'s currentTarget is {}", bucketId, + currentTarget); + } + + long timeOut = 0; + int count = 0; + while (true) { + switch (count) { + case 0: + // Note we don't check for DM cancellation in common case. + // First time. Assume success, keep going. + break; + case 1: + this.cache.getCancelCriterion().checkCancelInProgress(null); + // Second time (first failure). Calculate timeout and keep going. + timeOut = System.currentTimeMillis() + this.retryTimeout; + break; + default: + this.cache.getCancelCriterion().checkCancelInProgress(null); + // test for timeout + long timeLeft = timeOut - System.currentTimeMillis(); + if (timeLeft < 0) { + PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket" + bucketId, + this.retryTimeout); + // NOTREACHED + } + + // Didn't time out. Sleep a bit and then continue + boolean interrupted = Thread.interrupted(); + try { + Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION); + } catch (InterruptedException ignore) { + interrupted = true; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + break; + } // switch + count++; + + if (currentTarget == null) { // pick target + checkReadiness(); + if (retryTime == null) { + retryTime = new RetryTimeKeeper(this.retryTimeout); + } + + currentTarget = waitForNodeOrCreateBucket(retryTime, null, bucketId, false); + if (currentTarget == null) { + // the bucket does not exist, no need to clear + logger.info("Bucket " + bucketId + " does not contain data, no need to clear"); + return; + } else { + if (logger.isDebugEnabled()) { + logger.debug("PR.sendClearMsgByBucket: new currentTarget is {}", currentTarget); + } + } + + // It's possible this is a GemFire thread e.g. ServerConnection + // which got to this point because of a distributed system shutdown or + // region closure which uses interrupt to break any sleep() or wait() calls + // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception + checkShutdown(); + continue; + } // pick target + + boolean result = false; + try { + final boolean isLocal = (this.localMaxMemory > 0) && currentTarget.equals(getMyId()); + if (isLocal) { + result = clearPRMessage.doLocalClear(this); + } else { + ClearPRMessage.ClearResponse response = clearPRMessage.send(currentTarget, this); + if (response != null) { + this.prStats.incPartitionMessagesSent(); + result = response.waitForResult(); + } + } + if (result) { + return; + } + } catch (ForceReattemptException fre) { + checkReadiness(); + InternalDistributedMember lastTarget = currentTarget; + if (retryTime == null) { + retryTime = new RetryTimeKeeper(this.retryTimeout); + } + currentTarget = getNodeForBucketWrite(bucketId, retryTime); + if (lastTarget.equals(currentTarget)) { + if (logger.isDebugEnabled()) { + logger.debug("PR.sendClearMsgByBucket: Retrying at the same node:{} due to {}", + currentTarget, fre.getMessage()); + } + if (retryTime.overMaximum()) { + PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket", + this.retryTimeout); + // NOTREACHED + } + retryTime.waitToRetryNode(); + } else { + if (logger.isDebugEnabled()) { + logger.debug("PR.sendClearMsgByBucket: Old target was {}, Retrying {}", lastTarget, + currentTarget); + } + } + } + + // It's possible this is a GemFire thread e.g. ServerConnection + // which got to this point because of a distributed system shutdown or + // region closure which uses interrupt to break any sleep() or wait() + // calls + // e.g. waitForPrimary or waitForBucketRecovery in which case throw + // exception + checkShutdown(); + + // If we get here, the attempt failed... + if (count == 1) { + // TODO prStats add ClearPRMsg retried + this.prStats.incPutAllMsgsRetried(); + } + } + } + + List<ClearPRMessage> createClearPRMessages() { + ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>(); + for (int bucketId = 0; bucketId < this.totalNumberOfBuckets; bucketId++) { + ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId); + clearMsgList.add(clearPRMessage); + } + return clearMsgList; } @Override @@ -2601,7 +2786,7 @@ public class PartitionedRegion extends LocalRegion retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true); if (isDebugEnabled) { logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}", getEntrySize(event), currentTarget); @@ -2742,7 +2927,7 @@ public class PartitionedRegion extends LocalRegion retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true); if (logger.isDebugEnabled()) { logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}", getEntrySize(event), currentTarget); @@ -2987,7 +3172,7 @@ public class PartitionedRegion extends LocalRegion if (retryTime == null) { retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true); // It's possible this is a GemFire thread e.g. ServerConnection // which got to this point because of a distributed system shutdown or @@ -3146,10 +3331,11 @@ public class PartitionedRegion extends LocalRegion * @param retryTime the RetryTimeKeeper to track retry times * @param event the event used to get the entry size in the event a new bucket should be created * @param bucketId the identity of the bucket should it be created + * @param createIfNotExist boolean to indicate if to create a bucket if found not exist * @return a Node which contains the bucket, potentially null */ private InternalDistributedMember waitForNodeOrCreateBucket(RetryTimeKeeper retryTime, - EntryEventImpl event, Integer bucketId) { + EntryEventImpl event, Integer bucketId, boolean createIfNotExist) { InternalDistributedMember newNode; if (retryTime.overMaximum()) { PRHARedundancyProvider.timedOut(this, null, null, "allocate a bucket", @@ -3159,7 +3345,7 @@ public class PartitionedRegion extends LocalRegion retryTime.waitForBucketsRecovery(); newNode = getNodeForBucketWrite(bucketId, retryTime); - if (newNode == null) { + if (newNode == null && createIfNotExist) { newNode = createBucket(bucketId, getEntrySize(event), retryTime); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java index 402b7f2..f155a7e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java @@ -119,6 +119,11 @@ public class RegionEventImpl return region; } + public void setRegion(LocalRegion region) { + this.region = region; + this.distributedMember = region.getMyId(); + } + @Override public Operation getOperation() { return this.op; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java index 1a8aba1..9fa8057 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java @@ -26,7 +26,8 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.DataSerializer; import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.cache.CacheException; -import org.apache.geode.distributed.DistributedLockService; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.persistence.PartitionOfflineException; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DirectReplyProcessor; @@ -44,7 +45,6 @@ import org.apache.geode.internal.cache.BucketRegion; import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.cache.PartitionedRegionHelper; import org.apache.geode.internal.cache.RegionEventImpl; import org.apache.geode.internal.logging.log4j.LogMarker; import org.apache.geode.internal.serialization.DeserializationContext; @@ -54,16 +54,10 @@ import org.apache.geode.logging.internal.log4j.api.LogService; public class ClearPRMessage extends PartitionMessageWithDirectReply { private static final Logger logger = LogService.getLogger(); - private RegionEventImpl regionEvent; - private Integer bucketId; - /** The time in ms to wait for a lock to be obtained during doLocalClear() */ - public static final int LOCK_WAIT_TIMEOUT_MS = 1000; public static final String BUCKET_NON_PRIMARY_MESSAGE = "The bucket region on target member is no longer primary"; - public static final String BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE = - "A lock for the bucket region could not be obtained."; public static final String EXCEPTION_THROWN_DURING_CLEAR_OPERATION = "An exception was thrown during the local clear operation: "; @@ -79,14 +73,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { public ClearPRMessage(int bucketId) { this.bucketId = bucketId; - - // These are both used by the parent class, but don't apply to this message type - this.notificationOnly = false; - this.posDup = false; - } - - public void setRegionEvent(RegionEventImpl event) { - regionEvent = event; } public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients, @@ -103,16 +89,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { } } - @Override - public boolean isSevereAlertCompatible() { - // allow forced-disconnect processing for all cache op messages - return true; - } - - public RegionEventImpl getRegionEvent() { - return regionEvent; - } - public ClearResponse send(DistributedMember recipient, PartitionedRegion region) throws ForceReattemptException { Set<InternalDistributedMember> recipients = @@ -125,7 +101,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { Set<InternalDistributedMember> failures = region.getDistributionManager().putOutgoing(this); if (failures != null && failures.size() > 0) { - throw new ForceReattemptException("Failed sending <" + this + ">"); + throw new ForceReattemptException("Failed sending <" + this + "> due to " + failures); } return clearResponse; } @@ -143,7 +119,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { } else { InternalDataSerializer.writeSignedVL(bucketId, out); } - DataSerializer.writeObject(regionEvent, out); } @Override @@ -151,12 +126,11 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { throws IOException, ClassNotFoundException { super.fromData(in, context); this.bucketId = (int) InternalDataSerializer.readSignedVL(in); - this.regionEvent = DataSerializer.readObject(in); } @Override public EventID getEventID() { - return regionEvent.getEventId(); + return null; } /** @@ -169,60 +143,51 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager, PartitionedRegion region, long startTime) { try { - result = doLocalClear(region); + this.result = doLocalClear(region); } catch (ForceReattemptException ex) { sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region, startTime); return false; } - sendReply(getSender(), getProcessorId(), distributionManager, null, region, startTime); - return false; + return this.result; } - public boolean doLocalClear(PartitionedRegion region) throws ForceReattemptException { + public Integer getBucketId() { + return this.bucketId; + } + + public boolean doLocalClear(PartitionedRegion region) + throws ForceReattemptException { // Retrieve local bucket region which matches target bucketId - BucketRegion bucketRegion = region.getDataStore().getInitializedBucketForId(null, bucketId); + BucketRegion bucketRegion = + region.getDataStore().getInitializedBucketForId(null, this.bucketId); - // Check if we are primary, throw exception if not - if (!bucketRegion.isPrimary()) { + boolean lockedForPrimary = bucketRegion.doLockForPrimary(false); + // Check if we obtained primary lock, throw exception if not + if (!lockedForPrimary) { throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE); } - - DistributedLockService lockService = getPartitionRegionLockService(); - String lockName = bucketRegion.getFullPath(); try { - boolean locked = lockService.lock(lockName, LOCK_WAIT_TIMEOUT_MS, -1); - - if (!locked) { - throw new ForceReattemptException(BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE); - } - - // Double check if we are still primary, as this could have changed between our first check - // and obtaining the lock - if (!bucketRegion.isPrimary()) { - throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE); - } - - try { - bucketRegion.cmnClearRegion(regionEvent, true, true); - } catch (Exception ex) { - throw new ForceReattemptException( - EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex); - } - + RegionEventImpl regionEvent = new RegionEventImpl(); + regionEvent.setOperation(Operation.REGION_CLEAR); + regionEvent.setRegion(bucketRegion); + bucketRegion.cmnClearRegion(regionEvent, true, true); + } catch (PartitionOfflineException poe) { + logger.info( + "All members holding data for bucket {} are offline, no more retries will be attempted", + this.bucketId, + poe); + throw poe; + } catch (Exception ex) { + throw new ForceReattemptException( + EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex); } finally { - lockService.unlock(lockName); + bucketRegion.doUnlockForPrimary(); } return true; } - // Extracted for testing - protected DistributedLockService getPartitionRegionLockService() { - return DistributedLockService - .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME); - } - @Override public boolean canStartRemoteTransaction() { return false; @@ -247,39 +212,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { buff.append("; bucketId=").append(this.bucketId); } - @Override - public String toString() { - StringBuilder buff = new StringBuilder(); - String className = getClass().getName(); - buff.append(className.substring(className.indexOf(PN_TOKEN) + PN_TOKEN.length())); // partition.<foo> - buff.append("(prid="); // make sure this is the first one - buff.append(this.regionId); - - // Append name, if we have it - String name = null; - try { - PartitionedRegion region = PartitionedRegion.getPRFromId(this.regionId); - if (region != null) { - name = region.getFullPath(); - } - } catch (Exception ignore) { - /* ignored */ - } - if (name != null) { - buff.append(" (name = \"").append(name).append("\")"); - } - - appendFields(buff); - buff.append(" ,distTx="); - buff.append(this.isTransactionDistributed); - buff.append(")"); - return buff.toString(); - } - public static class ClearReplyMessage extends ReplyMessage { - /** Result of the Clear operation */ - boolean result; - @Override public boolean getInlineProcess() { return true; @@ -293,16 +226,21 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { private ClearReplyMessage(int processorId, boolean result, ReplyException ex) { super(); - this.result = result; setProcessorId(processorId); - setException(ex); + if (ex != null) { + setException(ex); + } else { + setReturnValue(result); + } } - /** Send an ack */ + /** + * Send an ack + */ public static void send(InternalDistributedMember recipient, int processorId, ReplySender replySender, boolean result, ReplyException ex) { - Assert.assertTrue(recipient != null, "ClearReplyMessage NULL reply message"); + Assert.assertNotNull(recipient, "ClearReplyMessage recipient was NULL."); ClearReplyMessage message = new ClearReplyMessage(processorId, result, ex); message.setRecipient(recipient); replySender.putOutgoing(message); @@ -340,23 +278,11 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { } @Override - public void fromData(DataInput in, - DeserializationContext context) throws IOException, ClassNotFoundException { - super.fromData(in, context); - this.result = in.readBoolean(); - } - - @Override - public void toData(DataOutput out, - SerializationContext context) throws IOException { - super.toData(out, context); - out.writeBoolean(this.result); - } - - @Override public String toString() { - return "ClearReplyMessage " + "processorid=" + this.processorId + " returning " + this.result - + " exception=" + getException(); + StringBuilder stringBuilder = new StringBuilder(super.toString()); + stringBuilder.append(" returnValue="); + stringBuilder.append(getReturnValue()); + return stringBuilder.toString(); } } @@ -372,7 +298,9 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { } public void setResponse(ClearReplyMessage response) { - this.returnValue = response.result; + if (response.getException() == null) { + this.returnValue = (boolean) response.getReturnValue(); + } } /** diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java index 2cf5231..acdd4fc 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java @@ -20,7 +20,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.notNull; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doNothing; @@ -38,7 +37,6 @@ import java.util.Set; import org.junit.Before; import org.junit.Test; -import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DistributionManager; @@ -50,6 +48,7 @@ import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.PartitionedRegionDataStore; import org.apache.geode.internal.cache.PartitionedRegionStats; +import org.apache.geode.internal.cache.RegionEventImpl; public class ClearPRMessageTest { @@ -61,11 +60,14 @@ public class ClearPRMessageTest { @Before public void setup() throws ForceReattemptException { message = spy(new ClearPRMessage()); + InternalDistributedMember member = mock(InternalDistributedMember.class); region = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS); dataStore = mock(PartitionedRegionDataStore.class); when(region.getDataStore()).thenReturn(dataStore); + when(region.getFullPath()).thenReturn("/test"); bucketRegion = mock(BucketRegion.class); when(dataStore.getInitializedBucketForId(any(), any())).thenReturn(bucketRegion); + RegionEventImpl bucketRegionEventImpl = mock(RegionEventImpl.class); } @Test @@ -79,44 +81,19 @@ public class ClearPRMessageTest { @Test public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() { - DistributedLockService mockLockService = mock(DistributedLockService.class); - doReturn(mockLockService).when(message).getPartitionRegionLockService(); - - when(mockLockService.lock(anyString(), anyLong(), anyLong())).thenReturn(false); - when(bucketRegion.isPrimary()).thenReturn(true); - - assertThatThrownBy(() -> message.doLocalClear(region)) - .isInstanceOf(ForceReattemptException.class) - .hasMessageContaining(ClearPRMessage.BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE); - } - - @Test - public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAfterObtainingLock() { - DistributedLockService mockLockService = mock(DistributedLockService.class); - doReturn(mockLockService).when(message).getPartitionRegionLockService(); - - // Be primary on the first check, then be not primary on the second check - when(bucketRegion.isPrimary()).thenReturn(true).thenReturn(false); - when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); + when(bucketRegion.doLockForPrimary(false)).thenReturn(false); assertThatThrownBy(() -> message.doLocalClear(region)) .isInstanceOf(ForceReattemptException.class) .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); - // Confirm that we actually obtained and released the lock - verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong()); - verify(mockLockService, times(1)).unlock(any()); } @Test public void doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation() { - DistributedLockService mockLockService = mock(DistributedLockService.class); - doReturn(mockLockService).when(message).getPartitionRegionLockService(); NullPointerException exception = new NullPointerException("Error encountered"); doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), anyBoolean()); - // Be primary on the first check, then be not primary on the second check - when(bucketRegion.isPrimary()).thenReturn(true); - when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); + when(bucketRegion.doLockForPrimary(false)).thenReturn(true); assertThatThrownBy(() -> message.doLocalClear(region)) .isInstanceOf(ForceReattemptException.class) @@ -129,21 +106,13 @@ public class ClearPRMessageTest { @Test public void doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained() throws ForceReattemptException { - DistributedLockService mockLockService = mock(DistributedLockService.class); - doReturn(mockLockService).when(message).getPartitionRegionLockService(); - // Be primary on the first check, then be not primary on the second check - when(bucketRegion.isPrimary()).thenReturn(true); - when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); + when(bucketRegion.doLockForPrimary(false)).thenReturn(true); assertThat(message.doLocalClear(region)).isTrue(); // Confirm that cmnClearRegion was called verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean()); - - // Confirm that we actually obtained and released the lock - verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong()); - verify(mockLockService, times(1)).unlock(any()); } @Test @@ -197,6 +166,7 @@ public class ClearPRMessageTest { int processorId = 1000; int startTime = 0; + doReturn(0).when(message).getBucketId(); doReturn(true).when(message).doLocalClear(region); doReturn(sender).when(message).getSender(); doReturn(processorId).when(message).getProcessorId(); @@ -206,8 +176,9 @@ public class ClearPRMessageTest { doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong()); message.operateOnPartitionedRegion(distributionManager, region, startTime); + assertThat(message.result).isTrue(); - verify(message, times(1)).sendReply(sender, processorId, distributionManager, null, region, + verify(message, times(0)).sendReply(sender, processorId, distributionManager, null, region, startTime); } @@ -222,6 +193,7 @@ public class ClearPRMessageTest { ForceReattemptException exception = new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); + doReturn(0).when(message).getBucketId(); doThrow(exception).when(message).doLocalClear(region); doReturn(sender).when(message).getSender(); doReturn(processorId).when(message).getProcessorId();