This is an automated email from the ASF dual-hosted git repository. mhanson pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-7665 by this push: new 55550d3 GEODE-7845 blocking PR region clear if one or more server versions are too old (#5577) 55550d3 is described below commit 55550d31e2fcf63c22eef9153bdc0ac3ec4ba575 Author: mhansonp <hans...@vmware.com> AuthorDate: Tue Oct 13 10:10:25 2020 -0700 GEODE-7845 blocking PR region clear if one or more server versions are too old (#5577) - if a server is running an old version when a PR clear is invoked by the client, the client will receive a ServerOperationException with a cause of ServerVersionMismatchException. --- .../integrationTest/resources/assembly_content.txt | 1 + .../main/java/org/apache/geode/cache/Region.java | 2 + .../cache/ServerVersionMismatchException.java | 34 ++ .../geode/internal/cache/PartitionedRegion.java | 1 + .../internal/cache/PartitionedRegionClear.java | 34 +- .../sanctioned-geode-core-serializables.txt | 1 + .../internal/cache/PartitionedRegionClearTest.java | 109 ++++-- .../RollingUpgrade2DUnitTestBase.java | 4 +- ...ionRegionClearMixedServerPartitionedRegion.java | 412 +++++++++++++++++++++ 9 files changed, 571 insertions(+), 27 deletions(-) diff --git a/geode-assembly/src/integrationTest/resources/assembly_content.txt b/geode-assembly/src/integrationTest/resources/assembly_content.txt index 549150f..553785a 100644 --- a/geode-assembly/src/integrationTest/resources/assembly_content.txt +++ b/geode-assembly/src/integrationTest/resources/assembly_content.txt @@ -245,6 +245,7 @@ javadoc/org/apache/geode/cache/RoleEvent.html javadoc/org/apache/geode/cache/RoleException.html javadoc/org/apache/geode/cache/Scope.html javadoc/org/apache/geode/cache/SerializedCacheValue.html +javadoc/org/apache/geode/cache/ServerVersionMismatchException.html javadoc/org/apache/geode/cache/StatisticsDisabledException.html javadoc/org/apache/geode/cache/SubscriptionAttributes.html javadoc/org/apache/geode/cache/SynchronizationCommitConflictException.html diff --git a/geode-core/src/main/java/org/apache/geode/cache/Region.java b/geode-core/src/main/java/org/apache/geode/cache/Region.java index 4707a46..5162bd5 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/Region.java +++ b/geode-core/src/main/java/org/apache/geode/cache/Region.java @@ -1307,6 +1307,8 @@ public interface Region<K, V> extends ConcurrentMap<K, V> { * @throws PartitionedRegionPartialClearException when data is partially cleared on partitioned * region. It is caller responsibility to handle the partial data clear either by retrying * the clear operation or continue working with the partially cleared partitioned region. + * @throws ServerVersionMismatchException when data was not cleared because one or more + * of the member servers' version was too old to understand the clear message. */ @Override void clear(); diff --git a/geode-core/src/main/java/org/apache/geode/cache/ServerVersionMismatchException.java b/geode-core/src/main/java/org/apache/geode/cache/ServerVersionMismatchException.java new file mode 100644 index 0000000..1d4231a --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/cache/ServerVersionMismatchException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache; + +import java.util.List; + +/** + * Indicates a failure to perform an operation on a Partitioned Region due to + * server versions not meeting requirements. + * + * @since GEODE 1.14.0 + */ +public class ServerVersionMismatchException extends CacheRuntimeException { + private static final long serialVersionUID = -3004093739855972548L; + + public ServerVersionMismatchException(List<String> members, String featureName, + String version) { + super( + "A server's " + members + " version was too old (< " + version + ") for : " + featureName); + + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index b9572c4..256850b 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -5328,6 +5328,7 @@ public class PartitionedRegion extends LocalRegion return this.totalNumberOfBuckets; } + @Override public void basicDestroy(final EntryEventImpl event, final boolean cacheWrite, final Object expectedOldValue) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java index e8b01d8..0e5acfc 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java @@ -14,8 +14,10 @@ */ package org.apache.geode.internal.cache; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.logging.log4j.Logger; @@ -25,11 +27,13 @@ import org.apache.geode.cache.CacheWriterException; import org.apache.geode.cache.Operation; import org.apache.geode.cache.OperationAbortedException; import org.apache.geode.cache.PartitionedRegionPartialClearException; +import org.apache.geode.cache.ServerVersionMismatchException; import org.apache.geode.cache.partition.PartitionRegionHelper; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.MembershipListener; import org.apache.geode.distributed.internal.ReplyException; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.logging.internal.log4j.api.LogService; public class PartitionedRegionClear { @@ -289,7 +293,8 @@ public class PartitionedRegionClear { } final Set<InternalDistributedMember> configRecipients = - new HashSet<>(partitionedRegion.getRegionAdvisor().adviseAllPRNodes()); + new HashSet<>(partitionedRegion.getRegionAdvisor() + .adviseAllPRNodes()); try { final PartitionRegionConfig prConfig = @@ -310,8 +315,7 @@ public class PartitionedRegionClear { try { PartitionedRegionClearMessage.PartitionedRegionClearResponse resp = new PartitionedRegionClearMessage.PartitionedRegionClearResponse( - partitionedRegion.getSystem(), - configRecipients); + partitionedRegion.getSystem(), configRecipients); PartitionedRegionClearMessage partitionedRegionClearMessage = new PartitionedRegionClearMessage(configRecipients, partitionedRegion, resp, op, event); partitionedRegionClearMessage.send(); @@ -334,10 +338,34 @@ public class PartitionedRegionClear { return bucketsOperated; } + /** + * This method returns a boolean to indicate if all server versions support Partition Region clear + */ + public void allServerVersionsSupportPartitionRegionClear() { + List<String> memberNames = new ArrayList<>(); + for (int i = 0; i < partitionedRegion.getTotalNumberOfBuckets(); i++) { + InternalDistributedMember internalDistributedMember = partitionedRegion.getBucketPrimary(i); + if ((internalDistributedMember != null) + && (internalDistributedMember.getVersion().isOlderThan(KnownVersion.GEODE_1_14_0))) { + if (!memberNames.contains(internalDistributedMember.getName())) { + memberNames.add(internalDistributedMember.getName()); + logger.info("MLH adding " + internalDistributedMember.getName()); + } + } + } + if (!memberNames.isEmpty()) { + throw new ServerVersionMismatchException(memberNames, "Partitioned Region Clear", + KnownVersion.GEODE_1_14_0.toString()); + } + } + + void doClear(RegionEventImpl regionEvent, boolean cacheWrite) { String lockName = CLEAR_OPERATION + partitionedRegion.getName(); long clearStartTime = 0; + allServerVersionsSupportPartitionRegionClear(); + try { // distributed lock to make sure only one clear op is in progress in the cluster. acquireDistributedClearLock(lockName); diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt index 644fbc2..86e2372 100644 --- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt +++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt @@ -92,6 +92,7 @@ org/apache/geode/cache/ResourceException,true,-5559328592343363268 org/apache/geode/cache/ResumptionAction,true,6632254151314915610,ordinal:byte org/apache/geode/cache/RoleException,true,-7521056108445887394 org/apache/geode/cache/Scope,true,5534399159504301602,ordinal:int +org/apache/geode/cache/ServerVersionMismatchException,true,-3004093739855972548 org/apache/geode/cache/StatisticsDisabledException,true,-2987721454129719551 org/apache/geode/cache/SynchronizationCommitConflictException,true,2619806460255259492 org/apache/geode/cache/TimeoutException,true,-6260761691185737442 diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java index bd37d9e..bd78fd0 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java @@ -37,6 +37,7 @@ import org.mockito.ArgumentCaptor; import org.apache.geode.CancelCriterion; import org.apache.geode.cache.PartitionedRegionPartialClearException; import org.apache.geode.cache.Region; +import org.apache.geode.cache.ServerVersionMismatchException; import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DistributionManager; @@ -44,6 +45,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.MembershipListener; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.partitioned.RegionAdvisor; +import org.apache.geode.internal.serialization.KnownVersion; public class PartitionedRegionClearTest { @@ -51,6 +53,8 @@ public class PartitionedRegionClearTest { private PartitionedRegionClear partitionedRegionClear; private DistributionManager distributionManager; private PartitionedRegion partitionedRegion; + private RegionAdvisor regionAdvisor; + private InternalDistributedMember internalDistributedMember; @Before public void setUp() { @@ -62,6 +66,14 @@ public class PartitionedRegionClearTest { when(partitionedRegion.getName()).thenReturn("prRegion"); partitionedRegionClear = new PartitionedRegionClear(partitionedRegion); + internalDistributedMember = mock(InternalDistributedMember.class); + when(internalDistributedMember.getVersion()).thenReturn(KnownVersion.CURRENT); + regionAdvisor = mock(RegionAdvisor.class); + when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor); + when(regionAdvisor.getDistributionManager()).thenReturn(distributionManager); + when(distributionManager.getDistributionManagerId()).thenReturn(internalDistributedMember); + when(distributionManager.getId()).thenReturn(internalDistributedMember); + } private Set<BucketRegion> setupBucketRegions( @@ -85,7 +97,6 @@ public class PartitionedRegionClearTest { @Test public void isLockedForListenerAndClientNotificationReturnsTrueWhenLocked() { - InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class); when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(true); partitionedRegionClear.obtainClearLockLocal(internalDistributedMember); @@ -94,7 +105,6 @@ public class PartitionedRegionClearTest { @Test public void isLockedForListenerAndClientNotificationReturnsFalseWhenMemberNotInTheSystemRequestsLock() { - InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class); when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(false); assertThat(partitionedRegionClear.isLockedForListenerAndClientNotification()).isFalse(); @@ -132,8 +142,6 @@ public class PartitionedRegionClearTest { doReturn(Collections.EMPTY_SET).when(spyPartitionedRegionClear) .attemptToSendPartitionedRegionClearMessage(regionEvent, PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR); - InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class); - when(distributionManager.getId()).thenReturn(internalDistributedMember); spyPartitionedRegionClear.obtainLockForClear(regionEvent); @@ -152,8 +160,6 @@ public class PartitionedRegionClearTest { doReturn(Collections.EMPTY_SET).when(spyPartitionedRegionClear) .attemptToSendPartitionedRegionClearMessage(regionEvent, PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR); - InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class); - when(distributionManager.getId()).thenReturn(internalDistributedMember); spyPartitionedRegionClear.releaseLockForClear(regionEvent); @@ -172,8 +178,6 @@ public class PartitionedRegionClearTest { doReturn(Collections.EMPTY_SET).when(spyPartitionedRegionClear) .attemptToSendPartitionedRegionClearMessage(regionEvent, PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR); - InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class); - when(distributionManager.getId()).thenReturn(internalDistributedMember); spyPartitionedRegionClear.clearRegion(regionEvent); @@ -330,13 +334,12 @@ public class PartitionedRegionClearTest { PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class); Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor); when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore); - InternalDistributedMember member = mock(InternalDistributedMember.class); - when(distributionManager.isCurrentMember(member)).thenReturn(true); + when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(true); - partitionedRegionClear.obtainClearLockLocal(member); + partitionedRegionClear.obtainClearLockLocal(internalDistributedMember); assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester()) - .isSameAs(member); + .isSameAs(internalDistributedMember); for (BucketRegion bucketRegion : buckets) { verify(bucketRegion, times(1)).lockLocallyForClear(partitionedRegion.getDistributionManager(), partitionedRegion.getMyId(), null); @@ -350,10 +353,9 @@ public class PartitionedRegionClearTest { PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class); Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor); when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore); - InternalDistributedMember member = mock(InternalDistributedMember.class); - when(distributionManager.isCurrentMember(member)).thenReturn(false); + when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(false); - partitionedRegionClear.obtainClearLockLocal(member); + partitionedRegionClear.obtainClearLockLocal(internalDistributedMember); assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester()) .isNull(); @@ -370,9 +372,9 @@ public class PartitionedRegionClearTest { PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class); Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor); when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore); - InternalDistributedMember member = mock(InternalDistributedMember.class); - when(distributionManager.isCurrentMember(member)).thenReturn(true); - partitionedRegionClear.lockForListenerAndClientNotification.setLocked(member); + when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(true); + partitionedRegionClear.lockForListenerAndClientNotification + .setLocked(internalDistributedMember); partitionedRegionClear.releaseClearLockLocal(); @@ -405,13 +407,11 @@ public class PartitionedRegionClearTest { Region<String, PartitionRegionConfig> prRoot = mock(Region.class); when(partitionedRegion.getPRRoot()).thenReturn(prRoot); InternalDistributedMember member = mock(InternalDistributedMember.class); - RegionAdvisor regionAdvisor = mock(RegionAdvisor.class); Set<InternalDistributedMember> prNodes = Collections.singleton(member); Node node = mock(Node.class); when(node.getMemberId()).thenReturn(member); Set<Node> configNodes = Collections.singleton(node); when(regionAdvisor.adviseAllPRNodes()).thenReturn(prNodes); - when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor); PartitionRegionConfig partitionRegionConfig = mock(PartitionRegionConfig.class); when(partitionRegionConfig.getNodes()).thenReturn(configNodes); when(prRoot.get(any())).thenReturn(partitionRegionConfig); @@ -423,7 +423,7 @@ public class PartitionedRegionClearTest { when(txManager.isDistributed()).thenReturn(false); when(internalCache.getTxManager()).thenReturn(txManager); when(partitionedRegion.getCache()).thenReturn(internalCache); - + when(member.getVersion()).thenReturn(KnownVersion.getCurrentVersion()); when(distributionManager.getCancelCriterion()).thenReturn(mock(CancelCriterion.class)); when(distributionManager.getStats()).thenReturn(mock(DMStats.class)); @@ -433,6 +433,8 @@ public class PartitionedRegionClearTest { verify(distributionManager, times(1)).putOutgoing(any()); } + + @Test public void doClearAcquiresAndReleasesDistributedClearLockAndCreatesAllPrimaryBuckets() { RegionEventImpl regionEvent = mock(RegionEventImpl.class); @@ -458,7 +460,6 @@ public class PartitionedRegionClearTest { doReturn(Collections.EMPTY_SET).when(spyPartitionedRegionClear).clearRegion(regionEvent); spyPartitionedRegionClear.doClear(regionEvent, cacheWrite); - verify(spyPartitionedRegionClear, times(1)).invokeCacheWriter(regionEvent); } @@ -558,6 +559,70 @@ public class PartitionedRegionClearTest { } @Test + public void doClearThrowsServerVersionMismatchException() { + boolean cacheWrite = false; + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(partitionedRegion.hasListener()).thenReturn(false); + when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false); + when(partitionedRegion.getTotalNumberOfBuckets()).thenReturn(2); + when(partitionedRegion.getName()).thenReturn("prRegion"); + PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); + doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any()); + doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets(); + doNothing().when(spyPartitionedRegionClear).obtainLockForClear(regionEvent); + doNothing().when(spyPartitionedRegionClear).releaseLockForClear(regionEvent); + doReturn(Collections.singleton("2")).when(spyPartitionedRegionClear).clearRegion(regionEvent); + + when(regionEvent.clone()).thenReturn(mock(RegionEventImpl.class)); + Region<String, PartitionRegionConfig> prRoot = mock(Region.class); + when(partitionedRegion.getPRRoot()).thenReturn(prRoot); + InternalDistributedMember member = mock(InternalDistributedMember.class); + InternalDistributedMember oldMember = mock(InternalDistributedMember.class); + Set<InternalDistributedMember> prNodes = new HashSet<>(); + prNodes.add(member); + prNodes.add(oldMember); + Node node = mock(Node.class); + Node oldNode = mock(Node.class); + when(member.getName()).thenReturn("member"); + when(oldMember.getName()).thenReturn("oldMember"); + when(node.getMemberId()).thenReturn(member); + when(oldNode.getMemberId()).thenReturn(oldMember); + Set<Node> configNodes = new HashSet<>(); + configNodes.add(node); + configNodes.add(oldNode); + when(partitionedRegion.getBucketPrimary(0)).thenReturn(member); + when(partitionedRegion.getBucketPrimary(1)).thenReturn(oldMember); + + when(regionAdvisor.adviseAllPRNodes()).thenReturn(prNodes); + PartitionRegionConfig partitionRegionConfig = mock(PartitionRegionConfig.class); + when(partitionRegionConfig.getNodes()).thenReturn(configNodes); + when(prRoot.get(any())).thenReturn(partitionRegionConfig); + InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class); + when(internalDistributedSystem.getDistributionManager()).thenReturn(distributionManager); + when(partitionedRegion.getSystem()).thenReturn(internalDistributedSystem); + InternalCache internalCache = mock(InternalCache.class); + TXManagerImpl txManager = mock(TXManagerImpl.class); + when(txManager.isDistributed()).thenReturn(false); + when(internalCache.getTxManager()).thenReturn(txManager); + when(partitionedRegion.getCache()).thenReturn(internalCache); + when(oldMember.getVersion()).thenReturn(KnownVersion.GEODE_1_11_0); + when(member.getVersion()).thenReturn(KnownVersion.getCurrentVersion()); + when(distributionManager.getCancelCriterion()).thenReturn(mock(CancelCriterion.class)); + when(distributionManager.getStats()).thenReturn(mock(DMStats.class)); + + + Throwable thrown = + catchThrowable(() -> spyPartitionedRegionClear.doClear(regionEvent, cacheWrite)); + + assertThat(thrown) + .isInstanceOf(ServerVersionMismatchException.class) + .hasMessage( + "A server's [oldMember] version was too old (< GEODE 1.14.0) for : Partitioned Region Clear"); + } + + + + @Test public void handleClearFromDepartedMemberReleasesTheLockForRequesterDeparture() { InternalDistributedMember member = mock(InternalDistributedMember.class); partitionedRegionClear.lockForListenerAndClientNotification.setLocked(member); diff --git a/geode-core/src/upgradeTest/java/org/apache/geode/internal/cache/rollingupgrade/RollingUpgrade2DUnitTestBase.java b/geode-core/src/upgradeTest/java/org/apache/geode/internal/cache/rollingupgrade/RollingUpgrade2DUnitTestBase.java index 293bc69..6181e56 100755 --- a/geode-core/src/upgradeTest/java/org/apache/geode/internal/cache/rollingupgrade/RollingUpgrade2DUnitTestBase.java +++ b/geode-core/src/upgradeTest/java/org/apache/geode/internal/cache/rollingupgrade/RollingUpgrade2DUnitTestBase.java @@ -987,7 +987,7 @@ public abstract class RollingUpgrade2DUnitTestBase extends JUnit4DistributedTest return clientCache; } - private static boolean assertRegionExists(GemFireCache cache, String regionName) { + protected static boolean assertRegionExists(GemFireCache cache, String regionName) { Region region = cache.getRegion(regionName); if (region == null) { throw new Error("Region: " + regionName + " does not exist"); @@ -995,7 +995,7 @@ public abstract class RollingUpgrade2DUnitTestBase extends JUnit4DistributedTest return true; } - private static Region getRegion(GemFireCache cache, String regionName) { + protected static Region getRegion(GemFireCache cache, String regionName) { return cache.getRegion(regionName); } diff --git a/geode-core/src/upgradeTest/java/org/apache/geode/internal/cache/rollingupgrade/RollingUpgradePartitionRegionClearMixedServerPartitionedRegion.java b/geode-core/src/upgradeTest/java/org/apache/geode/internal/cache/rollingupgrade/RollingUpgradePartitionRegionClearMixedServerPartitionedRegion.java new file mode 100644 index 0000000..bfcd651 --- /dev/null +++ b/geode-core/src/upgradeTest/java/org/apache/geode/internal/cache/rollingupgrade/RollingUpgradePartitionRegionClearMixedServerPartitionedRegion.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.rollingupgrade; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; + +import java.io.File; +import java.lang.reflect.Constructor; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.Logger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runners.Parameterized.UseParametersRunnerFactory; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.GemFireCache; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.ServerVersionMismatchException; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.ServerOperationException; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.test.dunit.DistributedTestUtils; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.dunit.Invoke; +import org.apache.geode.test.dunit.NetworkUtils; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.internal.DUnitLauncher; +import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; +import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; +import org.apache.geode.test.version.VersionManager; + +@RunWith(Parameterized.class) +@UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class) +public class RollingUpgradePartitionRegionClearMixedServerPartitionedRegion + extends JUnit4DistributedTestCase { + + protected static final Logger logger = LogService.getLogger(); + protected static GemFireCache cache; + protected static ClientCache clientcache; + + @Parameter + public String oldVersion; + + @Parameters(name = "from_v{0}") + public static Collection<String> data() { + List<String> result = VersionManager.getInstance().getVersionsWithoutCurrent(); + if (result.size() < 1) { + throw new RuntimeException("No older versions of Geode were found to test against"); + } else { + System.out.println("running against these versions: " + result); + } + return result; + } + + @Test + public void testPutAndGetMixedServerPartitionedRegion() throws Exception { + doTestPutAndGetMixedServers(oldVersion); + } + + /** + * This test starts up multiple servers from the current code base and multiple servers from the + * old version and executes puts and gets on a new server and old server and verifies that the + * results are present. Note that the puts have overlapping region keys just to test new puts and + * replaces + */ + void doTestPutAndGetMixedServers(String oldVersion) + throws Exception { + VM currentServer1 = VM.getVM(VersionManager.CURRENT_VERSION, 0); + VM oldServerAndLocator = VM.getVM(oldVersion, 1); + VM currentServer2 = VM.getVM(VersionManager.CURRENT_VERSION, 2); + VM oldServer2 = VM.getVM(oldVersion, 3); + + String regionName = "aRegion"; + + final String serverHostName = NetworkUtils.getServerHostName(); + final int port = AvailablePortHelper.getRandomAvailableTCPPort(); + oldServerAndLocator.invoke(() -> DistributedTestUtils.deleteLocatorStateFile(port)); + try { + final Properties props = getSystemProperties(); + props.remove(DistributionConfig.LOCATORS_NAME); + + // Fire up the locator and server + oldServerAndLocator.invoke(() -> { + props.put(DistributionConfig.START_LOCATOR_NAME, + "" + serverHostName + "[" + port + "]"); + props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); + cache = createCache(props); + Thread.sleep(5000); // bug in 1.0 - cluster config service not immediately available + }); + + props.put(DistributionConfig.LOCATORS_NAME, serverHostName + "[" + port + "]"); + + // create the cache in all the server VMs. + for (VM vm : Arrays.asList(oldServer2, currentServer1, currentServer2)) { + vm.invoke(() -> { + cache = createCache(props); + }); + } + // spin up current version servers + for (VM vm : Arrays.asList(currentServer1, currentServer2)) { + vm.invoke( + () -> assertVersion(cache, VersionManager.getInstance().getCurrentVersionOrdinal())); + } + + // create region + for (VM vm : Arrays.asList(currentServer1, currentServer2, oldServerAndLocator, oldServer2)) { + vm.invoke(() -> createRegion(cache, regionName)); + } + + // put some data in the region to make sure there is something to clear. + putDataSerializableAndVerify(currentServer1, regionName, currentServer2, oldServerAndLocator, + oldServer2); + + // invoke Partition Region Clear and verify we didn't touch the old servers. + + currentServer1.invoke(() -> { + assertRegionExists(cache, regionName); + PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName); + + Throwable thrown = catchThrowable(region::clear); + assertThat(thrown).isInstanceOf(ServerVersionMismatchException.class); + + }); + } finally { + for (VM vm : Arrays.asList(currentServer1, currentServer2, oldServerAndLocator, oldServer2)) { + vm.invoke( + () -> closeCache(RollingUpgradePartitionRegionClearMixedServerPartitionedRegion.cache)); + } + } + } + + @Test + public void TestClientServerGetsUnsupportedExceptionWhenPRClearInvoked() throws Exception { + doTestClientServerGetsUnsupportedExceptionWhenPRClearInvoked(oldVersion); + } + + void doTestClientServerGetsUnsupportedExceptionWhenPRClearInvoked(String oldVersion) + throws Exception { + + VM client = VM.getVM(VersionManager.CURRENT_VERSION, 0); + VM locator = VM.getVM(VersionManager.CURRENT_VERSION, 1); + VM currentServer = VM.getVM(VersionManager.CURRENT_VERSION, 2); + VM oldServer2 = VM.getVM(oldVersion, 3); + + for (VM vm : Arrays.asList(locator, currentServer, client)) { + vm.invoke(() -> System.setProperty("gemfire.allow_old_members_to_join_for_testing", "true")); + } + + String regionName = "aRegion"; + + final String serverHostName = NetworkUtils.getServerHostName(); + final int port = AvailablePortHelper.getRandomAvailableTCPPort(); + locator.invoke(() -> DistributedTestUtils.deleteLocatorStateFile(port)); + try { + final Properties props = getSystemProperties(); + props.remove(DistributionConfig.LOCATORS_NAME); + + // Fire up the locator and server + locator.invoke(() -> { + props.put(DistributionConfig.START_LOCATOR_NAME, + "" + serverHostName + "[" + port + "]"); + props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); + cache = createCache(props); + }); + + props.put(DistributionConfig.LOCATORS_NAME, serverHostName + "[" + port + "]"); + + // create the cache in all the server VMs. + for (VM vm : Arrays.asList(oldServer2, currentServer)) { + vm.invoke(() -> { + props.setProperty(DistributionConfig.NAME_NAME, "vm" + VM.getVMId()); + cache = createCache(props); + }); + } + int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2); + + oldServer2.invoke(() -> startCacheServer(cache, ports[0])); + currentServer.invoke(() -> startCacheServer(cache, ports[1])); + + // create region + for (VM vm : Arrays.asList(currentServer, locator, oldServer2)) { + vm.invoke(() -> createRegion(cache, regionName)); + } + + // put some data in the region to make sure there is something to clear. + putDataSerializableAndVerify(currentServer, regionName, locator, oldServer2); + + // invoke Partition Region Clear from the client and verify the exception. + client.invoke(() -> { + clientcache = new ClientCacheFactory().addPoolServer(serverHostName, ports[1]).create(); + Region<Object, Object> clientRegion = clientcache.createClientRegionFactory( + ClientRegionShortcut.PROXY).create(regionName); + + clientRegion.put("key", "value"); + + Throwable thrown = catchThrowable(clientRegion::clear); + assertThat(thrown).isInstanceOf(ServerOperationException.class); + assertThat(thrown).hasCauseInstanceOf(ServerVersionMismatchException.class); + ServerVersionMismatchException serverVersionMismatchException = + (ServerVersionMismatchException) thrown.getCause(); + assertThat(serverVersionMismatchException.getMessage()).contains("vm3"); + }); + + } finally { + + for (VM vm : Arrays.asList(currentServer, locator, oldServer2)) { + vm.invoke(() -> closeCache(cache)); + } + + client.invoke(() -> { + if (cache != null && !clientcache.isClosed()) { + clientcache.close(false); + } + }); + } + } + + private String getLocatorString(int locatorPort) { + return getDUnitLocatorAddress() + "[" + locatorPort + "]"; + } + + public String getLocatorString(int[] locatorPorts) { + StringBuilder locatorString = new StringBuilder(); + int numLocators = locatorPorts.length; + for (int i = 0; i < numLocators; i++) { + locatorString.append(getLocatorString(locatorPorts[i])); + if (i + 1 < numLocators) { + locatorString.append(","); + } + } + return locatorString.toString(); + } + + private Cache createCache(Properties systemProperties) { + systemProperties.setProperty(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false"); + if (VersionManager.getInstance().getCurrentVersionOrdinal() < 75) { + systemProperties.remove("validate-serializable-objects"); + systemProperties.remove("serializable-object-filter"); + } + CacheFactory cf = new CacheFactory(systemProperties); + return cf.create(); + } + + private void startCacheServer(GemFireCache cache, int port) throws Exception { + CacheServer cacheServer = ((GemFireCacheImpl) cache).addCacheServer(); + cacheServer.setPort(port); + cacheServer.start(); + } + + protected void assertRegionExists(GemFireCache cache, String regionName) { + Region<Object, Object> region = cache.getRegion(regionName); + if (region == null) { + throw new Error("Region: " + regionName + " does not exist"); + } + } + + private void assertEntryExists(GemFireCache cache, String regionName) { + assertRegionExists(cache, regionName); + Region<Object, Object> region = cache.getRegion(regionName); + for (int i = 0; i < 10; i++) { + String key = "" + i; + Object regionValue = region.get(key); + assertThat(regionValue).describedAs("Entry for key:" + key + " does not exist").isNotNull(); + } + } + + public void put(GemFireCache cache, String regionName, Object key, Object value) { + Region<Object, Object> region = cache.getRegion(regionName); + System.out.println(regionName + ".put(" + key + "," + value + ")"); + Object result = region.put(key, value); + System.out.println("returned " + result); + } + + private void createRegion(GemFireCache cache, String regionName) { + RegionFactory<Object, Object> rf = ((GemFireCacheImpl) cache).createRegionFactory( + RegionShortcut.PARTITION); + System.out.println("created region " + rf.create(regionName)); + } + + void assertVersion(GemFireCache cache, short ordinal) { + DistributedSystem system = cache.getDistributedSystem(); + int thisOrdinal = + ((InternalDistributedMember) system.getDistributedMember()).getVersion() + .ordinal(); + if (ordinal != thisOrdinal) { + throw new Error( + "Version ordinal:" + thisOrdinal + " was not the expected ordinal of:" + ordinal); + } + } + + private void closeCache(GemFireCache cache) { + if (cache == null) { + return; + } + boolean cacheClosed = cache.isClosed(); + if (!cacheClosed) { + List<CacheServer> servers = ((Cache) cache).getCacheServers(); + for (CacheServer server : servers) { + server.stop(); + } + cache.close(); + } + } + + /** + * Get the port that the standard dunit locator is listening on. + * + */ + private String getDUnitLocatorAddress() { + return Host.getHost(0).getHostName(); + } + + private void deleteVMFiles() { + System.out.println("deleting files in vm" + VM.getVMId()); + File pwd = new File("."); + for (File entry : pwd.listFiles()) { + try { + if (entry.isDirectory()) { + FileUtils.deleteDirectory(entry); + } else { + if (!entry.delete()) { + System.out.println("Could not delete " + entry); + } + } + } catch (Exception e) { + System.out.println("Could not delete " + entry + ": " + e.getMessage()); + } + } + } + + @Override + public void postSetUp() { + Invoke.invokeInEveryVM("delete files", this::deleteVMFiles); + IgnoredException.addIgnoredException( + "cluster configuration service not available|ConflictingPersistentDataException"); + } + + + void putDataSerializableAndVerify(VM putter, String regionName, + VM... vms) throws Exception { + for (int i = 0; i < 10; i++) { + Class aClass = Thread.currentThread().getContextClassLoader() + .loadClass("org.apache.geode.cache.ExpirationAttributes"); + Constructor constructor = aClass.getConstructor(int.class); + Object testDataSerializable = constructor.newInstance(i); + int finalI = i; + putter.invoke(() -> put(cache, regionName, "" + finalI, testDataSerializable)); + } + + // verify present in others + for (VM vm : vms) { + vm.invoke(() -> assertEntryExists(cache, regionName)); + } + } + + public Properties getSystemProperties() { + Properties props = DistributedTestUtils.getAllDistributedSystemProperties(new Properties()); + props.remove("disable-auto-reconnect"); + props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true"); + props.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false"); + props.remove(DistributionConfig.LOAD_CLUSTER_CONFIG_FROM_DIR_NAME); + props.remove(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME); + props.remove(DistributionConfig.LOCK_MEMORY_NAME); + return props; + } + + public Properties getSystemProperties(int[] locatorPorts) { + Properties props = new Properties(); + String locatorString = getLocatorString(locatorPorts); + props.setProperty("locators", locatorString); + props.setProperty("mcast-port", "0"); + props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true"); + props.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false"); + props.remove(DistributionConfig.LOAD_CLUSTER_CONFIG_FROM_DIR_NAME); + props.setProperty(DistributionConfig.LOG_LEVEL_NAME, DUnitLauncher.logLevel); + return props; + } +}