move protocol versions into MessagingService

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6a858a30
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6a858a30
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6a858a30

Branch: refs/heads/trunk
Commit: 6a858a30d61fb6a575521bce9afa9448b5c309c1
Parents: c82a9d9
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Wed Jun 6 13:15:53 2012 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Thu Jun 14 18:03:47 2012 -0500

----------------------------------------------------------------------
 src/java/org/apache/cassandra/gms/Gossiper.java    |   36 +--------------
 .../org/apache/cassandra/gms/GossiperMBean.java    |    2 -
 .../cassandra/net/IncomingTcpConnection.java       |    6 +-
 .../org/apache/cassandra/net/MessagingService.java |   33 +++++++++++++
 .../cassandra/net/MessagingServiceMBean.java       |    3 +
 .../cassandra/net/OutboundTcpConnection.java       |    6 +-
 .../cassandra/service/AntiEntropyService.java      |    4 +-
 .../apache/cassandra/service/MigrationManager.java |    6 +-
 .../org/apache/cassandra/service/StorageProxy.java |    4 +-
 .../apache/cassandra/service/StorageService.java   |    8 ++--
 .../apache/cassandra/streaming/FileStreamTask.java |    2 +-
 .../cassandra/streaming/StreamInSession.java       |    4 +-
 12 files changed, 57 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 06dafe7..57c3c49 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -101,9 +101,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
     private final Map<InetAddress, Long> expireTimeEndpointMap = new 
ConcurrentHashMap<InetAddress, Long>();
 
-    // protocol versions of the other nodes in the cluster
-    private final ConcurrentMap<InetAddress, Integer> versions = new 
NonBlockingHashMap<InetAddress, Integer>();
-
     private class GossipTask implements Runnable
     {
         public void run()
@@ -200,32 +197,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         subscribers.remove(subscriber);
     }
 
-    public Integer setVersion(InetAddress address, int version)
-    {
-        logger.debug("Setting version {} for {}", version, address);
-        return versions.put(address, version);
-    }
-
-    public void resetVersion(InetAddress endpoint)
-    {
-        logger.debug("Reseting version for {}", endpoint);
-        versions.remove(endpoint);
-    }
-
-    public Integer getVersion(InetAddress address)
-    {
-        Integer v = versions.get(address);
-        if (v == null)
-        {
-            // we don't know the version. assume current. we'll know soon 
enough if that was incorrect.
-            logger.trace("Assuming current protocol version for {}", address);
-            return MessagingService.current_version;
-        }
-        else
-            return v;
-    }
-
-
     public Set<InetAddress> getLiveMembers()
     {
         Set<InetAddress> liveMbrs = new HashSet<InetAddress>(liveEndpoints);
@@ -304,7 +275,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         unreachableEndpoints.remove(endpoint);
         // do not remove endpointState until the quarantine expires
         FailureDetector.instance.remove(endpoint);
-        versions.remove(endpoint);
+        MessagingService.instance().resetVersion(endpoint);
         quarantineEndpoint(endpoint);
         if (logger.isDebugEnabled())
             logger.debug("removing endpoint " + endpoint);
@@ -1115,11 +1086,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         }
     }
 
