http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index be0cf0f..dcf0cab 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -17,9 +17,7 @@
  */
 package org.apache.cassandra.service;
 
-import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.nio.file.Paths;
 import java.util.*;
@@ -64,7 +62,6 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.hints.Hint;
 import org.apache.cassandra.hints.HintsService;
 import org.apache.cassandra.index.Index;
-import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.metrics.*;
 import org.apache.cassandra.net.*;
@@ -95,9 +92,9 @@ public class StorageProxy implements StorageProxyMBean
     public static final StorageProxy instance = new StorageProxy();
 
     private static volatile int maxHintsInProgress = 128 * 
FBUtilities.getAvailableProcessors();
-    private static final CacheLoader<InetAddress, AtomicInteger> 
hintsInProgress = new CacheLoader<InetAddress, AtomicInteger>()
+    private static final CacheLoader<InetAddressAndPort, AtomicInteger> 
hintsInProgress = new CacheLoader<InetAddressAndPort, AtomicInteger>()
     {
-        public AtomicInteger load(InetAddress inetAddress)
+        public AtomicInteger load(InetAddressAndPort inetAddress)
         {
             return new AtomicInteger(0);
         }
@@ -135,7 +132,7 @@ public class StorageProxy implements StorageProxyMBean
         standardWritePerformer = new WritePerformer()
         {
             public void apply(IMutation mutation,
-                              Iterable<InetAddress> targets,
+                              Iterable<InetAddressAndPort> targets,
                               AbstractWriteResponseHandler<IMutation> 
responseHandler,
                               String localDataCenter,
                               ConsistencyLevel consistency_level)
@@ -155,7 +152,7 @@ public class StorageProxy implements StorageProxyMBean
         counterWritePerformer = new WritePerformer()
         {
             public void apply(IMutation mutation,
-                              Iterable<InetAddress> targets,
+                              Iterable<InetAddressAndPort> targets,
                               AbstractWriteResponseHandler<IMutation> 
responseHandler,
                               String localDataCenter,
                               ConsistencyLevel consistencyLevel)
@@ -167,7 +164,7 @@ public class StorageProxy implements StorageProxyMBean
         counterWriteOnCoordinatorPerformer = new WritePerformer()
         {
             public void apply(IMutation mutation,
-                              Iterable<InetAddress> targets,
+                              Iterable<InetAddressAndPort> targets,
                               AbstractWriteResponseHandler<IMutation> 
responseHandler,
                               String localDataCenter,
                               ConsistencyLevel consistencyLevel)
@@ -248,8 +245,8 @@ public class StorageProxy implements StorageProxyMBean
             while (System.nanoTime() - queryStartNanoTime < timeout)
             {
                 // for simplicity, we'll do a single liveness check at the 
start of each attempt
-                Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(metadata, key, consistencyForPaxos);
-                List<InetAddress> liveEndpoints = p.left;
+                Pair<List<InetAddressAndPort>, Integer> p = 
getPaxosParticipants(metadata, key, consistencyForPaxos);
+                List<InetAddressAndPort> liveEndpoints = p.left;
                 int requiredParticipants = p.right;
 
                 final Pair<UUID, Integer> pair = 
beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, 
requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state);
@@ -342,34 +339,34 @@ public class StorageProxy implements StorageProxyMBean
             casWriteMetrics.contention.update(contentions);
     }
 
-    private static Predicate<InetAddress> sameDCPredicateFor(final String dc)
+    private static Predicate<InetAddressAndPort> sameDCPredicateFor(final 
String dc)
     {
         final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        return new Predicate<InetAddress>()
+        return new Predicate<InetAddressAndPort>()
         {
-            public boolean apply(InetAddress host)
+            public boolean apply(InetAddressAndPort host)
             {
                 return dc.equals(snitch.getDatacenter(host));
             }
         };
     }
 
-    private static Pair<List<InetAddress>, Integer> 
getPaxosParticipants(TableMetadata metadata, DecoratedKey key, ConsistencyLevel 
consistencyForPaxos) throws UnavailableException
+    private static Pair<List<InetAddressAndPort>, Integer> 
getPaxosParticipants(TableMetadata metadata, DecoratedKey key, ConsistencyLevel 
consistencyForPaxos) throws UnavailableException
     {
         Token tk = key.getToken();
-        List<InetAddress> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(metadata.keyspace, tk);
-        Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
metadata.keyspace);
+        List<InetAddressAndPort> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(metadata.keyspace, tk);
+        Collection<InetAddressAndPort> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
metadata.keyspace);
         if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
         {
             // Restrict naturalEndpoints and pendingEndpoints to node in the 
local DC only
-            String localDc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
-            Predicate<InetAddress> isLocalDc = sameDCPredicateFor(localDc);
+            String localDc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+            Predicate<InetAddressAndPort> isLocalDc = 
sameDCPredicateFor(localDc);
             naturalEndpoints = 
ImmutableList.copyOf(Iterables.filter(naturalEndpoints, isLocalDc));
             pendingEndpoints = 
ImmutableList.copyOf(Iterables.filter(pendingEndpoints, isLocalDc));
         }
         int participants = pendingEndpoints.size() + naturalEndpoints.size();
         int requiredParticipants = participants / 2 + 1; // See 
CASSANDRA-8346, CASSANDRA-833
-        List<InetAddress> liveEndpoints = 
ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, 
pendingEndpoints), IAsyncCallback.isAlive));
+        List<InetAddressAndPort> liveEndpoints = 
ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, 
pendingEndpoints), IAsyncCallback.isAlive));
         if (liveEndpoints.size() < requiredParticipants)
             throw new UnavailableException(consistencyForPaxos, 
requiredParticipants, liveEndpoints.size());
 
