This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 5726d03fd78ced79a67b1526d18cc5033f1dbee4 Author: Xiaojian Zhou <gesterz...@users.noreply.github.com> AuthorDate: Tue Apr 13 14:21:34 2021 -0700 GEODE-7674: Clear on PR with lucene index should throw exception (#6317) --- .../internal/AsyncEventQueueFactoryImpl.java | 6 ++++++ .../asyncqueue/internal/AsyncEventQueueImpl.java | 5 +++++ .../org/apache/geode/cache/wan/GatewaySender.java | 2 ++ .../internal/cache/PartitionedRegionClear.java | 11 +++++++++++ .../internal/cache/wan/AbstractGatewaySender.java | 12 +++++++++++ .../cache/wan/GatewaySenderAttributes.java | 7 +++++++ .../internal/cache/wan/InternalGatewaySender.java | 2 ++ .../internal/cache/PartitionedRegionClearTest.java | 10 ++++++++-- .../cache/lucene/LuceneIndexCreationDUnitTest.java | 23 ++++++++++++++++++++++ .../cache/lucene/internal/LuceneIndexImpl.java | 1 + 10 files changed, 77 insertions(+), 2 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java index 700cc4b..089c058 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java @@ -283,6 +283,12 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory { return this; } + // keep this method internal + public AsyncEventQueueFactory setPartitionedRegionClearUnsupported(boolean status) { + gatewaySenderAttributes.partitionedRegionClearUnsupported = status; + return this; + } + @Override public AsyncEventQueueFactory pauseEventDispatching() { pauseEventsDispatching = true; diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java index 2e5cfeb..e9e4f43 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java @@ -199,6 +199,11 @@ public class AsyncEventQueueImpl implements InternalAsyncEventQueue { return sender.isForwardExpirationDestroy(); } + // keep this method internal + public boolean isPartitionedRegionClearUnsupported() { + return sender.isPartitionedRegionClearUnsupported(); + } + public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException { return sender.waitUntilFlushed(timeout, unit); } diff --git a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java index 32dcec7..72ef716 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java @@ -102,6 +102,8 @@ public interface GatewaySender { boolean DEFAULT_FORWARD_EXPIRATION_DESTROY = false; + boolean DEFAULT_PARTITIONED_REGION_CLEAR_UNSUPPORTED = false; + @Immutable OrderPolicy DEFAULT_ORDER_POLICY = OrderPolicy.KEY; /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java index 539f682..569f78c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java @@ -27,6 +27,8 @@ import org.apache.geode.cache.CacheWriterException; import org.apache.geode.cache.Operation; import org.apache.geode.cache.OperationAbortedException; import org.apache.geode.cache.PartitionedRegionPartialClearException; +import org.apache.geode.cache.asyncqueue.AsyncEventQueue; +import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.partition.PartitionRegionHelper; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.MembershipListener; @@ -396,6 +398,15 @@ public class PartitionedRegionClear { // Force all primary buckets to be created before clear. assignAllPrimaryBuckets(); + for (AsyncEventQueue asyncEventQueue : partitionedRegion.getCache() + .getAsyncEventQueues(false)) { + if (((AsyncEventQueueImpl) asyncEventQueue).isPartitionedRegionClearUnsupported()) { + throw new UnsupportedOperationException( + "Clear is not supported on region " + partitionedRegion.getFullPath() + + " because it has a lucene index"); + } + } + // do cacheWrite if (cacheWrite) { invokeCacheWriter(regionEvent); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index e5fa44a..1a4930e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -143,6 +143,12 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di protected boolean forwardExpirationDestroy; + /** + * An attribute to specify if Partitioned region clear operation is unsupported. + * Default is false. + */ + protected boolean partitionedRegionClearUnsupported; + protected GatewayEventSubstitutionFilter substitutionFilter; protected LocatorDiscoveryCallback locatorDiscoveryCallback; @@ -291,6 +297,7 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di } isBucketSorted = attrs.isBucketSorted(); forwardExpirationDestroy = attrs.isForwardExpirationDestroy(); + partitionedRegionClearUnsupported = attrs.isPartitionedRegionClearUnsupported(); } public GatewaySenderAdvisor getSenderAdvisor() { @@ -388,6 +395,11 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di } @Override + public boolean isPartitionedRegionClearUnsupported() { + return this.partitionedRegionClearUnsupported; + } + + @Override public boolean isManualStart() { return manualStart; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java index 74f9c42..0baa896 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java @@ -89,6 +89,9 @@ public class GatewaySenderAttributes { private boolean forwardExpirationDestroy = GatewaySender.DEFAULT_FORWARD_EXPIRATION_DESTROY; + public boolean partitionedRegionClearUnsupported = + GatewaySender.DEFAULT_PARTITIONED_REGION_CLEAR_UNSUPPORTED; + private boolean enforceThreadsConnectSameReceiver = GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER; @@ -320,6 +323,10 @@ public class GatewaySenderAttributes { return forwardExpirationDestroy; } + public boolean isPartitionedRegionClearUnsupported() { + return this.partitionedRegionClearUnsupported; + } + public boolean getEnforceThreadsConnectSameReceiver() { return enforceThreadsConnectSameReceiver; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java index 13e36e7..d71297c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java @@ -40,6 +40,8 @@ public interface InternalGatewaySender extends GatewaySender { boolean isForwardExpirationDestroy(); + boolean isPartitionedRegionClearUnsupported(); + boolean getIsMetaQueue(); InternalCache getCache(); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java index 3b66e67..721d236 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java @@ -42,6 +42,7 @@ import org.mockito.ArgumentCaptor; import org.apache.geode.CancelCriterion; import org.apache.geode.cache.PartitionedRegionPartialClearException; import org.apache.geode.cache.Region; +import org.apache.geode.cache.asyncqueue.AsyncEventQueue; import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DistributionManager; @@ -56,15 +57,18 @@ import org.apache.geode.internal.serialization.KnownVersion; public class PartitionedRegionClearTest { + private GemFireCacheImpl cache; + private HashSet<AsyncEventQueue> allAEQs = new HashSet<>(); + private PartitionedRegionClear partitionedRegionClear; private DistributionManager distributionManager; private PartitionedRegion partitionedRegion; private RegionAdvisor regionAdvisor; private InternalDistributedMember internalDistributedMember; - private PartitionedRegionClear partitionedRegionClear; - @Before public void setUp() { + + cache = mock(GemFireCacheImpl.class); distributionManager = mock(DistributionManager.class); internalDistributedMember = mock(InternalDistributedMember.class); partitionedRegion = mock(PartitionedRegion.class); @@ -73,6 +77,8 @@ public class PartitionedRegionClearTest { when(distributionManager.getDistributionManagerId()).thenReturn(internalDistributedMember); when(distributionManager.getId()).thenReturn(internalDistributedMember); when(internalDistributedMember.getVersion()).thenReturn(KnownVersion.CURRENT); + when(partitionedRegion.getCache()).thenReturn(cache); + when(cache.getAsyncEventQueues(false)).thenReturn(allAEQs); when(partitionedRegion.getDistributionManager()).thenReturn(distributionManager); when(partitionedRegion.getName()).thenReturn("prRegion"); when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor); diff --git a/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java b/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java index 45baa11..4666128 100644 --- a/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java +++ b/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java @@ -28,7 +28,9 @@ import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.INDEX_NAME; import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.REGION_NAME; import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; import java.io.FileInputStream; @@ -107,6 +109,27 @@ public class LuceneIndexCreationDUnitTest extends LuceneDUnitTest { } @Test + public void verifyThrowExceptionWhenClearOnRegionWithLuceneIndex() { + SerializableRunnableIF createIndex = getFieldsIndexWithOneField(); + dataStore1.invoke(() -> { + initDataStore(createIndex, RegionTestableType.PARTITION_REDUNDANT); + Region<Object, Object> region = cache.getRegion(REGION_NAME); + assertNotNull(region); + assertThrows(UnsupportedOperationException.class, () -> region.clear()); + }); + } + + @Test + public void verifyNotThrowExceptionWhenClearOnRegionWithoutLuceneIndex() { + dataStore1.invoke(() -> { + initDataStore(RegionTestableType.PARTITION_REDUNDANT); + Region<Object, Object> region = cache.getRegion(REGION_NAME); + assertNotNull(region); + region.clear(); + }); + } + + @Test public void verifyThatEmptyListIsOutputWhenThereAreNoIndexesInTheSystem() { dataStore1.invoke(() -> verifyIndexList(0)); dataStore2.invoke(() -> verifyIndexList(0)); diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java index 3a23282..be2bd58 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java @@ -197,6 +197,7 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { factory.setDiskStoreName(attributes.getDiskStoreName()); factory.setDiskSynchronous(true); factory.setForwardExpirationDestroy(true); + factory.setPartitionedRegionClearUnsupported(true); return factory; }