-    public int getVersion(String address) throws UnknownHostException
-    {
-        return getVersion(InetAddress.getByName(address));
-    }
-
     public long getEndpointDowntime(String address) throws UnknownHostException
     {
         return getEndpointDowntime(InetAddress.getByName(address));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/gms/GossiperMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossiperMBean.java 
b/src/java/org/apache/cassandra/gms/GossiperMBean.java
index 806e77d..521fd21 100644
--- a/src/java/org/apache/cassandra/gms/GossiperMBean.java
+++ b/src/java/org/apache/cassandra/gms/GossiperMBean.java
@@ -21,8 +21,6 @@ import java.net.UnknownHostException;
 
 public interface GossiperMBean
 {
-    public int getVersion(String address) throws UnknownHostException;
-
     public long getEndpointDowntime(String address) throws 
UnknownHostException;
 
     public int getCurrentGenerationNumber(String address) throws 
UnknownHostException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java 
b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index e808c7e..7966605 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -113,7 +113,7 @@ public class IncomingTcpConnection extends Thread
             logger.info("Received messages from newer protocol version {}. 
Ignoring", version);
             return;
         }
-        Gossiper.instance.setVersion(from, 
Math.min(MessagingService.current_version, maxVersion));
+        MessagingService.instance().setVersion(from, 
Math.min(MessagingService.current_version, maxVersion));
         logger.debug("set version for {} to {}", from, 
Math.min(MessagingService.current_version, maxVersion));
         // outbound side will reconnect if necessary to upgrade version
 
@@ -137,7 +137,7 @@ public class IncomingTcpConnection extends Thread
             logger.info("Received messages from newer protocol version. 
Ignoring");
             return;
         }
-        int lastVersion = Gossiper.instance.setVersion(from, version);
+        int lastVersion = MessagingService.instance().setVersion(from, 
version);
         logger.debug("set version for {} to {}", from, version);
         if (lastVersion < version)
         {
@@ -200,7 +200,7 @@ public class IncomingTcpConnection extends Thread
     {
         // reset version here, since we set when starting an incoming socket
         if (from != null)
-            Gossiper.instance.resetVersion(from);
+            MessagingService.instance().resetVersion(from);
         try
         {
             socket.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index ed9e5b3..a9cb1a6 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -287,6 +287,9 @@ public final class MessagingService implements 
MessagingServiceMBean
     private final List<ILatencySubscriber> subscribers = new 
ArrayList<ILatencySubscriber>();
     private static final long DEFAULT_CALLBACK_TIMEOUT = 
DatabaseDescriptor.getRpcTimeout();
 
+    // protocol versions of the other nodes in the cluster
+    private final ConcurrentMap<InetAddress, Integer> versions = new 
NonBlockingHashMap<InetAddress, Integer>();
+
     private static class MSHandle
     {
         public static final MessagingService instance = new MessagingService();
@@ -761,6 +764,36 @@ public final class MessagingService implements 
MessagingServiceMBean
         return buffer;
     }
 
+    public Integer setVersion(InetAddress address, int version)
+    {
+        logger.debug("Setting version {} for {}", version, address);
+        return versions.put(address, version);
+    }
+
+    public void resetVersion(InetAddress endpoint)
+    {
+        logger.debug("Reseting version for {}", endpoint);
+        versions.remove(endpoint);
+    }
+
+    public Integer getVersion(InetAddress address)
+    {
+        Integer v = versions.get(address);
+        if (v == null)
+        {
+            // we don't know the version. assume current. we'll know soon 
enough if that was incorrect.
+            logger.trace("Assuming current protocol version for {}", address);
+            return MessagingService.current_version;
+        }
+        else
+            return v;
+    }
+
+    public int getVersion(String address) throws UnknownHostException
+    {
+        return getVersion(InetAddress.getByName(address));
+    }
+
     public void incrementDroppedMessages(Verb verb)
     {
         assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not 
legally be dropped";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java 
b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
index 17621af..ff39e23 100644
--- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
+++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.net;
 
 
 
+import java.net.UnknownHostException;
 import java.util.Map;
 
 /**
@@ -81,4 +82,6 @@ public interface MessagingServiceMBean
      * Number of timeouts since last check per host.
      */
     public Map<String, Long> getRecentTimeoutsPerHost();
+
+    public int getVersion(String address) throws UnknownHostException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index a123072..a5d8181 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -241,7 +241,7 @@ public class OutboundTcpConnection extends Thread
         if (logger.isDebugEnabled())
             logger.debug("attempting to connect to " + 
poolReference.endPoint());
 
-        targetVersion = Gossiper.instance.getVersion(poolReference.endPoint());
+        targetVersion = 
MessagingService.instance().getVersion(poolReference.endPoint());
 
         long start = System.currentTimeMillis();
         while (System.currentTimeMillis() < start + 
DatabaseDescriptor.getRpcTimeout())
@@ -264,7 +264,7 @@ public class OutboundTcpConnection extends Thread
                     if (targetVersion > maxTargetVersion)
                     {
                         logger.debug("Target max version is {}; will reconnect 
with that version", maxTargetVersion);
-                        Gossiper.instance.setVersion(poolReference.endPoint(), 
maxTargetVersion);
+                        
MessagingService.instance().setVersion(poolReference.endPoint(), 
maxTargetVersion);
                         disconnect();
                         return false;
                     }
@@ -273,7 +273,7 @@ public class OutboundTcpConnection extends Thread
                     {
                         logger.debug("Detected higher max version {} (using 
{}); will reconnect when queued messages are done",
                                      maxTargetVersion, targetVersion);
-                        Gossiper.instance.setVersion(poolReference.endPoint(), 
Math.min(MessagingService.current_version, maxTargetVersion));
+                        
MessagingService.instance().setVersion(poolReference.endPoint(), 
Math.min(MessagingService.current_version, maxTargetVersion));
                         softCloseSocket();
                     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java 
b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index 08d4dd5..b26574e 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -636,7 +636,7 @@ public class AntiEntropyService
                     return;
                 }
 
-                if (Gossiper.instance.getVersion(endpoint) < 
MessagingService.VERSION_11 && isSequential)
+                if (MessagingService.instance().getVersion(endpoint) < 
MessagingService.VERSION_11 && isSequential)
                 {
                     logger.info(String.format("[repair #%s] Cannot repair 
using snapshots as node %s is pre-1.1", getName(), endpoint));
                     return;
@@ -979,7 +979,7 @@ public class AntiEntropyService
                 StreamingRepairTask task = 
StreamingRepairTask.create(r1.endpoint, r2.endpoint, tablename, cfname, 
differences, callback);
 
                 // Pre 1.0, nodes don't know how to handle forwarded streaming 
task so don't bother
-                if (task.isLocalTask() || 
Gossiper.instance.getVersion(task.dst) >= MessagingService.VERSION_10)
+                if (task.isLocalTask() || 
MessagingService.instance().getVersion(task.dst) >= MessagingService.VERSION_10)
                     task.run();
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java 
b/src/java/org/apache/cassandra/service/MigrationManager.java
index cc1c136..ec2679c 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -85,7 +85,7 @@ public class MigrationManager implements 
IEndpointStateChangeSubscriber
     private static void rectifySchema(UUID theirVersion, final InetAddress 
endpoint)
     {
         // Can't request migrations from nodes with versions younger than 1.1
-        if (Gossiper.instance.getVersion(endpoint) < 
MessagingService.VERSION_11)
+        if (MessagingService.instance().getVersion(endpoint) < 
MessagingService.VERSION_11)
             return;
 
         if (Schema.instance.getVersion().equals(theirVersion))
@@ -204,7 +204,7 @@ public class MigrationManager implements 
IEndpointStateChangeSubscriber
                 continue; // we've delt with localhost already
 
             // don't send migrations to the nodes with the versions older than 
< 1.1
-            if (Gossiper.instance.getVersion(endpoint) < 
MessagingService.VERSION_11)
+            if (MessagingService.instance().getVersion(endpoint) < 
MessagingService.VERSION_11)
                 continue;
 
             pushSchemaMutation(endpoint, schema);
@@ -261,7 +261,7 @@ public class MigrationManager implements 
IEndpointStateChangeSubscriber
             // because migration format of the nodes with versions < 1.1 is 
incompatible with older versions
             for (InetAddress node : liveEndpoints)
             {
-                if (Gossiper.instance.getVersion(node) >= 
MessagingService.VERSION_11)
+                if (MessagingService.instance().getVersion(node) >= 
MessagingService.VERSION_11)
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("Requesting schema from " + node);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index c76c3c4..64aea28 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -362,7 +362,7 @@ public class StorageProxy implements StorageProxyMBean
                 try
                 {
                     UUID hostId = 
StorageService.instance.getTokenMetadata().getHostId(target);
-                    if ((hostId == null) && 
(Gossiper.instance.getVersion(target) < MessagingService.VERSION_12))
+                    if ((hostId == null) && 
(MessagingService.instance().getVersion(target) < MessagingService.VERSION_12))
                     {
                         logger.warn("Unable to store hint for host with 
missing ID, {} (old node?)", target.toString());
                         return;
@@ -409,7 +409,7 @@ public class StorageProxy implements StorageProxyMBean
                 InetAddress target = iter.next();
 
                 // direct writes to local DC or old Cassadra versions
-                if (dataCenter.equals(localDataCenter) || 
Gossiper.instance.getVersion(target) < MessagingService.VERSION_11)
+                if (dataCenter.equals(localDataCenter) || 
MessagingService.instance().getVersion(target) < MessagingService.VERSION_11)
                 {
                     // yes, the loop and non-loop code here are the same; this 
is clunky but we want to avoid
                     // creating a second iterator since we already have a 
perfectly good one

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index af34413..0455b1d 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1018,7 +1018,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         //   versions  < 1.2 .....: STATUS,TOKEN
         //   versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,...
         int tokenPos;
-        if (Gossiper.instance.getVersion(endpoint) >= 
MessagingService.VERSION_12)
+        if (MessagingService.instance().getVersion(endpoint) >= 
MessagingService.VERSION_12)
         {
             assert pieces.length >= 3;
             tokenPos = 2;
@@ -1048,7 +1048,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         tokenMetadata.addBootstrapToken(token, endpoint);
         calculatePendingRanges();
 
-        if (Gossiper.instance.getVersion(endpoint) >= 
MessagingService.VERSION_12)
+        if (MessagingService.instance().getVersion(endpoint) >= 
MessagingService.VERSION_12)
             tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint);
     }
 
@@ -1067,7 +1067,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         //   versions  < 1.2 .....: STATUS,TOKEN
         //   versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,...
         int tokensPos;
-        if (Gossiper.instance.getVersion(endpoint) >= 
MessagingService.VERSION_12)
+        if (MessagingService.instance().getVersion(endpoint) >= 
MessagingService.VERSION_12)
         {
             assert pieces.length >= 3;
             tokensPos = 2;
@@ -1084,7 +1084,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
             logger.info("Node " + endpoint + " state jump to normal");
 
         // Order Matters, TM.updateHostID() should be called before 
TM.updateNormalToken(), (see CASSANDRA-4300).
-        if (Gossiper.instance.getVersion(endpoint) >= 
MessagingService.VERSION_12)
+        if (MessagingService.instance().getVersion(endpoint) >= 
MessagingService.VERSION_12)
             tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint);
 
         // we don't want to update if this node is responsible for the token 
and it has a later startup time than endpoint.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java 
b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index b55593f..96a0064 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -129,7 +129,7 @@ public class FileStreamTask extends WrappedRunnable
      */
     protected void stream() throws IOException
     {
-        ByteBuffer headerBuffer = 
MessagingService.instance().constructStreamHeader(header, false, 
Gossiper.instance.getVersion(to));
+        ByteBuffer headerBuffer = 
MessagingService.instance().constructStreamHeader(header, false, 
MessagingService.instance().getVersion(to));
         // write header (this should not be compressed for compatibility with 
other messages)
         output.write(ByteBufferUtil.getArray(headerBuffer));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a858a30/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java 
b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index 958924d..e11838c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -171,7 +171,7 @@ public class StreamInSession extends AbstractStreamSession
         OutboundTcpConnection.write(message,
                                     String.valueOf(getSessionId()),
                                     out,
-                                    Gossiper.instance.getVersion(getHost()), 
false);
+                                    
MessagingService.instance().getVersion(getHost()));
         out.flush();
     }
 
@@ -222,7 +222,7 @@ public class StreamInSession extends AbstractStreamSession
                     OutboundTcpConnection.write(reply.createMessage(),
                                                 context.right.toString(),
                                                 new 
DataOutputStream(socket.getOutputStream()),
-                                                
Gossiper.instance.getVersion(getHost()), false);
+                                                
MessagingService.instance().getVersion(getHost()));
                 else
                     logger.debug("No socket to reply to {} with!", getHost());
             }

Reply via email to