http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java 
b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 8c0ed1e..f01197d 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -52,11 +52,13 @@ public class ConfigHelper
     private static final int DEFAULT_RANGE_BATCH_SIZE = 4096;
     private static final String INPUT_INITIAL_ADDRESS = 
"cassandra.input.address";
     private static final String OUTPUT_INITIAL_ADDRESS = 
"cassandra.output.address";
+    private static final String OUTPUT_INITIAL_PORT = "cassandra.output.port";
     private static final String READ_CONSISTENCY_LEVEL = 
"cassandra.consistencylevel.read";
     private static final String WRITE_CONSISTENCY_LEVEL = 
"cassandra.consistencylevel.write";
     private static final String OUTPUT_COMPRESSION_CLASS = 
"cassandra.output.compression.class";
     private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = 
"cassandra.output.compression.length";
     private static final String OUTPUT_LOCAL_DC_ONLY = 
"cassandra.output.local.dc.only";
+    private static final String DEFAULT_CASSANDRA_NATIVE_PORT = "7000";
 
     private static final Logger logger = 
LoggerFactory.getLogger(ConfigHelper.class);
 
@@ -349,6 +351,16 @@ public class ConfigHelper
         return conf.get(OUTPUT_INITIAL_ADDRESS);
     }
 
+    public static void setOutputInitialPort(Configuration conf, Integer port)
+    {
+        conf.set(OUTPUT_INITIAL_PORT, port.toString());
+    }
+
+    public static Integer getOutputInitialPort(Configuration conf)
+    {
+        return Integer.valueOf(conf.get(OUTPUT_INITIAL_PORT, 
DEFAULT_CASSANDRA_NATIVE_PORT));
+    }
+
     public static void setOutputInitialAddress(Configuration conf, String 
address)
     {
         conf.set(OUTPUT_INITIAL_ADDRESS, address);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index 0f44e0c..204d9ee 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -21,11 +21,13 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 
+import com.google.common.net.HostAndPort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +41,7 @@ import org.apache.cassandra.hadoop.HadoopCompat;
 import org.apache.cassandra.io.sstable.CQLSSTableWriter;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.utils.NativeSSTableLoaderClient;
@@ -80,7 +83,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, 
List<ByteBuffer>>
     protected SSTableLoader loader;
     protected Progressable progress;
     protected TaskAttemptContext context;
-    protected final Set<InetAddress> ignores = new HashSet<>();
+    protected final Set<InetAddressAndPort> ignores = new HashSet<>();
 
     private String keyspace;
     private String table;
@@ -139,7 +142,7 @@ public class CqlBulkRecordWriter extends 
RecordWriter<Object, List<ByteBuffer>>
         try
         {
             for (String hostToIgnore : 
CqlBulkOutputFormat.getIgnoreHosts(conf))
-                ignores.add(InetAddress.getByName(hostToIgnore));
+                ignores.add(InetAddressAndPort.getByName(hostToIgnore));
         }
         catch (UnknownHostException e)
         {
@@ -285,20 +288,23 @@ public class CqlBulkRecordWriter extends 
RecordWriter<Object, List<ByteBuffer>>
         {
             super(resolveHostAddresses(conf),
                   CqlConfigHelper.getOutputNativePort(conf),
+                  ConfigHelper.getOutputInitialPort(conf),
                   ConfigHelper.getOutputKeyspaceUserName(conf),
                   ConfigHelper.getOutputKeyspacePassword(conf),
-                  CqlConfigHelper.getSSLOptions(conf).orNull());
+                  CqlConfigHelper.getSSLOptions(conf).orNull(),
+                  CqlConfigHelper.getAllowServerPortDiscovery(conf));
         }
 
-        private static Collection<InetAddress> 
resolveHostAddresses(Configuration conf)
+        private static Collection<InetSocketAddress> 
resolveHostAddresses(Configuration conf)
         {
-            Set<InetAddress> addresses = new HashSet<>();
-
+            Set<InetSocketAddress> addresses = new HashSet<>();
+            int port = CqlConfigHelper.getOutputNativePort(conf);
             for (String host : 
ConfigHelper.getOutputInitialAddress(conf).split(","))
             {
                 try
                 {
-                    addresses.add(InetAddress.getByName(host));
+                    HostAndPort hap = HostAndPort.fromString(host);
+                    addresses.add(new 
InetSocketAddress(InetAddress.getByName(hap.getHost()), 
hap.getPortOrDefault(port)));
                 }
                 catch (UnknownHostException e)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index f9a6f3a..3a47a72 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -88,6 +88,7 @@ public class CqlConfigHelper
 
     private static final String OUTPUT_CQL = "cassandra.output.cql";
     private static final String OUTPUT_NATIVE_PORT = 
"cassandra.output.native.port";
+    private static final String ALLOW_SERVER_PORT_DISCOVERY = 
"cassandra.allowserverportdiscovery";
 
     /**
      * Set the CQL columns for the input of this job.
@@ -651,4 +652,15 @@ public class CqlConfigHelper
                  new SecureRandom());
         return ctx;
     }
+
+    public static void setAllowServerPortDiscovery(Configuration conf, boolean 
allowServerPortDiscovery)
+    {
+        conf.set(ALLOW_SERVER_PORT_DISCOVERY, 
Boolean.toString(allowServerPortDiscovery));
+    }
+
+    public static boolean getAllowServerPortDiscovery(Configuration conf)
+    {
+        return Boolean.parseBoolean(conf.get(ALLOW_SERVER_PORT_DISCOVERY, 
"false"));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintVerbHandler.java 
b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
index 2b92a42..cec6f0b 100644
--- a/src/java/org/apache/cassandra/hints/HintVerbHandler.java
+++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
@@ -18,13 +18,13 @@
  */
 package org.apache.cassandra.hints;
 
-import java.net.InetAddress;
 import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
@@ -47,7 +47,7 @@ public final class HintVerbHandler implements 
IVerbHandler<HintMessage>
     {
         UUID hostId = message.payload.hostId;
         Hint hint = message.payload.hint;
-        InetAddress address = 
StorageService.instance.getEndpointForHostId(hostId);
+        InetAddressAndPort address = 
StorageService.instance.getEndpointForHostId(hostId);
 
         // If we see an unknown table id, it means the table, or one of the 
tables in the mutation, had been dropped.
         // In that case there is nothing we can really do, or should do, other 
than log it go on.
@@ -96,7 +96,7 @@ public final class HintVerbHandler implements 
IVerbHandler<HintMessage>
         }
     }
 
-    private static void reply(int id, InetAddress to)
+    private static void reply(int id, InetAddressAndPort to)
     {
         MessagingService.instance().sendReply(HintResponse.message, id, to);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java 
b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index 58a3e6f..cbbb212 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.hints;
 
 import java.io.File;
-import java.net.InetAddress;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.*;
@@ -36,6 +35,7 @@ import 
org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.service.StorageService;
 
 /**
@@ -50,10 +50,10 @@ final class HintsDispatchExecutor
     private final File hintsDirectory;
     private final ExecutorService executor;
     private final AtomicBoolean isPaused;
-    private final Predicate<InetAddress> isAlive;
+    private final Predicate<InetAddressAndPort> isAlive;
     private final Map<UUID, Future> scheduledDispatches;
 
-    HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean 
isPaused, Predicate<InetAddress> isAlive)
+    HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean 
isPaused, Predicate<InetAddressAndPort> isAlive)
     {
         this.hintsDirectory = hintsDirectory;
         this.isPaused = isPaused;
@@ -154,7 +154,7 @@ final class HintsDispatchExecutor
         public void run()
         {
             UUID hostId = hostIdSupplier.get();
-            InetAddress address = 
StorageService.instance.getEndpointForHostId(hostId);
+            InetAddressAndPort address = 
StorageService.instance.getEndpointForHostId(hostId);
             logger.info("Transferring all hints to {}: {}", address, hostId);
             if (transfer(hostId))
                 return;
@@ -257,7 +257,7 @@ final class HintsDispatchExecutor
         {
             logger.trace("Dispatching hints file {}", descriptor.fileName());
 
-            InetAddress address = 
StorageService.instance.getEndpointForHostId(hostId);
+            InetAddressAndPort address = 
StorageService.instance.getEndpointForHostId(hostId);
             if (address != null)
                 return deliver(descriptor, address);
 
@@ -266,7 +266,7 @@ final class HintsDispatchExecutor
             return true;
         }
 
-        private boolean deliver(HintsDescriptor descriptor, InetAddress 
address)
+        private boolean deliver(HintsDescriptor descriptor, InetAddressAndPort 
address)
         {
             File file = new File(hintsDirectory, descriptor.fileName());
             InputPosition offset = store.getDispatchOffset(descriptor);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java 
b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
index 34d1eb2..ca38c0c 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
@@ -23,7 +23,7 @@ import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.schema.Schema;
 
-import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddress;
+import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 
 /**
  * A simple dispatch trigger that's being run every 10 seconds.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java 
b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 323eeb1..d0d9aac 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.hints;
 
 import java.io.File;
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
@@ -31,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.monitoring.ApproximateTime;
 import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.HintsServiceMetrics;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
@@ -51,13 +51,13 @@ final class HintsDispatcher implements AutoCloseable
 
     private final HintsReader reader;
     private final UUID hostId;
-    private final InetAddress address;
+    private final InetAddressAndPort address;
     private final int messagingVersion;
     private final BooleanSupplier abortRequested;
 
     private InputPosition currentPagePosition;
 
-    private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress 
address, int messagingVersion, BooleanSupplier abortRequested)
+    private HintsDispatcher(HintsReader reader, UUID hostId, 
InetAddressAndPort address, int messagingVersion, BooleanSupplier 
abortRequested)
     {
         currentPagePosition = null;
 
@@ -68,7 +68,7 @@ final class HintsDispatcher implements AutoCloseable
         this.abortRequested = abortRequested;
     }
 
-    static HintsDispatcher create(File file, RateLimiter rateLimiter, 
InetAddress address, UUID hostId, BooleanSupplier abortRequested)
+    static HintsDispatcher create(File file, RateLimiter rateLimiter, 
InetAddressAndPort address, UUID hostId, BooleanSupplier abortRequested)
     {
         int messagingVersion = MessagingService.instance().getVersion(address);
         return new HintsDispatcher(HintsReader.open(file, rateLimiter), 
hostId, address, messagingVersion, abortRequested);
@@ -228,7 +228,7 @@ final class HintsDispatcher implements AutoCloseable
             return timedOut ? Outcome.TIMEOUT : outcome;
         }
 
-        public void onFailure(InetAddress from, RequestFailureReason 
failureReason)
+        public void onFailure(InetAddressAndPort from, RequestFailureReason 
failureReason)
         {
             outcome = Outcome.FAILURE;
             condition.signalAll();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java 
b/src/java/org/apache/cassandra/hints/HintsService.java
index 3d82c02..5c331d0 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.hints;
 
 import java.io.File;
 import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.UUID;
@@ -40,6 +39,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.HintedHandoffMetrics;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.dht.Token;
@@ -267,10 +267,10 @@ public final class HintsService implements 
HintsServiceMBean
      */
     public void deleteAllHintsForEndpoint(String address)
     {
-        InetAddress target;
+        InetAddressAndPort target;
         try
         {
-            target = InetAddress.getByName(address);
+            target = InetAddressAndPort.getByName(address);
         }
         catch (UnknownHostException e)
         {
@@ -284,7 +284,7 @@ public final class HintsService implements HintsServiceMBean
      *
      * @param target inet address of the target node
      */
-    public void deleteAllHintsForEndpoint(InetAddress target)
+    public void deleteAllHintsForEndpoint(InetAddressAndPort target)
     {
         UUID hostId = StorageService.instance.getHostIdForEndpoint(target);
         if (hostId == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java 
b/src/java/org/apache/cassandra/hints/HintsStore.java
index 3572172..bbf57f5 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.hints;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -31,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.SyncUtil;
 
@@ -77,14 +77,14 @@ final class HintsStore
         return new HintsStore(hostId, hintsDirectory, writerParams, 
descriptors);
     }
 
-    InetAddress address()
+    InetAddressAndPort address()
     {
         return StorageService.instance.getEndpointForHostId(hostId);
     }
 
     boolean isLive()
     {
-        InetAddress address = address();
+        InetAddressAndPort address = address();
         return address != null && FailureDetector.instance.isAlive(address);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/io/DummyByteVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/DummyByteVersionedSerializer.java 
b/src/java/org/apache/cassandra/io/DummyByteVersionedSerializer.java
new file mode 100644
index 0000000..d82ff7d
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/DummyByteVersionedSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Serializes a dummy byte that can't be set. Will always write 0 and return 0 
in a correctly formed message.
+ */
+public class DummyByteVersionedSerializer implements 
IVersionedSerializer<byte[]>
+{
+    public static final DummyByteVersionedSerializer instance = new 
DummyByteVersionedSerializer();
+
+    private DummyByteVersionedSerializer() {}
+
+    public void serialize(byte[] bytes, DataOutputPlus out, int version) 
throws IOException
+    {
+        Preconditions.checkArgument(bytes == MessagingService.ONE_BYTE);
+        out.write(0);
+    }
+
+    public byte[] deserialize(DataInputPlus in, int version) throws IOException
+    {
+        assert(0 == in.readByte());
+        return MessagingService.ONE_BYTE;
+    }
+
+    public long serializedSize(byte[] bytes, int version)
+    {
+        //Payload
+        return 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/io/ShortVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/ShortVersionedSerializer.java 
b/src/java/org/apache/cassandra/io/ShortVersionedSerializer.java
new file mode 100644
index 0000000..8731f4c
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/ShortVersionedSerializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public class ShortVersionedSerializer implements IVersionedSerializer<Short>
+{
+
+    public static final ShortVersionedSerializer instance = new 
ShortVersionedSerializer();
+
+    private ShortVersionedSerializer() {}
+
+    public void serialize(Short aShort, DataOutputPlus out, int version) 
throws IOException
+    {
+        out.writeShort(aShort);
+    }
+
+    public Short deserialize(DataInputPlus in, int version) throws IOException
+    {
+        return in.readShort();
+    }
+
+    public long serializedSize(Short aShort, int version)
+    {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 9fb3059..7d77ad5 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -19,12 +19,12 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.*;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.db.Directories;
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.utils.OutputHandler;
 import org.apache.cassandra.utils.Pair;
@@ -50,10 +49,10 @@ public class SSTableLoader implements StreamEventHandler
     private final Client client;
     private final int connectionsPerHost;
     private final OutputHandler outputHandler;
-    private final Set<InetAddress> failedHosts = new HashSet<>();
+    private final Set<InetAddressAndPort> failedHosts = new HashSet<>();
 
     private final List<SSTableReader> sstables = new ArrayList<>();
-    private final Multimap<InetAddress, 
StreamSession.SSTableStreamingSections> streamingDetails = 
HashMultimap.create();
+    private final Multimap<InetAddressAndPort, 
StreamSession.SSTableStreamingSections> streamingDetails = 
HashMultimap.create();
 
     public SSTableLoader(File directory, Client client, OutputHandler 
outputHandler)
     {
@@ -70,7 +69,7 @@ public class SSTableLoader implements StreamEventHandler
     }
 
     @SuppressWarnings("resource")
-    protected Collection<SSTableReader> openSSTables(final Map<InetAddress, 
Collection<Range<Token>>> ranges)
+    protected Collection<SSTableReader> openSSTables(final 
Map<InetAddressAndPort, Collection<Range<Token>>> ranges)
     {
         outputHandler.output("Opening sstables and calculating sections to 
stream");
 
@@ -124,9 +123,9 @@ public class SSTableLoader implements StreamEventHandler
 
                                               // calculate the sstable 
sections to stream as well as the estimated number of
                                               // keys per host
-                                              for (Map.Entry<InetAddress, 
Collection<Range<Token>>> entry : ranges.entrySet())
+                                              for 
(Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : 
ranges.entrySet())
                                               {
-                                                  InetAddress endpoint = 
entry.getKey();
+                                                  InetAddressAndPort endpoint 
= entry.getKey();
                                                   Collection<Range<Token>> 
tokenRanges = entry.getValue();
 
                                                   List<Pair<Long, Long>> 
sstableSections = sstable.getPositionsForRanges(tokenRanges);
@@ -153,17 +152,17 @@ public class SSTableLoader implements StreamEventHandler
 
     public StreamResultFuture stream()
     {
-        return stream(Collections.<InetAddress>emptySet());
+        return stream(Collections.<InetAddressAndPort>emptySet());
     }
 
-    public StreamResultFuture stream(Set<InetAddress> toIgnore, 
StreamEventHandler... listeners)
+    public StreamResultFuture stream(Set<InetAddressAndPort> toIgnore, 
StreamEventHandler... listeners)
     {
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
 
         StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, 
connectionsPerHost, false, false, null, 
PreviewKind.NONE).connectionFactory(client.getConnectionFactory());
 
-        Map<InetAddress, Collection<Range<Token>>> endpointToRanges = 
client.getEndpointToRangesMap();
+        Map<InetAddressAndPort, Collection<Range<Token>>> endpointToRanges = 
client.getEndpointToRangesMap();
         openSSTables(endpointToRanges);
         if (sstables.isEmpty())
         {
@@ -173,9 +172,9 @@ public class SSTableLoader implements StreamEventHandler
 
         outputHandler.output(String.format("Streaming relevant part of %s to 
%s", names(sstables), endpointToRanges.keySet()));
 
-        for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : 
endpointToRanges.entrySet())
+        for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : 
endpointToRanges.entrySet())
         {
-            InetAddress remote = entry.getKey();
+            InetAddressAndPort remote = entry.getKey();
             if (toIgnore.contains(remote))
                 continue;
 
@@ -232,14 +231,14 @@ public class SSTableLoader implements StreamEventHandler
         return builder.toString();
     }
 
-    public Set<InetAddress> getFailedHosts()
+    public Set<InetAddressAndPort> getFailedHosts()
     {
         return failedHosts;
     }
 
     public static abstract class Client
     {
-        private final Map<InetAddress, Collection<Range<Token>>> 
endpointToRanges = new HashMap<>();
+        private final Map<InetAddressAndPort, Collection<Range<Token>>> 
endpointToRanges = new HashMap<>();
 
         /**
          * Initialize the client.
@@ -281,12 +280,12 @@ public class SSTableLoader implements StreamEventHandler
             throw new RuntimeException();
         }
 
-        public Map<InetAddress, Collection<Range<Token>>> 
getEndpointToRangesMap()
+        public Map<InetAddressAndPort, Collection<Range<Token>>> 
getEndpointToRangesMap()
         {
             return endpointToRanges;
         }
 
-        protected void addRangeForEndpoint(Range<Token> range, InetAddress 
endpoint)
+        protected void addRangeForEndpoint(Range<Token> range, 
InetAddressAndPort endpoint)
         {
             Collection<Range<Token>> ranges = endpointToRanges.get(endpoint);
             if (ranges == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java 
b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
index 546d15e..2ee8eea 100644
--- a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
@@ -17,14 +17,13 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.util.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 
 public abstract class AbstractEndpointSnitch implements IEndpointSnitch
 {
-    public abstract int compareEndpoints(InetAddress target, InetAddress a1, 
InetAddress a2);
+    public abstract int compareEndpoints(InetAddressAndPort target, 
InetAddressAndPort a1, InetAddressAndPort a2);
 
     /**
      * Sorts the <tt>Collection</tt> of node addresses by proximity to the 
given address
@@ -32,9 +31,9 @@ public abstract class AbstractEndpointSnitch implements 
IEndpointSnitch
      * @param unsortedAddress the nodes to sort
      * @return a new sorted <tt>List</tt>
      */
-    public List<InetAddress> getSortedListByProximity(InetAddress address, 
Collection<InetAddress> unsortedAddress)
+    public List<InetAddressAndPort> 
getSortedListByProximity(InetAddressAndPort address, 
Collection<InetAddressAndPort> unsortedAddress)
     {
-        List<InetAddress> preferred = new 
ArrayList<InetAddress>(unsortedAddress);
+        List<InetAddressAndPort> preferred = new ArrayList<>(unsortedAddress);
         sortByProximity(address, preferred);
         return preferred;
     }
@@ -44,11 +43,11 @@ public abstract class AbstractEndpointSnitch implements 
IEndpointSnitch
      * @param address the address to sort the proximity by
      * @param addresses the nodes to sort
      */
-    public void sortByProximity(final InetAddress address, List<InetAddress> 
addresses)
+    public void sortByProximity(final InetAddressAndPort address, 
List<InetAddressAndPort> addresses)
     {
-        Collections.sort(addresses, new Comparator<InetAddress>()
+        Collections.sort(addresses, new Comparator<InetAddressAndPort>()
         {
-            public int compare(InetAddress a1, InetAddress a2)
+            public int compare(InetAddressAndPort a1, InetAddressAndPort a2)
             {
                 return compareEndpoints(address, a1, a2);
             }
@@ -60,7 +59,7 @@ public abstract class AbstractEndpointSnitch implements 
IEndpointSnitch
         // noop by default
     }
 
-    public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, 
List<InetAddress> l1, List<InetAddress> l2)
+    public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> 
merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2)
     {
         // Querying remote DC is likely to be an order of magnitude slower than
         // querying locally, so 2 queries to local nodes is likely to still be
@@ -71,10 +70,10 @@ public abstract class AbstractEndpointSnitch implements 
IEndpointSnitch
              : true;
     }
 
-    private boolean hasRemoteNode(List<InetAddress> l)
+    private boolean hasRemoteNode(List<InetAddressAndPort> l)
     {
         String localDc = DatabaseDescriptor.getLocalDataCenter();
-        for (InetAddress ep : l)
+        for (InetAddressAndPort ep : l)
         {
             if (!localDc.equals(getDatacenter(ep)))
                 return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java 
b/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
index b5606d6..e91f6ac 100644
--- a/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
+++ b/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
-
 /**
  * An endpoint snitch tells Cassandra information about network topology that 
it can use to route
  * requests more efficiently.
@@ -30,16 +28,16 @@ public abstract class AbstractNetworkTopologySnitch extends 
AbstractEndpointSnit
      * @param endpoint a specified endpoint
      * @return string of rack
      */
-    abstract public String getRack(InetAddress endpoint);
+    abstract public String getRack(InetAddressAndPort endpoint);
 
     /**
      * Return the data center for which an endpoint resides in
      * @param endpoint a specified endpoint
      * @return string of data center
      */
-    abstract public String getDatacenter(InetAddress endpoint);
+    abstract public String getDatacenter(InetAddressAndPort endpoint);
 
-    public int compareEndpoints(InetAddress address, InetAddress a1, 
InetAddress a2)
+    public int compareEndpoints(InetAddressAndPort address, InetAddressAndPort 
a1, InetAddressAndPort a2)
     {
         if (address.equals(a1) && !address.equals(a2))
             return -1;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java 
b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index c3498d9..3e9b5bb 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.locator;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.net.InetAddress;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -74,9 +73,9 @@ public abstract class AbstractReplicationStrategy
         // lazy-initialize keyspace itself since we don't create them until 
after the replication strategies
     }
 
-    private final Map<Token, ArrayList<InetAddress>> cachedEndpoints = new 
NonBlockingHashMap<Token, ArrayList<InetAddress>>();
+    private final Map<Token, ArrayList<InetAddressAndPort>> cachedEndpoints = 
new NonBlockingHashMap<Token, ArrayList<InetAddressAndPort>>();
 
-    public ArrayList<InetAddress> getCachedEndpoints(Token t)
+    public ArrayList<InetAddressAndPort> getCachedEndpoints(Token t)
     {
         long lastVersion = tokenMetadata.getRingVersion();
 
@@ -103,21 +102,21 @@ public abstract class AbstractReplicationStrategy
      * @param searchPosition the position the natural endpoints are requested 
for
      * @return a copy of the natural endpoints for the given token
      */
-    public ArrayList<InetAddress> getNaturalEndpoints(RingPosition 
searchPosition)
+    public ArrayList<InetAddressAndPort> getNaturalEndpoints(RingPosition 
searchPosition)
     {
         Token searchToken = searchPosition.getToken();
         Token keyToken = 
TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
-        ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
+        ArrayList<InetAddressAndPort> endpoints = getCachedEndpoints(keyToken);
         if (endpoints == null)
         {
             TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap();
             // if our cache got invalidated, it's possible there is a new 
token to account for too
             keyToken = TokenMetadata.firstToken(tm.sortedTokens(), 
searchToken);
-            endpoints = new 
ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tm));
+            endpoints = new 
ArrayList<InetAddressAndPort>(calculateNaturalEndpoints(searchToken, tm));
             cachedEndpoints.put(keyToken, endpoints);
         }
 
-        return new ArrayList<InetAddress>(endpoints);
+        return new ArrayList<InetAddressAndPort>(endpoints);
     }
 
     /**
@@ -128,10 +127,10 @@ public abstract class AbstractReplicationStrategy
      * @param searchToken the token the natural endpoints are requested for
      * @return a copy of the natural endpoints for the given token
      */
-    public abstract List<InetAddress> calculateNaturalEndpoints(Token 
searchToken, TokenMetadata tokenMetadata);
+    public abstract List<InetAddressAndPort> calculateNaturalEndpoints(Token 
searchToken, TokenMetadata tokenMetadata);
 
-    public <T> AbstractWriteResponseHandler<T> 
getWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
-                                                                       
Collection<InetAddress> pendingEndpoints,
+    public <T> AbstractWriteResponseHandler<T> 
getWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints,
+                                                                       
Collection<InetAddressAndPort> pendingEndpoints,
                                                                        
ConsistencyLevel consistency_level,
                                                                        
Runnable callback,
                                                                        
WriteType writeType,
@@ -140,8 +139,8 @@ public abstract class AbstractReplicationStrategy
         return getWriteResponseHandler(naturalEndpoints, pendingEndpoints, 
consistency_level, callback, writeType, queryStartNanoTime, 
DatabaseDescriptor.getIdealConsistencyLevel());
     }
 
-    public <T> AbstractWriteResponseHandler<T> 
getWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
-                                                                       
Collection<InetAddress> pendingEndpoints,
+    public <T> AbstractWriteResponseHandler<T> 
getWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints,
+                                                                       
Collection<InetAddressAndPort> pendingEndpoints,
                                                                        
ConsistencyLevel consistency_level,
                                                                        
Runnable callback,
                                                                        
WriteType writeType,
@@ -211,14 +210,14 @@ public abstract class AbstractReplicationStrategy
      * (fixing this would probably require merging tokenmetadata into 
replicationstrategy,
      * so we could cache/invalidate cleanly.)
      */
-    public Multimap<InetAddress, Range<Token>> getAddressRanges(TokenMetadata 
metadata)
+    public Multimap<InetAddressAndPort, Range<Token>> 
getAddressRanges(TokenMetadata metadata)
     {
-        Multimap<InetAddress, Range<Token>> map = HashMultimap.create();
+        Multimap<InetAddressAndPort, Range<Token>> map = HashMultimap.create();
 
         for (Token token : metadata.sortedTokens())
         {
             Range<Token> range = metadata.getPrimaryRangeFor(token);
-            for (InetAddress ep : calculateNaturalEndpoints(token, metadata))
+            for (InetAddressAndPort ep : calculateNaturalEndpoints(token, 
metadata))
             {
                 map.put(ep, range);
             }
@@ -227,14 +226,14 @@ public abstract class AbstractReplicationStrategy
         return map;
     }
 
-    public Multimap<Range<Token>, InetAddress> getRangeAddresses(TokenMetadata 
metadata)
+    public Multimap<Range<Token>, InetAddressAndPort> 
getRangeAddresses(TokenMetadata metadata)
     {
-        Multimap<Range<Token>, InetAddress> map = HashMultimap.create();
+        Multimap<Range<Token>, InetAddressAndPort> map = HashMultimap.create();
 
         for (Token token : metadata.sortedTokens())
         {
             Range<Token> range = metadata.getPrimaryRangeFor(token);
-            for (InetAddress ep : calculateNaturalEndpoints(token, metadata))
+            for (InetAddressAndPort ep : calculateNaturalEndpoints(token, 
metadata))
             {
                 map.put(range, ep);
             }
@@ -243,17 +242,17 @@ public abstract class AbstractReplicationStrategy
         return map;
     }
 
-    public Multimap<InetAddress, Range<Token>> getAddressRanges()
+    public Multimap<InetAddressAndPort, Range<Token>> getAddressRanges()
     {
         return getAddressRanges(tokenMetadata.cloneOnlyTokenMap());
     }
 
-    public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata 
metadata, Token pendingToken, InetAddress pendingAddress)
+    public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata 
metadata, Token pendingToken, InetAddressAndPort pendingAddress)
     {
         return getPendingAddressRanges(metadata, Arrays.asList(pendingToken), 
pendingAddress);
     }
 
-    public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata 
metadata, Collection<Token> pendingTokens, InetAddress pendingAddress)
+    public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata 
metadata, Collection<Token> pendingTokens, InetAddressAndPort pendingAddress)
     {
         TokenMetadata temp = metadata.cloneOnlyTokenMap();
         temp.updateNormalTokens(pendingTokens, pendingAddress);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/CloudstackSnitch.java 
b/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
index ec2e87e..be6d3c4 100644
--- a/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
+++ b/src/java/org/apache/cassandra/locator/CloudstackSnitch.java
@@ -24,7 +24,6 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.File;
 import java.net.HttpURLConnection;
-import java.net.InetAddress;
 import java.net.URL;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
@@ -56,7 +55,7 @@ public class CloudstackSnitch extends 
AbstractNetworkTopologySnitch
     protected static final Logger logger = 
LoggerFactory.getLogger(CloudstackSnitch.class);
     protected static final String ZONE_NAME_QUERY_URI = 
"/latest/meta-data/availability-zone";
 
-    private Map<InetAddress, Map<String, String>> savedEndpoints;
+    private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
 
     private static final String DEFAULT_DC = "UNKNOWN-DC";
     private static final String DEFAULT_RACK = "UNKNOWN-RACK";
@@ -83,9 +82,9 @@ public class CloudstackSnitch extends 
AbstractNetworkTopologySnitch
         csZoneRack = zone_parts[2];
     }
 
-    public String getRack(InetAddress endpoint)
+    public String getRack(InetAddressAndPort endpoint)
     {
-        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
             return csZoneRack;
         EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
         if (state == null || state.getApplicationState(ApplicationState.RACK) 
== null)
@@ -99,9 +98,9 @@ public class CloudstackSnitch extends 
AbstractNetworkTopologySnitch
         return state.getApplicationState(ApplicationState.RACK).value;
     }
 
-    public String getDatacenter(InetAddress endpoint)
+    public String getDatacenter(InetAddressAndPort endpoint)
     {
-        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
             return csZoneDc;
         EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
         if (state == null || state.getApplicationState(ApplicationState.DC) == 
null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 42fc26c..b9c9ba0 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -25,6 +25,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import javax.management.MBeanServer;
@@ -63,8 +64,8 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
     private String mbeanName;
     private boolean registered = false;
 
-    private volatile HashMap<InetAddress, Double> scores = new HashMap<>();
-    private final ConcurrentHashMap<InetAddress, 
ExponentiallyDecayingReservoir> samples = new ConcurrentHashMap<>();
+    private volatile HashMap<InetAddressAndPort, Double> scores = new 
HashMap<>();
+    private final ConcurrentHashMap<InetAddressAndPort, 
ExponentiallyDecayingReservoir> samples = new ConcurrentHashMap<>();
 
     public final IEndpointSnitch subsnitch;
 
@@ -174,27 +175,27 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
         subsnitch.gossiperStarting();
     }
 
-    public String getRack(InetAddress endpoint)
+    public String getRack(InetAddressAndPort endpoint)
     {
         return subsnitch.getRack(endpoint);
     }
 
-    public String getDatacenter(InetAddress endpoint)
+    public String getDatacenter(InetAddressAndPort endpoint)
     {
         return subsnitch.getDatacenter(endpoint);
     }
 
-    public List<InetAddress> getSortedListByProximity(final InetAddress 
address, Collection<InetAddress> addresses)
+    public List<InetAddressAndPort> getSortedListByProximity(final 
InetAddressAndPort address, Collection<InetAddressAndPort> addresses)
     {
-        List<InetAddress> list = new ArrayList<InetAddress>(addresses);
+        List<InetAddressAndPort> list = new ArrayList<>(addresses);
         sortByProximity(address, list);
         return list;
     }
 
     @Override
-    public void sortByProximity(final InetAddress address, List<InetAddress> 
addresses)
+    public void sortByProximity(final InetAddressAndPort address, 
List<InetAddressAndPort> addresses)
     {
-        assert address.equals(FBUtilities.getBroadcastAddress()); // we only 
know about ourself
+        assert address.equals(FBUtilities.getBroadcastAddressAndPort()); // we 
only know about ourself
         if (dynamicBadnessThreshold == 0)
         {
             sortByProximityWithScore(address, addresses);
@@ -205,32 +206,32 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
         }
     }
 
-    private void sortByProximityWithScore(final InetAddress address, 
List<InetAddress> addresses)
+    private void sortByProximityWithScore(final InetAddressAndPort address, 
List<InetAddressAndPort> addresses)
     {
         // Scores can change concurrently from a call to this method. But 
Collections.sort() expects
         // its comparator to be "stable", that is 2 endpoint should compare 
the same way for the duration
         // of the sort() call. As we copy the scores map on write, it is thus 
enough to alias the current
         // version of it during this call.
-        final HashMap<InetAddress, Double> scores = this.scores;
-        Collections.sort(addresses, new Comparator<InetAddress>()
+        final HashMap<InetAddressAndPort, Double> scores = this.scores;
+        Collections.sort(addresses, new Comparator<InetAddressAndPort>()
         {
-            public int compare(InetAddress a1, InetAddress a2)
+            public int compare(InetAddressAndPort a1, InetAddressAndPort a2)
             {
                 return compareEndpoints(address, a1, a2, scores);
             }
         });
     }
 
-    private void sortByProximityWithBadness(final InetAddress address, 
List<InetAddress> addresses)
+    private void sortByProximityWithBadness(final InetAddressAndPort address, 
List<InetAddressAndPort> addresses)
     {
         if (addresses.size() < 2)
             return;
 
         subsnitch.sortByProximity(address, addresses);
-        HashMap<InetAddress, Double> scores = this.scores; // Make sure the 
score don't change in the middle of the loop below
+        HashMap<InetAddressAndPort, Double> scores = this.scores; // Make sure 
the score don't change in the middle of the loop below
                                                            // (which wouldn't 
really matter here but its cleaner that way).
         ArrayList<Double> subsnitchOrderedScores = new 
ArrayList<>(addresses.size());
-        for (InetAddress inet : addresses)
+        for (InetAddressAndPort inet : addresses)
         {
             Double score = scores.get(inet);
             if (score == null)
@@ -256,7 +257,7 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
     }
 
     // Compare endpoints given an immutable snapshot of the scores
-    private int compareEndpoints(InetAddress target, InetAddress a1, 
InetAddress a2, Map<InetAddress, Double> scores)
+    private int compareEndpoints(InetAddressAndPort target, InetAddressAndPort 
a1, InetAddressAndPort a2, Map<InetAddressAndPort, Double> scores)
     {
         Double scored1 = scores.get(a1);
         Double scored2 = scores.get(a2);
@@ -279,7 +280,7 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
             return 1;
     }
 
-    public int compareEndpoints(InetAddress target, InetAddress a1, 
InetAddress a2)
+    public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort 
a1, InetAddressAndPort a2)
     {
         // That function is fundamentally unsafe because the scores can change 
at any time and so the result of that
         // method is not stable for identical arguments. This is why we don't 
rely on super.sortByProximity() in
@@ -287,7 +288,7 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
         throw new UnsupportedOperationException("You shouldn't wrap the 
DynamicEndpointSnitch (within itself or otherwise)");
     }
 
-    public void receiveTiming(InetAddress host, long latency) // this is cheap
+    public void receiveTiming(InetAddressAndPort host, long latency) // this 
is cheap
     {
         ExponentiallyDecayingReservoir sample = samples.get(host);
         if (sample == null)
@@ -315,23 +316,23 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
         }
         double maxLatency = 1;
 
-        Map<InetAddress, Snapshot> snapshots = new HashMap<>(samples.size());
-        for (Map.Entry<InetAddress, ExponentiallyDecayingReservoir> entry : 
samples.entrySet())
+        Map<InetAddressAndPort, Snapshot> snapshots = new 
HashMap<>(samples.size());
+        for (Map.Entry<InetAddressAndPort, ExponentiallyDecayingReservoir> 
entry : samples.entrySet())
         {
             snapshots.put(entry.getKey(), entry.getValue().getSnapshot());
         }
 
         // We're going to weight the latency for each host against the worst 
one we see, to
         // arrive at sort of a 'badness percentage' for them. First, find the 
worst for each:
-        HashMap<InetAddress, Double> newScores = new HashMap<>();
-        for (Map.Entry<InetAddress, Snapshot> entry : snapshots.entrySet())
+        HashMap<InetAddressAndPort, Double> newScores = new HashMap<>();
+        for (Map.Entry<InetAddressAndPort, Snapshot> entry : 
snapshots.entrySet())
         {
             double mean = entry.getValue().getMedian();
             if (mean > maxLatency)
                 maxLatency = mean;
         }
         // now make another pass to do the weighting based on the maximums we 
found before
-        for (Map.Entry<InetAddress, Snapshot> entry : snapshots.entrySet())
+        for (Map.Entry<InetAddressAndPort, Snapshot> entry : 
snapshots.entrySet())
         {
             double score = entry.getValue().getMedian() / maxLatency;
             // finally, add the severity without any weighting, since hosts 
scale this relative to their own load and the size of the task causing the 
severity.
@@ -351,6 +352,11 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
 
     public Map<InetAddress, Double> getScores()
     {
+        return scores.entrySet().stream().collect(Collectors.toMap(address -> 
address.getKey().address, Map.Entry::getValue));
+    }
+
+    public Map<InetAddressAndPort, Double> getScoresWithPort()
+    {
         return scores;
     }
 
@@ -374,7 +380,7 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
 
     public List<Double> dumpTimings(String hostname) throws 
UnknownHostException
     {
-        InetAddress host = InetAddress.getByName(hostname);
+        InetAddressAndPort host = InetAddressAndPort.getByName(hostname);
         ArrayList<Double> timings = new ArrayList<Double>();
         ExponentiallyDecayingReservoir sample = samples.get(host);
         if (sample != null)
@@ -390,7 +396,7 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
         Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, 
StorageService.instance.valueFactory.severity(severity));
     }
 
-    private double getSeverity(InetAddress endpoint)
+    private double getSeverity(InetAddressAndPort endpoint)
     {
         EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
         if (state == null)
@@ -405,10 +411,10 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
 
     public double getSeverity()
     {
-        return getSeverity(FBUtilities.getBroadcastAddress());
+        return getSeverity(FBUtilities.getBroadcastAddressAndPort());
     }
 
-    public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, 
List<InetAddress> l1, List<InetAddress> l2)
+    public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> 
merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2)
     {
         if (!subsnitch.isWorthMergingForRangeQuery(merged, l1, l2))
             return false;
@@ -428,10 +434,10 @@ public class DynamicEndpointSnitch extends 
AbstractEndpointSnitch implements ILa
     }
 
     // Return the max score for the endpoint in the provided list, or -1.0 if 
no node have a score.
-    private double maxScore(List<InetAddress> endpoints)
+    private double maxScore(List<InetAddressAndPort> endpoints)
     {
         double maxScore = -1.0;
-        for (InetAddress endpoint : endpoints)
+        for (InetAddressAndPort endpoint : endpoints)
         {
             Double score = scores.get(endpoint);
             if (score == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java 
b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
index bfafa75..61f0d97 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
@@ -24,6 +24,8 @@ import java.util.List;
 
 public interface DynamicEndpointSnitchMBean 
 {
+    public Map<InetAddressAndPort, Double> getScoresWithPort();
+    @Deprecated
     public Map<InetAddress, Double> getScores();
     public int getUpdateInterval();
     public int getResetInterval();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java 
b/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
index b32ca84..2a6c7e9 100644
--- a/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
+++ b/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.locator;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -62,6 +63,16 @@ public class Ec2MultiRegionSnitch extends Ec2Snitch
     public void gossiperStarting()
     {
         super.gossiperStarting();
+        InetAddressAndPort address;
+        try
+        {
+            address = InetAddressAndPort.getByName(localPrivateAddress);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+        
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT,
 StorageService.instance.valueFactory.internalAddressAndPort(address));
         
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, 
StorageService.instance.valueFactory.internalIP(localPrivateAddress));
         Gossiper.instance.register(new ReconnectableSnitchHelper(this, 
ec2region, true));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/Ec2Snitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Ec2Snitch.java 
b/src/java/org/apache/cassandra/locator/Ec2Snitch.java
index 59eb27b..c7324c8 100644
--- a/src/java/org/apache/cassandra/locator/Ec2Snitch.java
+++ b/src/java/org/apache/cassandra/locator/Ec2Snitch.java
@@ -21,7 +21,6 @@ import java.io.DataInputStream;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.net.HttpURLConnection;
-import java.net.InetAddress;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
@@ -46,7 +45,7 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch
     protected static final String ZONE_NAME_QUERY_URL = 
"http://169.254.169.254/latest/meta-data/placement/availability-zone";;
     private static final String DEFAULT_DC = "UNKNOWN-DC";
     private static final String DEFAULT_RACK = "UNKNOWN-RACK";
-    private Map<InetAddress, Map<String, String>> savedEndpoints;
+    private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
     protected String ec2zone;
     protected String ec2region;
 
@@ -92,9 +91,9 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch
         }
     }
 
-    public String getRack(InetAddress endpoint)
+    public String getRack(InetAddressAndPort endpoint)
     {
-        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
             return ec2zone;
         EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
         if (state == null || state.getApplicationState(ApplicationState.RACK) 
== null)
@@ -108,9 +107,9 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch
         return state.getApplicationState(ApplicationState.RACK).value;
     }
 
-    public String getDatacenter(InetAddress endpoint)
+    public String getDatacenter(InetAddressAndPort endpoint)
     {
-        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
             return ec2region;
         EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
         if (state == null || state.getApplicationState(ApplicationState.DC) == 
null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java 
b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
index bbfabb6..c06d765 100644
--- a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
+++ b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.locator;
 
 
 import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -44,22 +43,22 @@ public class EndpointSnitchInfo implements 
EndpointSnitchInfoMBean
 
     public String getDatacenter(String host) throws UnknownHostException
     {
-        return 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(InetAddress.getByName(host));
+        return 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(InetAddressAndPort.getByName(host));
     }
 
     public String getRack(String host) throws UnknownHostException
     {
-        return 
DatabaseDescriptor.getEndpointSnitch().getRack(InetAddress.getByName(host));
+        return 
DatabaseDescriptor.getEndpointSnitch().getRack(InetAddressAndPort.getByName(host));
     }
 
     public String getDatacenter()
     {
-        return 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+        return 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
     }
 
     public String getRack()
     {
-        return 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress());
+        return 
DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort());
     }
 
     public String getSnitchName()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java 
b/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java
index b4d3b19..1e1c500 100644
--- a/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java
+++ b/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java
@@ -21,7 +21,6 @@ import java.io.DataInputStream;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.net.HttpURLConnection;
-import java.net.InetAddress;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
@@ -46,7 +45,7 @@ public class GoogleCloudSnitch extends 
AbstractNetworkTopologySnitch
     protected static final String ZONE_NAME_QUERY_URL = 
"http://metadata.google.internal/computeMetadata/v1/instance/zone";;
     private static final String DEFAULT_DC = "UNKNOWN-DC";
     private static final String DEFAULT_RACK = "UNKNOWN-RACK";
-    private Map<InetAddress, Map<String, String>> savedEndpoints;
+    private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
     protected String gceZone;
     protected String gceRegion;
 
@@ -94,9 +93,9 @@ public class GoogleCloudSnitch extends 
AbstractNetworkTopologySnitch
         }
     }
 
-    public String getRack(InetAddress endpoint)
+    public String getRack(InetAddressAndPort endpoint)
     {
-        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
             return gceZone;
         EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
         if (state == null || state.getApplicationState(ApplicationState.RACK) 
== null)
@@ -110,9 +109,9 @@ public class GoogleCloudSnitch extends 
AbstractNetworkTopologySnitch
         return state.getApplicationState(ApplicationState.RACK).value;
     }
 
-    public String getDatacenter(InetAddress endpoint)
+    public String getDatacenter(InetAddressAndPort endpoint)
     {
-        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
             return gceRegion;
         EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
         if (state == null || state.getApplicationState(ApplicationState.DC) == 
null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java 
b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
index e2449ae..75b5685 100644
--- a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.Map;
 
@@ -45,7 +44,7 @@ public class GossipingPropertyFileSnitch extends 
AbstractNetworkTopologySnitch//
     private final boolean preferLocal;
     private final AtomicReference<ReconnectableSnitchHelper> 
snitchHelperReference;
 
-    private Map<InetAddress, Map<String, String>> savedEndpoints;
+    private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
     private static final String DEFAULT_DC = "UNKNOWN_DC";
     private static final String DEFAULT_RACK = "UNKNOWN_RACK";
 
@@ -84,9 +83,9 @@ public class GossipingPropertyFileSnitch extends 
AbstractNetworkTopologySnitch//
      * @param endpoint the endpoint to process
      * @return string of data center
      */
-    public String getDatacenter(InetAddress endpoint)
+    public String getDatacenter(InetAddressAndPort endpoint)
     {
-        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
             return myDC;
 
         EndpointState epState = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
@@ -112,9 +111,9 @@ public class GossipingPropertyFileSnitch extends 
AbstractNetworkTopologySnitch//
      * @param endpoint the endpoint to process
      * @return string of rack
      */
-    public String getRack(InetAddress endpoint)
+    public String getRack(InetAddressAndPort endpoint)
     {
-        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
             return myRack;
 
         EndpointState epState = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
@@ -138,8 +137,10 @@ public class GossipingPropertyFileSnitch extends 
AbstractNetworkTopologySnitch//
     {
         super.gossiperStarting();
 
+        
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT,
+                                                   
StorageService.instance.valueFactory.internalAddressAndPort(FBUtilities.getLocalAddressAndPort()));
         
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP,
-                
StorageService.instance.valueFactory.internalIP(FBUtilities.getLocalAddress().getHostAddress()));
+                
StorageService.instance.valueFactory.internalIP(FBUtilities.getJustLocalAddress().getHostAddress()));
 
         loadGossiperState();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java 
b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
index 71b441c..00a1543 100644
--- a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.List;
 
@@ -32,27 +31,27 @@ public interface IEndpointSnitch
     /**
      * returns a String representing the rack this endpoint belongs to
      */
-    public String getRack(InetAddress endpoint);
+    public String getRack(InetAddressAndPort endpoint);
 
     /**
      * returns a String representing the datacenter this endpoint belongs to
      */
-    public String getDatacenter(InetAddress endpoint);
+    public String getDatacenter(InetAddressAndPort endpoint);
 
     /**
      * returns a new <tt>List</tt> sorted by proximity to the given endpoint
      */
-    public List<InetAddress> getSortedListByProximity(InetAddress address, 
Collection<InetAddress> unsortedAddress);
+    public List<InetAddressAndPort> 
getSortedListByProximity(InetAddressAndPort address, 
Collection<InetAddressAndPort> unsortedAddress);
 
     /**
      * This method will sort the <tt>List</tt> by proximity to the given 
address.
      */
-    public void sortByProximity(InetAddress address, List<InetAddress> 
addresses);
+    public void sortByProximity(InetAddressAndPort address, 
List<InetAddressAndPort> addresses);
 
     /**
      * compares two endpoints in relation to the target endpoint, returning as 
Comparator.compare would
      */
-    public int compareEndpoints(InetAddress target, InetAddress a1, 
InetAddress a2);
+    public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort 
a1, InetAddressAndPort a2);
 
     /**
      * called after Gossiper instance exists immediately before it starts 
gossiping
@@ -63,5 +62,5 @@ public interface IEndpointSnitch
      * Returns whether for a range query doing a query against merged is likely
      * to be faster than 2 sequential queries, one against l1 followed by one 
against l2.
      */
-    public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, 
List<InetAddress> l1, List<InetAddress> l2);
+    public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> 
merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ILatencySubscriber.java 
b/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
index d2ae6db..f6c1c7f 100644
--- a/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
+++ b/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
@@ -17,9 +17,7 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
-
 public interface ILatencySubscriber
 {
-    public void receiveTiming(InetAddress address, long latency);
+    public void receiveTiming(InetAddressAndPort address, long latency);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java 
b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
new file mode 100644
index 0000000..38a1a49
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.net.HostAndPort;
+
+import org.apache.cassandra.utils.FastByteOperations;
+
+/**
+ * A class to replace the usage of InetAddress to identify hosts in the 
cluster.
+ * Opting for a full replacement class so that in the future if we change the 
nature
+ * of the identifier the refactor will be easier in that we don't have to 
change the type
+ * just the methods.
+ *
+ * Because an IP might contain multiple C* instances the identification must 
be done
+ * using the IP + port. InetSocketAddress is undesirable for a couple of 
reasons. It's not comparable,
+ * it's toString() method doesn't correctly bracket IPv6, it doesn't handle 
optional default values,
+ * and a couple of other minor behaviors that are slightly less troublesome 
like handling the
+ * need to sometimes return a port and sometimes not.
+ *
+ */
+public final class InetAddressAndPort implements 
Comparable<InetAddressAndPort>, Serializable
+{
+    private static final long serialVersionUID = 0;
+
+    //Store these here to avoid requiring DatabaseDescriptor to be loaded. 
DatabaseDescriptor will set
+    //these when it loads the config. A lot of unit tests won't end up loading 
DatabaseDescriptor.
+    //Tools that might use this class also might not load database descriptor. 
Those tools are expected
+    //to always override the defaults.
+    static volatile int defaultPort = 7000;
+
+    public final InetAddress address;
+    public final byte[] addressBytes;
+    public final int port;
+
+    private InetAddressAndPort(InetAddress address, byte[] addressBytes, int 
port)
+    {
+        Preconditions.checkNotNull(address);
+        Preconditions.checkNotNull(addressBytes);
+        validatePortRange(port);
+        this.address = address;
+        this.port = port;
+        this.addressBytes = addressBytes;
+    }
+
+    private static void validatePortRange(int port)
+    {
+        if (port < 0 | port > 65535)
+        {
+            throw new IllegalArgumentException("Port " + port + " is not a 
valid port number in the range 0-65535");
+        }
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        InetAddressAndPort that = (InetAddressAndPort) o;
+
+        if (port != that.port) return false;
+        return address.equals(that.address);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = address.hashCode();
+        result = 31 * result + port;
+        return result;
+    }
+
+    @Override
+    public int compareTo(InetAddressAndPort o)
+    {
+        int retval = FastByteOperations.compareUnsigned(addressBytes, 0, 
addressBytes.length, o.addressBytes, 0, o.addressBytes.length);
+        if (retval != 0)
+        {
+            return retval;
+        }
+
+        return Integer.compare(port, o.port);
+    }
+
+    public String getHostAddress(boolean withPort)
+    {
+        if (withPort)
+        {
+            return toString();
+        }
+        else
+        {
+            return address.getHostAddress();
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return toString(true);
+    }
+
+    public String toString(boolean withPort)
+    {
+        if (withPort)
+        {
+            return HostAndPort.fromParts(address.getHostAddress(), 
port).toString();
+        }
+        else
+        {
+            return address.toString();
+        }
+    }
+
+    public static InetAddressAndPort getByName(String name) throws 
UnknownHostException
+    {
+        return getByNameOverrideDefaults(name, null);
+    }
+
+    /**
+     *
+     * @param name Hostname + optional ports string
+     * @param port Port to connect on, overridden by values in hostname 
string, defaults to DatabaseDescriptor default if not specified anywhere.
+     * @return
+     * @throws UnknownHostException
+     */
+    public static InetAddressAndPort getByNameOverrideDefaults(String name, 
Integer port) throws UnknownHostException
+    {
+        HostAndPort hap = HostAndPort.fromString(name);
+        if (hap.hasPort())
+        {
+            port = hap.getPort();
+        }
+        return 
getByAddressOverrideDefaults(InetAddress.getByName(hap.getHost()), port);
+    }
+
+    public static InetAddressAndPort getByAddress(byte[] address) throws 
UnknownHostException
+    {
+        return getByAddressOverrideDefaults(InetAddress.getByAddress(address), 
address, null);
+    }
+
+    public static InetAddressAndPort getByAddress(InetAddress address)
+    {
+        return getByAddressOverrideDefaults(address, null);
+    }
+
+    public static InetAddressAndPort getByAddressOverrideDefaults(InetAddress 
address, Integer port)
+    {
+        if (port == null)
+        {
+            port = defaultPort;
+        }
+
+        return new InetAddressAndPort(address, address.getAddress(), port);
+    }
+
+    public static InetAddressAndPort getByAddressOverrideDefaults(InetAddress 
address, byte[] addressBytes, Integer port)
+    {
+        if (port == null)
+        {
+            port = defaultPort;
+        }
+
+        return new InetAddressAndPort(address, addressBytes, port);
+    }
+
+    public static InetAddressAndPort getLoopbackAddress()
+    {
+        return 
InetAddressAndPort.getByAddress(InetAddress.getLoopbackAddress());
+    }
+
+    public static InetAddressAndPort getLocalHost() throws UnknownHostException
+    {
+        return InetAddressAndPort.getByAddress(InetAddress.getLocalHost());
+    }
+
+    public static void initializeDefaultPort(int port)
+    {
+        defaultPort = port;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/LocalStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java 
b/src/java/org/apache/cassandra/locator/LocalStrategy.java
index ae58203..a76fe96 100644
--- a/src/java/org/apache/cassandra/locator/LocalStrategy.java
+++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collection;
@@ -42,16 +41,16 @@ public class LocalStrategy extends 
AbstractReplicationStrategy
      * LocalStrategy may be used before tokens are set up.
      */
     @Override
-    public ArrayList<InetAddress> getNaturalEndpoints(RingPosition 
searchPosition)
+    public ArrayList<InetAddressAndPort> getNaturalEndpoints(RingPosition 
searchPosition)
     {
-        ArrayList<InetAddress> l = new ArrayList<InetAddress>(1);
-        l.add(FBUtilities.getBroadcastAddress());
+        ArrayList<InetAddressAndPort> l = new ArrayList<InetAddressAndPort>(1);
+        l.add(FBUtilities.getBroadcastAddressAndPort());
         return l;
     }
 
-    public List<InetAddress> calculateNaturalEndpoints(Token token, 
TokenMetadata metadata)
+    public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, 
TokenMetadata metadata)
     {
-        return Collections.singletonList(FBUtilities.getBroadcastAddress());
+        return 
Collections.singletonList(FBUtilities.getBroadcastAddressAndPort());
     }
 
     public int getReplicationFactor()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java 
b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 442e6cf..673c018 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.util.*;
 import java.util.Map.Entry;
 
@@ -72,7 +71,7 @@ public class NetworkTopologyStrategy extends 
AbstractReplicationStrategy
         }
 
         datacenters = Collections.unmodifiableMap(newDatacenters);
-        logger.trace("Configured datacenter replicas are {}", 
FBUtilities.toString(datacenters));
+        logger.info("Configured datacenter replicas are {}", 
FBUtilities.toString(datacenters));
     }
 
     /**
@@ -81,7 +80,7 @@ public class NetworkTopologyStrategy extends 
AbstractReplicationStrategy
     private static final class DatacenterEndpoints
     {
         /** List accepted endpoints get pushed into. */
-        Set<InetAddress> endpoints;
+        Set<InetAddressAndPort> endpoints;
         /**
          * Racks encountered so far. Replicas are put into separate racks 
while possible.
          * For efficiency the set is shared between the instances, using the 
location pair (dc, rack) to make sure
@@ -93,7 +92,7 @@ public class NetworkTopologyStrategy extends 
AbstractReplicationStrategy
         int rfLeft;
         int acceptableRackRepeats;
 
-        DatacenterEndpoints(int rf, int rackCount, int nodeCount, 
Set<InetAddress> endpoints, Set<Pair<String, String>> racks)
+        DatacenterEndpoints(int rf, int rackCount, int nodeCount, 
Set<InetAddressAndPort> endpoints, Set<Pair<String, String>> racks)
         {
             this.endpoints = endpoints;
             this.racks = racks;
@@ -108,7 +107,7 @@ public class NetworkTopologyStrategy extends 
AbstractReplicationStrategy
          * Attempts to add an endpoint to the replicas for this datacenter, 
adding to the endpoints set if successful.
          * Returns true if the endpoint was added, and this datacenter does 
not require further replicas.
          */
-        boolean addEndpointAndCheckIfDone(InetAddress ep, Pair<String,String> 
location)
+        boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, 
Pair<String,String> location)
         {
             if (done())
                 return false;
@@ -143,17 +142,17 @@ public class NetworkTopologyStrategy extends 
AbstractReplicationStrategy
     /**
      * calculate endpoints in one pass through the tokens by tracking our 
progress in each DC.
      */
-    public List<InetAddress> calculateNaturalEndpoints(Token searchToken, 
TokenMetadata tokenMetadata)
+    public List<InetAddressAndPort> calculateNaturalEndpoints(Token 
searchToken, TokenMetadata tokenMetadata)
     {
         // we want to preserve insertion order so that the first added 
endpoint becomes primary
-        Set<InetAddress> replicas = new LinkedHashSet<>();
+        Set<InetAddressAndPort> replicas = new LinkedHashSet<>();
         Set<Pair<String, String>> seenRacks = new HashSet<>();
 
         Topology topology = tokenMetadata.getTopology();
         // all endpoints in each DC, so we can check when we have exhausted 
all the members of a DC
-        Multimap<String, InetAddress> allEndpoints = 
topology.getDatacenterEndpoints();
+        Multimap<String, InetAddressAndPort> allEndpoints = 
topology.getDatacenterEndpoints();
         // all racks in a DC so we can check when we have exhausted all racks 
in a DC
-        Map<String, Multimap<String, InetAddress>> racks = 
topology.getDatacenterRacks();
+        Map<String, Multimap<String, InetAddressAndPort>> racks = 
topology.getDatacenterRacks();
         assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any 
cluster members";
 
         int dcsToFill = 0;
@@ -178,7 +177,7 @@ public class NetworkTopologyStrategy extends 
AbstractReplicationStrategy
         while (dcsToFill > 0 && tokenIter.hasNext())
         {
             Token next = tokenIter.next();
-            InetAddress ep = tokenMetadata.getEndpoint(next);
+            InetAddressAndPort ep = tokenMetadata.getEndpoint(next);
             Pair<String, String> location = topology.getLocation(ep);
             DatacenterEndpoints dcEndpoints = dcs.get(location.left);
             if (dcEndpoints != null && 
dcEndpoints.addEndpointAndCheckIfDone(ep, location))
@@ -227,9 +226,9 @@ public class NetworkTopologyStrategy extends 
AbstractReplicationStrategy
         final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
 
         // Add data center of localhost.
-        
validDataCenters.add(snitch.getDatacenter(FBUtilities.getBroadcastAddress()));
+        
validDataCenters.add(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()));
         // Fetch and add DCs of all peers.
-        for (final InetAddress peer : 
StorageService.instance.getTokenMetadata().getAllEndpoints())
+        for (final InetAddressAndPort peer : 
StorageService.instance.getTokenMetadata().getAllEndpoints())
         {
             validDataCenters.add(snitch.getDatacenter(peer));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java 
b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
index b9bd767..93e629e 100644
--- a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collection;
@@ -42,10 +41,10 @@ public class OldNetworkTopologyStrategy extends 
AbstractReplicationStrategy
         super(keyspaceName, tokenMetadata, snitch, configOptions);
     }
 
-    public List<InetAddress> calculateNaturalEndpoints(Token token, 
TokenMetadata metadata)
+    public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, 
TokenMetadata metadata)
     {
         int replicas = getReplicationFactor();
-        List<InetAddress> endpoints = new ArrayList<InetAddress>(replicas);
+        List<InetAddressAndPort> endpoints = new ArrayList<>(replicas);
         ArrayList<Token> tokens = metadata.sortedTokens();
 
         if (tokens.isEmpty())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to