@@ -394,7 +391,7 @@ public class StorageProxy implements StorageProxyMBean
     private static Pair<UUID, Integer> beginAndRepairPaxos(long 
queryStartNanoTime,
                                                            DecoratedKey key,
                                                            TableMetadata 
metadata,
-                                                           List<InetAddress> 
liveEndpoints,
+                                                           
List<InetAddressAndPort> liveEndpoints,
                                                            int 
requiredParticipants,
                                                            ConsistencyLevel 
consistencyForPaxos,
                                                            ConsistencyLevel 
consistencyForCommit,
@@ -472,7 +469,7 @@ public class StorageProxy implements StorageProxyMBean
             // Since we waited for quorum nodes, if some of them haven't seen 
the last commit (which may just be a timing issue, but may also
             // mean we lost messages), we pro-actively "repair" those nodes, 
and retry.
             int nowInSec = 
Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(ballotMicros));
-            Iterable<InetAddress> missingMRC = 
summary.replicasMissingMostRecentCommit(metadata, nowInSec);
+            Iterable<InetAddressAndPort> missingMRC = 
summary.replicasMissingMostRecentCommit(metadata, nowInSec);
             if (Iterables.size(missingMRC) > 0)
             {
                 Tracing.trace("Repairing replicas that missed the most recent 
commit");
@@ -494,19 +491,19 @@ public class StorageProxy implements StorageProxyMBean
     /**
      * Unlike commitPaxos, this does not wait for replies
      */
-    private static void sendCommit(Commit commit, Iterable<InetAddress> 
replicas)
+    private static void sendCommit(Commit commit, Iterable<InetAddressAndPort> 
replicas)
     {
         MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, commit, 
Commit.serializer);
-        for (InetAddress target : replicas)
+        for (InetAddressAndPort target : replicas)
             MessagingService.instance().sendOneWay(message, target);
     }
 
-    private static PrepareCallback preparePaxos(Commit toPrepare, 
List<InetAddress> endpoints, int requiredParticipants, ConsistencyLevel 
consistencyForPaxos, long queryStartNanoTime)
+    private static PrepareCallback preparePaxos(Commit toPrepare, 
List<InetAddressAndPort> endpoints, int requiredParticipants, ConsistencyLevel 
consistencyForPaxos, long queryStartNanoTime)
     throws WriteTimeoutException
     {
         PrepareCallback callback = new 
PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), 
requiredParticipants, consistencyForPaxos, queryStartNanoTime);
         MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, 
Commit.serializer);
-        for (InetAddress target : endpoints)
+        for (InetAddressAndPort target : endpoints)
         {
             if (canDoLocalRequest(target))
             {
@@ -516,9 +513,9 @@ public class StorageProxy implements StorageProxyMBean
                     {
                         try
                         {
-                            MessageIn<PrepareResponse> message = 
MessageIn.create(FBUtilities.getBroadcastAddress(),
+                            MessageIn<PrepareResponse> message = 
MessageIn.create(FBUtilities.getBroadcastAddressAndPort(),
                                     PrepareVerbHandler.doPrepare(toPrepare),
-                                    Collections.<String, byte[]>emptyMap(),
+                                    Collections.emptyMap(),
                                     MessagingService.Verb.INTERNAL_RESPONSE,
                                     MessagingService.current_version);
                             callback.response(message);
@@ -539,12 +536,12 @@ public class StorageProxy implements StorageProxyMBean
         return callback;
     }
 
-    private static boolean proposePaxos(Commit proposal, List<InetAddress> 
endpoints, int requiredParticipants, boolean timeoutIfPartial, ConsistencyLevel 
consistencyLevel, long queryStartNanoTime)
+    private static boolean proposePaxos(Commit proposal, 
List<InetAddressAndPort> endpoints, int requiredParticipants, boolean 
timeoutIfPartial, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws WriteTimeoutException
     {
         ProposeCallback callback = new ProposeCallback(endpoints.size(), 
requiredParticipants, !timeoutIfPartial, consistencyLevel, queryStartNanoTime);
         MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, 
Commit.serializer);
-        for (InetAddress target : endpoints)
+        for (InetAddressAndPort target : endpoints)
         {
             if (canDoLocalRequest(target))
             {
@@ -554,9 +551,9 @@ public class StorageProxy implements StorageProxyMBean
                     {
                         try
                         {
-                            MessageIn<Boolean> message = 
MessageIn.create(FBUtilities.getBroadcastAddress(),
+                            MessageIn<Boolean> message = 
MessageIn.create(FBUtilities.getBroadcastAddressAndPort(),
                                     ProposeVerbHandler.doPropose(proposal),
-                                    Collections.<String, byte[]>emptyMap(),
+                                    Collections.emptyMap(),
                                     MessagingService.Verb.INTERNAL_RESPONSE,
                                     MessagingService.current_version);
                             callback.response(message);
@@ -590,8 +587,8 @@ public class StorageProxy implements StorageProxyMBean
         Keyspace keyspace = Keyspace.open(proposal.update.metadata().keyspace);
 
         Token tk = proposal.update.partitionKey().getToken();
-        List<InetAddress> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk);
-        Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspace.getName());
+        List<InetAddressAndPort> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk);
+        Collection<InetAddressAndPort> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspace.getName());
 
         AbstractWriteResponseHandler<Commit> responseHandler = null;
         if (shouldBlock)
@@ -602,7 +599,7 @@ public class StorageProxy implements StorageProxyMBean
         }
 
         MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, 
Commit.serializer);
-        for (InetAddress destination : Iterables.concat(naturalEndpoints, 
pendingEndpoints))
+        for (InetAddressAndPort destination : 
Iterables.concat(naturalEndpoints, pendingEndpoints))
         {
             checkHintOverload(destination);
 
@@ -658,7 +655,7 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     if (!(ex instanceof WriteTimeoutException))
                         logger.error("Failed to apply paxos commit locally : 
", ex);
-                    
responseHandler.onFailure(FBUtilities.getBroadcastAddress(), 
RequestFailureReason.UNKNOWN);
+                    
responseHandler.onFailure(FBUtilities.getBroadcastAddressAndPort(), 
RequestFailureReason.UNKNOWN);
                 }
             }
 
@@ -684,7 +681,7 @@ public class StorageProxy implements StorageProxyMBean
     throws UnavailableException, OverloadedException, WriteTimeoutException, 
WriteFailureException
     {
         Tracing.trace("Determining replicas for mutation");
-        final String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+        final String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
 
         long startTime = System.nanoTime();
         List<AbstractWriteResponseHandler<IMutation>> responseHandlers = new 
ArrayList<>(mutations.size());
@@ -781,13 +778,13 @@ public class StorageProxy implements StorageProxyMBean
         String keyspaceName = mutation.getKeyspaceName();
         Token token = mutation.key().getToken();
 
-        Iterable<InetAddress> endpoints = 
StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token);
-        ArrayList<InetAddress> endpointsToHint = new 
ArrayList<>(Iterables.size(endpoints));
+        Iterable<InetAddressAndPort> endpoints = 
StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token);
+        ArrayList<InetAddressAndPort> endpointsToHint = new 
ArrayList<>(Iterables.size(endpoints));
 
         // local writes can timeout, but cannot be dropped (see 
LocalMutationRunnable and CASSANDRA-6510),
         // so there is no need to hint or retry.
-        for (InetAddress target : endpoints)
-            if (!target.equals(FBUtilities.getBroadcastAddress()) && 
shouldHint(target))
+        for (InetAddressAndPort target : endpoints)
+            if (!target.equals(FBUtilities.getBroadcastAddressAndPort()) && 
shouldHint(target))
                 endpointsToHint.add(target);
 
         submitHint(mutation, endpointsToHint, null);
@@ -797,7 +794,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         String keyspaceName = mutation.getKeyspaceName();
         Token token = mutation.key().getToken();
-        InetAddress local = FBUtilities.getBroadcastAddress();
+        InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
 
         return StorageService.instance.getNaturalEndpoints(keyspaceName, 
token).contains(local)
                || 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(token, 
keyspaceName).contains(local);
@@ -816,7 +813,7 @@ public class StorageProxy implements StorageProxyMBean
     throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         Tracing.trace("Determining replicas for mutation");
-        final String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+        final String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
 
         long startTime = System.nanoTime();
 
@@ -841,7 +838,7 @@ public class StorageProxy implements StorageProxyMBean
                 ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
 
                 //Since the base -> view replication is 1:1 we only need to 
store the BL locally
-                final Collection<InetAddress> batchlogEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddress());
+                final Collection<InetAddressAndPort> batchlogEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
                 BatchlogResponseHandler.BatchlogCleanup cleanup = new 
BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> 
asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
 
                 // add a handler for each mutation - includes checking 
availability, but doesn't initiate any writes, yet
@@ -849,8 +846,8 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     String keyspaceName = mutation.getKeyspaceName();
                     Token tk = mutation.key().getToken();
-                    Optional<InetAddress> pairedEndpoint = 
ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
-                    Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
+                    Optional<InetAddressAndPort> pairedEndpoint = 
ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
+                    Collection<InetAddressAndPort> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
 
                     // if there are no paired endpoints there are probably 
range movements going on, so we write to the local batchlog to replay later
                     if (!pairedEndpoint.isPresent())
@@ -865,7 +862,7 @@ public class StorageProxy implements StorageProxyMBean
                     }
 
                     // When local node is the paired endpoint just apply the 
