http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/LoaderOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java 
b/src/java/org/apache/cassandra/tools/LoaderOptions.java
index c821e6a..4646ba4 100644
--- a/src/java/org/apache/cassandra/tools/LoaderOptions.java
+++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java
@@ -27,8 +27,12 @@ import java.net.*;
 import java.util.HashSet;
 import java.util.Set;
 
+import com.google.common.base.Throwables;
+import com.google.common.net.HostAndPort;
+
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
 
 import com.datastax.driver.core.AuthProvider;
@@ -54,6 +58,7 @@ public class LoaderOptions
     public static final String THROTTLE_MBITS = "throttle";
     public static final String INTER_DC_THROTTLE_MBITS = "inter-dc-throttle";
     public static final String TOOL_NAME = "sstableloader";
+    public static final String ALLOW_SERVER_PORT_DISCOVERY_OPTION = 
"server-port-discovery";
 
     /* client encryption options */
     public static final String SSL_TRUSTSTORE = "truststore";
@@ -80,8 +85,9 @@ public class LoaderOptions
     public final EncryptionOptions clientEncOptions;
     public final int connectionsPerHost;
     public final EncryptionOptions.ServerEncryptionOptions serverEncOptions;
-    public final Set<InetAddress> hosts;
-    public final Set<InetAddress> ignores;
+    public final Set<InetSocketAddress> hosts;
+    public final Set<InetAddressAndPort> ignores;
+    public final boolean allowServerPortDiscovery;
 
     LoaderOptions(Builder builder)
     {
@@ -101,6 +107,7 @@ public class LoaderOptions
         connectionsPerHost = builder.connectionsPerHost;
         serverEncOptions = builder.serverEncOptions;
         hosts = builder.hosts;
+        allowServerPortDiscovery = builder.allowServerPortDiscovery;
         ignores = builder.ignores;
     }
 
@@ -122,8 +129,11 @@ public class LoaderOptions
         EncryptionOptions clientEncOptions = new EncryptionOptions();
         int connectionsPerHost = 1;
         EncryptionOptions.ServerEncryptionOptions serverEncOptions = new 
EncryptionOptions.ServerEncryptionOptions();
-        Set<InetAddress> hosts = new HashSet<>();
-        Set<InetAddress> ignores = new HashSet<>();
+        Set<InetAddress> hostsArg = new HashSet<>();
+        Set<InetAddress> ignoresArg = new HashSet<>();
+        Set<InetSocketAddress> hosts = new HashSet<>();
+        Set<InetAddressAndPort> ignores = new HashSet<>();
+        boolean allowServerPortDiscovery;
 
         Builder()
         {
@@ -133,6 +143,23 @@ public class LoaderOptions
         public LoaderOptions build()
         {
             constructAuthProvider();
+
+            try
+            {
+                for (InetAddress host : hostsArg)
+                {
+                    hosts.add(new InetSocketAddress(host, nativePort));
+                }
+                for (InetAddress host : ignoresArg)
+                {
+                    
ignores.add(InetAddressAndPort.getByNameOverrideDefaults(host.getHostAddress(), 
storagePort));
+                }
+            }
+            catch (UnknownHostException e)
+            {
+                Throwables.propagate(e);
+            }
+
             return new LoaderOptions(this);
         }
 
@@ -226,30 +253,61 @@ public class LoaderOptions
             return this;
         }
 
+        @Deprecated
         public Builder hosts(Set<InetAddress> hosts)
         {
-            this.hosts = hosts;
+            this.hostsArg.addAll(hosts);
+            return this;
+        }
+
+        public Builder hostsAndNativePort(Set<InetSocketAddress> hosts)
+        {
+            this.hosts.addAll(hosts);
             return this;
         }
 
         public Builder host(InetAddress host)
         {
+            hostsArg.add(host);
+            return this;
+        }
+
+        public Builder hostAndNativePort(InetSocketAddress host)
+        {
             hosts.add(host);
             return this;
         }
 
         public Builder ignore(Set<InetAddress> ignores)
         {
-            this.ignores = ignores;
+            this.ignoresArg.addAll(ignores);
+            return this;
+        }
+
+        public Builder ignoresAndInternalPorts(Set<InetAddressAndPort> ignores)
+        {
+            this.ignores.addAll(ignores);
             return this;
         }
 
         public Builder ignore(InetAddress ignore)
         {
+            ignoresArg.add(ignore);
+            return this;
+        }
+
+        public Builder ignoreAndInternalPorts(InetAddressAndPort ignore)
+        {
             ignores.add(ignore);
             return this;
         }
 
+        public Builder allowServerPortDiscovery(boolean 
allowServerPortDiscovery)
+        {
+            this.allowServerPortDiscovery = allowServerPortDiscovery;
+            return this;
+        }
+
         public Builder parseArgs(String cmdArgs[])
         {
             CommandLineParser parser = new GnuParser();
@@ -296,6 +354,7 @@ public class LoaderOptions
 
                 verbose = cmd.hasOption(VERBOSE_OPTION);
                 noProgress = cmd.hasOption(NOPROGRESS_OPTION);
+                allowServerPortDiscovery = 
cmd.hasOption(ALLOW_SERVER_PORT_DISCOVERY_OPTION);
 
                 if (cmd.hasOption(USER_OPTION))
                 {
@@ -319,7 +378,8 @@ public class LoaderOptions
                     {
                         for (String node : nodes)
                         {
-                            hosts.add(InetAddress.getByName(node.trim()));
+                            HostAndPort hap = HostAndPort.fromString(node);
+                            hosts.add(new 
InetSocketAddress(InetAddress.getByName(hap.getHost()), 
hap.getPortOrDefault(nativePort)));
                         }
                     } catch (UnknownHostException e)
                     {
@@ -340,7 +400,7 @@ public class LoaderOptions
                     {
                         for (String node : nodes)
                         {
-                            ignores.add(InetAddress.getByName(node.trim()));
+                            
ignores.add(InetAddressAndPort.getByNameOverrideDefaults(node.trim(), 
storagePort));
                         }
                     } catch (UnknownHostException e)
                     {
@@ -554,6 +614,7 @@ public class LoaderOptions
         options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "Client SSL: 
type of store");
         options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", 
"Client SSL: comma-separated list of encryption suites to use");
         options.addOption("f", CONFIG_PATH, "path to config file", 
"cassandra.yaml file path for streaming throughput and client/server SSL.");
+        options.addOption("spd", ALLOW_SERVER_PORT_DISCOVERY_OPTION, "allow 
server port discovery", "Use ports published by server to decide how to 
connect. With SSL requires StartTLS to be used.");
         return options;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 8acb3c1..d330ed4 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -450,39 +450,39 @@ public class NodeProbe implements AutoCloseable
         ssProxy.drain();
     }
 
-    public Map<String, String> getTokenToEndpointMap()
+    public Map<String, String> getTokenToEndpointMap(boolean withPort)
     {
-        return ssProxy.getTokenToEndpointMap();
+        return withPort ? ssProxy.getTokenToEndpointWithPortMap() : 
ssProxy.getTokenToEndpointMap();
     }
 
-    public List<String> getLiveNodes()
+    public List<String> getLiveNodes(boolean withPort)
     {
-        return ssProxy.getLiveNodes();
+        return withPort ? ssProxy.getLiveNodesWithPort() : 
ssProxy.getLiveNodes();
     }
 
-    public List<String> getJoiningNodes()
+    public List<String> getJoiningNodes(boolean withPort)
     {
-        return ssProxy.getJoiningNodes();
+        return withPort ? ssProxy.getJoiningNodesWithPort() : 
ssProxy.getJoiningNodes();
     }
 
-    public List<String> getLeavingNodes()
+    public List<String> getLeavingNodes(boolean withPort)
     {
-        return ssProxy.getLeavingNodes();
+        return withPort ? ssProxy.getLeavingNodesWithPort() : 
ssProxy.getLeavingNodes();
     }
 
-    public List<String> getMovingNodes()
+    public List<String> getMovingNodes(boolean withPort)
     {
-        return ssProxy.getMovingNodes();
+        return withPort ? ssProxy.getMovingNodesWithPort() : 
ssProxy.getMovingNodes();
     }
 
-    public List<String> getUnreachableNodes()
+    public List<String> getUnreachableNodes(boolean withPort)
     {
-        return ssProxy.getUnreachableNodes();
+        return withPort ? ssProxy.getUnreachableNodesWithPort() : 
ssProxy.getUnreachableNodes();
     }
 
-    public Map<String, String> getLoadMap()
+    public Map<String, String> getLoadMap(boolean withPort)
     {
-        return ssProxy.getLoadMap();
+        return withPort ? ssProxy.getLoadMapWithPort() : ssProxy.getLoadMap();
     }
 
     public Map<InetAddress, Float> getOwnership()
@@ -490,11 +490,21 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.getOwnership();
     }
 
+    public Map<String, Float> getOwnershipWithPort()
+    {
+        return ssProxy.getOwnershipWithPort();
+    }
+
     public Map<InetAddress, Float> effectiveOwnership(String keyspace) throws 
IllegalStateException
     {
         return ssProxy.effectiveOwnership(keyspace);
     }
 
+    public Map<String, Float> effectiveOwnershipWithPort(String keyspace) 
throws IllegalStateException
+    {
+        return ssProxy.effectiveOwnershipWithPort(keyspace);
+    }
+
     public CacheServiceMBean getCacheServiceMBean()
     {
         String cachePath = "org.apache.cassandra.db:type=Caches";
@@ -557,9 +567,9 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.getLocalHostId();
     }
 
-    public Map<String, String> getHostIdMap()
+    public Map<String, String> getHostIdMap(boolean withPort)
     {
-        return ssProxy.getEndpointToHostId();
+        return withPort ? ssProxy.getEndpointWithPortToHostId() : 
ssProxy.getEndpointToHostId();
     }
 
     public String getLoadString()
@@ -686,9 +696,9 @@ public class NodeProbe implements AutoCloseable
         ssProxy.removeNode(token);
     }
 
-    public String getRemovalStatus()
+    public String getRemovalStatus(boolean withPort)
     {
-        return ssProxy.getRemovalStatus();
+        return withPort ? ssProxy.getRemovalStatusWithPort() : 
ssProxy.getRemovalStatus();
     }
 
     public void forceRemoveCompletion()
@@ -775,6 +785,11 @@ public class NodeProbe implements AutoCloseable
         ssProxy.setHintedHandoffThrottleInKB(throttleInKB);
     }
 
