This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch feature/GEODE-7196 in repository https://gitbox.apache.org/repos/asf/geode.git
commit ca27333daf17fd752cbe052994a0667baad94aa9 Author: Bruce Schuchardt <bschucha...@pivotal.io> AuthorDate: Thu Sep 12 12:49:51 2019 -0700 remove membership collections from ClusterDistributionManager this also makes GMSMembershipManager's latestView immutable and removes the latestViewId instance variable, which is left over from very old GemFire code --- .../geode/management/CacheManagementDUnitTest.java | 10 +- .../geode/management/DLockManagementDUnitTest.java | 6 +- .../geode/management/DiskManagementDUnitTest.java | 3 +- .../geode/management/ManagementTestRule.java | 10 +- .../stats/DistributedSystemStatsDUnitTest.java | 4 +- .../adapter/GMSMembershipManagerJUnitTest.java | 5 +- .../internal/ClusterDistributionManager.java | 293 ++++----------------- .../distributed/internal/DistributionManager.java | 14 +- .../internal/InternalDistributedSystem.java | 2 +- .../internal/LonerDistributionManager.java | 21 +- .../distributed/internal/ReplyProcessor21.java | 12 +- .../distributed/internal/SerialAckedMessage.java | 4 +- .../distributed/internal/direct/DirectChannel.java | 3 +- .../distributed/internal/locks/DLockGrantor.java | 2 +- .../locks/DLockRecoverGrantorProcessor.java | 5 +- .../internal/membership/MembershipView.java | 19 +- .../membership/adapter/GMSMembershipManager.java | 52 ++-- .../internal/membership/gms/api/Membership.java | 3 + .../org/apache/geode/internal/SystemAdmin.java | 2 +- .../admin/remote/RemoteGfManagerAgent.java | 4 +- .../internal/cache/InitialImageFlowControl.java | 4 +- .../internal/cache/PRHARedundancyProvider.java | 6 +- .../cache/PartitionedRegionBucketMgmtHelper.java | 3 +- .../internal/cache/PartitionedRegionHelper.java | 2 +- .../geode/internal/cache/StateFlushOperation.java | 3 +- .../geode/internal/cache/TXFarSideCMTracker.java | 2 +- .../internal/cache/UpdateAttributesProcessor.java | 4 +- .../geode/internal/cache/backup/BackupService.java | 2 +- .../cache/execute/MemberFunctionExecutor.java | 5 +- .../rebalance/ExplicitMoveDirector.java | 4 +- .../cache/persistence/PersistentMemberManager.java | 2 +- .../internal/cache/backup/BackupServiceTest.java | 3 +- 32 files changed, 177 insertions(+), 337 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java index 056768b..5610b64 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java @@ -304,7 +304,7 @@ public class CacheManagementDUnitTest implements Serializable { private void verifyQueryMBeans(final VM managerVM) { managerVM.invoke("validateQueryMBeans", () -> { SystemManagementService service = this.managementTestRule.getSystemManagementService(); - Set<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); + List<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); Set<ObjectName> superSet = new HashSet<>(); for (DistributedMember member : otherMembers) { @@ -329,7 +329,7 @@ public class CacheManagementDUnitTest implements Serializable { private void verifyGetMBeanInstance(final VM managerVM) { managerVM.invoke("verifyGetMBeanInstance", () -> { SystemManagementService service = this.managementTestRule.getSystemManagementService(); - Set<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); + List<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); for (DistributedMember member : otherMembers) { ObjectName memberMBeanName = service.getMemberMBeanName(member); @@ -381,7 +381,7 @@ public class CacheManagementDUnitTest implements Serializable { assertThat(service.getLocalManager().getFederationSheduler().isShutdown()).isFalse(); // Check for Proxies - Set<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); + List<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); assertThat(otherMembers).hasSize(otherMembersCount); for (DistributedMember member : otherMembers) { @@ -416,7 +416,7 @@ public class CacheManagementDUnitTest implements Serializable { */ private void verifyConfigDataRemote(final Map<DistributedMember, DistributionConfig> configMap) { SystemManagementService service = this.managementTestRule.getSystemManagementService(); - Set<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); + List<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); for (DistributedMember member : otherMembers) { MemberMXBean memberMXBean = MXBeanAwaitility.awaitMemberMXBeanProxy(member, service); @@ -582,7 +582,7 @@ public class CacheManagementDUnitTest implements Serializable { private void invokeRemoteMemberMXBeanOps() { SystemManagementService service = this.managementTestRule.getSystemManagementService(); - Set<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); + List<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); for (DistributedMember member : otherMembers) { MemberMXBean memberMXBean = MXBeanAwaitility.awaitMemberMXBeanProxy(member, service); diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/DLockManagementDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/DLockManagementDUnitTest.java index b41f15f..de25420 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/management/DLockManagementDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/management/DLockManagementDUnitTest.java @@ -20,8 +20,8 @@ import static org.apache.geode.management.internal.MBeanJMXAdapter.getLockServic import static org.assertj.core.api.Assertions.assertThat; import java.io.Serializable; +import java.util.List; import java.util.Map; -import java.util.Set; import javax.management.ObjectName; @@ -96,7 +96,7 @@ public class DLockManagementDUnitTest implements Serializable { private void verifyProxyCleanupInManager(final VM managerVM) { managerVM.invoke("verifyProxyCleanupInManager", () -> { - Set<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); + List<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); SystemManagementService service = this.managementTestRule.getSystemManagementService(); for (final DistributedMember member : otherMembers) { @@ -183,7 +183,7 @@ public class DLockManagementDUnitTest implements Serializable { */ private void verifyLockServiceMXBeanInManager(final VM managerVM) throws Exception { managerVM.invoke("verifyLockServiceMXBeanInManager", () -> { - Set<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); + List<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); for (DistributedMember member : otherMembers) { LockServiceMXBean lockServiceMXBean = diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/DiskManagementDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/DiskManagementDUnitTest.java index 4b7ea10..9e8371b 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/management/DiskManagementDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/management/DiskManagementDUnitTest.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.File; import java.io.Serializable; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -300,7 +301,7 @@ public class DiskManagementDUnitTest implements Serializable { */ private void compactDiskStoresRemote(final VM managerVM, final int memberCount) { managerVM.invoke("compactDiskStoresRemote", () -> { - Set<DistributedMember> otherMemberSet = this.managementTestRule.getOtherNormalMembers(); + List<DistributedMember> otherMemberSet = this.managementTestRule.getOtherNormalMembers(); assertThat(otherMemberSet.size()).isEqualTo(memberCount); SystemManagementService service = this.managementTestRule.getSystemManagementService(); diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/ManagementTestRule.java b/geode-core/src/distributedTest/java/org/apache/geode/management/ManagementTestRule.java index e40aac2..3925356 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/management/ManagementTestRule.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/management/ManagementTestRule.java @@ -27,9 +27,9 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.Serializable; import java.lang.annotation.Annotation; import java.lang.reflect.Field; -import java.util.HashSet; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; -import java.util.Set; import org.junit.rules.MethodRule; import org.junit.runners.model.FrameworkMethod; @@ -234,13 +234,13 @@ public class ManagementTestRule implements MethodRule, Serializable { managerVM.invoke("stopManager", () -> stopManager()); } - public Set<DistributedMember> getOtherNormalMembers() { - Set<DistributedMember> allMembers = new HashSet<>(getAllNormalMembers()); + public List<DistributedMember> getOtherNormalMembers() { + List<DistributedMember> allMembers = new ArrayList<>(getAllNormalMembers()); allMembers.remove(getDistributedMember()); return allMembers; } - private Set<InternalDistributedMember> getAllNormalMembers() { + private List<InternalDistributedMember> getAllNormalMembers() { return getDistributionManager().getNormalDistributionManagerIds(); // excludes LOCATOR_DM_TYPE } diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/bean/stats/DistributedSystemStatsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/bean/stats/DistributedSystemStatsDUnitTest.java index 7e2445d..0f9992d 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/management/bean/stats/DistributedSystemStatsDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/management/bean/stats/DistributedSystemStatsDUnitTest.java @@ -20,7 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.Serializable; -import java.util.Set; +import java.util.List; import javax.management.ObjectName; @@ -59,7 +59,7 @@ public class DistributedSystemStatsDUnitTest implements Serializable { // next block awaits all memberMXBeanName to refresh (getLastUpdateTime) SystemManagementService service = this.managementTestRule.getSystemManagementService(); - Set<DistributedMember> otherMemberSet = this.managementTestRule.getOtherNormalMembers(); + List<DistributedMember> otherMemberSet = this.managementTestRule.getOtherNormalMembers(); assertEquals(3, otherMemberSet.size()); for (DistributedMember member : otherMemberSet) { MemberMXBean memberMXBean = awaitMemberMXBeanProxy(member); diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/adapter/GMSMembershipManagerJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/adapter/GMSMembershipManagerJUnitTest.java index 58619fa..97a7dee 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/adapter/GMSMembershipManagerJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/adapter/GMSMembershipManagerJUnitTest.java @@ -41,7 +41,6 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; -import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Random; @@ -464,10 +463,10 @@ public class GMSMembershipManagerJUnitTest { when(dm.getCancelCriterion()).thenReturn(stopper); when(dm.getMembershipManager()).thenReturn(manager); when(dm.getViewMembers()).thenReturn(members); - when(dm.getDistributionManagerIds()).thenReturn(new HashSet(members)); + when(dm.getDistributionManagerIds()).thenReturn(members); when(dm.addMembershipListenerAndGetDistributionManagerIds(any( org.apache.geode.distributed.internal.MembershipListener.class))) - .thenReturn(new HashSet(members)); + .thenReturn(members); manager.getGMSManager().start(); manager.getGMSManager().started(); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java index a288af5..d41f103 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java @@ -25,6 +25,7 @@ import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; +import java.util.stream.Collectors; import org.apache.logging.log4j.Logger; @@ -199,18 +201,6 @@ public class ClusterDistributionManager implements DistributionManager { private MembershipManager membershipManager; /** - * The (non-admin-only) members of the distributed system. This is a map of memberid->memberid for - * fast access to canonical ID references. All accesses to this field must be synchronized on - * {@link #membersLock}. - */ - private Map<InternalDistributedMember, InternalDistributedMember> members = - Collections.emptyMap(); - /** - * All (admin and non-admin) members of the distributed system. All accesses to this field must be - * synchronized on {@link #membersLock}. - */ - private Set<InternalDistributedMember> membersAndAdmin = Collections.emptySet(); - /** * Map of all locator members of the distributed system. The value is a collection of locator * strings that are hosted in that member. All accesses to this field must be synchronized on * {@link #membersLock}. @@ -226,25 +216,6 @@ public class ClusterDistributionManager implements DistributionManager { private Map<InternalDistributedMember, Collection<String>> hostedLocatorsWithSharedConfiguration = Collections.emptyMap(); - /** - * The lock held while accessing the field references to the following:<br> - * 1) {@link #members}<br> - * 2) {@link #membersAndAdmin}<br> - * 3) {@link #hostedLocatorsAll}<br> - * 4) {@link #hostedLocatorsWithSharedConfiguration}<br> - */ - private final Object membersLock = new Object(); - - /** - * The lock held while writing {@link #adminConsoles}. - */ - private final Object adminConsolesLock = new Object(); - /** - * The ids of all known admin consoles Uses Copy on Write. Writers must sync on adminConsolesLock. - * Readers don't need to sync. - */ - private volatile Set<InternalDistributedMember> adminConsoles = Collections.emptySet(); - /** a map keyed on InternalDistributedMember, to direct channels to other systems */ // protected final Map channelMap = CFactory.createCM(); @@ -305,6 +276,8 @@ public class ClusterDistributionManager implements DistributionManager { private final AlertingService alertingService; + private Object membersLock = new Object(); + ////////////////////// Static Methods ////////////////////// /** @@ -857,7 +830,7 @@ public class ClusterDistributionManager implements DistributionManager { @Override public DistributedMember getMemberWithName(String name) { - for (DistributedMember id : members.values()) { + for (DistributedMember id : getViewMembers()) { if (Objects.equals(id.getName(), name)) { return id; } @@ -881,12 +854,8 @@ public class ClusterDistributionManager implements DistributionManager { * distribution managers. */ @Override - public Set<InternalDistributedMember> getDistributionManagerIds() { - // access to members synchronized under membersLock in order to - // ensure serialization - synchronized (membersLock) { - return members.keySet(); - } + public List<InternalDistributedMember> getDistributionManagerIds() { + return getViewMembers(); } /** @@ -1013,12 +982,8 @@ public class ClusterDistributionManager implements DistributionManager { * distribution managers. */ @Override - public Set<InternalDistributedMember> getDistributionManagerIdsIncludingAdmin() { - // access to members synchronized under membersLock in order to - // ensure serialization - synchronized (membersLock) { - return membersAndAdmin; - } + public List<InternalDistributedMember> getDistributionManagerIdsIncludingAdmin() { + return getViewMembers(); } @@ -1055,18 +1020,15 @@ public class ClusterDistributionManager implements DistributionManager { @Override public InternalDistributedMember getCanonicalId(DistributedMember id) { // the members set is copy-on-write, so it is safe to iterate over it - InternalDistributedMember result = members.get(id); - if (result == null) { - return (InternalDistributedMember) id; - } - return result; + return membershipManager.getView().getCanonicalID( + (InternalDistributedMember) id); } /** * Add a membership listener and return other DistributionManagerIds as an atomic operation */ @Override - public Set<InternalDistributedMember> addMembershipListenerAndGetDistributionManagerIds( + public List<InternalDistributedMember> addMembershipListenerAndGetDistributionManagerIds( MembershipListener l) { // switched sync order to fix bug 30360 synchronized (membersLock) { @@ -1076,7 +1038,7 @@ public class ClusterDistributionManager implements DistributionManager { addMembershipListener(l); // Note it is ok to return the members set // because we will never modify the returned set. - return members.keySet(); + return getViewMembers(); } } @@ -1545,12 +1507,7 @@ public class ClusterDistributionManager implements DistributionManager { @Override public void addAdminConsole(InternalDistributedMember theId) { - logger.info("New administration member detected at {}.", theId); - synchronized (adminConsolesLock) { - HashSet<InternalDistributedMember> tmp = new HashSet<>(adminConsoles); - tmp.add(theId); - adminConsoles = Collections.unmodifiableSet(tmp); - } + // no-op: new members are added to the membership manager's view } @Override @@ -1589,20 +1546,15 @@ public class ClusterDistributionManager implements DistributionManager { } @Override - public Set<InternalDistributedMember> addAllMembershipListenerAndGetAllIds(MembershipListener l) { - MembershipManager mgr = membershipManager; - mgr.getViewLock().writeLock().lock(); - try { - synchronized (membersLock) { - // Don't let the members come and go while we are adding this - // listener. This ensures that the listener (probably a - // ReplyProcessor) gets a consistent view of the members. - addAllMembershipListener(l); - return getDistributionManagerIdsIncludingAdmin(); - } - } finally { - mgr.getViewLock().writeLock().unlock(); - } + public List<InternalDistributedMember> addAllMembershipListenerAndGetAllIds( + MembershipListener l) { + return membershipManager.withViewLock((manager) -> { + // Don't let the members come and go while we are adding this + // listener. This ensures that the listener (probably a + // ReplyProcessor) gets a consistent view of the members. + addAllMembershipListener(l); + return membershipManager.getView().getMembers(); + }); } /** @@ -1829,47 +1781,14 @@ public class ClusterDistributionManager implements DistributionManager { String reason = p_reason; boolean result = false; // initialization shouldn't be required, but... - // Test once before acquiring the lock, fault tolerance for potentially - // recursive (and deadlock) conditions -- bug33626 - // Note that it is always safe to _read_ {@link members} without locking if (isCurrentMember(theId)) { - // Destroy underlying member's resources reason = prettifyReason(reason); - synchronized (membersLock) { - if (logger.isDebugEnabled()) { - logger.debug("DistributionManager: removing member <{}>; crashed {}; reason = {}", theId, - crashed, reason); - } - Map<InternalDistributedMember, InternalDistributedMember> tmp = new HashMap<>(members); - if (tmp.remove(theId) != null) { - // Note we don't modify in place. This allows reader to get snapshots - // without locking. - if (tmp.isEmpty()) { - tmp = Collections.emptyMap(); - } else { - tmp = Collections.unmodifiableMap(tmp); - } - members = tmp; - result = true; - - } else { - result = false; - // Don't get upset since this can happen twice due to - // an explicit remove followed by an implicit one caused - // by a JavaGroup view change - } - Set<InternalDistributedMember> tmp2 = new HashSet<>(membersAndAdmin); - if (tmp2.remove(theId)) { - if (tmp2.isEmpty()) { - tmp2 = Collections.emptySet(); - } else { - tmp2 = Collections.unmodifiableSet(tmp2); - } - membersAndAdmin = tmp2; - } - removeHostedLocators(theId); - } // synchronized - } // if + if (logger.isDebugEnabled()) { + logger.debug("DistributionManager: removing member <{}>; crashed {}; reason = {}", theId, + crashed, reason); + } + removeHostedLocators(theId); + } redundancyZones.remove(theId); @@ -1884,41 +1803,19 @@ public class ClusterDistributionManager implements DistributionManager { * */ private void handleManagerStartup(InternalDistributedMember theId) { - HashMap<InternalDistributedMember, InternalDistributedMember> tmp; - synchronized (membersLock) { - // Note test is under membersLock - if (members.containsKey(theId)) { - return; // already accounted for - } - - // Note we don't modify in place. This allows reader to get snapshots - // without locking. - tmp = new HashMap<>(members); - tmp.put(theId, theId); - members = Collections.unmodifiableMap(tmp); - - Set<InternalDistributedMember> stmp = new HashSet<>(membersAndAdmin); - stmp.add(theId); - membersAndAdmin = Collections.unmodifiableSet(stmp); - } // synchronized - + // Note test is under membersLock + if (membershipManager.getView().contains(theId)) { + return; // already accounted for + } if (theId.getVmKind() != ClusterDistributionManager.LOCATOR_DM_TYPE) { stats.incNodes(1); } - logger.info("Admitting member <{}>. Now there are {} non-admin member(s).", - theId, tmp.size()); addMemberEvent(new MemberJoinedEvent(theId)); } @Override public boolean isCurrentMember(DistributedMember id) { - Set m; - synchronized (membersLock) { - // access to members synchronized under membersLock in order to - // ensure serialization - m = membersAndAdmin; - } - return m.contains(id); + return membershipManager.getView().contains(id); } /** @@ -1926,27 +1823,11 @@ public class ClusterDistributionManager implements DistributionManager { * */ private void handleConsoleStartup(InternalDistributedMember theId) { - // if we have an all listener then notify it NOW. - HashSet<InternalDistributedMember> tmp; - synchronized (membersLock) { - // Note test is under membersLock - if (membersAndAdmin.contains(theId)) - return; // already accounted for - - // Note we don't modify in place. This allows reader to get snapshots - // without locking. - tmp = new HashSet<>(membersAndAdmin); - tmp.add(theId); - membersAndAdmin = Collections.unmodifiableSet(tmp); - } // synchronized - for (MembershipListener listener : allMembershipListeners) { listener.memberJoined(this, theId); } logger.info("DMMembership: Admitting new administration member < {} >.", theId); - // Note that we don't add the member to the list of admin consoles until - // we receive a message from them. } /** @@ -1974,64 +1855,10 @@ public class ClusterDistributionManager implements DistributionManager { */ public void handleConsoleShutdown(InternalDistributedMember theId, boolean crashed, String reason) { - boolean removedConsole = false; - boolean removedMember = false; - synchronized (membersLock) { - // to fix bug 39747 we can only remove this member from - // membersAndAdmin if it is not in members. - // This happens when we have an admin member colocated with a normal DS. - // In this case we need for the normal DS to shutdown or crash. - if (!members.containsKey(theId)) { - if (logger.isDebugEnabled()) - logger.debug("DistributionManager: removing admin member <{}>; crashed = {}; reason = {}", - theId, crashed, reason); - Set<InternalDistributedMember> tmp = new HashSet<>(membersAndAdmin); - if (tmp.remove(theId)) { - // Note we don't modify in place. This allows reader to get snapshots - // without locking. - if (tmp.isEmpty()) { - tmp = Collections.emptySet(); - } else { - tmp = Collections.unmodifiableSet(tmp); - } - membersAndAdmin = tmp; - removedMember = true; - } else { - // Don't get upset since this can happen twice due to - // an explicit remove followed by an implicit one caused - // by a JavaGroup view change - } - } - removeHostedLocators(theId); - } - synchronized (adminConsolesLock) { - if (adminConsoles.contains(theId)) { - removedConsole = true; - Set<InternalDistributedMember> tmp = new HashSet<>(adminConsoles); - tmp.remove(theId); - if (tmp.isEmpty()) { - tmp = Collections.emptySet(); - } else { - tmp = Collections.unmodifiableSet(tmp); - } - adminConsoles = tmp; - } - } - if (removedMember) { - for (MembershipListener listener : allMembershipListeners) { - listener.memberDeparted(this, theId, crashed); - } - } - if (removedConsole) { - String msg; - if (crashed) { - msg = "Administration member at {} crashed: {}"; - } else { - msg = "Administration member at {} closed: {}"; - } - logger.info(msg, new Object[] {theId, reason}); + removeHostedLocators(theId); + for (MembershipListener listener : allMembershipListeners) { + listener.memberDeparted(this, theId, crashed); } - redundancyZones.remove(theId); } @@ -2394,14 +2221,16 @@ public class ClusterDistributionManager implements DistributionManager { @Override public Set<InternalDistributedMember> getAdminMemberSet() { - return adminConsoles; + return membershipManager.getView().getMembers().stream() + .filter((id) -> id.getVmKind() == ADMIN_ONLY_DM_TYPE).collect( + Collectors.toSet()); } /** Returns count of members filling the specified role */ @Override public int getRoleCount(Role role) { int count = 0; - Set<InternalDistributedMember> mbrs = getDistributionManagerIds(); + List<InternalDistributedMember> mbrs = getDistributionManagerIds(); for (InternalDistributedMember mbr : mbrs) { Set<Role> roles = (mbr).getRoles(); for (Role mbrRole : roles) { @@ -2417,7 +2246,7 @@ public class ClusterDistributionManager implements DistributionManager { /** Returns true if at least one member is filling the specified role */ @Override public boolean isRolePresent(Role role) { - Set<InternalDistributedMember> mbrs = getDistributionManagerIds(); + List<InternalDistributedMember> mbrs = getDistributionManagerIds(); for (InternalDistributedMember mbr : mbrs) { Set<Role> roles = mbr.getRoles(); for (Role mbrRole : roles) { @@ -2433,7 +2262,7 @@ public class ClusterDistributionManager implements DistributionManager { @Override public Set<Role> getAllRoles() { Set<Role> allRoles = new HashSet<>(); - Set<InternalDistributedMember> mbrs = getDistributionManagerIds(); + List<InternalDistributedMember> mbrs = getDistributionManagerIds(); for (InternalDistributedMember mbr : mbrs) { allRoles.addAll(mbr.getRoles()); } @@ -2779,14 +2608,14 @@ public class ClusterDistributionManager implements DistributionManager { * @since GemFire 5.9 */ @Override - public Set<InternalDistributedMember> getMembersInThisZone() { + public List<InternalDistributedMember> getMembersInThisZone() { return getMembersInSameZone(getDistributionManagerId()); } @Override - public Set<InternalDistributedMember> getMembersInSameZone( + public List<InternalDistributedMember> getMembersInSameZone( InternalDistributedMember targetMember) { - Set<InternalDistributedMember> buddyMembers = new HashSet<>(); + List<InternalDistributedMember> buddyMembers = new LinkedList<>(); if (!redundancyZones.isEmpty()) { synchronized (redundancyZones) { String targetZone = redundancyZones.get(targetMember); @@ -2896,33 +2725,17 @@ public class ClusterDistributionManager implements DistributionManager { } @Override - public Set<InternalDistributedMember> getNormalDistributionManagerIds() { - // access to members synchronized under membersLock in order to - // ensure serialization - synchronized (membersLock) { - HashSet<InternalDistributedMember> result = new HashSet<>(); - for (InternalDistributedMember m : members.keySet()) { - if (m.getVmKind() != ClusterDistributionManager.LOCATOR_DM_TYPE) { - result.add(m); - } - } - return result; - } + public List<InternalDistributedMember> getNormalDistributionManagerIds() { + return membershipManager.getView().getMembers().stream() + .filter((id) -> id.getVmKind() != LOCATOR_DM_TYPE).collect( + Collectors.toList()); } /** test method to get the member IDs of all locators in the distributed system */ public Set<InternalDistributedMember> getLocatorDistributionManagerIds() { - // access to members synchronized under membersLock in order to - // ensure serialization - synchronized (membersLock) { - HashSet<InternalDistributedMember> result = new HashSet<>(); - for (InternalDistributedMember m : members.keySet()) { - if (m.getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE) { - result.add(m); - } - } - return result; - } + return membershipManager.getView().getMembers().stream() + .filter((id) -> id.getVmKind() == LOCATOR_DM_TYPE).collect( + Collectors.toSet()); } @Override diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java index 86d59bb..d69e832 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java @@ -87,13 +87,13 @@ public interface DistributionManager extends ReplySender { * Returns an unmodifiable set containing the identities of all of the known distribution * managers. As of 7.0 this includes locators since they have a cache. */ - Set<InternalDistributedMember> getDistributionManagerIds(); + List<InternalDistributedMember> getDistributionManagerIds(); /** * Returns an unmodifiable set containing the identities of all of the known "normal" distribution * managers. This does not include locators or admin members. */ - Set<InternalDistributedMember> getNormalDistributionManagerIds(); + List<InternalDistributedMember> getNormalDistributionManagerIds(); /** * Returns an unmodifiable set containing the identities of all of the known distribution managers @@ -101,7 +101,7 @@ public interface DistributionManager extends ReplySender { * * @since GemFire 5.7 */ - Set<InternalDistributedMember> getDistributionManagerIdsIncludingAdmin(); + List<InternalDistributedMember> getDistributionManagerIdsIncludingAdmin(); /** * Returns a private-memory list containing getDistributionManagerIds() minus our id. @@ -116,7 +116,7 @@ public interface DistributionManager extends ReplySender { /** * Add a membership listener and return other DistributionManagerIds as an atomic operation */ - Set<InternalDistributedMember> addMembershipListenerAndGetDistributionManagerIds( + List<InternalDistributedMember> addMembershipListenerAndGetDistributionManagerIds( MembershipListener l); /** @@ -125,7 +125,7 @@ public interface DistributionManager extends ReplySender { * * @since GemFire 5.7 */ - Set<InternalDistributedMember> addAllMembershipListenerAndGetAllIds(MembershipListener l); + List<InternalDistributedMember> addAllMembershipListenerAndGetAllIds(MembershipListener l); /** * Returns the identity of this <code>DistributionManager</code> @@ -308,7 +308,7 @@ public interface DistributionManager extends ReplySender { * @return set of {@link InternalDistributedMember} including this VM * @since GemFire 5.9 */ - Set<InternalDistributedMember> getMembersInThisZone(); + List<InternalDistributedMember> getMembersInThisZone(); /** * Acquire a permit to request a GII from another member @@ -324,7 +324,7 @@ public interface DistributionManager extends ReplySender { boolean enforceUniqueZone(); - Set<InternalDistributedMember> getMembersInSameZone(InternalDistributedMember acceptedMember); + List<InternalDistributedMember> getMembersInSameZone(InternalDistributedMember acceptedMember); boolean areInSameZone(InternalDistributedMember member1, InternalDistributedMember member2); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java index 66c3e2a..a1c6974 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java @@ -1805,7 +1805,7 @@ public class InternalDistributedSystem extends DistributedSystem @Override public Set<DistributedMember> findDistributedMembers(InetAddress address) { - Set<InternalDistributedMember> allMembers = dm.getDistributionManagerIdsIncludingAdmin(); + List<InternalDistributedMember> allMembers = dm.getDistributionManagerIdsIncludingAdmin(); Set<DistributedMember> results = new HashSet<>(2); // Search through the set of all members diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java index c25996b..5754729 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java @@ -89,7 +89,7 @@ public class LonerDistributionManager implements DistributionManager { this.system = system; this.logger = logger; this.localAddress = generateMemberId(); - this.allIds = Collections.singleton(localAddress); + this.allIds = Collections.singletonList(localAddress); this.viewMembers = new ArrayList<>(allIds); DistributionStats.enableClockStats = this.system.getConfig().getEnableTimeStatistics(); @@ -118,7 +118,7 @@ public class LonerDistributionManager implements DistributionManager { private final InternalDistributedMember localAddress; - private final Set<InternalDistributedMember> allIds;// = Collections.singleton(id); + private final List<InternalDistributedMember> allIds; private final List<InternalDistributedMember> viewMembers; private ConcurrentMap<InternalDistributedMember, InternalDistributedMember> canonicalIds = new ConcurrentHashMap<>(); @@ -140,12 +140,12 @@ public class LonerDistributionManager implements DistributionManager { } @Override - public Set<InternalDistributedMember> getDistributionManagerIds() { + public List<InternalDistributedMember> getDistributionManagerIds() { return allIds; } @Override - public Set<InternalDistributedMember> getDistributionManagerIdsIncludingAdmin() { + public List<InternalDistributedMember> getDistributionManagerIdsIncludingAdmin() { return allIds; } @@ -211,14 +211,15 @@ public class LonerDistributionManager implements DistributionManager { @Override - public Set<InternalDistributedMember> addMembershipListenerAndGetDistributionManagerIds( + public List<InternalDistributedMember> addMembershipListenerAndGetDistributionManagerIds( MembershipListener l) { // return getOtherDistributionManagerIds(); return allIds; } @Override - public Set<InternalDistributedMember> addAllMembershipListenerAndGetAllIds(MembershipListener l) { + public List<InternalDistributedMember> addAllMembershipListenerAndGetAllIds( + MembershipListener l) { return allIds; } @@ -1319,7 +1320,7 @@ public class LonerDistributionManager implements DistributionManager { * @since GemFire 5.9 */ @Override - public Set<InternalDistributedMember> getMembersInThisZone() { + public List<InternalDistributedMember> getMembersInThisZone() { return this.allIds; } @@ -1353,9 +1354,9 @@ public class LonerDistributionManager implements DistributionManager { } @Override - public Set<InternalDistributedMember> getMembersInSameZone( + public List<InternalDistributedMember> getMembersInSameZone( InternalDistributedMember acceptedMember) { - return Collections.singleton(acceptedMember); + return Collections.singletonList(acceptedMember); } @Override @@ -1391,7 +1392,7 @@ public class LonerDistributionManager implements DistributionManager { } @Override - public Set<InternalDistributedMember> getNormalDistributionManagerIds() { + public List<InternalDistributedMember> getNormalDistributionManagerIds() { return getDistributionManagerIds(); } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyProcessor21.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyProcessor21.java index 9b4963b..9151fa5 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyProcessor21.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyProcessor21.java @@ -427,7 +427,7 @@ public class ReplyProcessor21 implements MembershipListener { if (!removeMember(sender, false) && warn) { // if the member hasn't left the system, something is wrong final DistributionManager dm = getDistributionManager(); // fix for bug 33253 - Set ids = getDistributionManagerIds(); + List ids = getDistributionManagerIds(); if (ids == null || ids.contains(sender)) { List viewMembers = dm.getViewMembers(); if (system.getConfig().getMcastPort() == 0 // could be using multicast & will get responses @@ -543,7 +543,7 @@ public class ReplyProcessor21 implements MembershipListener { * @return a Set of the current members * @since GemFire 5.7 */ - protected Set addListenerAndGetMembers() { + protected List addListenerAndGetMembers() { return getDistributionManager().addMembershipListenerAndGetDistributionManagerIds(this); } @@ -566,7 +566,7 @@ public class ReplyProcessor21 implements MembershipListener { * @return a Set of the current members * @since GemFire 5.7 */ - protected Set getDistributionManagerIds() { + protected List getDistributionManagerIds() { return getDistributionManager().getDistributionManagerIds(); } @@ -575,7 +575,7 @@ public class ReplyProcessor21 implements MembershipListener { DistributionManager mgr = getDistributionManager(); statStart = mgr.getStats().startReplyWait(); synchronized (this.members) { - Set activeMembers = addListenerAndGetMembers(); + List activeMembers = addListenerAndGetMembers(); processActiveMembers(activeMembers); } } @@ -585,7 +585,7 @@ public class ReplyProcessor21 implements MembershipListener { * * @param activeMembers the DM's current membership set */ - protected void processActiveMembers(Set activeMembers) { + protected void processActiveMembers(List activeMembers) { for (int i = 0; i < this.members.length; i++) { if (this.members[i] != null) { if (!activeMembers.contains(this.members[i])) { @@ -1064,7 +1064,7 @@ public class ReplyProcessor21 implements MembershipListener { if (!this.processTimeout()) return; - Set activeMembers = getDistributionManagerIds(); + List activeMembers = getDistributionManagerIds(); // an alert that will show up in the console long timeout = getAckWaitThreshold(); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/SerialAckedMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/SerialAckedMessage.java index 2233425..c602830 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/SerialAckedMessage.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/SerialAckedMessage.java @@ -17,6 +17,7 @@ package org.apache.geode.distributed.internal; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.Set; @@ -63,7 +64,8 @@ public class SerialAckedMessage extends SerialDistributionMessage implements Mes * @throws InterruptedException if the operation is interrupted (as by shutdown) * @throws ReplyException if an exception was sent back by another manager */ - public void send(Set recipients, boolean multicast) throws InterruptedException, ReplyException { + public void send(Collection recipients, boolean multicast) + throws InterruptedException, ReplyException { final boolean isDebugEnabled = logger.isDebugEnabled(); if (Thread.interrupted()) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java index 383ab6f..e74af61 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java @@ -24,7 +24,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.concurrent.Semaphore; import org.apache.logging.log4j.Logger; @@ -646,7 +645,7 @@ public class DirectChannel { */ private void handleAckTimeout(long ackTimeout, long ackSATimeout, Connection c, DirectReplyProcessor processor) throws ConnectionException { - Set activeMembers = dm.getDistributionManagerIds(); + List activeMembers = dm.getDistributionManagerIds(); // Increment the stat dm.getStats().incReplyTimeouts(); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java index 9851322..09ce11b 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java @@ -873,7 +873,7 @@ public class DLockGrantor { try { synchronized (this.grantTokens) { - Set members = this.dlock.getDistributionManager().getDistributionManagerIds(); + List members = this.dlock.getDistributionManager().getDistributionManagerIds(); final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); for (Iterator iter = tokens.iterator(); iter.hasNext();) { diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java index 487cc8c..6e20a9c 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockRecoverGrantorProcessor.java @@ -20,6 +20,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.logging.log4j.Logger; @@ -69,7 +70,7 @@ public class DLockRecoverGrantorProcessor extends ReplyProcessor21 { * <p> * This method should block until transfer of lock grantor has completed. */ - static boolean recoverLockGrantor(Set members, DLockService service, DLockGrantor newGrantor, + static boolean recoverLockGrantor(List members, DLockService service, DLockGrantor newGrantor, DistributionManager dm, InternalDistributedMember elder) { // proc will wait for replies from everyone including THIS member... DLockRecoverGrantorProcessor processor = @@ -114,7 +115,7 @@ public class DLockRecoverGrantorProcessor extends ReplyProcessor21 { // ------------------------------------------------------------------------- /** Creates a new instance of DLockRecoverGrantorProcessor */ - private DLockRecoverGrantorProcessor(DistributionManager dm, Set members, + private DLockRecoverGrantorProcessor(DistributionManager dm, List members, DLockGrantor newGrantor) { super(dm, members); this.dm = dm; diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipView.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipView.java index 096b62d..52f0122 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipView.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipView.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.geode.UnmodifiableException; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.ClusterDistributionManager; @@ -36,11 +37,12 @@ public class MembershipView { private Set<InternalDistributedMember> crashedMembers; private InternalDistributedMember creator; private Set<InternalDistributedMember> hashedMembers; + private volatile boolean unmodifiable; public MembershipView() { - viewId = 0; - members = new ArrayList<>(4); + viewId = -1; + members = new ArrayList<>(0); this.hashedMembers = new HashSet<>(members); shutdownMembers = Collections.emptySet(); crashedMembers = new HashSet<>(); @@ -80,6 +82,10 @@ public class MembershipView { this.crashedMembers = crashes; } + public void makeUnmodifiable() { + unmodifiable = true; + } + public int getViewId() { return this.viewId; } @@ -116,16 +122,25 @@ public class MembershipView { } public void add(InternalDistributedMember mbr) { + if (unmodifiable) { + throw new UnmodifiableException("this membership view is not modifiable"); + } this.hashedMembers.add(mbr); this.members.add(mbr); } public boolean remove(InternalDistributedMember mbr) { + if (unmodifiable) { + throw new UnmodifiableException("this membership view is not modifiable"); + } this.hashedMembers.remove(mbr); return this.members.remove(mbr); } public void removeAll(Collection<InternalDistributedMember> ids) { + if (unmodifiable) { + throw new UnmodifiableException("this membership view is not modifiable"); + } this.hashedMembers.removeAll(ids); ids.forEach(this::remove); } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/adapter/GMSMembershipManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/adapter/GMSMembershipManager.java index 09d4847..ca9811a 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/adapter/GMSMembershipManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/adapter/GMSMembershipManager.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; import org.apache.logging.log4j.Logger; @@ -257,7 +258,7 @@ public class GMSMembershipManager implements MembershipManager { * * All accesses to this object are protected via {@link #latestViewLock} */ - private MembershipView latestView = new MembershipView(); + private volatile MembershipView latestView = new MembershipView(); /** * This is the lock for protecting access to latestView @@ -462,18 +463,18 @@ public class GMSMembershipManager implements MembershipManager { } disableMulticastForRollingUpgrade = !version.equals(Version.CURRENT); - if (newViewId < latestViewId) { + // Save previous view, for delta analysis + MembershipView priorView = latestView; + + if (newViewId < priorView.getViewId()) { // ignore this view since it is old news return; } - // Save previous view, for delta analysis - MembershipView priorView = latestView; - // update the view to reflect our changes, so that // callbacks will see the new (updated) view. - latestViewId = newViewId; - latestView = new MembershipView(newView, newView.getViewId()); + long newlatestViewId = newViewId; + MembershipView newlatestView = new MembershipView(newView, newView.getViewId()); // look for additions for (int i = 0; i < newView.getMembers().size(); i++) { // additions @@ -598,15 +599,12 @@ public class GMSMembershipManager implements MembershipManager { removeWithViewLock(m, true, "not seen in membership view in " + this.surpriseMemberTimeout + "ms"); } else { - if (!latestView.contains(entry.getKey())) { - latestView.add(entry.getKey()); + if (!newlatestView.contains(entry.getKey())) { + newlatestView.add(entry.getKey()); } } } // expire suspected members - /* - * the timeout interval for suspected members - */ final long suspectMemberTimeout = 180000; oldestAllowed = System.currentTimeMillis() - suspectMemberTimeout; for (Iterator it = suspectedMembers.entrySet().iterator(); it.hasNext();) { @@ -616,6 +614,10 @@ public class GMSMembershipManager implements MembershipManager { it.remove(); } } + + // the view is complete - let's install it + newlatestView.makeUnmodifiable(); + latestView = newlatestView; try { listener.viewInstalled(latestView); } catch (DistributedSystemDisconnectedException se) { @@ -668,6 +670,7 @@ public class GMSMembershipManager implements MembershipManager { MembershipView initialView = createGeodeView(services.getJoinLeave().getView()); latestView = new MembershipView(initialView, initialView.getViewId()); + latestView.makeUnmodifiable(); listener.viewInstalled(latestView); } catch (RuntimeException ex) { @@ -688,9 +691,11 @@ public class GMSMembershipManager implements MembershipManager { } private MembershipView createGeodeView(GMSMembershipView view) { - return createGeodeView(view.getCreator(), view.getViewId(), view.getMembers(), + MembershipView result = createGeodeView(view.getCreator(), view.getViewId(), view.getMembers(), view.getShutdownMembers(), view.getCrashedMembers()); + result.makeUnmodifiable(); + return result; } private MembershipView createGeodeView(GMSMember gmsCreator, int viewId, @@ -903,6 +908,7 @@ public class GMSMembershipManager implements MembershipManager { // view. MembershipView newMembers = new MembershipView(latestView, latestView.getViewId()); newMembers.add(member); + newMembers.makeUnmodifiable(); latestView = newMembers; } } finally { @@ -1285,16 +1291,8 @@ public class GMSMembershipManager implements MembershipManager { @Override public MembershipView getView() { // Grab the latest view under a mutex... - MembershipView v; - - latestViewReadLock.lock(); - v = latestView; - latestViewReadLock.unlock(); - + MembershipView v = latestView; MembershipView result = new MembershipView(v, v.getViewId()); - - v.getMembers().stream().filter(this::isShunned).forEachOrdered(result::remove); - return result; } @@ -1803,6 +1801,7 @@ public class GMSMembershipManager implements MembershipManager { if (latestView.contains(member)) { MembershipView newView = new MembershipView(latestView, latestView.getViewId()); newView.remove(member); + newView.makeUnmodifiable(); latestView = newView; } } finally { @@ -1877,6 +1876,11 @@ public class GMSMembershipManager implements MembershipManager { } } + @Override + public <T> T withViewLock(Function function) { + return (T) function.apply(this); + } + private boolean isShunnedOrNew(final InternalDistributedMember m) { latestViewReadLock.lock(); try { @@ -2504,8 +2508,8 @@ public class GMSMembershipManager implements MembershipManager { /* Service interface */ @Override public void installView(GMSMembershipView v) { - if (latestViewId < 0 && !isConnected()) { - latestViewId = v.getViewId(); + MembershipView currentView = latestView; + if (currentView.getViewId() < 0 && !isConnected()) { latestView = createGeodeView(v); logger.debug("MembershipManager: initial view is {}", latestView); } else { diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/api/Membership.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/api/Membership.java index 7c3de02..0f02758 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/api/Membership.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/api/Membership.java @@ -17,6 +17,7 @@ package org.apache.geode.distributed.internal.membership.gms.api; import java.io.NotSerializableException; import java.util.Map; import java.util.Set; +import java.util.function.Function; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.DistributionMessage; @@ -95,4 +96,6 @@ public interface Membership { * Returns true if the member is being shunned */ boolean isShunned(DistributedMember m); + + <T> T withViewLock(Function function); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/SystemAdmin.java b/geode-core/src/main/java/org/apache/geode/internal/SystemAdmin.java index 66034b6..b57c292 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/SystemAdmin.java +++ b/geode-core/src/main/java/org/apache/geode/internal/SystemAdmin.java @@ -495,7 +495,7 @@ public class SystemAdmin { } InternalDistributedSystem ds = (InternalDistributedSystem) InternalDistributedSystem.connectForAdmin(props); - Set existingMembers = ds.getDistributionManager().getDistributionManagerIds(); + List existingMembers = ds.getDistributionManager().getDistributionManagerIds(); if (existingMembers.isEmpty()) { throw new RuntimeException("There are no members in the distributed system"); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteGfManagerAgent.java b/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteGfManagerAgent.java index 401a97f..e95f327 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteGfManagerAgent.java +++ b/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteGfManagerAgent.java @@ -872,8 +872,8 @@ class RemoteGfManagerAgent implements GfManagerAgent { synchronized (this.myMembershipListenerLock) { this.myMembershipListener = new MyMembershipListener(); dm.addMembershipListener(this.myMembershipListener); - Set initialMembers = dm.getDistributionManagerIds(); - this.myMembershipListener.addMembers(initialMembers); + List initialMembers = dm.getDistributionManagerIds(); + this.myMembershipListener.addMembers(new HashSet(initialMembers)); if (logger.isDebugEnabled()) { StringBuffer sb = new StringBuffer("[RemoteGfManagerAgent] "); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageFlowControl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageFlowControl.java index 943f423..6cb3839 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageFlowControl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageFlowControl.java @@ -68,7 +68,7 @@ public class InitialImageFlowControl implements MembershipListener { int id = keeper.put(control); control.id = id; - Set availableIds = dm.addMembershipListenerAndGetDistributionManagerIds(control); + List availableIds = dm.addMembershipListenerAndGetDistributionManagerIds(control); if (!availableIds.contains(target)) { control.abort(); } @@ -125,7 +125,7 @@ public class InitialImageFlowControl implements MembershipListener { if (!aborted.get() && !permits.tryAcquire(timeout - timeSoFar - 1, TimeUnit.MILLISECONDS)) { checkCancellation(); - Set activeMembers = dm.getDistributionManagerIds(); + List activeMembers = dm.getDistributionManagerIds(); logger.warn( "{} seconds have elapsed while waiting for replies: {} on {} whose current membership list is: [{}]", getAckWaitThreshold(), this, dm.getId(), activeMembers); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java index 5a0086f..76c063c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java @@ -630,7 +630,7 @@ public class PRHARedundancyProvider { // enforceUniqueZone property has no effect for a loner if (!(partitionedRegion .getDistributionManager() instanceof LonerDistributionManager)) { - Set<InternalDistributedMember> exm = getBuddyMembersInZone(candidate, allStores); + List<InternalDistributedMember> exm = getBuddyMembersInZone(candidate, allStores); exm.remove(candidate); exm.removeAll(acceptedMembers); excludedMembers.addAll(exm); @@ -902,10 +902,10 @@ public class PRHARedundancyProvider { * @return set of members on the same host, not including accepted member * @since GemFire 5.9 */ - private Set<InternalDistributedMember> getBuddyMembersInZone( + private List<InternalDistributedMember> getBuddyMembersInZone( InternalDistributedMember acceptedMember, Collection<InternalDistributedMember> allStores) { DistributionManager dm = partitionedRegion.getDistributionManager(); - Set<InternalDistributedMember> buddies = dm.getMembersInSameZone(acceptedMember); + List<InternalDistributedMember> buddies = dm.getMembersInSameZone(acceptedMember); buddies.retainAll(allStores); return buddies; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionBucketMgmtHelper.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionBucketMgmtHelper.java index 9c9e582..dceb46a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionBucketMgmtHelper.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionBucketMgmtHelper.java @@ -16,6 +16,7 @@ package org.apache.geode.internal.cache; import java.util.Collections; +import java.util.HashSet; import java.util.Set; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; @@ -39,7 +40,7 @@ class PartitionedRegionBucketMgmtHelper { if (b.getDistributionManager().enforceUniqueZone()) { Set<InternalDistributedMember> hostingMembers = b.getBucketOwners(); Set<InternalDistributedMember> buddyMembers = - b.getDistributionManager().getMembersInThisZone(); + new HashSet(b.getDistributionManager().getMembersInThisZone()); boolean disjoint = Collections.disjoint(hostingMembers, buddyMembers); boolean sourceIsOneThisHost = moveSource != null && buddyMembers.contains(moveSource); return disjoint || sourceIsOneThisHost; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionHelper.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionHelper.java index 8bfc5ee..4caa9ea 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionHelper.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionHelper.java @@ -846,7 +846,7 @@ public class PartitionedRegionHelper { if (prConfig == null) return; - Set members = partitionedRegion.getDistributionManager().getDistributionManagerIds(); + List members = partitionedRegion.getDistributionManager().getDistributionManagerIds(); logger.warn( "DATALOSS ( {} ) :: Size of nodeList After verifyBucketNodes for bucket ID, {} is 0", callingMethod, bucketId); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java index fba73c6..6c92035 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; @@ -776,7 +777,7 @@ public class StateFlushOperation { } @Override - protected void processActiveMembers(Set activeMembers) { + protected void processActiveMembers(List activeMembers) { super.processActiveMembers(activeMembers); if (!activeMembers.contains(this.targetMember)) { targetMemberHasLeft = true; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java index 0fb2404..efd8ec8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXFarSideCMTracker.java @@ -228,7 +228,7 @@ public class TXFarSideCMTracker { Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {} }; try { - Set memberSet = dm.addMembershipListenerAndGetDistributionManagerIds(memEar); + List memberSet = dm.addMembershipListenerAndGetDistributionManagerIds(memEar); // Still need to wait synchronized (lock) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java index d7451ef..1b49b5b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java @@ -192,7 +192,7 @@ public class UpdateAttributesProcessor { * @since GemFire 5.7 */ @Override - protected Set addListenerAndGetMembers() { + protected List addListenerAndGetMembers() { DistributionAdvisor da = UpdateAttributesProcessor.this.advisee.getDistributionAdvisor(); if (da.useAdminMembersForDefault()) { return getDistributionManager().addAllMembershipListenerAndGetAllIds(this); @@ -223,7 +223,7 @@ public class UpdateAttributesProcessor { * @since GemFire 5.7 */ @Override - protected Set getDistributionManagerIds() { + protected List getDistributionManagerIds() { DistributionAdvisor da = UpdateAttributesProcessor.this.advisee.getDistributionAdvisor(); if (da.useAdminMembersForDefault()) { return getDistributionManager().getDistributionManagerIdsIncludingAdmin(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupService.java index dbc9890..dd8ee3c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupService.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupService.java @@ -120,7 +120,7 @@ public class BackupService { void validateRequestingSender(InternalDistributedMember sender) { // We need to watch for pure admin members that depart. this allMembershipListener set // looks like it should receive those events. - Set allIds = + List allIds = cache.getDistributionManager().addAllMembershipListenerAndGetAllIds(membershipListener); if (!allIds.contains(sender)) { cleanup(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java index 573c40c..80fbfbd 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java @@ -44,8 +44,9 @@ public class MemberFunctionExecutor extends AbstractExecution { private ServerToClientFunctionResultSender sender; MemberFunctionExecutor(DistributedSystem distributedSystem) { - this(distributedSystem, ((InternalDistributedSystem) distributedSystem).getDistributionManager() - .getNormalDistributionManagerIds()); + this(distributedSystem, + new HashSet(((InternalDistributedSystem) distributedSystem).getDistributionManager() + .getNormalDistributionManagerIds())); } MemberFunctionExecutor(DistributedSystem distributedSystem, DistributedMember distributedMember) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/ExplicitMoveDirector.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/ExplicitMoveDirector.java index e7be1e5..069ae98 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/ExplicitMoveDirector.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/ExplicitMoveDirector.java @@ -14,7 +14,7 @@ */ package org.apache.geode.internal.cache.partitioned.rebalance; -import java.util.Set; +import java.util.List; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; @@ -89,7 +89,7 @@ public class ExplicitMoveDirector extends RebalanceDirectorAdapter { if (reason.willAccept()) { if (!model.moveBucket(new Move(sourceMember, targetMember, bucket))) { // Double check to see if the source or destination have left the DS - Set allMembers = ds.getDistributionManager().getDistributionManagerIdsIncludingAdmin(); + List allMembers = ds.getDistributionManager().getDistributionManagerIdsIncludingAdmin(); if (!allMembers.contains(sourceMember)) { throw new IllegalStateException( String.format( diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistentMemberManager.java b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistentMemberManager.java index ed0c110..9c16916 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistentMemberManager.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistentMemberManager.java @@ -168,7 +168,7 @@ public class PersistentMemberManager { // Add a membership listener to make sure we cancel the pending // revoke if the sender goes away. // DO this outside the synch block to avoid lock ordering issues. - Set members = dm.addMembershipListenerAndGetDistributionManagerIds(membershipListener); + List members = dm.addMembershipListenerAndGetDistributionManagerIds(membershipListener); if (!members.contains(sender) && sender.equals(dm.getId())) { cancelRevoke(pattern); return false; diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupServiceTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupServiceTest.java index 9c4979c..3d3da94 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupServiceTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupServiceTest.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Arrays; -import java.util.HashSet; import org.junit.Before; import org.junit.Test; @@ -54,7 +53,7 @@ public class BackupServiceTest { when(cache.getInternalDistributedSystem()).thenReturn(distributedSystem); when(distributedMember.toString()).thenReturn("member"); when(distributionManager.addAllMembershipListenerAndGetAllIds(any())) - .thenReturn(new HashSet<>(Arrays.asList(sender))); + .thenReturn(Arrays.asList(sender)); backupService = new BackupService(cache); }