http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 0276238..c600789 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.service;
 
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.*;
@@ -61,6 +60,7 @@ import 
org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
 import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
@@ -208,7 +208,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
                                              Collection<Range<Token>> range,
                                              String keyspace,
                                              RepairParallelism 
parallelismDegree,
-                                             Set<InetAddress> endpoints,
+                                             Set<InetAddressAndPort> endpoints,
                                              boolean isIncremental,
                                              boolean pullRepair,
                                              boolean force,
@@ -297,12 +297,12 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
      *
      * @return neighbors with whom we share the provided range
      */
-    public static Set<InetAddress> getNeighbors(String keyspaceName, 
Collection<Range<Token>> keyspaceLocalRanges,
-                                                Range<Token> toRepair, 
Collection<String> dataCenters,
-                                                Collection<String> hosts)
+    public static Set<InetAddressAndPort> getNeighbors(String keyspaceName, 
Collection<Range<Token>> keyspaceLocalRanges,
+                                                       Range<Token> toRepair, 
Collection<String> dataCenters,
+                                                       Collection<String> 
hosts)
     {
         StorageService ss = StorageService.instance;
-        Map<Range<Token>, List<InetAddress>> replicaSets = 
ss.getRangeToAddressMap(keyspaceName);
+        Map<Range<Token>, List<InetAddressAndPort>> replicaSets = 
ss.getRangeToAddressMap(keyspaceName);
         Range<Token> rangeSuperSet = null;
         for (Range<Token> range : keyspaceLocalRanges)
         {
@@ -322,17 +322,17 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet))
             return Collections.emptySet();
 
-        Set<InetAddress> neighbors = new 
HashSet<>(replicaSets.get(rangeSuperSet));
-        neighbors.remove(FBUtilities.getBroadcastAddress());
+        Set<InetAddressAndPort> neighbors = new 
HashSet<>(replicaSets.get(rangeSuperSet));
+        neighbors.remove(FBUtilities.getBroadcastAddressAndPort());
 
         if (dataCenters != null && !dataCenters.isEmpty())
         {
             TokenMetadata.Topology topology = 
ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
-            Set<InetAddress> dcEndpoints = Sets.newHashSet();
-            Multimap<String,InetAddress> dcEndpointsMap = 
topology.getDatacenterEndpoints();
+            Set<InetAddressAndPort> dcEndpoints = Sets.newHashSet();
+            Multimap<String,InetAddressAndPort> dcEndpointsMap = 
topology.getDatacenterEndpoints();
             for (String dc : dataCenters)
             {
-                Collection<InetAddress> c = dcEndpointsMap.get(dc);
+                Collection<InetAddressAndPort> c = dcEndpointsMap.get(dc);
                 if (c != null)
                    dcEndpoints.addAll(c);
             }
@@ -340,13 +340,13 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         }
         else if (hosts != null && !hosts.isEmpty())
         {
-            Set<InetAddress> specifiedHost = new HashSet<>();
+            Set<InetAddressAndPort> specifiedHost = new HashSet<>();
             for (final String host : hosts)
             {
                 try
                 {
-                    final InetAddress endpoint = 
InetAddress.getByName(host.trim());
-                    if (endpoint.equals(FBUtilities.getBroadcastAddress()) || 
neighbors.contains(endpoint))
+                    final InetAddressAndPort endpoint = 
InetAddressAndPort.getByName(host.trim());
+                    if 
(endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) || 
neighbors.contains(endpoint))
                         specifiedHost.add(endpoint);
                 }
                 catch (UnknownHostException e)
@@ -355,7 +355,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
                 }
             }
 
-            if (!specifiedHost.contains(FBUtilities.getBroadcastAddress()))
+            if 
(!specifiedHost.contains(FBUtilities.getBroadcastAddressAndPort()))
                 throw new IllegalArgumentException("The current host must be 
part of the repair");
 
             if (specifiedHost.size() <= 1)
@@ -366,7 +366,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
                 throw new IllegalArgumentException(String.format(msg, hosts, 
toRepair, neighbors));
             }
 
-            specifiedHost.remove(FBUtilities.getBroadcastAddress());
+            specifiedHost.remove(FBUtilities.getBroadcastAddressAndPort());
             return specifiedHost;
 
         }
@@ -393,7 +393,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         }
     }
 