+    public List<String> getEndpointsWithPort(String keyspace, String cf, 
String key)
+    {
+        return ssProxy.getNaturalEndpointsWithPort(keyspace, cf, key);
+    }
+
     public List<InetAddress> getEndpoints(String keyspace, String cf, String 
key)
     {
         return ssProxy.getNaturalEndpoints(keyspace, cf, key);
@@ -1144,9 +1159,9 @@ public class NodeProbe implements AutoCloseable
         ssProxy.rebuildSecondaryIndex(ksName, cfName, idxNames);
     }
 
-    public String getGossipInfo()
+    public String getGossipInfo(boolean withPort)
     {
-        return fdProxy.getAllEndpointStates();
+        return withPort ? fdProxy.getAllEndpointStatesWithPort() : 
fdProxy.getAllEndpointStates();
     }
 
     public void stop(String string)
@@ -1212,9 +1227,9 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.getSchemaVersion();
     }
 
-    public List<String> describeRing(String keyspaceName) throws IOException
+    public List<String> describeRing(String keyspaceName, boolean withPort) 
throws IOException
     {
-        return ssProxy.describeRingJMX(keyspaceName);
+        return withPort ? ssProxy.describeRingWithPortJMX(keyspaceName) : 
ssProxy.describeRingJMX(keyspaceName);
     }
 
     public void rebuild(String sourceDc, String keyspace, String tokens, 
String specificSources)
@@ -1580,11 +1595,11 @@ public class NodeProbe implements AutoCloseable
         }
     }
 