mutation locally.
-                    if 
(pairedEndpoint.get().equals(FBUtilities.getBroadcastAddress()) && 
StorageService.instance.isJoined())
+                    if 
(pairedEndpoint.get().equals(FBUtilities.getBroadcastAddressAndPort()) && 
StorageService.instance.isJoined())
                     {
                         try
                         {
@@ -956,7 +953,7 @@ public class StorageProxy implements StorageProxyMBean
         long startTime = System.nanoTime();
 
         List<WriteResponseHandlerWrapper> wrappers = new 
ArrayList<WriteResponseHandlerWrapper>(mutations.size());
-        String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+        String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
 
         try
         {
@@ -974,7 +971,7 @@ public class StorageProxy implements StorageProxyMBean
                     batchConsistencyLevel = consistency_level;
             }
 
-            final Collection<InetAddress> batchlogEndpoints = 
getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
+            final Collection<InetAddressAndPort> batchlogEndpoints = 
getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
             final UUID batchUUID = UUIDGen.getTimeUUID();
             BatchlogResponseHandler.BatchlogCleanup cleanup = new 
BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
                                                                                
                           () -> asyncRemoveFromBatchlog(batchlogEndpoints, 
batchUUID));
@@ -1029,16 +1026,16 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    public static boolean canDoLocalRequest(InetAddress replica)
+    public static boolean canDoLocalRequest(InetAddressAndPort replica)
     {
-        return replica.equals(FBUtilities.getBroadcastAddress());
+        return replica.equals(FBUtilities.getBroadcastAddressAndPort());
     }
 
-    private static void syncWriteToBatchlog(Collection<Mutation> mutations, 
Collection<InetAddress> endpoints, UUID uuid, long queryStartNanoTime)
+    private static void syncWriteToBatchlog(Collection<Mutation> mutations, 
Collection<InetAddressAndPort> endpoints, UUID uuid, long queryStartNanoTime)
     throws WriteTimeoutException, WriteFailureException
     {
         WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints,
-                                                                     
Collections.<InetAddress>emptyList(),
+                                                                     
Collections.emptyList(),
                                                                      
endpoints.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO,
                                                                      
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME),
                                                                      null,
@@ -1047,7 +1044,7 @@ public class StorageProxy implements StorageProxyMBean
 
         Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), 
mutations);
         MessageOut<Batch> message = new 
MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer);
-        for (InetAddress target : endpoints)
+        for (InetAddressAndPort target : endpoints)
         {
             logger.trace("Sending batchlog store request {} to {} for {} 
mutations", batch.id, target, batch.size());
 
@@ -1059,10 +1056,10 @@ public class StorageProxy implements StorageProxyMBean
         handler.get();
     }
 
-    private static void asyncRemoveFromBatchlog(Collection<InetAddress> 
endpoints, UUID uuid)
+    private static void asyncRemoveFromBatchlog(Collection<InetAddressAndPort> 
endpoints, UUID uuid)
     {
         MessageOut<UUID> message = new 
MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, 
UUIDSerializer.serializer);
-        for (InetAddress target : endpoints)
+        for (InetAddressAndPort target : endpoints)
         {
             if (logger.isTraceEnabled())
                 logger.trace("Sending batchlog remove request {} to {}", uuid, 
target);
@@ -1078,7 +1075,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
-            Iterable<InetAddress> endpoints = 
Iterables.concat(wrapper.handler.naturalEndpoints, 
wrapper.handler.pendingEndpoints);
+            Iterable<InetAddressAndPort> endpoints = 
Iterables.concat(wrapper.handler.naturalEndpoints, 
wrapper.handler.pendingEndpoints);
 
             try
             {
@@ -1086,7 +1083,7 @@ public class StorageProxy implements StorageProxyMBean
             }
             catch (OverloadedException | WriteTimeoutException e)
             {
-                wrapper.handler.onFailure(FBUtilities.getBroadcastAddress(), 
RequestFailureReason.UNKNOWN);
+                
wrapper.handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), 
RequestFailureReason.UNKNOWN);
             }
         }
     }
