Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 06316df54 -> bf0906b92 refs/heads/cassandra-3.0 f4ba9083e -> 2836a644a refs/heads/cassandra-3.11 5484bd1ac -> ec9ce3dfb refs/heads/trunk f5e0a7cdb -> 8b74ae4b6
Discard in-flight shadow round responses patch by Stefan Podkowinski; reviewed by Joel Knighton and Jason Brown for CASSANDRA-12653 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf0906b9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf0906b9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf0906b9 Branch: refs/heads/cassandra-2.2 Commit: bf0906b92cf65161d828e31bc46436d427bbb4b8 Parents: 06316df Author: Stefan Podkowinski <s.podkowin...@gmail.com> Authored: Mon Sep 19 13:56:54 2016 +0200 Committer: Joel Knighton <j...@apache.org> Committed: Wed Mar 22 13:08:28 2017 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../gms/GossipDigestAckVerbHandler.java | 26 +++++--- src/java/org/apache/cassandra/gms/Gossiper.java | 62 +++++++++++++++----- .../apache/cassandra/service/MigrationTask.java | 12 ++-- .../cassandra/service/StorageService.java | 16 +++-- 5 files changed, 79 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 27dd343..df2421d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.10 + * Discard in-flight shadow round responses (CASSANDRA-12653) * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153) * Wrong logger name in AnticompactionTask (CASSANDRA-13343) * Fix queries updating multiple time the same list (CASSANDRA-13130) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java index 9f69a94..59060f8 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java @@ -51,21 +51,31 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck> Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap(); logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size()); - if (epStateMap.size() > 0) - { - /* Notify the Failure Detector */ - Gossiper.instance.notifyFailureDetector(epStateMap); - Gossiper.instance.applyStateLocally(epStateMap); - } - if (Gossiper.instance.isInShadowRound()) { if (logger.isDebugEnabled()) logger.debug("Finishing shadow round with {}", from); - Gossiper.instance.finishShadowRound(); + Gossiper.instance.finishShadowRound(epStateMap); return; // don't bother doing anything else, we have what we came for } + if (epStateMap.size() > 0) + { + // Ignore any GossipDigestAck messages that we handle before a regular GossipDigestSyn has been send. + // This will prevent Acks from leaking over from the shadow round that are not actual part of + // the regular gossip conversation. + if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0) + { + if (logger.isTraceEnabled()) + logger.trace("Ignoring unrequested GossipDigestAck from {}", from); + return; + } + + /* Notify the Failure Detector */ + Gossiper.instance.notifyFailureDetector(epStateMap); + Gossiper.instance.applyStateLocally(epStateMap); + } + /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */ Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>(); for (GossipDigest gDigest : gDigestList) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 06b14c4..c2eccba 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -30,6 +30,7 @@ import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.cassandra.utils.Pair; @@ -86,6 +87,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean private static final Logger logger = LoggerFactory.getLogger(Gossiper.class); public static final Gossiper instance = new Gossiper(); + // Timestamp to prevent processing any in-flight messages for we've not send any SYN yet, see CASSANDRA-12653. + volatile long firstSynSendAt = 0L; + public static final long aVeryLongTime = 259200 * 1000; // 3 days // Maximimum difference between generation value and local time we are willing to accept about a peer @@ -125,6 +129,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean private volatile boolean inShadowRound = false; + // endpoint states as gathered during shadow round + private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>(); + private volatile long lastProcessedMessageAt = System.currentTimeMillis(); private class GossipTask implements Runnable @@ -645,6 +652,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean InetAddress to = liveEndpoints.get(index); if (logger.isTraceEnabled()) logger.trace("Sending a GossipDigestSyn to {} ...", to); + if (firstSynSendAt == 0) + firstSynSendAt = System.nanoTime(); MessagingService.instance().sendOneWay(message, to); return seeds.contains(to); } @@ -713,11 +722,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * Check if this endpoint can safely bootstrap into the cluster. * * @param endpoint - the endpoint to check + * @param epStates - endpoint states in the cluster * @return true if the endpoint can join the cluster */ - public boolean isSafeForBootstrap(InetAddress endpoint) + public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates) { - EndpointState epState = endpointStateMap.get(endpoint); + EndpointState epState = epStates.get(endpoint); // if there's no previous state, or the node was previously removed from the cluster, we're good if (epState == null || isDeadState(epState)) @@ -816,14 +826,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return endpointStateMap.get(ep); } - // removes ALL endpoint states; should only be called after shadow gossip - public void resetEndpointStateMap() - { - endpointStateMap.clear(); - unreachableEndpoints.clear(); - liveEndpoints.clear(); - } - public Set<Entry<InetAddress, EndpointState>> getEndpointStates() { return endpointStateMap.entrySet(); @@ -831,7 +833,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public UUID getHostId(InetAddress endpoint) { - return UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value); + return getHostId(endpoint, endpointStateMap); + } + + public UUID getHostId(InetAddress endpoint, Map<InetAddress, EndpointState> epStates) + { + return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value); } EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version) @@ -1305,12 +1312,32 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } /** - * Do a single 'shadow' round of gossip, where we do not modify any state - * Only used when replacing a node, to get and assume its states + * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the + * map return value, instead of endpointStateMap. + * + * Used when preparing to join the ring: + * <ul> + * <li>when replacing a node, to get and assume its tokens</li> + * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li> + * </ul> + * + * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared + * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update + * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the + * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at + * the same time. + * + * @return endpoint states gathered during shadow round or empty map */ - public void doShadowRound() + public synchronized Map<InetAddress, EndpointState> doShadowRound() { buildSeedsList(); + // it may be that the local address is the only entry in the seed + // list in which case, attempting a shadow round is pointless + if (seeds.isEmpty()) + return endpointShadowStateMap; + + endpointShadowStateMap.clear(); // send a completely empty syn List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(), @@ -1346,6 +1373,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean { throw new RuntimeException(wtf); } + + return ImmutableMap.copyOf(endpointShadowStateMap); } private void buildSeedsList() @@ -1466,10 +1495,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled()); } - protected void finishShadowRound() + protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap) { if (inShadowRound) + { + endpointShadowStateMap.putAll(epStateMap); inShadowRound = false; + } } protected boolean isInShadowRound() http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/MigrationTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java index df0b767..b065d90 100644 --- a/src/java/org/apache/cassandra/service/MigrationTask.java +++ b/src/java/org/apache/cassandra/service/MigrationTask.java @@ -48,6 +48,12 @@ class MigrationTask extends WrappedRunnable public void runMayThrow() throws Exception { + if (!FailureDetector.instance.isAlive(endpoint)) + { + logger.warn("Can't send schema pull request: node {} is down.", endpoint); + return; + } + // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(), // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with // a higher major. @@ -57,12 +63,6 @@ class MigrationTask extends WrappedRunnable return; } - if (!FailureDetector.instance.isAlive(endpoint)) - { - logger.debug("Can't send schema pull request: node {} is down.", endpoint); - return; - } - MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance); IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>() http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index c2996d7..65f536b 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -443,15 +443,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE MessagingService.instance().listen(); // make magic happen - Gossiper.instance.doShadowRound(); + Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound(); // now that we've gossiped at least once, we should be able to find the node we're replacing - if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null) + if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null) throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip"); - replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()); + replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates); try { - VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS); + VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS); if (tokensVersionedValue == null) throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace"); Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes()))); @@ -460,7 +460,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc } - Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need return tokens; } catch (IOException e) @@ -474,8 +473,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.debug("Starting shadow gossip round to check for endpoint collision"); if (!MessagingService.instance().isListening()) MessagingService.instance().listen(); - Gossiper.instance.doShadowRound(); - if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress())) + Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound(); + if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates)) { throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " + "Use cassandra.replace_address if you want to replace this node.", @@ -483,7 +482,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } if (useStrictConsistency && !allowSimultaneousMoves()) { - for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates()) + for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet()) { // ignore local node or empty status if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null) @@ -495,7 +494,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true"); } } - Gossiper.instance.resetEndpointStateMap(); } private boolean allowSimultaneousMoves()