-    public TabularData getFailureDetectorPhilValues()
+    public TabularData getFailureDetectorPhilValues(boolean withPort)
     {
         try
         {
-            return fdProxy.getPhiValues();
+            return withPort ? fdProxy.getPhiValuesWithPort() : 
fdProxy.getPhiValues();
         }
         catch (OpenDataException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index 59d4ead..d707499 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -248,6 +248,9 @@ public class NodeTool
         @Option(type = OptionType.GLOBAL, name = {"-pwf", "--password-file"}, 
description = "Path to the JMX password file")
         private String passwordFilePath = EMPTY;
 
+        @Option(type = OptionType.GLOBAL, name = { "-wp", "--with-port"}, 
description = "Operate in 4.0 mode with hosts disambiguated by port number", 
arity = 0)
+        protected boolean withPort = false;
+
         @Override
         public void run()
         {
@@ -398,4 +401,27 @@ public class NodeTool
         }
         return ownershipByDc;
     }
+
+    public static SortedMap<String, SetHostStatWithPort> 
getOwnershipByDcWithPort(NodeProbe probe, boolean resolveIp,
+                                                                  Map<String, 
String> tokenToEndpoint,
+                                                                  Map<String, 
Float> ownerships)
+    {
+        SortedMap<String, SetHostStatWithPort> ownershipByDc = 
Maps.newTreeMap();
+        EndpointSnitchInfoMBean epSnitchInfo = 
probe.getEndpointSnitchInfoProxy();
+        try
+        {
+            for (Entry<String, String> tokenAndEndPoint : 
tokenToEndpoint.entrySet())
+            {
+                String dc = 
epSnitchInfo.getDatacenter(tokenAndEndPoint.getValue());
+                if (!ownershipByDc.containsKey(dc))
+                    ownershipByDc.put(dc, new SetHostStatWithPort(resolveIp));
+                ownershipByDc.get(dc).add(tokenAndEndPoint.getKey(), 
tokenAndEndPoint.getValue(), ownerships);
+            }
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return ownershipByDc;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/DescribeCluster.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/DescribeCluster.java 
b/src/java/org/apache/cassandra/tools/nodetool/DescribeCluster.java
index 5228468..ed91b8b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/DescribeCluster.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/DescribeCluster.java
@@ -49,10 +49,10 @@ public class DescribeCluster extends NodeToolCmd
 
         // display schema version for each node
         System.out.println("\tSchema versions:");
-        Map<String, List<String>> schemaVersions = 
probe.getSpProxy().getSchemaVersions();
+        Map<String, List<String>> schemaVersions = withPort ? 
probe.getSpProxy().getSchemaVersionsWithPort() : 
probe.getSpProxy().getSchemaVersions();
         for (String version : schemaVersions.keySet())
         {
             System.out.println(format("\t\t%s: %s%n", version, 
schemaVersions.get(version)));
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/DescribeRing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/DescribeRing.java 
b/src/java/org/apache/cassandra/tools/nodetool/DescribeRing.java
index 2a73c2a..c57e54d 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/DescribeRing.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/DescribeRing.java
@@ -39,7 +39,7 @@ public class DescribeRing extends NodeToolCmd
         System.out.println("TokenRange: ");
         try
         {
-            for (String tokenRangeString : probe.describeRing(keyspace))
+            for (String tokenRangeString : probe.describeRing(keyspace, 
withPort))
             {
                 System.out.println("\t" + tokenRangeString);
             }
@@ -48,4 +48,4 @@ public class DescribeRing extends NodeToolCmd
             throw new RuntimeException(e);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java 
b/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java
index b3ffb6d..896663d 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java
@@ -33,7 +33,7 @@ public class FailureDetectorInfo extends NodeToolCmd
     @Override
     public void execute(NodeProbe probe)
     {
-        TabularData data = probe.getFailureDetectorPhilValues();
+        TabularData data = probe.getFailureDetectorPhilValues(withPort);
         System.out.printf("%10s,%16s%n", "Endpoint", "Phi");
         for (Object o : data.keySet())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/GetEndpoints.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetEndpoints.java 
b/src/java/org/apache/cassandra/tools/nodetool/GetEndpoints.java
index 922ae26..c0adb2a 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/GetEndpoints.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/GetEndpoints.java
@@ -42,10 +42,20 @@ public class GetEndpoints extends NodeToolCmd
         String table = args.get(1);
         String key = args.get(2);
 
-        List<InetAddress> endpoints = probe.getEndpoints(ks, table, key);
-        for (InetAddress endpoint : endpoints)
+        if (withPort)
         {
-            System.out.println(endpoint.getHostAddress());
+            for (String endpoint : probe.getEndpointsWithPort(ks, table, key))
+            {
+                System.out.println(endpoint);
+            }
+        }
+        else
+        {
+            List<InetAddress> endpoints = probe.getEndpoints(ks, table, key);
+            for (InetAddress endpoint : endpoints)
+            {
+                System.out.println(endpoint.getHostAddress());
+            }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/GossipInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GossipInfo.java 
b/src/java/org/apache/cassandra/tools/nodetool/GossipInfo.java
index 1b4b979..182c395 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/GossipInfo.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/GossipInfo.java
@@ -28,6 +28,6 @@ public class GossipInfo extends NodeToolCmd
     @Override
     public void execute(NodeProbe probe)
     {
-        System.out.println(probe.getGossipInfo());
+        System.out.println(probe.getGossipInfo(withPort));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/HostStatWithPort.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/HostStatWithPort.java 
b/src/java/org/apache/cassandra/tools/nodetool/HostStatWithPort.java
new file mode 100644
index 0000000..54cda17
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/HostStatWithPort.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public class HostStatWithPort
+{
+    public final InetAddressAndPort endpoint;
+    public final boolean resolveIp;
+    public final Float owns;
+    public final String token;
+
+    public HostStatWithPort(String token, InetAddressAndPort endpoint, boolean 
resolveIp, Float owns)
+    {
+        this.token = token;
+        this.endpoint = endpoint;
+        this.resolveIp = resolveIp;
+        this.owns = owns;
+    }
+
+    public String ipOrDns()
+    {
+        return resolveIp ?
+               endpoint.address.getHostName() + ":" + endpoint.port :
+               endpoint.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java 
b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
index 2312097..2702d9e 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
@@ -50,11 +50,11 @@ public class NetStats extends NodeToolCmd
             System.out.printf("%s %s%n", 
status.streamOperation.getDescription(), status.planId.toString());
             for (SessionInfo info : status.sessions)
             {
-                System.out.printf("    %s", info.peer.toString());
+                System.out.printf("    %s", info.peer.toString(withPort));
                 // print private IP when it is used
                 if (!info.peer.equals(info.connecting))
                 {
-                    System.out.printf(" (using %s)", 
info.connecting.toString());
+                    System.out.printf(" (using %s)", 
info.connecting.toString(withPort));
                 }
                 System.out.printf("%n");
                 if (!info.receivingSummaries.isEmpty())
@@ -65,7 +65,7 @@ public class NetStats extends NodeToolCmd
                         System.out.printf("        Receiving %d files, %d 
bytes total. Already received %d files, %d bytes total%n", 
info.getTotalFilesToReceive(), info.getTotalSizeToReceive(), 
info.getTotalFilesReceived(), info.getTotalSizeReceived());
                     for (ProgressInfo progress : info.getReceivingFiles())
                     {
-                        System.out.printf("            %s%n", 
progress.toString());
+                        System.out.printf("            %s%n", 
progress.toString(withPort));
                     }
                 }
                 if (!info.sendingSummaries.isEmpty())
@@ -76,7 +76,7 @@ public class NetStats extends NodeToolCmd
                         System.out.printf("        Sending %d files, %d bytes 
total. Already sent %d files, %d bytes total%n", info.getTotalFilesToSend(), 
info.getTotalSizeToSend(), info.getTotalFilesSent(), info.getTotalSizeSent());
                     for (ProgressInfo progress : info.getSendingFiles())
                     {
-                        System.out.printf("            %s%n", 
progress.toString());
+                        System.out.printf("            %s%n", 
progress.toString(withPort));
                     }
                 }
             }
@@ -131,4 +131,4 @@ public class NetStats extends NodeToolCmd
             System.out.printf("%-25s%10s%10s%15s%10s%n", "Gossip messages", 
"n/a", pending, completed, dropped);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/RemoveNode.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/RemoveNode.java 
b/src/java/org/apache/cassandra/tools/nodetool/RemoveNode.java
index 7312597..bd40aba 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/RemoveNode.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/RemoveNode.java
@@ -36,10 +36,10 @@ public class RemoveNode extends NodeToolCmd
         switch (removeOperation)
         {
             case "status":
-                System.out.println("RemovalStatus: " + 
probe.getRemovalStatus());
+                System.out.println("RemovalStatus: " + 
probe.getRemovalStatus(withPort));
                 break;
             case "force":
-                System.out.println("RemovalStatus: " + 
probe.getRemovalStatus());
+                System.out.println("RemovalStatus: " + 
probe.getRemovalStatus(withPort));
                 probe.forceRemoveCompletion();
                 break;
             default:
@@ -47,4 +47,4 @@ public class RemoveNode extends NodeToolCmd
                 break;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java 
b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
index 0b4f767..ba3cf62 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java
@@ -57,7 +57,8 @@ public class RepairAdmin extends NodeTool.NodeToolCmd
                                                                   "state",
                                                                   "last 
activity",
                                                                   
"coordinator",
-                                                                  
"participants");
+                                                                  
"participants",
+                                                                  
"participants_wp");
 
 
     private List<String> sessionValues(Map<String, String> session, int now)
@@ -67,7 +68,8 @@ public class RepairAdmin extends NodeTool.NodeToolCmd
                                   session.get(LocalSessionInfo.STATE),
                                   Integer.toString(now - updated) + " (s)",
                                   session.get(LocalSessionInfo.COORDINATOR),
-                                  session.get(LocalSessionInfo.PARTICIPANTS));
+                                  session.get(LocalSessionInfo.PARTICIPANTS),
+                                  
session.get(LocalSessionInfo.PARTICIPANTS_WP));
     }
 
     private void listSessions(ActiveRepairServiceMBean repairServiceProxy)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/Ring.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Ring.java 
b/src/java/org/apache/cassandra/tools/nodetool/Ring.java
index 9c389c2..105726c 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Ring.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Ring.java
@@ -51,59 +51,103 @@ public class Ring extends NodeToolCmd
     @Override
     public void execute(NodeProbe probe)
     {
-        Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap();
-        LinkedHashMultimap<String, String> endpointsToTokens = 
LinkedHashMultimap.create();
-        boolean haveVnodes = false;
-        for (Map.Entry<String, String> entry : tokensToEndpoints.entrySet())
-        {
-            haveVnodes |= endpointsToTokens.containsKey(entry.getValue());
-            endpointsToTokens.put(entry.getValue(), entry.getKey());
-        }
-
-        int maxAddressLength = Collections.max(endpointsToTokens.keys(), new 
Comparator<String>()
+        try
         {
-            @Override
-            public int compare(String first, String second)
+            Map<String, String> tokensToEndpoints = 
probe.getTokenToEndpointMap(withPort);
+            LinkedHashMultimap<String, String> endpointsToTokens = 
LinkedHashMultimap.create();
+            boolean haveVnodes = false;
+            for (Map.Entry<String, String> entry : 
tokensToEndpoints.entrySet())
             {
-               return Integer.compare(first.length(), second.length());
+                haveVnodes |= endpointsToTokens.containsKey(entry.getValue());
+                endpointsToTokens.put(entry.getValue(), entry.getKey());
             }
-        }).length();
 
-        String formatPlaceholder = "%%-%ds  
%%-12s%%-7s%%-8s%%-16s%%-20s%%-44s%%n";
-        String format = format(formatPlaceholder, maxAddressLength);
+            int maxAddressLength = Collections.max(endpointsToTokens.keys(), 
new Comparator<String>()
+            {
+                @Override
+                public int compare(String first, String second)
+                {
+                    return Integer.compare(first.length(), second.length());
+                }
+            }).length();
 
-        StringBuilder errors = new StringBuilder();
-        boolean showEffectiveOwnership = true;
-        // Calculate per-token ownership of the ring
-        Map<InetAddress, Float> ownerships;
-        try
-        {
-            ownerships = probe.effectiveOwnership(keyspace);
-        }
-        catch (IllegalStateException ex)
-        {
-            ownerships = probe.getOwnership();
-            errors.append("Note: ").append(ex.getMessage()).append("%n");
-            showEffectiveOwnership = false;
-        }
-        catch (IllegalArgumentException ex)
-        {
-            System.out.printf("%nError: %s%n", ex.getMessage());
-            return;
-        }
+            String formatPlaceholder = "%%-%ds  
%%-12s%%-7s%%-8s%%-16s%%-20s%%-44s%%n";
+            String format = format(formatPlaceholder, maxAddressLength);
 
+            StringBuilder errors = new StringBuilder();
+            boolean showEffectiveOwnership = true;
 
-        System.out.println();
-        for (Entry<String, SetHostStat> entry : 
NodeTool.getOwnershipByDc(probe, resolveIp, tokensToEndpoints, 
ownerships).entrySet())
-            printDc(probe, format, entry.getKey(), endpointsToTokens, 
entry.getValue(),showEffectiveOwnership);
+            if (withPort)
+            {
+                // Calculate per-token ownership of the ring
+                Map<String, Float> ownerships;
+                try
+                {
+                    ownerships = probe.effectiveOwnershipWithPort(keyspace);
+                }
+                catch (IllegalStateException ex)
+                {
+                    ownerships = probe.getOwnershipWithPort();
+                    errors.append("Note: 
").append(ex.getMessage()).append("%n");
+                    showEffectiveOwnership = false;
+                }
+                catch (IllegalArgumentException ex)
+                {
+                    System.out.printf("%nError: %s%n", ex.getMessage());
+                    return;
+                }
+
+
+                System.out.println();
+                for (Entry<String, SetHostStatWithPort> entry : 
NodeTool.getOwnershipByDcWithPort(probe, resolveIp, tokensToEndpoints, 
ownerships).entrySet())
+                    printDc(probe, format, entry.getKey(), endpointsToTokens, 
entry.getValue(), showEffectiveOwnership);
+
+                if (haveVnodes)
+                {
+                    System.out.println("  Warning: \"nodetool ring\" is used 
to output all the tokens of a node.");
+                    System.out.println("  To view status related info of a 
node use \"nodetool status\" instead.\n");
+                }
+
+                System.out.printf("%n  " + errors.toString());
+            }
+            else
+            {
+                // Calculate per-token ownership of the ring
+                Map<InetAddress, Float> ownerships;
+                try
+                {
+                    ownerships = probe.effectiveOwnership(keyspace);
+                }
+                catch (IllegalStateException ex)
+                {
+                    ownerships = probe.getOwnership();
+                    errors.append("Note: 
").append(ex.getMessage()).append("%n");
+                    showEffectiveOwnership = false;
+                }
+                catch (IllegalArgumentException ex)
+                {
+                    System.out.printf("%nError: %s%n", ex.getMessage());
+                    return;
+                }
 
-        if (haveVnodes)
+
+                System.out.println();
+                for (Entry<String, SetHostStat> entry : 
NodeTool.getOwnershipByDc(probe, resolveIp, tokensToEndpoints, 
ownerships).entrySet())
+                    printDc(probe, format, entry.getKey(), endpointsToTokens, 
entry.getValue(), showEffectiveOwnership);
+
+                if (haveVnodes)
+                {
+                    System.out.println("  Warning: \"nodetool ring\" is used 
to output all the tokens of a node.");
+                    System.out.println("  To view status related info of a 
node use \"nodetool status\" instead.\n");
+                }
+
+                System.out.printf("%n  " + errors.toString());
+            }
+        } catch (Exception e)
         {
-            System.out.println("  Warning: \"nodetool ring\" is used to output 
all the tokens of a node.");
-            System.out.println("  To view status related info of a node use 
\"nodetool status\" instead.\n");
+            e.printStackTrace();
+            throw e;
         }
-
-        System.out.printf("%n  " + errors.toString());
     }
 
     private void printDc(NodeProbe probe, String format,
@@ -111,12 +155,12 @@ public class Ring extends NodeToolCmd
                          LinkedHashMultimap<String, String> endpointsToTokens,
                          SetHostStat hoststats,boolean showEffectiveOwnership)
     {
-        Collection<String> liveNodes = probe.getLiveNodes();
-        Collection<String> deadNodes = probe.getUnreachableNodes();
-        Collection<String> joiningNodes = probe.getJoiningNodes();
-        Collection<String> leavingNodes = probe.getLeavingNodes();
-        Collection<String> movingNodes = probe.getMovingNodes();
-        Map<String, String> loadMap = probe.getLoadMap();
+        Collection<String> liveNodes = probe.getLiveNodes(false);
+        Collection<String> deadNodes = probe.getUnreachableNodes(false);
+        Collection<String> joiningNodes = probe.getJoiningNodes(false);
+        Collection<String> leavingNodes = probe.getLeavingNodes(false);
+        Collection<String> movingNodes = probe.getMovingNodes(false);
+        Map<String, String> loadMap = probe.getLoadMap(false);
 
         System.out.println("Datacenter: " + dc);
         System.out.println("==========");
@@ -174,4 +218,73 @@ public class Ring extends NodeToolCmd
         }
         System.out.println();
     }
+
+    private void printDc(NodeProbe probe, String format,
+                         String dc,
+                         LinkedHashMultimap<String, String> endpointsToTokens,
+                         SetHostStatWithPort hoststats,boolean 
showEffectiveOwnership)
+    {
+        Collection<String> liveNodes = probe.getLiveNodes(true);
+        Collection<String> deadNodes = probe.getUnreachableNodes(true);
+        Collection<String> joiningNodes = probe.getJoiningNodes(true);
+        Collection<String> leavingNodes = probe.getLeavingNodes(true);
+        Collection<String> movingNodes = probe.getMovingNodes(true);
+        Map<String, String> loadMap = probe.getLoadMap(true);
+
+        System.out.println("Datacenter: " + dc);
+        System.out.println("==========");
+
+        // get the total amount of replicas for this dc and the last token in 
this dc's ring
+        List<String> tokens = new ArrayList<>();
+        String lastToken = "";
+
+        for (HostStatWithPort stat : hoststats)
+        {
+            tokens.addAll(endpointsToTokens.get(stat.endpoint.toString()));
+            lastToken = tokens.get(tokens.size() - 1);
+        }
+
+        System.out.printf(format, "Address", "Rack", "Status", "State", 
"Load", "Owns", "Token");
+
+        if (hoststats.size() > 1)
+            System.out.printf(format, "", "", "", "", "", "", lastToken);
+        else
+            System.out.println();
+
+        for (HostStatWithPort stat : hoststats)
+        {
+            String endpoint = stat.endpoint.toString();
+            String rack;
+            try
+            {
+                rack = probe.getEndpointSnitchInfoProxy().getRack(endpoint);
+            }
+            catch (UnknownHostException e)
+            {
+                rack = "Unknown";
+            }
+
+            String status = liveNodes.contains(endpoint)
+                            ? "Up"
+                            : deadNodes.contains(endpoint)
+                              ? "Down"
+                              : "?";
+
+            String state = "Normal";
+
+            if (joiningNodes.contains(endpoint))
+                state = "Joining";
+            else if (leavingNodes.contains(endpoint))
+                state = "Leaving";
+            else if (movingNodes.contains(endpoint))
+                state = "Moving";
+
+            String load = loadMap.containsKey(endpoint)
+                          ? loadMap.get(endpoint)
+                          : "?";
+            String owns = stat.owns != null && showEffectiveOwnership? new 
DecimalFormat("##0.00%").format(stat.owns) : "?";
+            System.out.printf(format, stat.ipOrDns(), rack, status, state, 
load, owns, stat.token);
+        }
+        System.out.println();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/SetHostStatWithPort.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/tools/nodetool/SetHostStatWithPort.java 
b/src/java/org/apache/cassandra/tools/nodetool/SetHostStatWithPort.java
new file mode 100644
index 0000000..67cd464
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/SetHostStatWithPort.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public class SetHostStatWithPort implements Iterable<HostStatWithPort>
+{
+    final List<HostStatWithPort> hostStats = new ArrayList<>();
+    final boolean resolveIp;
+
+    public SetHostStatWithPort(boolean resolveIp)
+    {
+        this.resolveIp = resolveIp;
+    }
+
+    public int size()
+    {
+        return hostStats.size();
+    }
+
+    @Override
+    public Iterator<HostStatWithPort> iterator()
+    {
+        return hostStats.iterator();
+    }
+
+    public void add(String token, String host, Map<String, Float> ownerships) 
throws UnknownHostException
+    {
+        InetAddressAndPort endpoint = InetAddressAndPort.getByName(host);
+        Float owns = ownerships.get(endpoint);
+        hostStats.add(new HostStatWithPort(token, endpoint, resolveIp, owns));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/Status.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Status.java 
b/src/java/org/apache/cassandra/tools/nodetool/Status.java
index f3ef606..49724a5 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Status.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Status.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.SortedMap;
 
 import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool;
 import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
@@ -55,72 +56,130 @@ public class Status extends NodeToolCmd
     @Override
     public void execute(NodeProbe probe)
     {
-        joiningNodes = probe.getJoiningNodes();
-        leavingNodes = probe.getLeavingNodes();
-        movingNodes = probe.getMovingNodes();
-        loadMap = probe.getLoadMap();
-        Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap();
-        liveNodes = probe.getLiveNodes();
-        unreachableNodes = probe.getUnreachableNodes();
-        hostIDMap = probe.getHostIdMap();
+        joiningNodes = probe.getJoiningNodes(withPort);
+        leavingNodes = probe.getLeavingNodes(withPort);
+        movingNodes = probe.getMovingNodes(withPort);
+        loadMap = probe.getLoadMap(withPort);
+        Map<String, String> tokensToEndpoints = 
probe.getTokenToEndpointMap(withPort);
+        liveNodes = probe.getLiveNodes(withPort);
+        unreachableNodes = probe.getUnreachableNodes(withPort);
+        hostIDMap = probe.getHostIdMap(withPort);
         epSnitchInfo = probe.getEndpointSnitchInfoProxy();
 
         StringBuilder errors = new StringBuilder();
 
-        Map<InetAddress, Float> ownerships = null;
-        boolean hasEffectiveOwns = false;
-        try
-        {
-            ownerships = probe.effectiveOwnership(keyspace);
-            hasEffectiveOwns = true;
-        }
-        catch (IllegalStateException e)
-        {
-            ownerships = probe.getOwnership();
-            errors.append("Note: ").append(e.getMessage()).append("%n");
-        }
-        catch (IllegalArgumentException ex)
+        if (withPort)
         {
-            System.out.printf("%nError: %s%n", ex.getMessage());
-            System.exit(1);
-        }
+            Map<String, Float> ownerships = null;
+            boolean hasEffectiveOwns = false;
+            try
+            {
+                ownerships = probe.effectiveOwnershipWithPort(keyspace);
+                hasEffectiveOwns = true;
+            }
+            catch (IllegalStateException e)
+            {
+                ownerships = probe.getOwnershipWithPort();
+                errors.append("Note: ").append(e.getMessage()).append("%n");
+            }
+            catch (IllegalArgumentException ex)
+            {
+                System.out.printf("%nError: %s%n", ex.getMessage());
+                System.exit(1);
+            }
 
-        SortedMap<String, SetHostStat> dcs = NodeTool.getOwnershipByDc(probe, 
resolveIp, tokensToEndpoints, ownerships);
+            SortedMap<String, SetHostStatWithPort> dcs = 
NodeTool.getOwnershipByDcWithPort(probe, resolveIp, tokensToEndpoints, 
ownerships);
 
-        // More tokens than nodes (aka vnodes)?
-        if (dcs.values().size() < tokensToEndpoints.keySet().size())
-            isTokenPerNode = false;
+            // More tokens than nodes (aka vnodes)?
+            if (dcs.values().size() < tokensToEndpoints.keySet().size())
+                isTokenPerNode = false;
 
-        findMaxAddressLength(dcs);
+            findMaxAddressLengthWithPort(dcs);
 
-        // Datacenters
-        for (Map.Entry<String, SetHostStat> dc : dcs.entrySet())
-        {
-            String dcHeader = String.format("Datacenter: %s%n", dc.getKey());
-            System.out.printf(dcHeader);
-            for (int i = 0; i < (dcHeader.length() - 1); i++) 
System.out.print('=');
-            System.out.println();
+            // Datacenters
+            for (Map.Entry<String, SetHostStatWithPort> dc : dcs.entrySet())
+            {
+                String dcHeader = String.format("Datacenter: %s%n", 
dc.getKey());
+                System.out.printf(dcHeader);
+                for (int i = 0; i < (dcHeader.length() - 1); i++) 
System.out.print('=');
+                System.out.println();
 
-            // Legend
-            System.out.println("Status=Up/Down");
-            System.out.println("|/ State=Normal/Leaving/Joining/Moving");
+                // Legend
+                System.out.println("Status=Up/Down");
+                System.out.println("|/ State=Normal/Leaving/Joining/Moving");
 
-            printNodesHeader(hasEffectiveOwns, isTokenPerNode);
+                printNodesHeader(hasEffectiveOwns, isTokenPerNode);
 
-            ArrayListMultimap<InetAddress, HostStat> hostToTokens = 
ArrayListMultimap.create();
-            for (HostStat stat : dc.getValue())
-                hostToTokens.put(stat.endpoint, stat);
+                ArrayListMultimap<InetAddressAndPort, HostStatWithPort> 
hostToTokens = ArrayListMultimap.create();
+                for (HostStatWithPort stat : dc.getValue())
+                    hostToTokens.put(stat.endpoint, stat);
 
-            for (InetAddress endpoint : hostToTokens.keySet())
-            {
-                Float owns = ownerships.get(endpoint);
-                List<HostStat> tokens = hostToTokens.get(endpoint);
-                printNode(endpoint.getHostAddress(), owns, tokens, 
hasEffectiveOwns, isTokenPerNode);
+                for (InetAddressAndPort endpoint : hostToTokens.keySet())
+                {
+                    Float owns = ownerships.get(endpoint);
+                    List<HostStatWithPort> tokens = hostToTokens.get(endpoint);
+                    printNodeWithPort(endpoint.toString(), owns, tokens, 
hasEffectiveOwns, isTokenPerNode);
+                }
             }
+
+            System.out.printf("%n" + errors.toString());
         }
+        else
+        {
+            Map<InetAddress, Float> ownerships = null;
+            boolean hasEffectiveOwns = false;
+            try
+            {
+                ownerships = probe.effectiveOwnership(keyspace);
+                hasEffectiveOwns = true;
+            }
+            catch (IllegalStateException e)
+            {
+                ownerships = probe.getOwnership();
+                errors.append("Note: ").append(e.getMessage()).append("%n");
+            }
+            catch (IllegalArgumentException ex)
+            {
+                System.out.printf("%nError: %s%n", ex.getMessage());
+                System.exit(1);
+            }
+
+            SortedMap<String, SetHostStat> dcs = 
NodeTool.getOwnershipByDc(probe, resolveIp, tokensToEndpoints, ownerships);
 
-        System.out.printf("%n" + errors.toString());
+            // More tokens than nodes (aka vnodes)?
+            if (dcs.values().size() < tokensToEndpoints.keySet().size())
+                isTokenPerNode = false;
 
+            findMaxAddressLength(dcs);
+
+            // Datacenters
+            for (Map.Entry<String, SetHostStat> dc : dcs.entrySet())
+            {
+                String dcHeader = String.format("Datacenter: %s%n", 
dc.getKey());
+                System.out.printf(dcHeader);
+                for (int i = 0; i < (dcHeader.length() - 1); i++) 
System.out.print('=');
+                System.out.println();
+
+                // Legend
+                System.out.println("Status=Up/Down");
+                System.out.println("|/ State=Normal/Leaving/Joining/Moving");
+
+                printNodesHeader(hasEffectiveOwns, isTokenPerNode);
+
+                ArrayListMultimap<InetAddress, HostStat> hostToTokens = 
ArrayListMultimap.create();
+                for (HostStat stat : dc.getValue())
+                    hostToTokens.put(stat.endpoint, stat);
+
+                for (InetAddress endpoint : hostToTokens.keySet())
+                {
+                    Float owns = ownerships.get(endpoint);
+                    List<HostStat> tokens = hostToTokens.get(endpoint);
+                    printNode(endpoint.getHostAddress(), owns, tokens, 
hasEffectiveOwns, isTokenPerNode);
+                }
+            }
+
+            System.out.printf("%n" + errors.toString());
+        }
     }
 
     private void findMaxAddressLength(Map<String, SetHostStat> dcs)
@@ -135,6 +194,18 @@ public class Status extends NodeToolCmd
         }
     }
 
+    private void findMaxAddressLengthWithPort(Map<String, SetHostStatWithPort> 
dcs)
+    {
+        maxAddressLength = 0;
+        for (Map.Entry<String, SetHostStatWithPort> dc : dcs.entrySet())
+        {
+            for (HostStatWithPort stat : dc.getValue())
+            {
+                maxAddressLength = Math.max(maxAddressLength, 
stat.ipOrDns().length());
+            }
+        }
+    }
+
     private void printNodesHeader(boolean hasEffectiveOwns, boolean 
isTokenPerNode)
     {
         String fmt = getFormat(hasEffectiveOwns, isTokenPerNode);
@@ -177,6 +248,37 @@ public class Status extends NodeToolCmd
             System.out.printf(fmt, status, state, endpointDns, load, 
tokens.size(), strOwns, hostID, rack);
     }
 
+    private void printNodeWithPort(String endpoint, Float owns, 
List<HostStatWithPort> tokens, boolean hasEffectiveOwns, boolean isTokenPerNode)
+    {
+        String status, state, load, strOwns, hostID, rack, fmt;
+        fmt = getFormat(hasEffectiveOwns, isTokenPerNode);
+        if (liveNodes.contains(endpoint)) status = "U";
+        else if (unreachableNodes.contains(endpoint)) status = "D";
+        else status = "?";
+        if (joiningNodes.contains(endpoint)) state = "J";
+        else if (leavingNodes.contains(endpoint)) state = "L";
+        else if (movingNodes.contains(endpoint)) state = "M";
+        else state = "N";
+
+        load = loadMap.containsKey(endpoint) ? loadMap.get(endpoint) : "?";
+        strOwns = owns != null && hasEffectiveOwns ? new 
DecimalFormat("##0.0%").format(owns) : "?";
+        hostID = hostIDMap.get(endpoint);
+
+        try
+        {
+            rack = epSnitchInfo.getRack(endpoint);
+        } catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        String endpointDns = tokens.get(0).ipOrDns();
+        if (isTokenPerNode)
+            System.out.printf(fmt, status, state, endpointDns, load, strOwns, 
hostID, tokens.get(0).token, rack);
+        else
+            System.out.printf(fmt, status, state, endpointDns, load, 
tokens.size(), strOwns, hostID, rack);
+    }
+
     private String getFormat(
             boolean hasEffectiveOwns,
             boolean isTokenPerNode)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java 
b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
index 9230d38..bf95080 100644
--- a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@ -29,7 +29,7 @@ class ExpiredTraceState extends TraceState
 
     ExpiredTraceState(TraceState delegate)
     {
-        super(FBUtilities.getBroadcastAddress(), delegate.sessionId, 
delegate.traceType);
+        super(FBUtilities.getBroadcastAddressAndPort(), delegate.sessionId, 
delegate.traceType);
         this.delegate = delegate;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java 
b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index 20c992c..487ed65 100644
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@ -48,29 +48,31 @@ public final class TraceKeyspace
 
     private static final TableMetadata Sessions =
         parse(SESSIONS,
-              "tracing sessions",
-              "CREATE TABLE %s ("
-              + "session_id uuid,"
-              + "command text,"
-              + "client inet,"
-              + "coordinator inet,"
-              + "duration int,"
-              + "parameters map<text, text>,"
-              + "request text,"
-              + "started_at timestamp,"
-              + "PRIMARY KEY ((session_id)))");
+                "tracing sessions",
+                "CREATE TABLE %s ("
+                + "session_id uuid,"
+                + "command text,"
+                + "client inet,"
+                + "coordinator inet,"
+                + "coordinator_port int,"
+                + "duration int,"
+                + "parameters map<text, text>,"
+                + "request text,"
+                + "started_at timestamp,"
+                + "PRIMARY KEY ((session_id)))");
 
     private static final TableMetadata Events =
         parse(EVENTS,
-              "tracing events",
-              "CREATE TABLE %s ("
-              + "session_id uuid,"
-              + "event_id timeuuid,"
-              + "activity text,"
-              + "source inet,"
-              + "source_elapsed int,"
-              + "thread text,"
-              + "PRIMARY KEY ((session_id), event_id))");
+                "tracing events",
+                "CREATE TABLE %s ("
+                + "session_id uuid,"
+                + "event_id timeuuid,"
+                + "activity text,"
+                + "source inet,"
+                + "source_port int,"
+                + "source_elapsed int,"
+                + "thread text,"
+                + "PRIMARY KEY ((session_id), event_id))");
 
     private static TableMetadata parse(String table, String description, 
String cql)
     {
@@ -100,7 +102,8 @@ public final class TraceKeyspace
         builder.row()
                .ttl(ttl)
                .add("client", client)
-               .add("coordinator", FBUtilities.getBroadcastAddress())
+               .add("coordinator", 
FBUtilities.getBroadcastAddressAndPort().address)
+               .add("coordinator_port", 
FBUtilities.getBroadcastAddressAndPort().port)
                .add("request", request)
                .add("started_at", new Date(startedAt))
                .add("command", command)
@@ -125,7 +128,8 @@ public final class TraceKeyspace
                                               .ttl(ttl);
 
         rowBuilder.add("activity", message)
-                  .add("source", FBUtilities.getBroadcastAddress())
+                  .add("source", 
FBUtilities.getBroadcastAddressAndPort().address)
+                  .add("source_port", 
FBUtilities.getBroadcastAddressAndPort().port)
                   .add("thread", threadName);
 
         if (elapsed >= 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java 
b/src/java/org/apache/cassandra/tracing/TraceState.java
index b4eff6b..a53846c 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.tracing;
 
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.UUID;
@@ -28,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import com.google.common.base.Stopwatch;
 import org.slf4j.helpers.MessageFormatter;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.progress.ProgressEvent;
 import org.apache.cassandra.utils.progress.ProgressEventNotifier;
@@ -40,7 +40,7 @@ import org.apache.cassandra.utils.progress.ProgressListener;
 public abstract class TraceState implements ProgressEventNotifier
 {
     public final UUID sessionId;
-    public final InetAddress coordinator;
+    public final InetAddressAndPort coordinator;
     public final Stopwatch watch;
     public final ByteBuffer sessionIdBytes;
     public final Tracing.TraceType traceType;
@@ -63,7 +63,7 @@ public abstract class TraceState implements 
ProgressEventNotifier
     // See CASSANDRA-7626 for more details.
     private final AtomicInteger references = new AtomicInteger(1);
 
-    protected TraceState(InetAddress coordinator, UUID sessionId, 
Tracing.TraceType traceType)
+    protected TraceState(InetAddressAndPort coordinator, UUID sessionId, 
Tracing.TraceType traceType)
     {
         assert coordinator != null;
         assert sessionId != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java 
b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
index 349000a..2722406 100644
--- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
+++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.tracing;
 
-import java.net.InetAddress;
 import java.util.Collections;
 import java.util.Set;
 import java.util.UUID;
@@ -36,6 +35,7 @@ import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.WrappedRunnable;
@@ -54,7 +54,7 @@ public class TraceStateImpl extends TraceState
 
     private final Set<Future<?>> pendingFutures = 
ConcurrentHashMap.newKeySet();
 
-    public TraceStateImpl(InetAddress coordinator, UUID sessionId, 
Tracing.TraceType traceType)
+    public TraceStateImpl(InetAddressAndPort coordinator, UUID sessionId, 
Tracing.TraceType traceType)
     {
         super(coordinator, sessionId, traceType);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java 
b/src/java/org/apache/cassandra/tracing/Tracing.java
index 4cdddba..55e36a4 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -19,15 +19,17 @@
  */
 package org.apache.cassandra.tracing;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,8 +37,13 @@ import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.concurrent.ExecutorLocal;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.UUIDGen;
@@ -48,8 +55,23 @@ import org.apache.cassandra.utils.UUIDGen;
  */
 public abstract class Tracing implements ExecutorLocal<TraceState>
 {
-    public static final String TRACE_HEADER = "TraceSession";
-    public static final String TRACE_TYPE = "TraceType";
+    public static final IVersionedSerializer<TraceType> traceTypeSerializer = 
new IVersionedSerializer<TraceType>()
+    {
+        public void serialize(TraceType traceType, DataOutputPlus out, int 
version) throws IOException
+        {
+            out.write((byte)traceType.ordinal());
+        }
+
+        public TraceType deserialize(DataInputPlus in, int version) throws 
IOException
+        {
+            return TraceType.deserialize(in.readByte());
+        }
+
+        public long serializedSize(TraceType traceType, int version)
+        {
+            return 1;
+        }
+    };
 
     public enum TraceType
     {
@@ -83,7 +105,7 @@ public abstract class Tracing implements 
ExecutorLocal<TraceState>
 
     protected static final Logger logger = 
LoggerFactory.getLogger(Tracing.class);
 
-    private final InetAddress localAddress = FBUtilities.getLocalAddress();
+    private final InetAddressAndPort localAddress = 
FBUtilities.getLocalAddressAndPort();
 
     private final FastThreadLocal<TraceState> state = new FastThreadLocal<>();
 
@@ -228,21 +250,19 @@ public abstract class Tracing implements 
ExecutorLocal<TraceState>
      */
     public TraceState initializeFromMessage(final MessageIn<?> message)
     {
-        final byte[] sessionBytes = message.parameters.get(TRACE_HEADER);
+        final UUID sessionId = 
(UUID)message.parameters.get(ParameterType.TRACE_SESSION);
 
-        if (sessionBytes == null)
+        if (sessionId == null)
             return null;
 
-        assert sessionBytes.length == 16;
-        UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
         TraceState ts = get(sessionId);
         if (ts != null && ts.acquireReference())
             return ts;
 
-        byte[] tmpBytes;
+        TraceType tmpType;
         TraceType traceType = TraceType.QUERY;
-        if ((tmpBytes = message.parameters.get(TRACE_TYPE)) != null)
-            traceType = TraceType.deserialize(tmpBytes[0]);
+        if ((tmpType = 
(TraceType)message.parameters.get(ParameterType.TRACE_TYPE)) != null)
+            traceType = tmpType;
 
         if (message.verb == MessagingService.Verb.REQUEST_RESPONSE)
         {
@@ -257,16 +277,16 @@ public abstract class Tracing implements 
ExecutorLocal<TraceState>
         }
     }
 
-    public Map<String, byte[]> getTraceHeaders()
+    public List<Object> getTraceHeaders()
     {
         assert isTracing();
 
-        return ImmutableMap.of(
-                TRACE_HEADER, 
UUIDGen.decompose(Tracing.instance.getSessionId()),
-                TRACE_TYPE, new byte[] { 
Tracing.TraceType.serialize(Tracing.instance.getTraceType()) });
+        return ImmutableList.of(
+        ParameterType.TRACE_SESSION, Tracing.instance.getSessionId(),
+        ParameterType.TRACE_TYPE, Tracing.instance.getTraceType());
     }
 
-    protected abstract TraceState newTraceState(InetAddress coordinator, UUID 
sessionId, Tracing.TraceType traceType);
+    protected abstract TraceState newTraceState(InetAddressAndPort 
coordinator, UUID sessionId, Tracing.TraceType traceType);
 
     // repair just gets a varargs method since it's so heavyweight anyway
     public static void traceRepair(String format, Object... args)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tracing/TracingImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TracingImpl.java 
b/src/java/org/apache/cassandra/tracing/TracingImpl.java
index 789216e..1e32f10 100644
--- a/src/java/org/apache/cassandra/tracing/TracingImpl.java
+++ b/src/java/org/apache/cassandra/tracing/TracingImpl.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 
@@ -93,7 +94,7 @@ class TracingImpl extends Tracing
     }
 
     @Override
-    protected TraceState newTraceState(InetAddress coordinator, UUID 
sessionId, TraceType traceType)
+    protected TraceState newTraceState(InetAddressAndPort coordinator, UUID 
sessionId, TraceType traceType)
     {
         return new TraceStateImpl(coordinator, sessionId, traceType);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java 
b/src/java/org/apache/cassandra/transport/Event.java
index ed77e59..77edf8a 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import com.google.common.base.Objects;
 import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 public abstract class Event
 {
@@ -110,19 +111,19 @@ public abstract class Event
             this.change = change;
         }
 
-        public static TopologyChange newNode(InetAddress host, int port)
+        public static TopologyChange newNode(InetAddressAndPort address)
         {
-            return new TopologyChange(Change.NEW_NODE, new 
InetSocketAddress(host, port));
+            return new TopologyChange(Change.NEW_NODE, new 
InetSocketAddress(address.address, address.port));
         }
 
-        public static TopologyChange removedNode(InetAddress host, int port)
+        public static TopologyChange removedNode(InetAddressAndPort address)
         {
-            return new TopologyChange(Change.REMOVED_NODE, new 
InetSocketAddress(host, port));
+            return new TopologyChange(Change.REMOVED_NODE, new 
InetSocketAddress(address.address, address.port));
         }
 
-        public static TopologyChange movedNode(InetAddress host, int port)
+        public static TopologyChange movedNode(InetAddressAndPort address)
         {
-            return new TopologyChange(Change.MOVED_NODE, new 
InetSocketAddress(host, port));
+            return new TopologyChange(Change.MOVED_NODE, new 
InetSocketAddress(address.address, address.port));
         }
 
         // Assumes the type has already been deserialized
@@ -181,14 +182,14 @@ public abstract class Event
             this.status = status;
         }
 
-        public static StatusChange nodeUp(InetAddress host, int port)
+        public static StatusChange nodeUp(InetAddressAndPort address)
         {
-            return new StatusChange(Status.UP, new InetSocketAddress(host, 
port));
+            return new StatusChange(Status.UP, new 
InetSocketAddress(address.address, address.port));
         }
 
-        public static StatusChange nodeDown(InetAddress host, int port)
+        public static StatusChange nodeDown(InetAddressAndPort address)
         {
-            return new StatusChange(Status.DOWN, new InetSocketAddress(host, 
port));
+            return new StatusChange(Status.DOWN, new 
InetSocketAddress(address.address, address.port));
         }
 
         // Assumes the type has already been deserialized

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/transport/ProtocolVersion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ProtocolVersion.java 
b/src/java/org/apache/cassandra/transport/ProtocolVersion.java
index cd73c86..838176a 100644
--- a/src/java/org/apache/cassandra/transport/ProtocolVersion.java
+++ b/src/java/org/apache/cassandra/transport/ProtocolVersion.java
@@ -43,7 +43,8 @@ public enum ProtocolVersion implements 
Comparable<ProtocolVersion>
     V2(2, "v2", false), // no longer supported
     V3(3, "v3", false),
     V4(4, "v4", false),
-    V5(5, "v5-beta", true);
+    V5(5, "v5", false),
+    V6(6, "v6-beta", true);
 
     /** The version number */
     private final int num;
@@ -62,7 +63,7 @@ public enum ProtocolVersion implements 
Comparable<ProtocolVersion>
     }
 
     /** The supported versions stored as an array, these should be private and 
are required for fast decoding*/
-    private final static ProtocolVersion[] SUPPORTED_VERSIONS = new 
ProtocolVersion[] { V3, V4, V5 };
+    private final static ProtocolVersion[] SUPPORTED_VERSIONS = new 
ProtocolVersion[] { V3, V4, V5, V6 };
     final static ProtocolVersion MIN_SUPPORTED_VERSION = SUPPORTED_VERSIONS[0];
     final static ProtocolVersion MAX_SUPPORTED_VERSION = 
SUPPORTED_VERSIONS[SUPPORTED_VERSIONS.length - 1];
 
@@ -73,8 +74,8 @@ public enum ProtocolVersion implements 
Comparable<ProtocolVersion>
     public final static EnumSet<ProtocolVersion> UNSUPPORTED = 
EnumSet.complementOf(SUPPORTED);
 
     /** The preferred versions */
-    public final static ProtocolVersion CURRENT = V4;
-    public final static Optional<ProtocolVersion> BETA = Optional.of(V5);
+    public final static ProtocolVersion CURRENT = V5;
+    public final static Optional<ProtocolVersion> BETA = Optional.of(V6);
 
     public static List<String> supportedVersions()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java 
b/src/java/org/apache/cassandra/transport/Server.java
index d3f1c2c..cd04edc 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -51,6 +51,7 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaChangeListener;
 import org.apache.cassandra.security.SSLFactory;
@@ -454,51 +455,32 @@ public class Server implements CassandraDaemon.Server
 
         // We keep track of the latest status change events we have sent to 
avoid sending duplicates
         // since StorageService may send duplicate notifications 
(CASSANDRA-7816, CASSANDRA-8236, CASSANDRA-9156)
-        private final Map<InetAddress, LatestEvent> latestEvents = new 
ConcurrentHashMap<>();
+        private final Map<InetAddressAndPort, LatestEvent> latestEvents = new 
ConcurrentHashMap<>();
         // We also want to delay delivering a NEW_NODE notification until the 
new node has set its RPC ready
         // state. This tracks the endpoints which have joined, but not yet 
signalled they're ready for clients
-        private final Set<InetAddress> endpointsPendingJoinedNotification = 
ConcurrentHashMap.newKeySet();
-
-
-        private static final InetAddress bindAll;
-        static
-        {
-            try
-            {
-                bindAll = InetAddress.getByAddress(new byte[4]);
-            }
-            catch (UnknownHostException e)
-            {
-                throw new AssertionError(e);
-            }
-        }
+        private final Set<InetAddressAndPort> 
endpointsPendingJoinedNotification = ConcurrentHashMap.newKeySet();
 
         private EventNotifier(Server server)
         {
             this.server = server;
         }
 
-        private InetAddress getRpcAddress(InetAddress endpoint)
+        private InetAddressAndPort getNativeAddress(InetAddressAndPort 
endpoint)
         {
             try
             {
-                InetAddress rpcAddress = 
InetAddress.getByName(StorageService.instance.getRpcaddress(endpoint));
-                // If rpcAddress == 0.0.0.0 (i.e. bound on all addresses), 
returning that is not very helpful,
-                // so return the internal address (which is ok since "we're 
bound on all addresses").
-                // Note that after all nodes are running a version that 
includes CASSANDRA-5899, rpcAddress should
-                // never be 0.0.0.0, so this can eventually be removed.
-                return rpcAddress.equals(bindAll) ? endpoint : rpcAddress;
+                return 
InetAddressAndPort.getByName(StorageService.instance.getNativeaddress(endpoint, 
true));
             }
             catch (UnknownHostException e)
             {
                 // That should not happen, so log an error, but return the
                 // endpoint address since there's a good change this is right
                 logger.error("Problem retrieving RPC address for {}", 
endpoint, e);
-                return endpoint;
+                return 
InetAddressAndPort.getByAddressOverrideDefaults(endpoint.address, 
DatabaseDescriptor.getNativeTransportPort());
             }
         }
 
-        private void send(InetAddress endpoint, Event.NodeEvent event)
+        private void send(InetAddressAndPort endpoint, Event.NodeEvent event)
         {
             if (logger.isTraceEnabled())
                 logger.trace("Sending event for endpoint {}, rpc address {}", 
endpoint, event.nodeAddress());
@@ -508,8 +490,8 @@ public class Server implements CassandraDaemon.Server
             // then don't send the notification. This covers the case of 
rpc_address set to "localhost",
             // which is not useful to any driver and in fact may cauase 
serious problems to some drivers,
             // see CASSANDRA-10052
-            if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
-                
event.nodeAddress().equals(FBUtilities.getBroadcastRpcAddress()))
+            if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) &&
+                
event.nodeAddress().equals(FBUtilities.getJustBroadcastNativeAddress()))
                 return;
 
             send(event);
@@ -520,38 +502,38 @@ public class Server implements CassandraDaemon.Server
             server.connectionTracker.send(event);
         }
 
-        public void onJoinCluster(InetAddress endpoint)
+        public void onJoinCluster(InetAddressAndPort endpoint)
         {
             if (!StorageService.instance.isRpcReady(endpoint))
                 endpointsPendingJoinedNotification.add(endpoint);
             else
-                onTopologyChange(endpoint, 
Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
+                onTopologyChange(endpoint, 
Event.TopologyChange.newNode(getNativeAddress(endpoint)));
         }
 
-        public void onLeaveCluster(InetAddress endpoint)
+        public void onLeaveCluster(InetAddressAndPort endpoint)
         {
-            onTopologyChange(endpoint, 
Event.TopologyChange.removedNode(getRpcAddress(endpoint), 
server.socket.getPort()));
+            onTopologyChange(endpoint, 
Event.TopologyChange.removedNode(getNativeAddress(endpoint)));
         }
 
-        public void onMove(InetAddress endpoint)
+        public void onMove(InetAddressAndPort endpoint)
         {
-            onTopologyChange(endpoint, 
Event.TopologyChange.movedNode(getRpcAddress(endpoint), 
server.socket.getPort()));
+            onTopologyChange(endpoint, 
Event.TopologyChange.movedNode(getNativeAddress(endpoint)));
         }
 
-        public void onUp(InetAddress endpoint)
+        public void onUp(InetAddressAndPort endpoint)
         {
             if (endpointsPendingJoinedNotification.remove(endpoint))
                 onJoinCluster(endpoint);
 
-            onStatusChange(endpoint, 
Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
+            onStatusChange(endpoint, 
Event.StatusChange.nodeUp(getNativeAddress(endpoint)));
         }
 
-        public void onDown(InetAddress endpoint)
+        public void onDown(InetAddressAndPort endpoint)
         {
-            onStatusChange(endpoint, 
Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
+            onStatusChange(endpoint, 
Event.StatusChange.nodeDown(getNativeAddress(endpoint)));
         }
 
-        private void onTopologyChange(InetAddress endpoint, 
Event.TopologyChange event)
+        private void onTopologyChange(InetAddressAndPort endpoint, 
Event.TopologyChange event)
         {
             if (logger.isTraceEnabled())
                 logger.trace("Topology changed event : {}, {}", endpoint, 
event.change);
@@ -565,7 +547,7 @@ public class Server implements CassandraDaemon.Server
             }
         }
 
-        private void onStatusChange(InetAddress endpoint, Event.StatusChange 
event)
+        private void onStatusChange(InetAddressAndPort endpoint, 
Event.StatusChange event)
         {
             if (logger.isTraceEnabled())
                 logger.trace("Status changed event : {}, {}", endpoint, 
event.status);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 9163d56..84af41c 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.cql3.functions.FunctionName;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.transport.*;
 import org.apache.cassandra.utils.MD5Digest;
 
@@ -88,14 +89,14 @@ public class ErrorMessage extends Message.Response
                         // The number of failures is also present in protocol 
v5, but used instead to specify the size of the failure map
                         int failure = body.readInt();
 
-                        Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint = new ConcurrentHashMap<>();
+                        Map<InetAddressAndPort, RequestFailureReason> 
failureReasonByEndpoint = new ConcurrentHashMap<>();
                         if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
                         {
                             for (int i = 0; i < failure; i++)
                             {
                                 InetAddress endpoint = 
CBUtil.readInetAddr(body);
                                 RequestFailureReason failureReason = 
RequestFailureReason.fromCode(body.readUnsignedShort());
-                                failureReasonByEndpoint.put(endpoint, 
failureReason);
+                                
failureReasonByEndpoint.put(InetAddressAndPort.getByAddress(endpoint), 
failureReason);
                             }
                         }
 
@@ -195,9 +196,9 @@ public class ErrorMessage extends Message.Response
 
                         if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
                         {
-                            for (Map.Entry<InetAddress, RequestFailureReason> 
entry : rfe.failureReasonByEndpoint.entrySet())
+                            for (Map.Entry<InetAddressAndPort, 
RequestFailureReason> entry : rfe.failureReasonByEndpoint.entrySet())
                             {
-                                CBUtil.writeInetAddr(entry.getKey(), dest);
+                                CBUtil.writeInetAddr(entry.getKey().address, 
dest);
                                 dest.writeShort(entry.getValue().code);
                             }
                         }
@@ -260,9 +261,9 @@ public class ErrorMessage extends Message.Response
 
                         if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
                         {
-                            for (Map.Entry<InetAddress, RequestFailureReason> 
entry : rfe.failureReasonByEndpoint.entrySet())
+                            for (Map.Entry<InetAddressAndPort, 
RequestFailureReason> entry : rfe.failureReasonByEndpoint.entrySet())
                             {
-                                size += CBUtil.sizeOfInetAddr(entry.getKey());
+                                size += 
CBUtil.sizeOfInetAddr(entry.getKey().address);
                                 size += 2; // RequestFailureReason code
                             }
                         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 1cb59d4..3ca8b89 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -58,6 +58,7 @@ import 
org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.AsyncOneResponse;
 
 import org.codehaus.jackson.JsonFactory;
@@ -78,7 +79,10 @@ public class FBUtilities
 
     private static volatile InetAddress localInetAddress;
     private static volatile InetAddress broadcastInetAddress;
-    private static volatile InetAddress broadcastRpcAddress;
+    private static volatile InetAddress broadcastNativeAddress;
+    private static volatile InetAddressAndPort broadcastNativeAddressAndPort;
+    private static volatile InetAddressAndPort broadcastInetAddressAndPort;
+    private static volatile InetAddressAndPort localInetAddressAndPort;
 
     public static int getAvailableProcessors()
     {
@@ -92,9 +96,9 @@ public class FBUtilities
     public static final int MAX_UNSIGNED_SHORT = 0xFFFF;
 
     /**
-     * Please use getBroadcastAddress instead. You need this only when you 
have to listen/connect.
+     * Please use getJustBroadcastAddress instead. You need this only when you 
have to listen/connect.
      */
-    public static InetAddress getLocalAddress()
+    public static InetAddress getJustLocalAddress()
     {
         if (localInetAddress == null)
             try
@@ -110,30 +114,57 @@ public class FBUtilities
         return localInetAddress;
     }
 
-    public static InetAddress getBroadcastAddress()
+    public static InetAddressAndPort getLocalAddressAndPort()
+    {
+        if (localInetAddressAndPort == null)
+        {
+            localInetAddressAndPort = 
InetAddressAndPort.getByAddress(getJustLocalAddress());
+        }
+        return localInetAddressAndPort;
+    }
+
+    public static InetAddress getJustBroadcastAddress()
     {
         if (broadcastInetAddress == null)
             broadcastInetAddress = DatabaseDescriptor.getBroadcastAddress() == 
null
-                                 ? getLocalAddress()
+                                 ? getJustLocalAddress()
                                  : DatabaseDescriptor.getBroadcastAddress();
         return broadcastInetAddress;
     }
 
+    public static InetAddressAndPort getBroadcastAddressAndPort()
+    {
+        if (broadcastInetAddressAndPort == null)
+        {
+            broadcastInetAddressAndPort = 
InetAddressAndPort.getByAddress(getJustBroadcastAddress());
+        }
+        return broadcastInetAddressAndPort;
+    }
+
     /**
      * <b>THIS IS FOR TESTING ONLY!!</b>
      */
     public static void setBroadcastInetAddress(InetAddress addr)
     {
         broadcastInetAddress = addr;
+        broadcastInetAddressAndPort = 
InetAddressAndPort.getByAddress(broadcastInetAddress);
     }
 
-    public static InetAddress getBroadcastRpcAddress()
+    public static InetAddress getJustBroadcastNativeAddress()
     {
-        if (broadcastRpcAddress == null)
-            broadcastRpcAddress = DatabaseDescriptor.getBroadcastRpcAddress() 
== null
+        if (broadcastNativeAddress == null)
+            broadcastNativeAddress = 
DatabaseDescriptor.getBroadcastRpcAddress() == null
                                    ? DatabaseDescriptor.getRpcAddress()
                                    : 
DatabaseDescriptor.getBroadcastRpcAddress();
-        return broadcastRpcAddress;
+        return broadcastNativeAddress;
+    }
+
+    public static InetAddressAndPort getBroadcastNativeAddressAndPort()
+    {
+        if (broadcastNativeAddressAndPort == null)
+            broadcastNativeAddressAndPort = 
InetAddressAndPort.getByAddressOverrideDefaults(getJustBroadcastNativeAddress(),
+                                                                               
              DatabaseDescriptor.getNativeTransportPort());
+        return broadcastNativeAddressAndPort;
     }
 
     public static Collection<InetAddress> getAllLocalAddresses()
@@ -838,7 +869,9 @@ public class FBUtilities
     public static void reset()
     {
         localInetAddress = null;
+        localInetAddressAndPort = null;
         broadcastInetAddress = null;
-        broadcastRpcAddress = null;
+        broadcastInetAddressAndPort = null;
+        broadcastNativeAddress = null;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/utils/JMXServerUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/JMXServerUtils.java 
b/src/java/org/apache/cassandra/utils/JMXServerUtils.java
index e78ed01..bb5c3ac 100644
--- a/src/java/org/apache/cassandra/utils/JMXServerUtils.java
+++ b/src/java/org/apache/cassandra/utils/JMXServerUtils.java
@@ -236,7 +236,7 @@ public class JMXServerUtils
         String hostName;
         if (serverAddress == null)
         {
-            hostName = FBUtilities.getBroadcastAddress() instanceof 
Inet6Address ? "[::]" : "0.0.0.0";
+            hostName = FBUtilities.getJustBroadcastAddress() instanceof 
Inet6Address ? "[::]" : "0.0.0.0";
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/utils/Mx4jTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Mx4jTool.java 
b/src/java/org/apache/cassandra/utils/Mx4jTool.java
index cd42aca..5baaea2 100644
--- a/src/java/org/apache/cassandra/utils/Mx4jTool.java
+++ b/src/java/org/apache/cassandra/utils/Mx4jTool.java
@@ -79,7 +79,7 @@ public class Mx4jTool
     {
         String sAddress = System.getProperty("mx4jaddress");
         if (StringUtils.isEmpty(sAddress))
-            sAddress = FBUtilities.getBroadcastAddress().getHostAddress();
+            sAddress = 
FBUtilities.getBroadcastAddressAndPort().address.getHostAddress();
         return sAddress;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to