Broadcast shutdown message when cleanly stopping gossip. Patch by brandonwilliams, reviewed by vijay for CASSANDRA-3936
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/190e27bb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/190e27bb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/190e27bb Branch: refs/heads/trunk Commit: 190e27bb5cc910d020b3828c1fbe83f009534b89 Parents: 27c999a Author: Brandon Williams <brandonwilli...@apache.org> Authored: Thu Mar 29 13:03:41 2012 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Thu Mar 29 13:03:41 2012 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/gms/FailureDetector.java | 9 ++ .../cassandra/gms/GossipShutdownMessage.java | 63 +++++++++++++++ .../cassandra/gms/GossipShutdownVerbHandler.java | 42 ++++++++++ src/java/org/apache/cassandra/gms/Gossiper.java | 50 +++++++++++- .../org/apache/cassandra/gms/IFailureDetector.java | 5 + .../apache/cassandra/service/StorageService.java | 3 + .../org/apache/cassandra/dht/BootStrapperTest.java | 1 + 7 files changed, 171 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/190e27bb/src/java/org/apache/cassandra/gms/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index c8ba43a..0a66cb6 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -195,6 +195,15 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean } } + public void forceConviction(InetAddress ep) + { + logger.debug("Forcing conviction of {}", ep); + for ( IFailureDetectionEventListener listener : fdEvntListeners ) + { + listener.convict(ep, phiConvictThreshold); + } + } + public void remove(InetAddress ep) { arrivalSamples_.remove(ep); http://git-wip-us.apache.org/repos/asf/cassandra/blob/190e27bb/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java b/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java new file mode 100644 index 0000000..3122986 --- /dev/null +++ b/src/java/org/apache/cassandra/gms/GossipShutdownMessage.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.gms; + +import org.apache.cassandra.io.IVersionedSerializer; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * This message indicates the gossiper is shutting down + */ + +class GossipShutdownMessage +{ + private static final IVersionedSerializer<GossipShutdownMessage> serializer; + static + { + serializer = new GossipShutdownMessageSerializer(); + } + + static IVersionedSerializer<GossipShutdownMessage> serializer() + { + return serializer; + } + + GossipShutdownMessage() + { + } +} + +class GossipShutdownMessageSerializer implements IVersionedSerializer<GossipShutdownMessage> +{ + public void serialize(GossipShutdownMessage gShutdownMessage, DataOutput dos, int version) throws IOException + { + } + + public GossipShutdownMessage deserialize(DataInput dis, int version) throws IOException + { + return new GossipShutdownMessage(); + } + + public long serializedSize(GossipShutdownMessage gossipShutdownMessage, int version) + { + throw new UnsupportedOperationException(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/190e27bb/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java new file mode 100644 index 0000000..352e229 --- /dev/null +++ b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.gms; + +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; + +public class GossipShutdownVerbHandler implements IVerbHandler +{ + private static final Logger logger = LoggerFactory.getLogger(GossipShutdownVerbHandler.class); + + public void doVerb(Message message, String id) + { + InetAddress from = message.getFrom(); + if (!Gossiper.instance.isEnabled()) + { + logger.debug("Ignoring shutdown message from {} because gossip is disabled", from); + return; + } + FailureDetector.instance.forceConviction(from); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/190e27bb/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index d6237a2..e8cf5a8 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -54,6 +54,8 @@ import org.apache.cassandra.service.StorageService; * sends node A a GossipDigestAckMessage. On receipt of this message node A sends node B a * GossipDigestAck2Message which completes a round of Gossip. This module as and when it hears one * of the three above mentioned messages updates the Failure Detector with the liveness information. + * Upon hearing a GossipShutdownMessage, this module will instantly mark the remote node as down in + * the Failure Detector. */ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean @@ -294,7 +296,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean unreachableEndpoints.remove(endpoint); endpointStateMap.remove(endpoint); expireTimeEndpointMap.remove(endpoint); - justRemovedEndpoints.put(endpoint, System.currentTimeMillis()); + quarantineEndpoint(endpoint); if (logger.isDebugEnabled()) logger.debug("evicting " + endpoint + " from gossip"); } @@ -313,12 +315,21 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean // do not remove endpointState until the quarantine expires FailureDetector.instance.remove(endpoint); versions.remove(endpoint); - justRemovedEndpoints.put(endpoint, System.currentTimeMillis()); + quarantineEndpoint(endpoint); if (logger.isDebugEnabled()) logger.debug("removing endpoint " + endpoint); } /** + * Quarantines the endpoint for QUARANTINE_DELAY + * @param endpoint + */ + private void quarantineEndpoint(InetAddress endpoint) + { + justRemovedEndpoints.put(endpoint, System.currentTimeMillis()); + } + + /** * Remove the Endpoint and evict immediately, to avoid gossiping about this node. * This should only be called when a token is taken over by a new IP address. * @param endpoint The endpoint that has been replaced @@ -522,6 +533,14 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos, version); return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray(), version); } + + Message makeGossipShutdownMessage(int version) throws IOException + { + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + GossipShutdownMessage.serializer().serialize(new GossipShutdownMessage(), dos, version); + return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.GOSSIP_SHUTDOWN, bos.toByteArray(), version); + } /** * Returns true if the chosen target was also a seed. False otherwise @@ -1111,6 +1130,33 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public void stop() { scheduledGossipTask.cancel(false); + logger.info("Announcing shutdown"); + try + { + Thread.sleep(intervalInMillis); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + MessageProducer prod = new MessageProducer() + { + public Message getMessage(Integer version) throws IOException + { + return makeGossipShutdownMessage(version); + } + }; + for (InetAddress ep : liveEndpoints) + { + try + { + MessagingService.instance().sendOneWay(prod.getMessage(getVersion(ep)), ep); + } + catch (IOException ex) + { + // keep going + } + } } public boolean isEnabled() http://git-wip-us.apache.org/repos/asf/cassandra/blob/190e27bb/src/java/org/apache/cassandra/gms/IFailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/IFailureDetector.java b/src/java/org/apache/cassandra/gms/IFailureDetector.java index d5bf9ec..734047b 100644 --- a/src/java/org/apache/cassandra/gms/IFailureDetector.java +++ b/src/java/org/apache/cassandra/gms/IFailureDetector.java @@ -68,6 +68,11 @@ public interface IFailureDetector public void remove(InetAddress ep); /** + * force conviction of endpoint in the failure detector + */ + public void forceConviction(InetAddress ep); + + /** * Register interest for Failure Detector events. * @param listener implementation of an application provided IFailureDetectionEventListener */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/190e27bb/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d3105a9..3e1034a 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -114,6 +114,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe STREAMING_REPAIR_RESPONSE, SNAPSHOT, // Similar to nt snapshot MIGRATION_REQUEST, + GOSSIP_SHUTDOWN, // use as padding for backwards compatability where a previous version needs to validate a verb from the future. UNUSED_1, UNUSED_2, @@ -142,6 +143,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP); put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP); put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP); + put(Verb.GOSSIP_SHUTDOWN, Stage.GOSSIP); put(Verb.DEFINITIONS_UPDATE, Stage.MIGRATION); put(Verb.SCHEMA_CHECK, Stage.MIGRATION); put(Verb.MIGRATION_REQUEST, Stage.MIGRATION); @@ -286,6 +288,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler()); MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler()); MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler()); + MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_SHUTDOWN, new GossipShutdownVerbHandler()); MessagingService.instance().registerVerbHandlers(Verb.DEFINITIONS_UPDATE, new DefinitionsUpdateVerbHandler()); MessagingService.instance().registerVerbHandlers(Verb.SCHEMA_CHECK, new SchemaCheckVerbHandler()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/190e27bb/test/unit/org/apache/cassandra/dht/BootStrapperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java index 4b19505..30c0254 100644 --- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java +++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java @@ -178,6 +178,7 @@ public class BootStrapperTest extends SchemaLoader public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); } public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); } public void remove(InetAddress ep) { throw new UnsupportedOperationException(); } + public void forceConviction(InetAddress ep) { throw new UnsupportedOperationException(); } public void clear(InetAddress ep) { throw new UnsupportedOperationException(); } }; s.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(mockFailureDetector));