@@ -1096,7 +1093,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
-            Iterable<InetAddress> endpoints = 
Iterables.concat(wrapper.handler.naturalEndpoints, 
wrapper.handler.pendingEndpoints);
+            Iterable<InetAddressAndPort> endpoints = 
Iterables.concat(wrapper.handler.naturalEndpoints, 
wrapper.handler.pendingEndpoints);
             sendToHintedEndpoints(wrapper.mutation, endpoints, 
wrapper.handler, localDataCenter, stage);
         }
 
@@ -1132,8 +1129,8 @@ public class StorageProxy implements StorageProxyMBean
         AbstractReplicationStrategy rs = 
Keyspace.open(keyspaceName).getReplicationStrategy();
 
         Token tk = mutation.key().getToken();
-        List<InetAddress> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
-        Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
+        List<InetAddressAndPort> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
+        Collection<InetAddressAndPort> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
 
         AbstractWriteResponseHandler<IMutation> responseHandler = 
rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, 
consistency_level, callback, writeType, queryStartNanoTime);
 
@@ -1156,8 +1153,8 @@ public class StorageProxy implements StorageProxyMBean
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
         String keyspaceName = mutation.getKeyspaceName();
         Token tk = mutation.key().getToken();
-        List<InetAddress> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
-        Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
+        List<InetAddressAndPort> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
+        Collection<InetAddressAndPort> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
         AbstractWriteResponseHandler<IMutation> writeHandler = 
rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, 
consistency_level, null, writeType, queryStartNanoTime);
         BatchlogResponseHandler<IMutation> batchHandler = new 