-    public UUID prepareForRepair(UUID parentRepairSession, InetAddress 
coordinator, Set<InetAddress> endpoints, RepairOption options, 
List<ColumnFamilyStore> columnFamilyStores)
+    public UUID prepareForRepair(UUID parentRepairSession, InetAddressAndPort 
coordinator, Set<InetAddressAndPort> endpoints, RepairOption options, 
List<ColumnFamilyStore> columnFamilyStores)
     {
         long repairedAt = getRepairedAt(options);
         registerParentRepairSession(parentRepairSession, coordinator, 
columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, 
options.isGlobal(), options.getPreviewKind());
@@ -412,10 +412,10 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
                 return false;
             }
 
-            public void onFailure(InetAddress from, RequestFailureReason 
failureReason)
+            public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
             {
                 status.set(false);
-                failedNodes.add(from.getHostAddress());
+                failedNodes.add(from.toString());
                 prepareLatch.countDown();
             }
         };
@@ -424,7 +424,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         for (ColumnFamilyStore cfs : columnFamilyStores)
             tableIds.add(cfs.metadata.id);
 
-        for (InetAddress neighbour : endpoints)
+        for (InetAddressAndPort neighbour : endpoints)
         {
             if (FailureDetector.instance.isAlive(neighbour))
             {
@@ -471,7 +471,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         throw new RuntimeException(errorMsg);
     }
 
-    public synchronized void registerParentRepairSession(UUID 
parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> 
columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, 
long repairedAt, boolean isGlobal, PreviewKind previewKind)
+    public synchronized void registerParentRepairSession(UUID 
parentRepairSession, InetAddressAndPort coordinator, List<ColumnFamilyStore> 
columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, 
long repairedAt, boolean isGlobal, PreviewKind previewKind)
     {
         assert isIncremental || repairedAt == 
ActiveRepairService.UNREPAIRED_SSTABLE;
         if (!registeredForEndpointChanges)
@@ -517,7 +517,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         return parentRepairSessions.remove(parentSessionId);
     }
 
-    public void handleMessage(InetAddress endpoint, RepairMessage message)
+    public void handleMessage(InetAddressAndPort endpoint, RepairMessage 
message)
     {
         RepairJobDesc desc = message.desc;
         RepairSession session = sessions.get(desc.sessionId);
@@ -551,10 +551,10 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         public final boolean isIncremental;
         public final boolean isGlobal;
         public final long repairedAt;
-        public final InetAddress coordinator;
+        public final InetAddressAndPort coordinator;
         public final PreviewKind previewKind;
 
-        public ParentRepairSession(InetAddress coordinator, 
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, 
boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind 
previewKind)
+        public ParentRepairSession(InetAddressAndPort coordinator, 
List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, 
boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind 
previewKind)
         {
             this.coordinator = coordinator;
             for (ColumnFamilyStore cfs : columnFamilyStores)
@@ -636,18 +636,18 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
     If the coordinator node dies we should remove the parent repair session 
from the other nodes.
     This uses the same notifications as we get in RepairSession
      */
-    public void onJoin(InetAddress endpoint, EndpointState epState) {}
-    public void beforeChange(InetAddress endpoint, EndpointState currentState, 
ApplicationState newStateKey, VersionedValue newValue) {}
-    public void onChange(InetAddress endpoint, ApplicationState state, 
VersionedValue value) {}
-    public void onAlive(InetAddress endpoint, EndpointState state) {}
-    public void onDead(InetAddress endpoint, EndpointState state) {}
+    public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
+    public void beforeChange(InetAddressAndPort endpoint, EndpointState 
currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+    public void onChange(InetAddressAndPort endpoint, ApplicationState state, 
VersionedValue value) {}
+    public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
+    public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
 
-    public void onRemove(InetAddress endpoint)
+    public void onRemove(InetAddressAndPort endpoint)
     {
         convict(endpoint, Double.MAX_VALUE);
     }
 
-    public void onRestart(InetAddress endpoint, EndpointState state)
+    public void onRestart(InetAddressAndPort endpoint, EndpointState state)
     {
         convict(endpoint, Double.MAX_VALUE);
     }
@@ -661,7 +661,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
      * @param ep  endpoint to be convicted
      * @param phi the value of phi with with ep was convicted
      */
-    public void convict(InetAddress ep, double phi)
+    public void convict(InetAddressAndPort ep, double phi)
     {
         // We want a higher confidence in the failure detection than usual 
because failing a repair wrongly has a high cost.
         if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold() || 
parentRepairSessions.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java 
b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
index 4e26101..e373fb6 100644
--- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
@@ -18,12 +18,12 @@
 
 package org.apache.cassandra.service;
 
-import java.net.InetAddress;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.exceptions.WriteFailureException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageIn;
 
 public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T>
@@ -59,7 +59,7 @@ public class BatchlogResponseHandler<T> extends 
AbstractWriteResponseHandler<T>
         return wrapped.isLatencyForSnitch();
     }
 
-    public void onFailure(InetAddress from, RequestFailureReason failureReason)
+    public void onFailure(InetAddressAndPort from, RequestFailureReason 
failureReason)
     {
         wrapped.onFailure(from, failureReason);
     }
@@ -84,7 +84,7 @@ public class BatchlogResponseHandler<T> extends 
AbstractWriteResponseHandler<T>
         return wrapped.totalEndpoints();
     }
 
-    protected boolean waitingFor(InetAddress from)
+    protected boolean waitingFor(InetAddressAndPort from)
     {
         return wrapped.waitingFor(from);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 51219e6..130f3fd 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -46,6 +46,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.Schema;
@@ -231,6 +232,8 @@ public class CassandraDaemon
             }
         });
 
+        SystemKeyspaceMigrator40.migrate();
+
         // Populate token metadata before flushing, for token-aware sstable 
partitioning (#6696)
         StorageService.instance.populateTokenMetadata();
 
@@ -377,7 +380,7 @@ public class CassandraDaemon
 
         ScheduledExecutors.optionalTasks.schedule(viewRebuild, 
StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
 
-        if 
(!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
+        if 
(!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress()))
             Gossiper.waitToSettle();
 
         // re-enable auto-compaction after gossip is settled, so correct disk 
boundaries are used
@@ -445,7 +448,7 @@ public class CassandraDaemon
        {
                try
                {
-                   logger.info("Hostname: {}", 
InetAddress.getLocalHost().getHostName());
+                   logger.info("Hostname: {}", 
InetAddress.getLocalHost().getHostName() + ":" + 
DatabaseDescriptor.getStoragePort() + ":" + 
DatabaseDescriptor.getSSLStoragePort());
                }
                catch (UnknownHostException e1)
                {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java 
b/src/java/org/apache/cassandra/service/ClientState.java
index e41cc4f..97b6172 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -60,7 +60,7 @@ public class ClientState
     {
         // We want these system cfs to be always readable to authenticated 
users since many tools rely on them
         // (nodetool, cqlsh, bulkloader, etc.)
-        for (String cf : Arrays.asList(SystemKeyspace.LOCAL, 
SystemKeyspace.PEERS))
+        for (String cf : Arrays.asList(SystemKeyspace.LOCAL, 
SystemKeyspace.LEGACY_PEERS, SystemKeyspace.PEERS_V2))
             
READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SYSTEM_KEYSPACE_NAME,
 cf));
 
         SchemaKeyspace.ALL.forEach(table -> 
READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SCHEMA_KEYSPACE_NAME,
 table)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java 
b/src/java/org/apache/cassandra/service/DataResolver.java
index 54f4b0c..82db754 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
 
@@ -40,6 +39,7 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.ExcludingBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
@@ -87,7 +87,7 @@ public class DataResolver extends ResponseResolver
         // at the beginning of this method), so grab the response count once 
and use that through the method.
         int count = responses.size();
         List<UnfilteredPartitionIterator> iters = new ArrayList<>(count);
-        InetAddress[] sources = new InetAddress[count];
+        InetAddressAndPort[] sources = new InetAddressAndPort[count];
         for (int i = 0; i < count; i++)
         {
             MessageIn<ReadResponse> msg = responses.get(i);
@@ -120,7 +120,7 @@ public class DataResolver extends ResponseResolver
     }
 
     private UnfilteredPartitionIterator 
mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
-                                                                     
InetAddress[] sources,
+                                                                     
InetAddressAndPort[] sources,
                                                                      
DataLimits.Counter mergedResultCounter)
     {
         // If we have only one results, there is no read repair to do and we 
can't get short reads
@@ -140,9 +140,9 @@ public class DataResolver extends ResponseResolver
 
     private class RepairMergeListener implements 
UnfilteredPartitionIterators.MergeListener
     {
-        private final InetAddress[] sources;
+        private final InetAddressAndPort[] sources;
 
-        private RepairMergeListener(InetAddress[] sources)
+        private RepairMergeListener(InetAddressAndPort[] sources)
         {
             this.sources = sources;
         }
@@ -471,7 +471,7 @@ public class DataResolver extends ResponseResolver
                         sendRepairMutation(repairs[i].build(), sources[i]);
             }
 
-            private void sendRepairMutation(PartitionUpdate partition, 
InetAddress destination)
+            private void sendRepairMutation(PartitionUpdate partition, 
InetAddressAndPort destination)
             {
                 Mutation mutation = new Mutation(partition);
                 int messagingVersion = 
MessagingService.instance().getVersion(destination);
@@ -514,7 +514,7 @@ public class DataResolver extends ResponseResolver
     }
 
     private UnfilteredPartitionIterator 
extendWithShortReadProtection(UnfilteredPartitionIterator partitions,
-                                                                      
InetAddress source,
+                                                                      
InetAddressAndPort source,
                                                                       
DataLimits.Counter mergedResultCounter)
     {
         DataLimits.Counter singleResultCounter =
@@ -557,7 +557,7 @@ public class DataResolver extends ResponseResolver
      */
     private class ShortReadPartitionsProtection extends 
Transformation<UnfilteredRowIterator> implements 
MorePartitions<UnfilteredPartitionIterator>
     {
-        private final InetAddress source;
+        private final InetAddressAndPort source;
 
         private final DataLimits.Counter singleResultCounter; // unmerged 
per-source counter
         private final DataLimits.Counter mergedResultCounter; // merged 
end-result counter
@@ -568,7 +568,7 @@ public class DataResolver extends ResponseResolver
 
         private final long queryStartNanoTime;
 
-        private ShortReadPartitionsProtection(InetAddress source,
+        private ShortReadPartitionsProtection(InetAddressAndPort source,
                                               DataLimits.Counter 
singleResultCounter,
                                               DataLimits.Counter 
mergedResultCounter,
                                               long queryStartNanoTime)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index 4137e3a..dbd3667 100644
--- 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -26,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.db.ConsistencyLevel;
@@ -41,8 +41,8 @@ public class DatacenterSyncWriteResponseHandler<T> extends 
AbstractWriteResponse
     private final Map<String, AtomicInteger> responses = new HashMap<String, 
AtomicInteger>();
     private final AtomicInteger acks = new AtomicInteger(0);
 
-    public DatacenterSyncWriteResponseHandler(Collection<InetAddress> 
naturalEndpoints,
-                                              Collection<InetAddress> 
pendingEndpoints,
+    public DatacenterSyncWriteResponseHandler(Collection<InetAddressAndPort> 
naturalEndpoints,
+                                              Collection<InetAddressAndPort> 
pendingEndpoints,
                                               ConsistencyLevel 
consistencyLevel,
                                               Keyspace keyspace,
                                               Runnable callback,
@@ -63,7 +63,7 @@ public class DatacenterSyncWriteResponseHandler<T> extends 
AbstractWriteResponse
 
         // During bootstrap, we have to include the pending endpoints or we 
may fail the consistency level
         // guarantees (see #833)
-        for (InetAddress pending : pendingEndpoints)
+        for (InetAddressAndPort pending : pendingEndpoints)
         {
             responses.get(snitch.getDatacenter(pending)).incrementAndGet();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index 83dddcf..a8d7b28 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.service;
 
-import java.net.InetAddress;
 import java.util.Collection;
 
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
@@ -30,8 +30,8 @@ import org.apache.cassandra.db.WriteType;
  */
 public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
 {
-    public DatacenterWriteResponseHandler(Collection<InetAddress> 
naturalEndpoints,
-                                          Collection<InetAddress> 
pendingEndpoints,
+    public DatacenterWriteResponseHandler(Collection<InetAddressAndPort> 
naturalEndpoints,
+                                          Collection<InetAddressAndPort> 
pendingEndpoints,
                                           ConsistencyLevel consistencyLevel,
                                           Keyspace keyspace,
                                           Runnable callback,
@@ -66,7 +66,7 @@ public class DatacenterWriteResponseHandler<T> extends 
WriteResponseHandler<T>
     }
 
     @Override
-    protected boolean waitingFor(InetAddress from)
+    protected boolean waitingFor(InetAddressAndPort from)
     {
         return consistencyLevel.isLocal(from);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java 
b/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java
index 24cb3d7..bc49d3b 100644
--- a/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java
+++ b/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.service;
 
-import java.net.InetAddress;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * Interface on which interested parties can be notified of high level endpoint
@@ -35,33 +35,33 @@ public interface IEndpointLifecycleSubscriber
      *
      * @param endpoint the newly added endpoint.
      */
-    public void onJoinCluster(InetAddress endpoint);
+    public void onJoinCluster(InetAddressAndPort endpoint);
 
     /**
      * Called when a new node leave the cluster (decommission or removeToken).
      *
      * @param endpoint the endpoint that is leaving.
      */
-    public void onLeaveCluster(InetAddress endpoint);
+    public void onLeaveCluster(InetAddressAndPort endpoint);
 
     /**
      * Called when a node is marked UP.
      *
      * @param endpoint the endpoint marked UP.
      */
-    public void onUp(InetAddress endpoint);
+    public void onUp(InetAddressAndPort endpoint);
 
     /**
      * Called when a node is marked DOWN.
      *
      * @param endpoint the endpoint marked DOWN.
      */
-    public void onDown(InetAddress endpoint);
+    public void onDown(InetAddressAndPort endpoint);
 
     /**
      * Called when a node has moved (to a new token).
      *
      * @param endpoint the endpoint that has moved.
      */
-    public void onMove(InetAddress endpoint);
+    public void onMove(InetAddressAndPort endpoint);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/LoadBroadcaster.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/LoadBroadcaster.java 
b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
index 945dd2f..35c0b62 100644
--- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java
+++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
@@ -17,12 +17,12 @@
  */
 package org.apache.cassandra.service;
 
-import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,21 +38,21 @@ public class LoadBroadcaster implements 
IEndpointStateChangeSubscriber
 
     private static final Logger logger = 
LoggerFactory.getLogger(LoadBroadcaster.class);
 
-    private ConcurrentMap<InetAddress, Double> loadInfo = new 
ConcurrentHashMap<InetAddress, java.lang.Double>();
+    private ConcurrentMap<InetAddressAndPort, Double> loadInfo = new 
ConcurrentHashMap<>();
 
     private LoadBroadcaster()
     {
         Gossiper.instance.register(this);
     }
 
-    public void onChange(InetAddress endpoint, ApplicationState state, 
VersionedValue value)
+    public void onChange(InetAddressAndPort endpoint, ApplicationState state, 
VersionedValue value)
     {
         if (state != ApplicationState.LOAD)
             return;
         loadInfo.put(endpoint, Double.valueOf(value.value));
     }
 
-    public void onJoin(InetAddress endpoint, EndpointState epState)
+    public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
     {
         VersionedValue localValue = 
epState.getApplicationState(ApplicationState.LOAD);
         if (localValue != null)
@@ -61,20 +61,20 @@ public class LoadBroadcaster implements 
IEndpointStateChangeSubscriber
         }
     }
     
-    public void beforeChange(InetAddress endpoint, EndpointState currentState, 
ApplicationState newStateKey, VersionedValue newValue) {}
+    public void beforeChange(InetAddressAndPort endpoint, EndpointState 
currentState, ApplicationState newStateKey, VersionedValue newValue) {}
 
-    public void onAlive(InetAddress endpoint, EndpointState state) {}
+    public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
 
-    public void onDead(InetAddress endpoint, EndpointState state) {}
+    public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
 
-    public void onRestart(InetAddress endpoint, EndpointState state) {}
+    public void onRestart(InetAddressAndPort endpoint, EndpointState state) {}
 
-    public void onRemove(InetAddress endpoint)
+    public void onRemove(InetAddressAndPort endpoint)
     {
         loadInfo.remove(endpoint);
     }
 
-    public Map<InetAddress, Double> getLoadInfo()
+    public Map<InetAddressAndPort, Double> getLoadInfo()
     {
         return Collections.unmodifiableMap(loadInfo);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java 
b/src/java/org/apache/cassandra/service/ReadCallback.java
index 7ee6386..e7f30b4 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.net.InetAddress;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +37,7 @@ import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
@@ -56,7 +56,7 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
     final SimpleCondition condition = new SimpleCondition();
     private final long queryStartNanoTime;
     final int blockfor;
-    final List<InetAddress> endpoints;
+    final List<InetAddressAndPort> endpoints;
     private final ReadCommand command;
     private final ConsistencyLevel consistencyLevel;
     private static final AtomicIntegerFieldUpdater<ReadCallback> 
recievedUpdater
@@ -65,14 +65,14 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
     private static final AtomicIntegerFieldUpdater<ReadCallback> 
failuresUpdater
             = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, 
"failures");
     private volatile int failures = 0;
-    private final Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint;
+    private final Map<InetAddressAndPort, RequestFailureReason> 
failureReasonByEndpoint;
 
     private final Keyspace keyspace; // TODO push this into ConsistencyLevel?
 
     /**
      * Constructor when response count has to be calculated and blocked for.
      */
-    public ReadCallback(ResponseResolver resolver, ConsistencyLevel 
consistencyLevel, ReadCommand command, List<InetAddress> filteredEndpoints, 
long queryStartNanoTime)
+    public ReadCallback(ResponseResolver resolver, ConsistencyLevel 
consistencyLevel, ReadCommand command, List<InetAddressAndPort> 
filteredEndpoints, long queryStartNanoTime)
     {
         this(resolver,
              consistencyLevel,
@@ -83,7 +83,7 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
              queryStartNanoTime);
     }
 
-    public ReadCallback(ResponseResolver resolver, ConsistencyLevel 
consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, 
List<InetAddress> endpoints, long queryStartNanoTime)
+    public ReadCallback(ResponseResolver resolver, ConsistencyLevel 
consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, 
List<InetAddressAndPort> endpoints, long queryStartNanoTime)
     {
         this.command = command;
         this.keyspace = keyspace;
@@ -176,7 +176,7 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
     /**
      * @return true if the message counts towards the blockfor threshold
      */
-    private boolean waitingFor(InetAddress from)
+    private boolean waitingFor(InetAddressAndPort from)
     {
         return consistencyLevel.isDatacenterLocal()
              ? 
DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
@@ -193,9 +193,9 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
 
     public void response(ReadResponse result)
     {
-        MessageIn<ReadResponse> message = 
MessageIn.create(FBUtilities.getBroadcastAddress(),
+        MessageIn<ReadResponse> message = 
MessageIn.create(FBUtilities.getBroadcastAddressAndPort(),
                                                            result,
-                                                           
Collections.<String, byte[]>emptyMap(),
+                                                           
Collections.emptyMap(),
                                                            
MessagingService.Verb.INTERNAL_RESPONSE,
                                                            
MessagingService.current_version);
         response(message);
@@ -245,14 +245,14 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
                 final DataResolver repairResolver = new DataResolver(keyspace, 
command, consistencyLevel, endpoints.size(), queryStartNanoTime);
                 AsyncRepairCallback repairHandler = new 
AsyncRepairCallback(repairResolver, endpoints.size());
 
-                for (InetAddress endpoint : endpoints)
+                for (InetAddressAndPort endpoint : endpoints)
                     
MessagingService.instance().sendRR(command.createMessage(), endpoint, 
repairHandler);
             }
         }
     }
 
     @Override
-    public void onFailure(InetAddress from, RequestFailureReason failureReason)
+    public void onFailure(InetAddressAndPort from, RequestFailureReason 
failureReason)
     {
         int n = waitingFor(from)
               ? failuresUpdater.incrementAndGet(this)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/StartupChecks.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java 
b/src/java/org/apache/cassandra/service/StartupChecks.java
index 55099fc..224fd5e 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -437,7 +437,7 @@ public class StartupChecks
                 String storedDc = SystemKeyspace.getDatacenter();
                 if (storedDc != null)
                 {
-                    String currentDc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+                    String currentDc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
                     if (!storedDc.equals(currentDc))
                     {
                         String formatMessage = "Cannot start node if snitch's 
data center (%s) differs from previous data center (%s). " +
@@ -459,7 +459,7 @@ public class StartupChecks
                 String storedRack = SystemKeyspace.getRack();
                 if (storedRack != null)
                 {
-                    String currentRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress());
+                    String currentRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
                     if (!storedRack.equals(currentRack))
                     {
                         String formatMessage = "Cannot start node if snitch's 
rack (%s) differs from previous rack (%s). " +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to