move protocol versions into MessagingService
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6a858a30 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6a858a30 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6a858a30 Branch: refs/heads/trunk Commit: 6a858a30d61fb6a575521bce9afa9448b5c309c1 Parents: c82a9d9 Author: Jonathan Ellis <jbel...@apache.org> Authored: Wed Jun 6 13:15:53 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu Jun 14 18:03:47 2012 -0500 ---------------------------------------------------------------------- src/java/org/apache/cassandra/gms/Gossiper.java | 36 +-------------- .../org/apache/cassandra/gms/GossiperMBean.java | 2 - .../cassandra/net/IncomingTcpConnection.java | 6 +- .../org/apache/cassandra/net/MessagingService.java | 33 +++++++++++++ .../cassandra/net/MessagingServiceMBean.java | 3 + .../cassandra/net/OutboundTcpConnection.java | 6 +- .../cassandra/service/AntiEntropyService.java | 4 +- .../apache/cassandra/service/MigrationManager.java | 6 +- .../org/apache/cassandra/service/StorageProxy.java | 4 +- .../apache/cassandra/service/StorageService.java | 8 ++-- .../apache/cassandra/streaming/FileStreamTask.java | 2 +- .../cassandra/streaming/StreamInSession.java | 4 +- 12 files changed, 57 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 06dafe7..57c3c49 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -101,9 +101,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>(); - // protocol versions of the other nodes in the cluster - private final ConcurrentMap<InetAddress, Integer> versions = new NonBlockingHashMap<InetAddress, Integer>(); - private class GossipTask implements Runnable { public void run() @@ -200,32 +197,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean subscribers.remove(subscriber); } - public Integer setVersion(InetAddress address, int version) - { - logger.debug("Setting version {} for {}", version, address); - return versions.put(address, version); - } - - public void resetVersion(InetAddress endpoint) - { - logger.debug("Reseting version for {}", endpoint); - versions.remove(endpoint); - } - - public Integer getVersion(InetAddress address) - { - Integer v = versions.get(address); - if (v == null) - { - // we don't know the version. assume current. we'll know soon enough if that was incorrect. - logger.trace("Assuming current protocol version for {}", address); - return MessagingService.current_version; - } - else - return v; - } - - public Set<InetAddress> getLiveMembers() { Set<InetAddress> liveMbrs = new HashSet<InetAddress>(liveEndpoints); @@ -304,7 +275,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean unreachableEndpoints.remove(endpoint); // do not remove endpointState until the quarantine expires FailureDetector.instance.remove(endpoint); - versions.remove(endpoint); + MessagingService.instance().resetVersion(endpoint); quarantineEndpoint(endpoint); if (logger.isDebugEnabled()) logger.debug("removing endpoint " + endpoint); @@ -1115,11 +1086,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } - public int getVersion(String address) throws UnknownHostException - { - return getVersion(InetAddress.getByName(address)); - } - public long getEndpointDowntime(String address) throws UnknownHostException { return getEndpointDowntime(InetAddress.getByName(address)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/gms/GossiperMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossiperMBean.java b/src/java/org/apache/cassandra/gms/GossiperMBean.java index 806e77d..521fd21 100644 --- a/src/java/org/apache/cassandra/gms/GossiperMBean.java +++ b/src/java/org/apache/cassandra/gms/GossiperMBean.java @@ -21,8 +21,6 @@ import java.net.UnknownHostException; public interface GossiperMBean { - public int getVersion(String address) throws UnknownHostException; - public long getEndpointDowntime(String address) throws UnknownHostException; public int getCurrentGenerationNumber(String address) throws UnknownHostException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index e808c7e..7966605 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -113,7 +113,7 @@ public class IncomingTcpConnection extends Thread logger.info("Received messages from newer protocol version {}. Ignoring", version); return; } - Gossiper.instance.setVersion(from, Math.min(MessagingService.current_version, maxVersion)); + MessagingService.instance().setVersion(from, Math.min(MessagingService.current_version, maxVersion)); logger.debug("set version for {} to {}", from, Math.min(MessagingService.current_version, maxVersion)); // outbound side will reconnect if necessary to upgrade version @@ -137,7 +137,7 @@ public class IncomingTcpConnection extends Thread logger.info("Received messages from newer protocol version. Ignoring"); return; } - int lastVersion = Gossiper.instance.setVersion(from, version); + int lastVersion = MessagingService.instance().setVersion(from, version); logger.debug("set version for {} to {}", from, version); if (lastVersion < version) { @@ -200,7 +200,7 @@ public class IncomingTcpConnection extends Thread { // reset version here, since we set when starting an incoming socket if (from != null) - Gossiper.instance.resetVersion(from); + MessagingService.instance().resetVersion(from); try { socket.close(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index ed9e5b3..a9cb1a6 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -287,6 +287,9 @@ public final class MessagingService implements MessagingServiceMBean private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>(); private static final long DEFAULT_CALLBACK_TIMEOUT = DatabaseDescriptor.getRpcTimeout(); + // protocol versions of the other nodes in the cluster + private final ConcurrentMap<InetAddress, Integer> versions = new NonBlockingHashMap<InetAddress, Integer>(); + private static class MSHandle { public static final MessagingService instance = new MessagingService(); @@ -761,6 +764,36 @@ public final class MessagingService implements MessagingServiceMBean return buffer; } + public Integer setVersion(InetAddress address, int version) + { + logger.debug("Setting version {} for {}", version, address); + return versions.put(address, version); + } + + public void resetVersion(InetAddress endpoint) + { + logger.debug("Reseting version for {}", endpoint); + versions.remove(endpoint); + } + + public Integer getVersion(InetAddress address) + { + Integer v = versions.get(address); + if (v == null) + { + // we don't know the version. assume current. we'll know soon enough if that was incorrect. + logger.trace("Assuming current protocol version for {}", address); + return MessagingService.current_version; + } + else + return v; + } + + public int getVersion(String address) throws UnknownHostException + { + return getVersion(InetAddress.getByName(address)); + } + public void incrementDroppedMessages(Verb verb) { assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/net/MessagingServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java index 17621af..ff39e23 100644 --- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java +++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java @@ -19,6 +19,7 @@ package org.apache.cassandra.net; +import java.net.UnknownHostException; import java.util.Map; /** @@ -81,4 +82,6 @@ public interface MessagingServiceMBean * Number of timeouts since last check per host. */ public Map<String, Long> getRecentTimeoutsPerHost(); + + public int getVersion(String address) throws UnknownHostException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index a123072..a5d8181 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -241,7 +241,7 @@ public class OutboundTcpConnection extends Thread if (logger.isDebugEnabled()) logger.debug("attempting to connect to " + poolReference.endPoint()); - targetVersion = Gossiper.instance.getVersion(poolReference.endPoint()); + targetVersion = MessagingService.instance().getVersion(poolReference.endPoint()); long start = System.currentTimeMillis(); while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout()) @@ -264,7 +264,7 @@ public class OutboundTcpConnection extends Thread if (targetVersion > maxTargetVersion) { logger.debug("Target max version is {}; will reconnect with that version", maxTargetVersion); - Gossiper.instance.setVersion(poolReference.endPoint(), maxTargetVersion); + MessagingService.instance().setVersion(poolReference.endPoint(), maxTargetVersion); disconnect(); return false; } @@ -273,7 +273,7 @@ public class OutboundTcpConnection extends Thread { logger.debug("Detected higher max version {} (using {}); will reconnect when queued messages are done", maxTargetVersion, targetVersion); - Gossiper.instance.setVersion(poolReference.endPoint(), Math.min(MessagingService.current_version, maxTargetVersion)); + MessagingService.instance().setVersion(poolReference.endPoint(), Math.min(MessagingService.current_version, maxTargetVersion)); softCloseSocket(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/service/AntiEntropyService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java index 08d4dd5..b26574e 100644 --- a/src/java/org/apache/cassandra/service/AntiEntropyService.java +++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java @@ -636,7 +636,7 @@ public class AntiEntropyService return; } - if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_11 && isSequential) + if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_11 && isSequential) { logger.info(String.format("[repair #%s] Cannot repair using snapshots as node %s is pre-1.1", getName(), endpoint)); return; @@ -979,7 +979,7 @@ public class AntiEntropyService StreamingRepairTask task = StreamingRepairTask.create(r1.endpoint, r2.endpoint, tablename, cfname, differences, callback); // Pre 1.0, nodes don't know how to handle forwarded streaming task so don't bother - if (task.isLocalTask() || Gossiper.instance.getVersion(task.dst) >= MessagingService.VERSION_10) + if (task.isLocalTask() || MessagingService.instance().getVersion(task.dst) >= MessagingService.VERSION_10) task.run(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index cc1c136..ec2679c 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -85,7 +85,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber private static void rectifySchema(UUID theirVersion, final InetAddress endpoint) { // Can't request migrations from nodes with versions younger than 1.1 - if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_11) + if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_11) return; if (Schema.instance.getVersion().equals(theirVersion)) @@ -204,7 +204,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber continue; // we've delt with localhost already // don't send migrations to the nodes with the versions older than < 1.1 - if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_11) + if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_11) continue; pushSchemaMutation(endpoint, schema); @@ -261,7 +261,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber // because migration format of the nodes with versions < 1.1 is incompatible with older versions for (InetAddress node : liveEndpoints) { - if (Gossiper.instance.getVersion(node) >= MessagingService.VERSION_11) + if (MessagingService.instance().getVersion(node) >= MessagingService.VERSION_11) { if (logger.isDebugEnabled()) logger.debug("Requesting schema from " + node); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index c76c3c4..64aea28 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -362,7 +362,7 @@ public class StorageProxy implements StorageProxyMBean try { UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target); - if ((hostId == null) && (Gossiper.instance.getVersion(target) < MessagingService.VERSION_12)) + if ((hostId == null) && (MessagingService.instance().getVersion(target) < MessagingService.VERSION_12)) { logger.warn("Unable to store hint for host with missing ID, {} (old node?)", target.toString()); return; @@ -409,7 +409,7 @@ public class StorageProxy implements StorageProxyMBean InetAddress target = iter.next(); // direct writes to local DC or old Cassadra versions - if (dataCenter.equals(localDataCenter) || Gossiper.instance.getVersion(target) < MessagingService.VERSION_11) + if (dataCenter.equals(localDataCenter) || MessagingService.instance().getVersion(target) < MessagingService.VERSION_11) { // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid // creating a second iterator since we already have a perfectly good one http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index af34413..0455b1d 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1018,7 +1018,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe // versions < 1.2 .....: STATUS,TOKEN // versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,... int tokenPos; - if (Gossiper.instance.getVersion(endpoint) >= MessagingService.VERSION_12) + if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12) { assert pieces.length >= 3; tokenPos = 2; @@ -1048,7 +1048,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe tokenMetadata.addBootstrapToken(token, endpoint); calculatePendingRanges(); - if (Gossiper.instance.getVersion(endpoint) >= MessagingService.VERSION_12) + if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12) tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint); } @@ -1067,7 +1067,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe // versions < 1.2 .....: STATUS,TOKEN // versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,... int tokensPos; - if (Gossiper.instance.getVersion(endpoint) >= MessagingService.VERSION_12) + if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12) { assert pieces.length >= 3; tokensPos = 2; @@ -1084,7 +1084,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe logger.info("Node " + endpoint + " state jump to normal"); // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300). - if (Gossiper.instance.getVersion(endpoint) >= MessagingService.VERSION_12) + if (MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12) tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint); // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint. http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/streaming/FileStreamTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java index b55593f..96a0064 100644 --- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java +++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java @@ -129,7 +129,7 @@ public class FileStreamTask extends WrappedRunnable */ protected void stream() throws IOException { - ByteBuffer headerBuffer = MessagingService.instance().constructStreamHeader(header, false, Gossiper.instance.getVersion(to)); + ByteBuffer headerBuffer = MessagingService.instance().constructStreamHeader(header, false, MessagingService.instance().getVersion(to)); // write header (this should not be compressed for compatibility with other messages) output.write(ByteBufferUtil.getArray(headerBuffer)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/streaming/StreamInSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java index 958924d..e11838c 100644 --- a/src/java/org/apache/cassandra/streaming/StreamInSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java @@ -171,7 +171,7 @@ public class StreamInSession extends AbstractStreamSession OutboundTcpConnection.write(message, String.valueOf(getSessionId()), out, - Gossiper.instance.getVersion(getHost()), false); + MessagingService.instance().getVersion(getHost())); out.flush(); } @@ -222,7 +222,7 @@ public class StreamInSession extends AbstractStreamSession OutboundTcpConnection.write(reply.createMessage(), context.right.toString(), new DataOutputStream(socket.getOutputStream()), - Gossiper.instance.getVersion(getHost()), false); + MessagingService.instance().getVersion(getHost())); else logger.debug("No socket to reply to {} with!", getHost()); }