BatchlogResponseHandler<>(writeHandler, 
batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
         return new WriteResponseHandlerWrapper(batchHandler, mutation);
@@ -1170,7 +1167,7 @@ public class StorageProxy implements StorageProxyMBean
     private static WriteResponseHandlerWrapper 
wrapViewBatchResponseHandler(Mutation mutation,
                                                                             
ConsistencyLevel consistency_level,
                                                                             
ConsistencyLevel batchConsistencyLevel,
-                                                                            
List<InetAddress> naturalEndpoints,
+                                                                            
List<InetAddressAndPort> naturalEndpoints,
                                                                             
AtomicLong baseComplete,
                                                                             
WriteType writeType,
                                                                             
BatchlogResponseHandler.BatchlogCleanup cleanup,
@@ -1180,7 +1177,7 @@ public class StorageProxy implements StorageProxyMBean
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
         String keyspaceName = mutation.getKeyspaceName();
         Token tk = mutation.key().getToken();
-        Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
+        Collection<InetAddressAndPort> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
         AbstractWriteResponseHandler<IMutation> writeHandler = 
rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, 
consistency_level, () -> {
             long delay = Math.max(0, System.currentTimeMillis() - 
baseComplete.get());
             viewWriteMetrics.viewWriteLatency.update(delay, 
TimeUnit.MILLISECONDS);
@@ -1209,18 +1206,18 @@ public class StorageProxy implements StorageProxyMBean
      * - choose min(2, number of qualifying candiates above)
      * - allow the local node to be the only replica only if it's a 
single-node DC
      */
-    private static Collection<InetAddress> getBatchlogEndpoints(String 
localDataCenter, ConsistencyLevel consistencyLevel)
+    private static Collection<InetAddressAndPort> getBatchlogEndpoints(String 
localDataCenter, ConsistencyLevel consistencyLevel)
     throws UnavailableException
     {
         TokenMetadata.Topology topology = 
StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
-        Multimap<String, InetAddress> localEndpoints = 
HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
-        String localRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress());
+        Multimap<String, InetAddressAndPort> localEndpoints = 
HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
+        String localRack = 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
 
-        Collection<InetAddress> chosenEndpoints = new 
BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
+        Collection<InetAddressAndPort> chosenEndpoints = new 
BatchlogManager.EndpointFilter(localRack, localEndpoints).filter();
         if (chosenEndpoints.isEmpty())
         {
             if (consistencyLevel == ConsistencyLevel.ANY)
-                return 
Collections.singleton(FBUtilities.getBroadcastAddress());
+                return 
Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
 
             throw new UnavailableException(ConsistencyLevel.ONE, 1, 0);
         }
@@ -1246,7 +1243,7 @@ public class StorageProxy implements StorageProxyMBean
      * @throws OverloadedException if the hints cannot be written/enqueued
      */
     public static void sendToHintedEndpoints(final Mutation mutation,
-                                             Iterable<InetAddress> targets,
+                                             Iterable<InetAddressAndPort> 
targets,
                                              
AbstractWriteResponseHandler<IMutation> responseHandler,
                                              String localDataCenter,
                                              Stage stage)
@@ -1255,18 +1252,18 @@ public class StorageProxy implements StorageProxyMBean
         int targetsSize = Iterables.size(targets);
 
         // this dc replicas:
-        Collection<InetAddress> localDc = null;
+        Collection<InetAddressAndPort> localDc = null;
         // extra-datacenter replicas, grouped by dc
-        Map<String, Collection<InetAddress>> dcGroups = null;
+        Map<String, Collection<InetAddressAndPort>> dcGroups = null;
         // only need to create a Message for non-local writes
         MessageOut<Mutation> message = null;
 
         boolean insertLocal = false;
-        ArrayList<InetAddress> endpointsToHint = null;
+        ArrayList<InetAddressAndPort> endpointsToHint = null;
 
-        List<InetAddress> backPressureHosts = null;
+        List<InetAddressAndPort> backPressureHosts = null;
 
-        for (InetAddress destination : targets)
+        for (InetAddressAndPort destination : targets)
         {
             checkHintOverload(destination);
 
@@ -1295,7 +1292,7 @@ public class StorageProxy implements StorageProxyMBean
                     }
                     else
                     {
-                        Collection<InetAddress> messages = (dcGroups != null) 
? dcGroups.get(dc) : null;
+                        Collection<InetAddressAndPort> messages = (dcGroups != 
null) ? dcGroups.get(dc) : null;
                         if (messages == null)
                         {
                             messages = new ArrayList<>(3); // most DCs will 
have <= 3 replicas
@@ -1338,18 +1335,18 @@ public class StorageProxy implements StorageProxyMBean
 
         if (localDc != null)
         {
-            for (InetAddress destination : localDc)
+            for (InetAddressAndPort destination : localDc)
                 MessagingService.instance().sendRR(message, destination, 
responseHandler, true);
         }
         if (dcGroups != null)
         {
             // for each datacenter, send the message to one node to relay the 
write to other replicas
-            for (Collection<InetAddress> dcTargets : dcGroups.values())
+            for (Collection<InetAddressAndPort> dcTargets : dcGroups.values())
                 sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
         }
     }
 
-    private static void checkHintOverload(InetAddress destination)
+    private static void checkHintOverload(InetAddressAndPort destination)
     {
         // avoid OOMing due to excess hints.  we need to do this check even 
for "live" nodes, since we can
         // still generate hints for those if it's overloaded or simply dead 
but not yet known-to-be-dead.
@@ -1366,39 +1363,31 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     private static void sendMessagesToNonlocalDC(MessageOut<? extends 
IMutation> message,
-                                                 Collection<InetAddress> 
targets,
+                                                 
Collection<InetAddressAndPort> targets,
                                                  
AbstractWriteResponseHandler<IMutation> handler)
     {
-        Iterator<InetAddress> iter = targets.iterator();
-        InetAddress target = iter.next();
+        Iterator<InetAddressAndPort> iter = targets.iterator();
+        int[] messageIds = new int[targets.size()];
+        InetAddressAndPort target = iter.next();
 
+        int idIdx = 0;
         // Add the other destinations of the same message as a FORWARD_HEADER 
entry
-        try(DataOutputBuffer out = new DataOutputBuffer())
-        {
-            out.writeInt(targets.size() - 1);
-            while (iter.hasNext())
-            {
-                InetAddress destination = iter.next();
-                CompactEndpointSerializationHelper.serialize(destination, out);
-                int id = MessagingService.instance().addCallback(handler,
-                                                                 message,
-                                                                 destination,
-                                                                 
message.getTimeout(),
-                                                                 
handler.consistencyLevel,
-                                                                 true);
-                out.writeInt(id);
-                logger.trace("Adding FWD message to {}@{}", id, destination);
-            }
-            message = message.withParameter(Mutation.FORWARD_TO, 
out.getData());
-            // send the combined message + forward headers
-            int id = MessagingService.instance().sendRR(message, target, 
handler, true);
-            logger.trace("Sending message to {}@{}", id, target);
-        }
-        catch (IOException e)
-        {
-            // DataOutputBuffer is in-memory, doesn't throw IOException
-            throw new AssertionError(e);
+        while (iter.hasNext())
+        {
+            InetAddressAndPort destination = iter.next();
+            int id = MessagingService.instance().addCallback(handler,
+                                                             message,
+                                                             destination,
+                                                             
message.getTimeout(),
+                                                             
handler.consistencyLevel,
+                                                             true);
+            messageIds[idIdx++] = id;
+            logger.trace("Adding FWD message to {}@{}", id, destination);
         }
+        message = message.withParameter(ParameterType.FORWARD_TO.FORWARD_TO, 
new ForwardToContainer(targets, messageIds));
+        // send the combined message + forward headers
+        int id = MessagingService.instance().sendRR(message, target, handler, 
true);
+        logger.trace("Sending message to {}@{}", id, target);
     }
 
     private static void performLocally(Stage stage, final Runnable runnable)
@@ -1440,7 +1429,7 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     if (!(ex instanceof WriteTimeoutException))
                         logger.error("Failed to apply mutation locally : ", 
ex);
-                    handler.onFailure(FBUtilities.getBroadcastAddress(), 
RequestFailureReason.UNKNOWN);
+                    
handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), 
RequestFailureReason.UNKNOWN);
                 }
             }
 
@@ -1468,9 +1457,9 @@ public class StorageProxy implements StorageProxyMBean
      */
     public static AbstractWriteResponseHandler<IMutation> 
mutateCounter(CounterMutation cm, String localDataCenter, long 
queryStartNanoTime) throws UnavailableException, OverloadedException
     {
-        InetAddress endpoint = findSuitableEndpoint(cm.getKeyspaceName(), 
cm.key(), localDataCenter, cm.consistency());
+        InetAddressAndPort endpoint = 
findSuitableEndpoint(cm.getKeyspaceName(), cm.key(), localDataCenter, 
cm.consistency());
 
-        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
         {
             return applyCounterMutationOnCoordinator(cm, localDataCenter, 
queryStartNanoTime);
         }
@@ -1480,8 +1469,8 @@ public class StorageProxy implements StorageProxyMBean
             String keyspaceName = cm.getKeyspaceName();
             AbstractReplicationStrategy rs = 
Keyspace.open(keyspaceName).getReplicationStrategy();
             Token tk = cm.key().getToken();
-            List<InetAddress> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
-            Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
+            List<InetAddressAndPort> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
+            Collection<InetAddressAndPort> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
 
             rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, 
cm.consistency(), null, WriteType.COUNTER, 
queryStartNanoTime).assureSufficientLiveNodes();
 
@@ -1504,11 +1493,11 @@ public class StorageProxy implements StorageProxyMBean
      * is unclear we want to mix those latencies with read latencies, so this
      * may be a bit involved.
      */
-    private static InetAddress findSuitableEndpoint(String keyspaceName, 
DecoratedKey key, String localDataCenter, ConsistencyLevel cl) throws 
UnavailableException
+    private static InetAddressAndPort findSuitableEndpoint(String 
keyspaceName, DecoratedKey key, String localDataCenter, ConsistencyLevel cl) 
throws UnavailableException
     {
         Keyspace keyspace = Keyspace.open(keyspaceName);
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        List<InetAddress> endpoints = new ArrayList<>();
+        List<InetAddressAndPort> endpoints = new ArrayList<>();
         StorageService.instance.getLiveNaturalEndpoints(keyspace, key, 
endpoints);
 
         // CASSANDRA-13043: filter out those endpoints not accepting clients 
yet, maybe because still bootstrapping
@@ -1518,9 +1507,9 @@ public class StorageProxy implements StorageProxyMBean
         if (endpoints.isEmpty())
             throw new UnavailableException(cl, cl.blockFor(keyspace), 0);
 
-        List<InetAddress> localEndpoints = new ArrayList<>(endpoints.size());
+        List<InetAddressAndPort> localEndpoints = new 
ArrayList<>(endpoints.size());
 
-        for (InetAddress endpoint : endpoints)
+        for (InetAddressAndPort endpoint : endpoints)
             if (snitch.getDatacenter(endpoint).equals(localDataCenter))
                 localEndpoints.add(endpoint);
 
@@ -1531,7 +1520,7 @@ public class StorageProxy implements StorageProxyMBean
                 throw new UnavailableException(cl, cl.blockFor(keyspace), 0);
 
             // No endpoint in local DC, pick the closest endpoint according to 
the snitch
-            snitch.sortByProximity(FBUtilities.getBroadcastAddress(), 
endpoints);
+            snitch.sortByProximity(FBUtilities.getBroadcastAddressAndPort(), 
endpoints);
             return endpoints.get(0);
         }
 
@@ -1555,7 +1544,7 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     private static Runnable counterWriteTask(final IMutation mutation,
-                                             final Iterable<InetAddress> 
targets,
+                                             final 
Iterable<InetAddressAndPort> targets,
                                              final 
AbstractWriteResponseHandler<IMutation> responseHandler,
                                              final String localDataCenter)
     {
@@ -1569,8 +1558,8 @@ public class StorageProxy implements StorageProxyMBean
                 Mutation result = ((CounterMutation) 
mutation).applyCounterMutation();
                 responseHandler.response(null);
 
-                Set<InetAddress> remotes = 
Sets.difference(ImmutableSet.copyOf(targets),
-                                                           
ImmutableSet.of(FBUtilities.getBroadcastAddress()));
+                Set<InetAddressAndPort> remotes = 
Sets.difference(ImmutableSet.copyOf(targets),
+                                                                  
ImmutableSet.of(FBUtilities.getBroadcastAddressAndPort()));
                 if (!remotes.isEmpty())
                     sendToHintedEndpoints(result, remotes, responseHandler, 
localDataCenter, Stage.COUNTER_MUTATION);
             }
@@ -1640,8 +1629,8 @@ public class StorageProxy implements StorageProxyMBean
         try
         {
             // make sure any in-progress paxos writes are done (i.e., 
committed to a majority of replicas), before performing a quorum read
-            Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(metadata, key, consistencyLevel);
-            List<InetAddress> liveEndpoints = p.left;
+            Pair<List<InetAddressAndPort>, Integer> p = 
getPaxosParticipants(metadata, key, consistencyLevel);
+            List<InetAddressAndPort> liveEndpoints = p.left;
             int requiredParticipants = p.right;
 
             // does the work of applying in-progress writes; throws UAE or 
timeout if it can't
@@ -1844,7 +1833,7 @@ public class StorageProxy implements StorageProxyMBean
                                                  executor.handler.endpoints,
                                                  queryStartNanoTime);
 
-                for (InetAddress endpoint : executor.getContactedReplicas())
+                for (InetAddressAndPort endpoint : 
executor.getContactedReplicas())
                 {
                     Tracing.trace("Enqueuing full data read to {}", endpoint);
                     
MessagingService.instance().sendRRWithFailure(command.createMessage(), 
endpoint, repairHandler);
@@ -1920,47 +1909,47 @@ public class StorageProxy implements StorageProxyMBean
                 else
                 {
                     MessagingService.instance().incrementDroppedMessages(verb, 
System.currentTimeMillis() - constructionTime);
-                    handler.onFailure(FBUtilities.getBroadcastAddress(), 
RequestFailureReason.UNKNOWN);
+                    
handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), 
RequestFailureReason.UNKNOWN);
                 }
 
