GEODE-870: Handling multiple concurrent locator restarts. Elder locator nomination
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3f00d86d Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3f00d86d Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3f00d86d Branch: refs/heads/feature/GEODE-870 Commit: 3f00d86de40e41b9d6fbcce83ea4ffee2beff794 Parents: b47610e Author: Udo Kohlmeyer <ukohlme...@pivotal.io> Authored: Wed Feb 10 09:20:09 2016 +1100 Committer: Udo Kohlmeyer <ukohlme...@pivotal.io> Committed: Wed Feb 17 14:59:11 2016 +1100 ---------------------------------------------------------------------- .../internal/membership/NetView.java | 5 + .../membership/gms/membership/GMSJoinLeave.java | 319 ++++++++++--------- .../gms/messages/InstallViewMessage.java | 18 +- .../gms/messages/ViewRejectMessage.java | 96 ------ .../gemstone/gemfire/internal/DSFIDFactory.java | 11 +- .../internal/DataSerializableFixedID.java | 2 - .../gemfire/distributed/LocatorDUnitTest.java | 199 ++++++------ .../gms/membership/GMSJoinLeaveHelper.java | 60 ++++ gradle/rat.gradle | 2 +- 9 files changed, 338 insertions(+), 374 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3f00d86d/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java index 40f5f71..af05f82 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Random; import java.util.Set; +import com.gemstone.gemfire.internal.logging.LogService; import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.DataSerializer; @@ -47,6 +48,9 @@ import com.gemstone.gemfire.internal.Version; */ public class NetView implements DataSerializableFixedID { + private static final Logger logger = LogService.getLogger(); + + private int viewId; private List<InternalDistributedMember> members; private int[] failureDetectionPorts = new int[10]; @@ -86,6 +90,7 @@ public class NetView implements DataSerializableFixedID { crashedMembers = Collections.emptySet(); this.creator = creator; Arrays.fill(failureDetectionPorts, -1); + } // legacy method for JGMM http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3f00d86d/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java index c7eacfa..b246344 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java @@ -259,14 +259,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { // unable to contact any of the locators if (!this.isJoined && state.hasContactedAJoinedLocator) { throw new SystemConnectException("Unable to join the distributed system in " - + (System.currentTimeMillis()-startTime) + "ms"); + + (System.currentTimeMillis() - startTime) + "ms"); } return this.isJoined; } finally { // notify anyone waiting on the address to be completed if (this.isJoined) { - synchronized(this.localAddress) { + synchronized (this.localAddress) { this.localAddress.notifyAll(); } } @@ -277,10 +277,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { /** * send a join request and wait for a reply. Process the reply. * This may throw a SystemConnectException or an AuthenticationFailedException - * + * * @return true if the attempt succeeded, false if it timed out */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="WA_NOT_IN_LOOP") + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "WA_NOT_IN_LOOP") boolean attemptToJoin() { SearchState state = searchState; @@ -302,7 +302,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { Thread.currentThread().interrupt(); return false; } - + if (response == null) { if (!isJoined) { logger.debug("received no join response"); @@ -314,13 +314,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { joinResponse[0] = null; String failReason = response.getRejectionMessage(); if (failReason != null) { - if (failReason.contains("Rejecting the attempt of a member using an older version") + if (failReason.contains("Rejecting the attempt of a member using an older version") || failReason.contains("15806")) { throw new SystemConnectException(failReason); } throw new AuthenticationFailedException(failReason); } - + if (response.getCurrentView() == null) { logger.info("received join response with no membership view: {}", response); return isJoined; @@ -328,19 +328,18 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (response.getBecomeCoordinator()) { logger.info("I am being told to become the membership coordinator by {}", coord); - synchronized(viewInstallationLock) { + synchronized (viewInstallationLock) { this.currentView = response.getCurrentView(); becomeCoordinator(null); } return true; } - + this.birthViewId = response.getMemberID().getVmViewId(); this.localAddress.setVmViewId(this.birthViewId); GMSMember me = (GMSMember) this.localAddress.getNetMember(); me.setBirthViewId(birthViewId); installView(response.getCurrentView()); - return true; } @@ -384,7 +383,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { * this method will enqueue the request for processing in another thread. * If this is not the coordinator but the coordinator is known, the message * is forwarded to the coordinator. - * + * * @param incomingRequest */ private void processJoinRequest(JoinRequestMessage incomingRequest) { @@ -413,12 +412,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { return; } - if (!this.localAddress.getNetMember().preferredForCoordinator() && - incomingRequest.getMemberID().getNetMember().preferredForCoordinator()) { - JoinResponseMessage m = new JoinResponseMessage(incomingRequest.getMemberID(), currentView, true); - services.getMessenger().send(m); - return; - } +// Remove JoinResponseMessage to fix GEODE-870 +// if (!this.localAddress.getNetMember().preferredForCoordinator() && +// incomingRequest.getMemberID().getNetMember().preferredForCoordinator()){ +// JoinResponseMessage joinResponseMessage = new JoinResponseMessage(incomingRequest.getMemberID(), currentView, true); +// services.getMessenger().send(joinResponseMessage); +// return; +// } recordViewRequest(incomingRequest); } @@ -426,7 +426,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { * Process a Leave request from another member. This may cause this member * to become the new membership coordinator. If this is the coordinator * a new view will be triggered. - * + * * @param incomingRequest */ private void processLeaveRequest(LeaveRequestMessage incomingRequest) { @@ -442,7 +442,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { InternalDistributedMember mbr = incomingRequest.getMemberID(); if (logger.isDebugEnabled()) { - logger.debug("JoinLeave.processLeaveRequest invoked. isCoordinator="+isCoordinator+ "; isStopping="+isStopping + logger.debug("JoinLeave.processLeaveRequest invoked. isCoordinator="+isCoordinator+ "; isStopping="+isStopping +"; cancelInProgress="+ services.getCancelCriterion().isCancelInProgress()); } @@ -487,7 +487,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { * Process a Remove request from another member. This may cause this member * to become the new membership coordinator. If this is the coordinator * a new view will be triggered. - * + * * @param incomingRequest */ private void processRemoveRequest(RemoveMemberMessage incomingRequest) { @@ -501,7 +501,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { logger.info("Membership ignoring removal request for " + mbr + " from non-member " + incomingRequest.getSender()); return; } - + if (v == null) { // not yet a member return; @@ -532,11 +532,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { check.addCrashedMembers(removedMembers); check.removeAll(removedMembers); } - synchronized(leftMembers) { + synchronized (leftMembers) { check.removeAll(leftMembers); } if (check.getCoordinator().equals(localAddress)) { - synchronized(viewInstallationLock) { + synchronized (viewInstallationLock) { becomeCoordinator(mbr); } } @@ -580,17 +580,18 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { void becomeCoordinator() { // package access for unit testing becomeCoordinator(null); } - + public void becomeCoordinatorForTest() { - synchronized(viewInstallationLock) { + synchronized (viewInstallationLock) { becomeCoordinator(); } } - + /** * Test hook for delaying the creation of new views. * This should be invoked before this member becomes coordinator * and creates its ViewCreator thread. + * * @param millis */ public void delayViewCreationForTest(int millis) { @@ -602,17 +603,17 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { * be invoked under a synch on viewInstallationLock that was held * at the time the decision was made to become coordinator so that * the decision is atomic with actually becoming coordinator. + * * @param oldCoordinator may be null */ private void becomeCoordinator(InternalDistributedMember oldCoordinator) { - boolean testing = unitTesting.contains("noRandomViewChange"); assert Thread.holdsLock(viewInstallationLock); - + if (isCoordinator) { return; } - + logger.info("This member is becoming the membership coordinator with address {}", localAddress); isCoordinator = true; if (currentView == null) { @@ -622,55 +623,59 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { this.localAddress.setVmViewId(0); installView(newView); isJoined = true; - if (viewCreator == null || viewCreator.isShutdown()) { - createViewCreator(); - viewCreator.setDaemon(true); - viewCreator.start(); - startViewBroadcaster(); - } + createAndStartViewCreator(newView); + startViewBroadcaster(); } else { // create and send out a new view - NetView newView; - Set<InternalDistributedMember> leaving = new HashSet<>(); - Set<InternalDistributedMember> removals; - synchronized(viewInstallationLock) { - int rand = testing? 0 : NetView.RANDOM.nextInt(10); - int viewNumber = currentView.getViewId() + 5 + rand; - if (this.localAddress.getVmViewId() < 0) { - this.localAddress.setVmViewId(viewNumber); - } - List<InternalDistributedMember> mbrs = new ArrayList<>(currentView.getMembers()); - if (!mbrs.contains(localAddress)) { - mbrs.add(localAddress); - } - synchronized(this.removedMembers) { - removals = new HashSet<>(this.removedMembers); - } - synchronized(this.leftMembers) { - leaving.addAll(leftMembers); - } - if (oldCoordinator != null && !removals.contains(oldCoordinator)) { - leaving.add(oldCoordinator); - } - mbrs.removeAll(removals); - mbrs.removeAll(leaving); - newView = new NetView(this.localAddress, viewNumber, mbrs, leaving, - removals); - newView.setFailureDetectionPorts(currentView); - newView.setFailureDetectionPort(this.localAddress, services.getHealthMonitor().getFailureDetectionPort()); - } - if (viewCreator == null || viewCreator.isShutdown()) { - createViewCreator(); - viewCreator.setInitialView(newView, newView.getNewMembers(), leaving, removals); - viewCreator.setDaemon(true); - viewCreator.start(); - startViewBroadcaster(); + NetView newView = addMemberToNetView(this.currentView, oldCoordinator); + createAndStartViewCreator(newView); + startViewBroadcaster(); + } + } + + private void createAndStartViewCreator(NetView newView) { + if (viewCreator == null || viewCreator.isShutdown()) { + viewCreator = new ViewCreator("Geode Membership View Creator", Services.getThreadGroup()); + if (newView != null) { + viewCreator.setInitialView(newView, newView.getNewMembers(), newView.getShutdownMembers(), newView.getCrashedMembers()); } + viewCreator.setDaemon(true); + viewCreator.start(); } } - protected void createViewCreator() { - viewCreator = new ViewCreator("Geode Membership View Creator", Services.getThreadGroup()); + private NetView addMemberToNetView(NetView netView, InternalDistributedMember oldCoordinator) { + boolean testing = unitTesting.contains("noRandomViewChange"); + NetView newView = null; + Set<InternalDistributedMember> leaving = new HashSet<>(); + Set<InternalDistributedMember> removals; + synchronized (viewInstallationLock) { + int rand = testing ? 0 : NetView.RANDOM.nextInt(10); + int viewNumber = currentView.getViewId() + 5 + rand; + if (this.localAddress.getVmViewId() < 0) { + this.localAddress.setVmViewId(viewNumber); + } + List<InternalDistributedMember> mbrs = new ArrayList<>(currentView.getMembers()); + if (!mbrs.contains(localAddress)) { + mbrs.add(localAddress); + } + synchronized (this.removedMembers) { + removals = new HashSet<>(this.removedMembers); + } + synchronized (this.leftMembers) { + leaving.addAll(leftMembers); + } + if (oldCoordinator != null && !removals.contains(oldCoordinator)) { + leaving.add(oldCoordinator); + } + mbrs.removeAll(removals); + mbrs.removeAll(leaving); + newView = new NetView(this.localAddress, viewNumber, mbrs, leaving, + removals); + newView.setFailureDetectionPorts(currentView); + newView.setFailureDetectionPort(this.localAddress, services.getHealthMonitor().getFailureDetectionPort()); + } + return newView; } private void sendRemoveMessages(List<InternalDistributedMember> removals, List<String> reasons, NetView newView) { @@ -689,7 +694,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { sendView(view, newMembers, false, this.viewProcessor); } - boolean sendView(NetView view, List<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp) { + private boolean sendView(NetView view, List<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor viewReplyProcessor) { int id = view.getViewId(); InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(this.localAddress), preparing); @@ -708,6 +713,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (preparing) { this.preparedView = view; } else { + if(!localAddress.equals(view.getCoordinator()) && getViewCreator() != null) + { + stopCoordinatorServices(); + } installView(view); } @@ -716,25 +725,25 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { return true; } - StringBuilder s = new StringBuilder(); + StringBuilder stringBuilder = new StringBuilder(); int[] ports = view.getFailureDetectionPorts(); int numMembers = view.size(); - for (int i=0; i<numMembers; i++) { + for (int i = 0; i < numMembers; i++) { if (i > 0) { - s.append(' '); + stringBuilder.append(' '); } - s.append(ports[i]); + stringBuilder.append(ports[i]); } logger.info((preparing ? "preparing" : "sending") + " new view " + view - + "\nfailure detection ports: " + s.toString()); + + "\nfailure detection ports: " + stringBuilder.toString()); msg.setRecipients(recips); Set<InternalDistributedMember> pendingLeaves = getPendingRequestIDs(LEAVE_REQUEST_MESSAGE); Set<InternalDistributedMember> pendingRemovals = getPendingRequestIDs(REMOVE_MEMBER_REQUEST); pendingRemovals.removeAll(view.getCrashedMembers()); - rp.initialize(id, responders); - rp.processPendingRequests(pendingLeaves, pendingRemovals); + viewReplyProcessor.initialize(id, responders); + viewReplyProcessor.processPendingRequests(pendingLeaves, pendingRemovals); services.getMessenger().send(msg); // only wait for responses during preparation @@ -745,10 +754,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { logger.info("finished waiting for responses to view preparation"); - InternalDistributedMember conflictingViewSender = rp.getConflictingViewSender(); - NetView conflictingView = rp.getConflictingView(); + InternalDistributedMember conflictingViewSender = viewReplyProcessor.getConflictingViewSender(); + NetView conflictingView = viewReplyProcessor.getConflictingView(); if (conflictingView != null) { - logger.warn("received a conflicting membership view from " + conflictingViewSender + logger.warn("received a conflicting membership view from " + conflictingViewSender + " during preparation: " + conflictingView); return false; } @@ -762,7 +771,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { return true; } - private void processViewMessage(InstallViewMessage m) { + private void processViewMessage(final InstallViewMessage installViewMessage) { NetView view = m.getView(); @@ -829,9 +838,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } private TcpClientWrapper tcpClientWrapper = new TcpClientWrapper(); - + /*** * testing purpose + * * @param tcpClientWrapper */ void setTcpClientWrapper(TcpClientWrapper tcpClientWrapper) { @@ -847,21 +857,21 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { SearchState state = searchState; assert this.localAddress != null; - + // If we've already tried to bootstrap from locators that // haven't joined the system (e.g., a collocated locator) // then jump to using the membership view to try to find // the coordinator - if ( !state.hasContactedAJoinedLocator && state.view != null) { + if (!state.hasContactedAJoinedLocator && state.view != null) { return findCoordinatorFromView(); } FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId); Set<InternalDistributedMember> coordinators = new HashSet<InternalDistributedMember>(); - - long giveUpTime = System.currentTimeMillis() + ((long)services.getConfig().getLocatorWaitTime() * 1000L); - - int connectTimeout = (int)services.getConfig().getMemberTimeout() * 2; + + long giveUpTime = System.currentTimeMillis() + ((long) services.getConfig().getLocatorWaitTime() * 1000L); + + int connectTimeout = (int) services.getConfig().getMemberTimeout() * 2; boolean anyResponses = false; boolean flagsSet = false; @@ -869,12 +879,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { state.hasContactedAJoinedLocator = false; state.locatorsContacted = 0; - + do { for (InetSocketAddress addr : locators) { try { Object o = tcpClientWrapper.sendCoordinatorFindRequest(addr, request, connectTimeout); - FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null; + FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse) o : null; if (response != null) { state.locatorsContacted++; if (!state.hasContactedAJoinedLocator && @@ -886,7 +896,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (response.getCoordinator() != null) { anyResponses = true; NetView v = response.getView(); - int viewId = v == null? -1 : v.getViewId(); + int viewId = v == null ? -1 : v.getViewId(); if (viewId > state.viewId) { state.viewId = viewId; state.view = v; @@ -946,17 +956,17 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } return true; } - + protected class TcpClientWrapper { - protected Object sendCoordinatorFindRequest(InetSocketAddress addr, FindCoordinatorRequest request, int connectTimeout) - throws ClassNotFoundException, IOException{ + protected Object sendCoordinatorFindRequest(InetSocketAddress addr, FindCoordinatorRequest request, int connectTimeout) + throws ClassNotFoundException, IOException { return TcpClient.requestToServer( - addr.getAddress(), addr.getPort(), request, connectTimeout, + addr.getAddress(), addr.getPort(), request, connectTimeout, true); } - } + } - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="WA_NOT_IN_LOOP") + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "WA_NOT_IN_LOOP") boolean findCoordinatorFromView() { ArrayList<FindCoordinatorResponse> result; SearchState state = searchState; @@ -1028,8 +1038,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { private void inheritSettingsFromLocator(InetSocketAddress addr, FindCoordinatorResponse response) { boolean enabled = response.isNetworkPartitionDetectionEnabled(); if (!enabled && services.getConfig().isNetworkPartitionDetectionEnabled()) { - throw new GemFireConfigException("locator at "+addr - +" does not have network-partition-detection enabled but my configuration has it enabled"); + throw new GemFireConfigException("locator at " + addr + + " does not have network-partition-detection enabled but my configuration has it enabled"); } GMSMember mbr = (GMSMember) this.localAddress.getNetMember(); @@ -1040,8 +1050,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (response.isUsePreferredCoordinators()) { this.quorumRequired = true; logger.debug("The locator indicates that all locators should be preferred as coordinators"); - if (services.getLocator() != null - || Locator.hasLocator() + if (services.getLocator() != null + || Locator.hasLocator() || !services.getConfig().getDistributionConfig().getStartLocator().isEmpty() || localAddress.getVmKind() == DistributionManager.LOCATOR_DM_TYPE) { ((GMSMember) localAddress.getNetMember()).setPreferredForCoordinator(true); @@ -1053,7 +1063,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { /** * receives a JoinResponse holding a membership view or rejection message - * + * * @param rsp */ private void processJoinResponse(JoinResponseMessage rsp) { @@ -1062,7 +1072,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { joinResponse.notifyAll(); } } - + /** * for testing, do not use in any other case as it is not thread safe */ @@ -1142,7 +1152,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { return; } } - + if (isJoined && isNetworkPartition(newView, true)) { if (quorumRequired) { Set<InternalDistributedMember> crashes = newView.getActualCrashedMembers(currentView); @@ -1150,7 +1160,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { return; } } - + previousView = currentView; currentView = newView; preparedView = null; @@ -1160,7 +1170,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (!isJoined) { logger.debug("notifying join thread"); isJoined = true; - synchronized(joinResponse) { + synchronized (joinResponse) { joinResponse.notifyAll(); } } @@ -1179,7 +1189,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { // newer than the view just processed - the senders will have to // resend these synchronized (viewRequests) { - for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext();) { + for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext(); ) { DistributionMessage m = it.next(); if (m instanceof JoinRequestMessage) { it.remove(); @@ -1199,7 +1209,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { synchronized (removedMembers) { removeMembersFromCollectionIfNotInView(removedMembers, currentView); } - synchronized(leftMembers) { + synchronized (leftMembers) { removeMembersFromCollectionIfNotInView(leftMembers, currentView); } } @@ -1208,7 +1218,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { Iterator<InternalDistributedMember> iterator = members.iterator(); while (iterator.hasNext()) { if (!currentView.contains(iterator.next())) { - iterator.remove(); + iterator.remove(); } } } @@ -1216,7 +1226,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { /** * Sends a message declaring a network partition to the * members of the given view via Messenger - * + * * @param view */ void sendNetworkPartitionMessage(NetView view) { @@ -1279,10 +1289,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { private void stopCoordinatorServices() { if (viewCreator != null && !viewCreator.isShutdown()) { + logger.debug("Shutting down ViewCreator"); viewCreator.shutdown(); } } - + private void startViewBroadcaster() { services.getTimer().schedule(new ViewBroadcaster(), VIEW_BROADCAST_INTERVAL, VIEW_BROADCAST_INTERVAL); } @@ -1342,7 +1353,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { stopCoordinatorServices(); if (view != null) { if (view.size() > 1) { - List<InternalDistributedMember> coords = view.getPreferredCoordinators(Collections.<InternalDistributedMember> emptySet(), localAddress, 5); + List<InternalDistributedMember> coords = view.getPreferredCoordinators(Collections.<InternalDistributedMember>emptySet(), localAddress, 5); logger.debug("JoinLeave sending a leave request to {}", coords); LeaveRequestMessage m = new LeaveRequestMessage(coords, this.localAddress, "this member is shutting down"); services.getMessenger().send(m); @@ -1365,7 +1376,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { processRemoveRequest(msg); if (!this.isCoordinator) { msg.resetRecipients(); - msg.setRecipients(v.getPreferredCoordinators(Collections.<InternalDistributedMember> emptySet(), localAddress, 10)); + msg.setRecipients(v.getPreferredCoordinators(Collections.<InternalDistributedMember>emptySet(), localAddress, 10)); services.getMessenger().send(msg); } } @@ -1373,8 +1384,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { @Override public void memberShutdown(DistributedMember mbr, String reason) { - LeaveRequestMessage msg = new LeaveRequestMessage(Collections.singleton(this.localAddress), (InternalDistributedMember)mbr, reason); - msg.setSender((InternalDistributedMember)mbr); + LeaveRequestMessage msg = new LeaveRequestMessage(Collections.singleton(this.localAddress), (InternalDistributedMember) mbr, reason); + msg.setSender((InternalDistributedMember) mbr); processLeaveRequest(msg); } @@ -1494,11 +1505,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { protected ViewReplyProcessor getPrepareViewReplyProcessor() { return prepareProcessor; } - - protected boolean testPrepareProcessorWaiting(){ + + protected boolean testPrepareProcessorWaiting() { return prepareProcessor.isWaiting(); } - + class ViewReplyProcessor { volatile int viewId = -1; final Set<InternalDistributedMember> notRepliedYet = new HashSet<>(); @@ -1521,7 +1532,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { pendingRemovals.clear(); } - boolean isWaiting(){ + boolean isWaiting() { return waiting; } @@ -1620,7 +1631,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } } finally { - synchronized(this) { + synchronized (this) { if (!this.waiting) { // if we've set waiting to false due to incoming messages then // we've discounted receiving any other responses from the @@ -1663,7 +1674,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { sendCurrentView(); } } - + void sendCurrentView() { NetView v = currentView; if (v != null) { @@ -1726,9 +1737,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { * All views should be sent by the ViewCreator thread, so * if this member becomes coordinator it may have an initial * view to transmit that announces the removal of the former coordinator to - * + * * @param newView - * @param leaving - members leaving in this view + * @param leaving - members leaving in this view * @param removals - members crashed in this view */ synchronized void setInitialView(NetView newView, List<InternalDistributedMember> newMembers, @@ -1752,7 +1763,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { List<InternalDistributedMember> iJoins; Set<InternalDistributedMember> iLeaves; Set<InternalDistributedMember> iRemoves; - synchronized(this) { + synchronized (this) { iView = initialView; iJoins = initialJoins; iLeaves = initialLeaving; @@ -1770,7 +1781,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { * During initial view processing a prepared view was discovered. * This method will extract its new members and create a new * initial view containing them. - * + * * @param v The prepared view */ private void processPreparedView(NetView v) { @@ -1778,7 +1789,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (currentView == null || currentView.getViewId() < v.getViewId()) { // we have a prepared view that is newer than the current view // form a new View ID - int viewId = Math.max(initialView.getViewId(),v.getViewId()); + int viewId = Math.max(initialView.getViewId(), v.getViewId()); viewId += 1; NetView newView = new NetView(initialView, viewId); @@ -1790,13 +1801,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } else { newMembers = v.getMembers(); } - for (InternalDistributedMember newMember: newMembers) { + for (InternalDistributedMember newMember : newMembers) { newView.add(newMember); newView.setFailureDetectionPort(newMember, v.getFailureDetectionPort(newMember)); } // use the new view as the initial view - synchronized(this) { + synchronized (this) { setInitialView(newView, newMembers, initialLeaving, initialRemovals); } } @@ -1899,7 +1910,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { InternalDistributedMember mbr = null; switch (msg.getDSFID()) { case JOIN_REQUEST: - JoinRequestMessage jmsg = (JoinRequestMessage)msg; + JoinRequestMessage jmsg = (JoinRequestMessage) msg; mbr = jmsg.getMemberID(); int port = jmsg.getFailureDetectionPort(); // see if an old member ID is being reused. If @@ -1940,8 +1951,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { removalReqs.add(mbr); removalReasons.add(((RemoveMemberMessage) msg).getReason()); } else { - sendRemoveMessages(Collections.<InternalDistributedMember> singletonList(mbr), - Collections.<String> singletonList(((RemoveMemberMessage) msg).getReason()), currentView); + sendRemoveMessages(Collections.<InternalDistributedMember>singletonList(mbr), + Collections.<String>singletonList(((RemoveMemberMessage) msg).getReason()), currentView); } } break; @@ -1977,7 +1988,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { // be reused in an auto-reconnect and get a new vmViewID mbrs.addAll(joinReqs); newView = new NetView(localAddress, viewNumber, mbrs, leaveReqs, new HashSet<InternalDistributedMember>(removalReqs)); - for (InternalDistributedMember mbr: joinReqs) { + for (InternalDistributedMember mbr : joinReqs) { if (mbrs.contains(mbr)) { newView.setFailureDetectionPort(mbr, joinPorts.get(mbr)); } @@ -1998,14 +2009,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { mbr.setVmViewId(newView.getViewId()); mbr.getNetMember().setSplitBrainEnabled(services.getConfig().isNetworkPartitionDetectionEnabled()); } - + if (isShutdown()) { return; } // send removal messages before installing the view so we stop // getting messages from members that have been kicked out sendRemoveMessages(removalReqs, removalReasons, newView); - + prepareAndSendView(newView, joinReqs, leaveReqs, newView.getCrashedMembers()); return; @@ -2078,7 +2089,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { List<InternalDistributedMember> newMembers = conflictingView.getNewMembers(); if (!newMembers.isEmpty()) { logger.info("adding these new members from a conflicting view to the new view: {}", newMembers); - for (InternalDistributedMember mbr: newMembers) { + for (InternalDistributedMember mbr : newMembers) { int port = conflictingView.getFailureDetectionPort(mbr); newView.add(mbr); newView.setFailureDetectionPort(mbr, port); @@ -2087,7 +2098,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } // trump the view ID of the conflicting view so mine will be accepted if (conflictingView.getViewId() >= newView.getViewId()) { - newView = new NetView(newView, conflictingView.getViewId()+1); + newView = new NetView(newView, conflictingView.getViewId() + 1); } } @@ -2105,7 +2116,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { List<InternalDistributedMember> newMembers = new ArrayList<>(newView.getMembers()); newMembers.removeAll(removalReqs); NetView tempView = new NetView(localAddress, newView.getViewId() + 1, newMembers, leaveReqs, removalReqs); - for (InternalDistributedMember mbr: newView.getMembers()) { + for (InternalDistributedMember mbr : newView.getMembers()) { if (tempView.contains(mbr)) { tempView.setFailureDetectionPort(mbr, newView.getFailureDetectionPort(mbr)); } @@ -2113,12 +2124,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { newView = tempView; int size = failures.size(); List<String> reasons = new ArrayList<>(size); - for (int i=0; i<size; i++) { + for (int i = 0; i < size; i++) { reasons.add("Failed to acknowledge a new membership view and then failed tcp/ip connection attempt"); } sendRemoveMessages(failures, reasons, newView); } - + // if there is no conflicting view then we can count // the current state as being prepared. All members // who are going to ack have already done so or passed @@ -2126,13 +2137,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (conflictingView == null) { prepared = true; } - + } while (!prepared); lastConflictingView = null; sendView(newView, joinReqs); - + // after sending a final view we need to stop this thread if // the GMS is shutting down if (isStopping()) { @@ -2143,11 +2154,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { /** * performs health checks on the collection of members, removing any that * are found to be healthy - * - * @param suspects + * + * @param mbrs */ - private void removeHealthyMembers(final Set<InternalDistributedMember> suspects) throws InterruptedException { - List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(suspects.size()); + private void removeHealthyMembers(final Collection<InternalDistributedMember> mbrs) throws InterruptedException { + List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size()); Set<InternalDistributedMember> newRemovals = new HashSet<>(); Set<InternalDistributedMember> newLeaves = new HashSet<>(); @@ -2166,7 +2177,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { @Override public InternalDistributedMember call() throws Exception { boolean available = GMSJoinLeave.this.checkIfAvailable(mbr); - + synchronized (viewRequests) { if (available) { suspects.remove(mbr); @@ -2218,7 +2229,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if(suspects.isEmpty() || newRemovals.containsAll(suspects)) { break; } - + viewRequests.wait(waitTime); waitTime = giveUpTime - System.currentTimeMillis(); } @@ -2249,22 +2260,22 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } } - + private <T> List<Future<T>> submitAll ( ExecutorService executor, Collection<? extends Callable<T> > tasks ) { List<Future<T>> result = new ArrayList<Future<T>>( tasks.size() ); - for ( Callable<T> task : tasks ) { + for (Callable<T> task : tasks) { result.add(executor.submit(task)); } return result; } - + boolean getTestFlageForRemovalRequest() { return testFlagForRemovalRequest; } } - + boolean checkIfAvailable(InternalDistributedMember fmbr) { // return the member id if it fails health checks logger.info("checking state of member " + fmbr); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3f00d86d/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java index 91f6918..9e42f1f 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java @@ -30,18 +30,19 @@ import com.gemstone.gemfire.distributed.internal.membership.NetView; import com.gemstone.gemfire.internal.InternalDataSerializer; public class InstallViewMessage extends HighPriorityDistributionMessage { - enum messageType { INSTALL, PREPARE, SYNC } private NetView view; private Object credentials; private messageType kind; + private int previousViewId; public InstallViewMessage(NetView view, Object credentials) { this.view = view; this.kind = messageType.INSTALL; this.credentials = credentials; + this.previousViewId = view.getViewId(); } public InstallViewMessage(NetView view, Object credentials, boolean preparing) { @@ -49,6 +50,13 @@ public class InstallViewMessage extends HighPriorityDistributionMessage { this.kind = preparing? messageType.PREPARE : messageType.INSTALL; this.credentials = credentials; } + + public InstallViewMessage(NetView view, Object credentials, int previousViewId, boolean preparing) { + this.view = view; + this.kind = preparing? messageType.PREPARE : messageType.INSTALL; + this.credentials = credentials; + this.previousViewId = previousViewId; + } public InstallViewMessage() { // no-arg constructor for serialization @@ -62,6 +70,10 @@ public class InstallViewMessage extends HighPriorityDistributionMessage { return view; } + public int getPreviousViewId() { + return previousViewId; + } + public Object getCredentials() { return credentials; } @@ -83,6 +95,7 @@ public class InstallViewMessage extends HighPriorityDistributionMessage { @Override public void toData(DataOutput out) throws IOException { super.toData(out); + out.writeInt(previousViewId); out.writeInt(kind.ordinal()); DataSerializer.writeObject(this.view, out); DataSerializer.writeObject(this.credentials, out); @@ -91,6 +104,7 @@ public class InstallViewMessage extends HighPriorityDistributionMessage { @Override public void fromData(DataInput in) throws IOException, ClassNotFoundException { super.fromData(in); + this.previousViewId = in.readInt(); this.kind = messageType.values()[in.readInt()]; this.view = DataSerializer.readObject(in); this.credentials = DataSerializer.readObject(in); @@ -98,7 +112,7 @@ public class InstallViewMessage extends HighPriorityDistributionMessage { @Override public String toString() { - return "InstallViewMessage(type="+this.kind+"; "+this.view + return "InstallViewMessage(type="+this.kind+"; Current ViewID="+view.getViewId()+"; Previous View ID="+previousViewId+"; "+this.view +"; cred="+(credentials==null?"null": "not null") +")"; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3f00d86d/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java deleted file mode 100755 index e5bf9e2..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewRejectMessage.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.distributed.internal.membership.gms.messages; - -import com.gemstone.gemfire.DataSerializer; -import com.gemstone.gemfire.distributed.internal.DistributionManager; -import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage; -import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; -import com.gemstone.gemfire.distributed.internal.membership.NetView; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -public class ViewRejectMessage extends HighPriorityDistributionMessage { - - private int viewId; - private NetView rejectedView; - private String reason; - - public ViewRejectMessage(InternalDistributedMember recipient, int viewId, NetView rejectedView, String reason) { - super(); - setRecipient(recipient); - this.viewId = viewId; - this.rejectedView = rejectedView; - this.reason = reason; - } - - public ViewRejectMessage() { - // no-arg constructor for serialization - } - - public int getViewId() { - return viewId; - } - - public NetView getRejectedView() { - return this.rejectedView; - } - - - @Override - public int getDSFID() { - // TODO Auto-generated method stub - return VIEW_REJECT_MESSAGE; - } - - public String getReason() { - return reason; - } - - @Override - public int getProcessorType() { - return 0; - } - - @Override - public void process(DistributionManager dm) { - throw new IllegalStateException("this message is not intended to execute in a thread pool"); - } - - @Override - public void toData(DataOutput out) throws IOException { - super.toData(out); - out.writeInt(this.viewId); - DataSerializer.writeObject(this.rejectedView, out); - } - - @Override - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - super.fromData(in); - this.viewId = in.readInt(); - this.rejectedView = DataSerializer.readObject(in); - } - - @Override - public String toString() { - String s = getSender() == null? getRecipientsDescription() : ""+getSender(); - return "ViewRejectMessage("+s+"; "+this.viewId+"; rejectedView="+this.rejectedView +")"; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3f00d86d/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java index b77dfdb..67e7c8d 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java @@ -19,6 +19,7 @@ package com.gemstone.gemfire.internal; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.*; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.io.DataInput; @@ -104,16 +105,6 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoor import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse; import com.gemstone.gemfire.distributed.internal.membership.gms.locator.GetViewRequest; import com.gemstone.gemfire.distributed.internal.membership.gms.locator.GetViewResponse; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.InstallViewMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatRequestMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.NetworkPartitionMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage; -import com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage; import com.gemstone.gemfire.distributed.internal.streaming.StreamingOperation.StreamingReplyMessage; import com.gemstone.gemfire.internal.admin.ClientMembershipMessage; import com.gemstone.gemfire.internal.admin.remote.AddHealthListenerRequest; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3f00d86d/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java index 22ac457..035ba56 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java @@ -82,8 +82,6 @@ public interface DataSerializableFixedID extends SerializationVersions { case FOO: return new FOO(in); */ - public static final short VIEW_REJECT_MESSAGE = -158; - public static final short NETWORK_PARTITION_MESSAGE = -157; public static final short SUSPECT_MEMBERS_MESSAGE = -156; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3f00d86d/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java index 94ec089..3786ccb 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java @@ -30,6 +30,7 @@ import com.gemstone.gemfire.distributed.internal.membership.MembershipManager; import com.gemstone.gemfire.distributed.internal.membership.MembershipTestHook; import com.gemstone.gemfire.distributed.internal.membership.NetView; import com.gemstone.gemfire.distributed.internal.membership.gms.MembershipManagerHelper; +import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeaveHelper; import com.gemstone.gemfire.internal.Assert; import com.gemstone.gemfire.internal.AvailablePort; import com.gemstone.gemfire.internal.AvailablePortHelper; @@ -67,7 +68,7 @@ import com.gemstone.gemfire.test.dunit.WaitCriterion; public class LocatorDUnitTest extends DistributedTestCase { static TestHook hook; - + /** * Creates a new <code>LocatorDUnitTest</code> */ @@ -77,12 +78,12 @@ public class LocatorDUnitTest extends DistributedTestCase { private static final String WAIT2_MS_NAME = "LocatorDUnitTest.WAIT2_MS"; private static final int WAIT2_MS_DEFAULT = 5000; // 2000 -- see bug 36470 - private static final int WAIT2_MS + private static final int WAIT2_MS = Integer.getInteger(WAIT2_MS_NAME, WAIT2_MS_DEFAULT).intValue(); - + private int port1; private int port2; - + @Override public void setUp() throws Exception { super.setUp(); @@ -90,7 +91,7 @@ public class LocatorDUnitTest extends DistributedTestCase { port2 = -1; IgnoredException.addIgnoredException("Removing shunned member"); } - + @Override protected final void preTearDown() throws Exception { if (Locator.hasLocator()) { @@ -105,10 +106,10 @@ public class LocatorDUnitTest extends DistributedTestCase { DistributedTestUtils.deleteLocatorStateFile(port2); } } - + //////// Test Methods - + /** * SQLFire uses a colocated locator in a dm-type=normal VM. This tests that * the locator can resume control as coordinator after all locators have been @@ -123,7 +124,7 @@ public class LocatorDUnitTest extends DistributedTestCase { VM vm1 = host.getVM(1); VM vm2 = host.getVM(2); VM vm3 = host.getVM(3); - + port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); DistributedTestUtils.deleteLocatorStateFile(port1); @@ -140,7 +141,7 @@ public class LocatorDUnitTest extends DistributedTestCase { InternalDistributedMember mbr = system.getDistributedMember(); assertEquals("expected the VM to have NORMAL vmKind", DistributionManager.NORMAL_DM_TYPE, system.getDistributedMember().getVmKind()); - + properties.remove("start-locator"); properties.put("log-level", LogWriterUtils.getDUnitLogLevel()); properties.put("locators", locators); @@ -156,12 +157,12 @@ public class LocatorDUnitTest extends DistributedTestCase { Cache cache = CacheFactory.create(system); Region r = cache.createRegionFactory(RegionShortcut.REPLICATE).create("test region"); assertNotNull("expected to create a region", r); - + // create a lock service and have every vm get a lock DistributedLockService service = DistributedLockService.create("test service", system); service.becomeLockGrantor(); service.lock("foo0", 0, 0); - + vm1.invoke(new SerializableRunnable("get the lock service and lock something") { public void run() { final DistributedLockService service = DistributedLockService.create("test service", system); @@ -178,7 +179,7 @@ public class LocatorDUnitTest extends DistributedTestCase { // cause elder failover. vm1 will become the lock grantor system.disconnect(); - + try { vm1.invoke(new SerializableRunnable("ensure grantor failover") { public void run() { @@ -194,17 +195,17 @@ public class LocatorDUnitTest extends DistributedTestCase { public String description() { return "waiting to become lock grantor after shutting down locator/grantor"; } - + }, DistributionConfig.DEFAULT_MEMBER_TIMEOUT * 2, 1000, true); assertTrue(service.isLockGrantor()); } }); - + properties.put("start-locator", locators); properties.put("log-level", LogWriterUtils.getDUnitLogLevel()); system = (InternalDistributedSystem)DistributedSystem.connect(properties); System.out.println("done connecting distributed system"); - + assertEquals("should be the coordinator", system.getDistributedMember(), MembershipManagerHelper.getCoordinator(system)); NetView view = MembershipManagerHelper.getMembershipManager(system).getView(); com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("view after becoming coordinator is " + view); @@ -221,7 +222,7 @@ public class LocatorDUnitTest extends DistributedTestCase { }); assertFalse("should not have become lock grantor", service.isLockGrantor()); - + // Now demonstrate that a new member can join and use the lock service properties.remove("start-locator"); vm3.invoke(startSystem); @@ -231,7 +232,7 @@ public class LocatorDUnitTest extends DistributedTestCase { service.lock("foo5", 0, 0); } }); - + } finally { disconnectAllFromDS(); } @@ -242,7 +243,7 @@ public class LocatorDUnitTest extends DistributedTestCase { * split-brain configuration. To work around this we have always told customers that they * need to stagger the starting of locators. This test configures two locators to start up * simultaneously and shows that they find each other and form a single system. - * + * * @throws Exception */ public void testStartTwoLocators() throws Exception { @@ -250,7 +251,7 @@ public class LocatorDUnitTest extends DistributedTestCase { Host host = Host.getHost(0); VM loc1 = host.getVM(1); VM loc2 = host.getVM(2); - + int ports[] = AvailablePortHelper.getRandomAvailableTCPPorts(2); final int port1 = ports[0]; this.port1 = port1; @@ -258,7 +259,7 @@ public class LocatorDUnitTest extends DistributedTestCase { this.port2 = port2; // for cleanup in tearDown2 DistributedTestUtils.deleteLocatorStateFile(port1); DistributedTestUtils.deleteLocatorStateFile(port2); - final String host0 = NetworkUtils.getServerHostName(host); + final String host0 = NetworkUtils.getServerHostName(host); final String locators = host0 + "[" + port1 + "]," + host0 + "[" + port2 + "]"; final Properties properties = new Properties(); @@ -384,16 +385,6 @@ public class LocatorDUnitTest extends DistributedTestCase { Object[] connectArgs = new Object[] { properties }; - SerializableRunnable disconnect = - new SerializableRunnable("Disconnect from " + locators) { - public void run() { - DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); - if (sys != null && sys.isConnected()) { - sys.disconnect(); - } - } - }; - assertTrue(MembershipManagerHelper.getLeadMember(sys) == null); // connect three vms and then watch the lead member selection as they @@ -416,7 +407,7 @@ public class LocatorDUnitTest extends DistributedTestCase { assertLeadMember(mem1, sys, 5000); // after disconnecting the first vm, the second one should become the leader - vm1.invoke(disconnect); + vm1.invoke(getDisconnectRunnable(locators)); MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem1); assertLeadMember(mem2, sys, 5000); @@ -425,15 +416,15 @@ public class LocatorDUnitTest extends DistributedTestCase { "getDistributedMember", connectArgs); assertLeadMember(mem2, sys, 5000); - vm2.invoke(disconnect); + vm2.invoke(getDisconnectRunnable(locators)); MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem2); assertLeadMember(mem3, sys, 5000); - vm1.invoke(disconnect); + vm1.invoke(getDisconnectRunnable(locators)); MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem1); assertLeadMember(mem3, sys, 5000); - vm3.invoke(disconnect); + vm3.invoke(getDisconnectRunnable(locators)); MembershipManagerHelper.getMembershipManager(sys).waitForDeparture(mem3); assertLeadMember(null, sys, 5000); @@ -485,13 +476,13 @@ public class LocatorDUnitTest extends DistributedTestCase { VM vm2 = host.getVM(2); VM locvm = host.getVM(3); Locator locator = null; - + int ports[] = AvailablePortHelper.getRandomAvailableTCPPorts(2); final int port1 = ports[0]; this.port1 = port1; final int port2 = ports[1]; DistributedTestUtils.deleteLocatorStateFile(port1, port2); - final String host0 = NetworkUtils.getServerHostName(host); + final String host0 = NetworkUtils.getServerHostName(host); final String locators = host0 + "[" + port1 + "]," + host0 + "[" + port2 + "]"; final Properties properties = new Properties(); @@ -503,7 +494,7 @@ public class LocatorDUnitTest extends DistributedTestCase { properties.put("log-level", LogWriterUtils.getDUnitLogLevel()); // properties.put("log-level", "fine"); properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); - + try { final String uname = getUniqueName(); File logFile = new File(""); @@ -618,7 +609,7 @@ public class LocatorDUnitTest extends DistributedTestCase { VM vm2 = host.getVM(2); VM locvm = host.getVM(3); Locator locator = null; - + final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2); final int port1 = ports[0]; this.port1 = port1; @@ -637,7 +628,7 @@ public class LocatorDUnitTest extends DistributedTestCase { properties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); SerializableRunnable stopLocator = getStopLocatorRunnable(); - + try { final String uname = getUniqueName(); File logFile = new File(""); @@ -716,19 +707,9 @@ public class LocatorDUnitTest extends DistributedTestCase { "'failed to collect all ACKs' it is a false failure.", mem2, vm2.invoke(LocatorDUnitTest.class, "getLeadMember", new Object[] {})); - SerializableRunnable disconnect = - new SerializableRunnable("Disconnect from " + locators) { - public void run() { - DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); - if (sys != null && sys.isConnected()) { - sys.disconnect(); - } - } - }; - // disconnect the first vm and demonstrate that the third vm and the // locator notice the failure and exit - vm2.invoke(disconnect); + vm2.invoke(getDisconnectRunnable(locators)); locvm.invoke(stopLocator); } finally { MembershipManagerHelper.inhibitForcedDisconnectLogging(false); @@ -765,7 +746,7 @@ public class LocatorDUnitTest extends DistributedTestCase { VM vm2 = host.getVM(2); VM locvm = host.getVM(3); Locator locator = null; - + int ports[] = AvailablePortHelper.getRandomAvailableTCPPorts(2); final int port1 = ports[0]; this.port1 = port1; @@ -902,7 +883,7 @@ public class LocatorDUnitTest extends DistributedTestCase { VM vm2 = host.getVM(2); VM locvm = host.getVM(3); Locator locator = null; - + int ports[] = AvailablePortHelper.getRandomAvailableTCPPorts(2); final int port1 = ports[0]; this.port1 = port1; @@ -1144,7 +1125,7 @@ public class LocatorDUnitTest extends DistributedTestCase { } }; Wait.waitForCriterion(ev, 15 * 1000, 200, false); - DistributedMember newCoord = MembershipManagerHelper.getCoordinator(system); + DistributedMember newCoord = MembershipManagerHelper.getCoordinator(system); com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("coordinator after shutdown of locator was " + newCoord); if (coord.equals(newCoord)) { @@ -1154,17 +1135,8 @@ public class LocatorDUnitTest extends DistributedTestCase { system.disconnect(); - SerializableRunnable disconnect = - new SerializableRunnable("Disconnect from " + locators) { - public void run() { - DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); - if (sys != null && sys.isConnected()) { - sys.disconnect(); - } - } - }; - vm1.invoke(disconnect); - vm2.invoke(disconnect); + vm1.invoke(getDisconnectRunnable(locators)); + vm2.invoke(getDisconnectRunnable(locators)); } finally { vm0.invoke(getStopLocatorRunnable()); @@ -1402,17 +1374,8 @@ public class LocatorDUnitTest extends DistributedTestCase { system.disconnect(); - SerializableRunnable disconnect = - new SerializableRunnable("Disconnect from " + locators) { - public void run() { - DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); - if (sys != null && sys.isConnected()) { - sys.disconnect(); - } - } - }; - vm1.invoke(disconnect); - vm2.invoke(disconnect); + vm1.invoke(getDisconnectRunnable(locators)); + vm2.invoke(getDisconnectRunnable(locators)); } finally { vm3.invoke(getStopLocatorRunnable()); @@ -1421,7 +1384,18 @@ public class LocatorDUnitTest extends DistributedTestCase { vm0.invoke(getStopLocatorRunnable()); } } - + + private SerializableRunnable getDisconnectRunnable(final String locators) { + return new SerializableRunnable("Disconnect from " + locators) { + public void run() { + DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); + if (sys != null && sys.isConnected()) { + sys.disconnect(); + } + } + }; + } + /** * Tests starting multiple locators at the same time and ensuring that the locators * end up only have 1 master. @@ -1452,7 +1426,6 @@ public class LocatorDUnitTest extends DistributedTestCase { dsProps.setProperty("log-level", "FINE"); dsProps.setProperty("enable-network-partition-detection", "true"); dsProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); - final String uniqueName = getUniqueName(); startLocatorSync(vm0, new Object[] { port1, dsProps }); startLocatorSync(vm1, new Object[] { port2, dsProps }); @@ -1483,7 +1456,7 @@ public class LocatorDUnitTest extends DistributedTestCase { WaitCriterion waitCriterion = new WaitCriterion() { public boolean done() { try { - return system.getDM().getViewMembers().size() >= 3; + return system.getDM().getViewMembers().size() == 6; } catch (Exception e) { e.printStackTrace(); fail("unexpected exception"); @@ -1504,10 +1477,31 @@ public class LocatorDUnitTest extends DistributedTestCase { vm1.invoke(getStopLocatorRunnable()); vm2.invoke(getStopLocatorRunnable()); + waitCriterion = new WaitCriterion() { + public boolean done() { + try { + return system.getDM().getAllHostedLocators().size() == 0; + } catch (Exception e) { + e.printStackTrace(); + fail("unexpected exception"); + } + return false; // NOTREACHED + } + + public String description() { + return null; + } + }; + DistributedTestCase.waitForCriterion(waitCriterion, 10 * 1000, 200, true); + final String newLocators = host0 + "[" + port2 + "]," + host0 + "[" + port3 + "]"; dsProps.setProperty("locators", newLocators); + assertTrue(vm3.invoke(() -> GMSJoinLeaveHelper.isViewCreator())); + //Given the start up order of servers, this server is the elder server + assertTrue(vm3.invoke(() -> GMSJoinLeaveHelper.isViewCreator())); + startLocatorAsync(vm1, new Object[] { port2, dsProps }); startLocatorAsync(vm2, new Object[] { port3, dsProps }); @@ -1528,19 +1522,23 @@ public class LocatorDUnitTest extends DistributedTestCase { }; DistributedTestCase.waitForCriterion(waitCriterion, 10 * 1000, 200, true); + int netviewId = vm1.invoke(() -> GMSJoinLeaveHelper.getViewId()); + assertEquals(netviewId, (int) vm2.invoke(() -> GMSJoinLeaveHelper.getViewId())); + assertEquals(netviewId, (int) vm3.invoke(() -> GMSJoinLeaveHelper.getViewId())); + assertEquals(netviewId, (int) vm4.invoke(() -> GMSJoinLeaveHelper.getViewId())); + assertFalse(vm4.invoke(() -> GMSJoinLeaveHelper.isViewCreator())); + //Given the start up order of servers, this server is the elder server + assertFalse(vm3.invoke(() -> GMSJoinLeaveHelper.isViewCreator())); + if (vm1.invoke(() -> GMSJoinLeaveHelper.isViewCreator())) { + assertFalse(vm2.invoke(() -> GMSJoinLeaveHelper.isViewCreator())); + } else { + assertTrue(vm2.invoke(() -> GMSJoinLeaveHelper.isViewCreator())); + } + } finally { system.disconnect(); - SerializableRunnable disconnect = - new SerializableRunnable("Disconnect from " + locators) { - public void run() { - DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); - if (sys != null && sys.isConnected()) { - sys.disconnect(); - } - } - }; - vm3.invoke(disconnect); - vm4.invoke(disconnect); + vm3.invoke(getDisconnectRunnable(locators)); + vm4.invoke(getDisconnectRunnable(locators)); vm2.invoke(getStopLocatorRunnable()); vm1.invoke(getStopLocatorRunnable()); } @@ -1678,17 +1676,8 @@ public class LocatorDUnitTest extends DistributedTestCase { Wait.waitForCriterion(ev, WAIT2_MS, 200, true); system.disconnect(); - SerializableRunnable disconnect = - new SerializableRunnable("Disconnect from " + locators) { - public void run() { - DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); - if (sys != null && sys.isConnected()) { - sys.disconnect(); - } - } - }; - vm1.invoke(disconnect); - vm2.invoke(disconnect); + vm1.invoke(getDisconnectRunnable(locators)); + vm2.invoke(getDisconnectRunnable(locators)); } finally { SerializableRunnable stop = getStopLocatorRunnable(); vm0.invoke(stop); @@ -1847,15 +1836,7 @@ public class LocatorDUnitTest extends DistributedTestCase { connect.run(); //vm1.invoke(connect); - SerializableRunnable disconnect = - new SerializableRunnable("Disconnect from " + locators) { - public void run() { - DistributedSystem sys = InternalDistributedSystem.getAnyInstance(); - if (sys != null && sys.isConnected()) { - sys.disconnect(); - } - } - }; + SerializableRunnable disconnect = getDisconnectRunnable(locators); disconnect.run(); //vm1.invoke(disconnect); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3f00d86d/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveHelper.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveHelper.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveHelper.java new file mode 100644 index 0000000..b8311bc --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveHelper.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.distributed.internal.membership.gms.membership; + +import com.gemstone.gemfire.distributed.Locator; +import com.gemstone.gemfire.distributed.internal.DM; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.distributed.internal.membership.gms.Services; +import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager; + +public class GMSJoinLeaveHelper { + public static boolean isViewCreator() { + GMSJoinLeave gmsJoinLeave = getGmsJoinLeave(); + if (gmsJoinLeave != null) { + GMSJoinLeave.ViewCreator viewCreator = gmsJoinLeave.getViewCreator(); + if (viewCreator != null && !viewCreator.isShutdown()) { + return true; + } else { + return false; + } + } + throw new RuntimeException("This should not have happened. There should be a JoinLeave for every DS"); + } + + private static GMSJoinLeave getGmsJoinLeave() { + InternalDistributedSystem distributedSystem = getInternalDistributedSystem(); + DM dm = distributedSystem.getDM(); + GMSMembershipManager membershipManager = (GMSMembershipManager) dm.getMembershipManager(); + Services services = membershipManager.getServices(); + return (GMSJoinLeave) services.getJoinLeave(); + } + + public static Integer getViewId() { + return getGmsJoinLeave().getView().getViewId(); + } + + private static InternalDistributedSystem getInternalDistributedSystem() { + InternalDistributedSystem distributedSystem = InternalDistributedSystem.getAnyInstance(); + if (distributedSystem == null) { + Locator locator = Locator.getLocator(); + return (InternalDistributedSystem) locator.getDistributedSystem(); + } else { + return distributedSystem; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3f00d86d/gradle/rat.gradle ---------------------------------------------------------------------- diff --git a/gradle/rat.gradle b/gradle/rat.gradle index f782665..e1f446b 100644 --- a/gradle/rat.gradle +++ b/gradle/rat.gradle @@ -61,7 +61,7 @@ rat { '**/*.log', '**/*.patch', '**/*.diff', - '**/*.MF', + '**/*.MF', // binary files '**/*.cer',