-                
MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+                
MessagingService.instance().addLatency(FBUtilities.getBroadcastAddressAndPort(),
 TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
             }
             catch (Throwable t)
             {
                 if (t instanceof TombstoneOverwhelmingException)
                 {
-                    handler.onFailure(FBUtilities.getBroadcastAddress(), 
RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+                    
handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), 
RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
                     logger.error(t.getMessage());
                 }
                 else
                 {
-                    handler.onFailure(FBUtilities.getBroadcastAddress(), 
RequestFailureReason.UNKNOWN);
+                    
handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), 
RequestFailureReason.UNKNOWN);
                     throw t;
                 }
             }
         }
     }
 
-    public static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, 
ByteBuffer key)
+    public static List<InetAddressAndPort> getLiveSortedEndpoints(Keyspace 
keyspace, ByteBuffer key)
     {
         return getLiveSortedEndpoints(keyspace, 
StorageService.instance.getTokenMetadata().decorateKey(key));
     }
 
-    public static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, 
RingPosition pos)
+    public static List<InetAddressAndPort> getLiveSortedEndpoints(Keyspace 
keyspace, RingPosition pos)
     {
-        List<InetAddress> liveEndpoints = 
StorageService.instance.getLiveNaturalEndpoints(keyspace, pos);
-        
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(),
 liveEndpoints);
+        List<InetAddressAndPort> liveEndpoints = 
StorageService.instance.getLiveNaturalEndpoints(keyspace, pos);
+        
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddressAndPort(),
 liveEndpoints);
         return liveEndpoints;
     }
 
-    private static List<InetAddress> intersection(List<InetAddress> l1, 
List<InetAddress> l2)
+    private static List<InetAddressAndPort> 
intersection(List<InetAddressAndPort> l1, List<InetAddressAndPort> l2)
     {
         // Note: we don't use Guava Sets.intersection() for 3 reasons:
         //   1) retainAll would be inefficient if l1 and l2 are large but in 
practice both are the replicas for a range and
         //   so will be very small (< RF). In that case, retainAll is in fact 
more efficient.
         //   2) we do ultimately need a list so converting everything to sets 
don't make sense
         //   3) l1 and l2 are sorted by proximity. The use of retainAll  
maintain that sorting in the result, while using sets wouldn't.
-        List<InetAddress> inter = new ArrayList<InetAddress>(l1);
+        List<InetAddressAndPort> inter = new ArrayList<>(l1);
         inter.retainAll(l2);
         return inter;
     }
@@ -1986,10 +1975,10 @@ public class StorageProxy implements StorageProxyMBean
     private static class RangeForQuery
     {
         public final AbstractBounds<PartitionPosition> range;
-        public final List<InetAddress> liveEndpoints;
-        public final List<InetAddress> filteredEndpoints;
+        public final List<InetAddressAndPort> liveEndpoints;
+        public final List<InetAddressAndPort> filteredEndpoints;
 
-        public RangeForQuery(AbstractBounds<PartitionPosition> range, 
List<InetAddress> liveEndpoints, List<InetAddress> filteredEndpoints)
+        public RangeForQuery(AbstractBounds<PartitionPosition> range, 
List<InetAddressAndPort> liveEndpoints, List<InetAddressAndPort> 
filteredEndpoints)
         {
             this.range = range;
             this.liveEndpoints = liveEndpoints;
@@ -2027,7 +2016,7 @@ public class StorageProxy implements StorageProxyMBean
                 return endOfData();
 
             AbstractBounds<PartitionPosition> range = ranges.next();
-            List<InetAddress> liveEndpoints = getLiveSortedEndpoints(keyspace, 
range.right);
+            List<InetAddressAndPort> liveEndpoints = 
getLiveSortedEndpoints(keyspace, range.right);
             return new RangeForQuery(range,
                                      liveEndpoints,
                                      consistency.filterForQuery(keyspace, 
liveEndpoints));
@@ -2069,13 +2058,13 @@ public class StorageProxy implements StorageProxyMBean
 
                 RangeForQuery next = ranges.peek();
 
-                List<InetAddress> merged = intersection(current.liveEndpoints, 
next.liveEndpoints);
+                List<InetAddressAndPort> merged = 
intersection(current.liveEndpoints, next.liveEndpoints);
 
                 // Check if there is enough endpoint for the merge to be 
possible.
                 if (!consistency.isSufficientLiveNodes(keyspace, merged))
                     break;
 
-                List<InetAddress> filteredMerged = 
consistency.filterForQuery(keyspace, merged);
+                List<InetAddressAndPort> filteredMerged = 
consistency.filterForQuery(keyspace, merged);
 
                 // Estimate whether merging will be a win or not
                 if 
(!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged,
 current.filteredEndpoints, next.filteredEndpoints))
@@ -2237,7 +2226,7 @@ public class StorageProxy implements StorageProxyMBean
 
             int blockFor = consistency.blockFor(keyspace);
             int minResponses = Math.min(toQuery.filteredEndpoints.size(), 
blockFor);
-            List<InetAddress> minimalEndpoints = 
toQuery.filteredEndpoints.subList(0, minResponses);
+            List<InetAddressAndPort> minimalEndpoints = 
toQuery.filteredEndpoints.subList(0, minResponses);
             ReadCallback handler = new ReadCallback(resolver, consistency, 
rangeCommand, minimalEndpoints, queryStartNanoTime);
 
             handler.assureSufficientLiveNodes();
@@ -2248,7 +2237,7 @@ public class StorageProxy implements StorageProxyMBean
             }
             else
             {
-                for (InetAddress endpoint : toQuery.filteredEndpoints)
+                for (InetAddressAndPort endpoint : toQuery.filteredEndpoints)
                 {
                     Tracing.trace("Enqueuing request to {}", endpoint);
                     
MessagingService.instance().sendRRWithFailure(rangeCommand.createMessage(), 
endpoint, handler);
@@ -2320,7 +2309,12 @@ public class StorageProxy implements StorageProxyMBean
 
     public Map<String, List<String>> getSchemaVersions()
     {
-        return describeSchemaVersions();
+        return describeSchemaVersions(false);
+    }
+
+    public Map<String, List<String>> getSchemaVersionsWithPort()
+    {
+        return describeSchemaVersions(true);
     }
 
     /**
@@ -2328,11 +2322,11 @@ public class StorageProxy implements StorageProxyMBean
      * migration id. This is useful for determining if a schema change has 
propagated through the cluster. Disagreement
      * is assumed if any node fails to respond.
      */
-    public static Map<String, List<String>> describeSchemaVersions()
+    public static Map<String, List<String>> describeSchemaVersions(boolean 
withPort)
     {
         final String myVersion = Schema.instance.getVersion().toString();
-        final Map<InetAddress, UUID> versions = new 
ConcurrentHashMap<InetAddress, UUID>();
-        final Set<InetAddress> liveHosts = Gossiper.instance.getLiveMembers();
+        final Map<InetAddressAndPort, UUID> versions = new 
ConcurrentHashMap<>();
+        final Set<InetAddressAndPort> liveHosts = 
Gossiper.instance.getLiveMembers();
         final CountDownLatch latch = new CountDownLatch(liveHosts.size());
 
         IAsyncCallback<UUID> cb = new IAsyncCallback<UUID>()
@@ -2351,7 +2345,7 @@ public class StorageProxy implements StorageProxyMBean
         };
         // an empty message acts as a request to the SchemaVersionVerbHandler.
         MessageOut message = new 
MessageOut(MessagingService.Verb.SCHEMA_CHECK);
-        for (InetAddress endpoint : liveHosts)
+        for (InetAddressAndPort endpoint : liveHosts)
             MessagingService.instance().sendRR(message, endpoint, cb);
 
         try
@@ -2366,8 +2360,8 @@ public class StorageProxy implements StorageProxyMBean
 
         // maps versions to hosts that are on that version.
         Map<String, List<String>> results = new HashMap<String, 
List<String>>();
-        Iterable<InetAddress> allHosts = 
Iterables.concat(Gossiper.instance.getLiveMembers(), 
Gossiper.instance.getUnreachableMembers());
-        for (InetAddress host : allHosts)
+        Iterable<InetAddressAndPort> allHosts = 
Iterables.concat(Gossiper.instance.getLiveMembers(), 
Gossiper.instance.getUnreachableMembers());
+        for (InetAddressAndPort host : allHosts)
         {
             UUID version = versions.get(host);
             String stringVersion = version == null ? UNREACHABLE : 
version.toString();
@@ -2377,7 +2371,7 @@ public class StorageProxy implements StorageProxyMBean
                 hosts = new ArrayList<String>();
                 results.put(stringVersion, hosts);
             }
-            hosts.add(host.getHostAddress());
+            hosts.add(host.getHostAddress(withPort));
         }
 
         // we're done: the results map is ready to return to the client.  the 
rest is just debug logging:
@@ -2485,7 +2479,7 @@ public class StorageProxy implements StorageProxyMBean
         DatabaseDescriptor.setMaxHintWindow(ms);
     }
 
-    public static boolean shouldHint(InetAddress ep)
+    public static boolean shouldHint(InetAddressAndPort ep)
     {
         if (DatabaseDescriptor.hintedHandoffEnabled())
         {
@@ -2534,7 +2528,7 @@ public class StorageProxy implements StorageProxyMBean
             throw new UnavailableException(ConsistencyLevel.ALL, liveMembers + 
Gossiper.instance.getUnreachableMembers().size(), liveMembers);
         }
 
-        Set<InetAddress> allEndpoints = 
StorageService.instance.getLiveRingMembers(true);
+        Set<InetAddressAndPort> allEndpoints = 
StorageService.instance.getLiveRingMembers(true);
 
         int blockFor = allEndpoints.size();
         final TruncateResponseHandler responseHandler = new 
TruncateResponseHandler(blockFor);
@@ -2543,7 +2537,7 @@ public class StorageProxy implements StorageProxyMBean
         Tracing.trace("Enqueuing truncate messages to hosts {}", allEndpoints);
         final Truncation truncation = new Truncation(keyspace, cfname);
         MessageOut<Truncation> message = truncation.createMessage();
-        for (InetAddress endpoint : allEndpoints)
+        for (InetAddressAndPort endpoint : allEndpoints)
             MessagingService.instance().sendRR(message, endpoint, 
responseHandler);
 
         // Wait for all
@@ -2570,7 +2564,7 @@ public class StorageProxy implements StorageProxyMBean
     public interface WritePerformer
     {
         public void apply(IMutation mutation,
-                          Iterable<InetAddress> targets,
+                          Iterable<InetAddressAndPort> targets,
                           AbstractWriteResponseHandler<IMutation> 
responseHandler,
                           String localDataCenter,
                           ConsistencyLevel consistencyLevel) throws 
OverloadedException;
@@ -2658,7 +2652,7 @@ public class StorageProxy implements StorageProxyMBean
             {
                 if (MessagingService.DROPPABLE_VERBS.contains(verb))
                     
MessagingService.instance().incrementDroppedMutations(mutationOpt, timeTaken);
-                HintRunnable runnable = new 
HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
+                HintRunnable runnable = new 
HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddressAndPort()))
                 {
                     protected void runMayThrow() throws Exception
                     {
@@ -2689,9 +2683,9 @@ public class StorageProxy implements StorageProxyMBean
      */
     private abstract static class HintRunnable implements Runnable
     {
-        public final Collection<InetAddress> targets;
+        public final Collection<InetAddressAndPort> targets;
 
-        protected HintRunnable(Collection<InetAddress> targets)
+        protected HintRunnable(Collection<InetAddressAndPort> targets)
         {
             this.targets = targets;
         }
@@ -2709,7 +2703,7 @@ public class StorageProxy implements StorageProxyMBean
             finally
             {
                 StorageMetrics.totalHintsInProgress.dec(targets.size());
-                for (InetAddress target : targets)
+                for (InetAddressAndPort target : targets)
                     getHintsInProgressFor(target).decrementAndGet();
             }
         }
@@ -2743,7 +2737,7 @@ public class StorageProxy implements StorageProxyMBean
             logger.warn("Some hints were not written before shutdown.  This is 
not supposed to happen.  You should (a) run repair, and (b) file a bug report");
     }
 
-    private static AtomicInteger getHintsInProgressFor(InetAddress destination)
+    private static AtomicInteger getHintsInProgressFor(InetAddressAndPort 
destination)
     {
         try
         {
@@ -2755,22 +2749,22 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    public static Future<Void> submitHint(Mutation mutation, InetAddress 
target, AbstractWriteResponseHandler<IMutation> responseHandler)
+    public static Future<Void> submitHint(Mutation mutation, 
InetAddressAndPort target, AbstractWriteResponseHandler<IMutation> 
responseHandler)
     {
         return submitHint(mutation, Collections.singleton(target), 
responseHandler);
     }
 
     public static Future<Void> submitHint(Mutation mutation,
-                                          Collection<InetAddress> targets,
+                                          Collection<InetAddressAndPort> 
targets,
                                           
AbstractWriteResponseHandler<IMutation> responseHandler)
     {
         HintRunnable runnable = new HintRunnable(targets)
         {
             public void runMayThrow()
             {
-                Set<InetAddress> validTargets = new HashSet<>(targets.size());
+                Set<InetAddressAndPort> validTargets = new 
HashSet<>(targets.size());
                 Set<UUID> hostIds = new HashSet<>(targets.size());
-                for (InetAddress target : targets)
+                for (InetAddressAndPort target : targets)
                 {
                     UUID hostId = 
StorageService.instance.getHostIdForEndpoint(target);
                     if (hostId != null)
@@ -2796,7 +2790,7 @@ public class StorageProxy implements StorageProxyMBean
     private static Future<Void> submitHint(HintRunnable runnable)
     {
         StorageMetrics.totalHintsInProgress.inc(runnable.targets.size());
-        for (InetAddress target : runnable.targets)
+        for (InetAddressAndPort target : runnable.targets)
             getHintsInProgressFor(target).incrementAndGet();
         return (Future<Void>) 
StageManager.getStage(Stage.MUTATION).submit(runnable);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java 
b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 173d43f..76a6617 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -65,7 +65,8 @@ public interface StorageProxyMBean
     public void setOtcBacklogExpirationInterval(int intervalInMillis);
 
     /** Returns each live node's schema version */
-    public Map<String, List<String>> getSchemaVersions();
+    @Deprecated public Map<String, List<String>> getSchemaVersions();
+    public Map<String, List<String>> getSchemaVersionsWithPort();
 
     public int getNumberOfTables();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to