http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/LoaderOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java b/src/java/org/apache/cassandra/tools/LoaderOptions.java index c821e6a..4646ba4 100644 --- a/src/java/org/apache/cassandra/tools/LoaderOptions.java +++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java @@ -27,8 +27,12 @@ import java.net.*; import java.util.HashSet; import java.util.Set; +import com.google.common.base.Throwables; +import com.google.common.net.HostAndPort; + import org.apache.cassandra.config.*; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.tools.BulkLoader.CmdLineOptions; import com.datastax.driver.core.AuthProvider; @@ -54,6 +58,7 @@ public class LoaderOptions public static final String THROTTLE_MBITS = "throttle"; public static final String INTER_DC_THROTTLE_MBITS = "inter-dc-throttle"; public static final String TOOL_NAME = "sstableloader"; + public static final String ALLOW_SERVER_PORT_DISCOVERY_OPTION = "server-port-discovery"; /* client encryption options */ public static final String SSL_TRUSTSTORE = "truststore"; @@ -80,8 +85,9 @@ public class LoaderOptions public final EncryptionOptions clientEncOptions; public final int connectionsPerHost; public final EncryptionOptions.ServerEncryptionOptions serverEncOptions; - public final Set<InetAddress> hosts; - public final Set<InetAddress> ignores; + public final Set<InetSocketAddress> hosts; + public final Set<InetAddressAndPort> ignores; + public final boolean allowServerPortDiscovery; LoaderOptions(Builder builder) { @@ -101,6 +107,7 @@ public class LoaderOptions connectionsPerHost = builder.connectionsPerHost; serverEncOptions = builder.serverEncOptions; hosts = builder.hosts; + allowServerPortDiscovery = builder.allowServerPortDiscovery; ignores = builder.ignores; } @@ -122,8 +129,11 @@ public class LoaderOptions EncryptionOptions clientEncOptions = new EncryptionOptions(); int connectionsPerHost = 1; EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions(); - Set<InetAddress> hosts = new HashSet<>(); - Set<InetAddress> ignores = new HashSet<>(); + Set<InetAddress> hostsArg = new HashSet<>(); + Set<InetAddress> ignoresArg = new HashSet<>(); + Set<InetSocketAddress> hosts = new HashSet<>(); + Set<InetAddressAndPort> ignores = new HashSet<>(); + boolean allowServerPortDiscovery; Builder() { @@ -133,6 +143,23 @@ public class LoaderOptions public LoaderOptions build() { constructAuthProvider(); + + try + { + for (InetAddress host : hostsArg) + { + hosts.add(new InetSocketAddress(host, nativePort)); + } + for (InetAddress host : ignoresArg) + { + ignores.add(InetAddressAndPort.getByNameOverrideDefaults(host.getHostAddress(), storagePort)); + } + } + catch (UnknownHostException e) + { + Throwables.propagate(e); + } + return new LoaderOptions(this); } @@ -226,30 +253,61 @@ public class LoaderOptions return this; } + @Deprecated public Builder hosts(Set<InetAddress> hosts) { - this.hosts = hosts; + this.hostsArg.addAll(hosts); + return this; + } + + public Builder hostsAndNativePort(Set<InetSocketAddress> hosts) + { + this.hosts.addAll(hosts); return this; } public Builder host(InetAddress host) { + hostsArg.add(host); + return this; + } + + public Builder hostAndNativePort(InetSocketAddress host) + { hosts.add(host); return this; } public Builder ignore(Set<InetAddress> ignores) { - this.ignores = ignores; + this.ignoresArg.addAll(ignores); + return this; + } + + public Builder ignoresAndInternalPorts(Set<InetAddressAndPort> ignores) + { + this.ignores.addAll(ignores); return this; } public Builder ignore(InetAddress ignore) { + ignoresArg.add(ignore); + return this; + } + + public Builder ignoreAndInternalPorts(InetAddressAndPort ignore) + { ignores.add(ignore); return this; } + public Builder allowServerPortDiscovery(boolean allowServerPortDiscovery) + { + this.allowServerPortDiscovery = allowServerPortDiscovery; + return this; + } + public Builder parseArgs(String cmdArgs[]) { CommandLineParser parser = new GnuParser(); @@ -296,6 +354,7 @@ public class LoaderOptions verbose = cmd.hasOption(VERBOSE_OPTION); noProgress = cmd.hasOption(NOPROGRESS_OPTION); + allowServerPortDiscovery = cmd.hasOption(ALLOW_SERVER_PORT_DISCOVERY_OPTION); if (cmd.hasOption(USER_OPTION)) { @@ -319,7 +378,8 @@ public class LoaderOptions { for (String node : nodes) { - hosts.add(InetAddress.getByName(node.trim())); + HostAndPort hap = HostAndPort.fromString(node); + hosts.add(new InetSocketAddress(InetAddress.getByName(hap.getHost()), hap.getPortOrDefault(nativePort))); } } catch (UnknownHostException e) { @@ -340,7 +400,7 @@ public class LoaderOptions { for (String node : nodes) { - ignores.add(InetAddress.getByName(node.trim())); + ignores.add(InetAddressAndPort.getByNameOverrideDefaults(node.trim(), storagePort)); } } catch (UnknownHostException e) { @@ -554,6 +614,7 @@ public class LoaderOptions options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "Client SSL: type of store"); options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "Client SSL: comma-separated list of encryption suites to use"); options.addOption("f", CONFIG_PATH, "path to config file", "cassandra.yaml file path for streaming throughput and client/server SSL."); + options.addOption("spd", ALLOW_SERVER_PORT_DISCOVERY_OPTION, "allow server port discovery", "Use ports published by server to decide how to connect. With SSL requires StartTLS to be used."); return options; }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 8acb3c1..d330ed4 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -450,39 +450,39 @@ public class NodeProbe implements AutoCloseable ssProxy.drain(); } - public Map<String, String> getTokenToEndpointMap() + public Map<String, String> getTokenToEndpointMap(boolean withPort) { - return ssProxy.getTokenToEndpointMap(); + return withPort ? ssProxy.getTokenToEndpointWithPortMap() : ssProxy.getTokenToEndpointMap(); } - public List<String> getLiveNodes() + public List<String> getLiveNodes(boolean withPort) { - return ssProxy.getLiveNodes(); + return withPort ? ssProxy.getLiveNodesWithPort() : ssProxy.getLiveNodes(); } - public List<String> getJoiningNodes() + public List<String> getJoiningNodes(boolean withPort) { - return ssProxy.getJoiningNodes(); + return withPort ? ssProxy.getJoiningNodesWithPort() : ssProxy.getJoiningNodes(); } - public List<String> getLeavingNodes() + public List<String> getLeavingNodes(boolean withPort) { - return ssProxy.getLeavingNodes(); + return withPort ? ssProxy.getLeavingNodesWithPort() : ssProxy.getLeavingNodes(); } - public List<String> getMovingNodes() + public List<String> getMovingNodes(boolean withPort) { - return ssProxy.getMovingNodes(); + return withPort ? ssProxy.getMovingNodesWithPort() : ssProxy.getMovingNodes(); } - public List<String> getUnreachableNodes() + public List<String> getUnreachableNodes(boolean withPort) { - return ssProxy.getUnreachableNodes(); + return withPort ? ssProxy.getUnreachableNodesWithPort() : ssProxy.getUnreachableNodes(); } - public Map<String, String> getLoadMap() + public Map<String, String> getLoadMap(boolean withPort) { - return ssProxy.getLoadMap(); + return withPort ? ssProxy.getLoadMapWithPort() : ssProxy.getLoadMap(); } public Map<InetAddress, Float> getOwnership() @@ -490,11 +490,21 @@ public class NodeProbe implements AutoCloseable return ssProxy.getOwnership(); } + public Map<String, Float> getOwnershipWithPort() + { + return ssProxy.getOwnershipWithPort(); + } + public Map<InetAddress, Float> effectiveOwnership(String keyspace) throws IllegalStateException { return ssProxy.effectiveOwnership(keyspace); } + public Map<String, Float> effectiveOwnershipWithPort(String keyspace) throws IllegalStateException + { + return ssProxy.effectiveOwnershipWithPort(keyspace); + } + public CacheServiceMBean getCacheServiceMBean() { String cachePath = "org.apache.cassandra.db:type=Caches"; @@ -557,9 +567,9 @@ public class NodeProbe implements AutoCloseable return ssProxy.getLocalHostId(); } - public Map<String, String> getHostIdMap() + public Map<String, String> getHostIdMap(boolean withPort) { - return ssProxy.getEndpointToHostId(); + return withPort ? ssProxy.getEndpointWithPortToHostId() : ssProxy.getEndpointToHostId(); } public String getLoadString() @@ -686,9 +696,9 @@ public class NodeProbe implements AutoCloseable ssProxy.removeNode(token); } - public String getRemovalStatus() + public String getRemovalStatus(boolean withPort) { - return ssProxy.getRemovalStatus(); + return withPort ? ssProxy.getRemovalStatusWithPort() : ssProxy.getRemovalStatus(); } public void forceRemoveCompletion() @@ -775,6 +785,11 @@ public class NodeProbe implements AutoCloseable ssProxy.setHintedHandoffThrottleInKB(throttleInKB); } + public List<String> getEndpointsWithPort(String keyspace, String cf, String key) + { + return ssProxy.getNaturalEndpointsWithPort(keyspace, cf, key); + } + public List<InetAddress> getEndpoints(String keyspace, String cf, String key) { return ssProxy.getNaturalEndpoints(keyspace, cf, key); @@ -1144,9 +1159,9 @@ public class NodeProbe implements AutoCloseable ssProxy.rebuildSecondaryIndex(ksName, cfName, idxNames); } - public String getGossipInfo() + public String getGossipInfo(boolean withPort) { - return fdProxy.getAllEndpointStates(); + return withPort ? fdProxy.getAllEndpointStatesWithPort() : fdProxy.getAllEndpointStates(); } public void stop(String string) @@ -1212,9 +1227,9 @@ public class NodeProbe implements AutoCloseable return ssProxy.getSchemaVersion(); } - public List<String> describeRing(String keyspaceName) throws IOException + public List<String> describeRing(String keyspaceName, boolean withPort) throws IOException { - return ssProxy.describeRingJMX(keyspaceName); + return withPort ? ssProxy.describeRingWithPortJMX(keyspaceName) : ssProxy.describeRingJMX(keyspaceName); } public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources) @@ -1580,11 +1595,11 @@ public class NodeProbe implements AutoCloseable } } - public TabularData getFailureDetectorPhilValues() + public TabularData getFailureDetectorPhilValues(boolean withPort) { try { - return fdProxy.getPhiValues(); + return withPort ? fdProxy.getPhiValuesWithPort() : fdProxy.getPhiValues(); } catch (OpenDataException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 59d4ead..d707499 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -248,6 +248,9 @@ public class NodeTool @Option(type = OptionType.GLOBAL, name = {"-pwf", "--password-file"}, description = "Path to the JMX password file") private String passwordFilePath = EMPTY; + @Option(type = OptionType.GLOBAL, name = { "-wp", "--with-port"}, description = "Operate in 4.0 mode with hosts disambiguated by port number", arity = 0) + protected boolean withPort = false; + @Override public void run() { @@ -398,4 +401,27 @@ public class NodeTool } return ownershipByDc; } + + public static SortedMap<String, SetHostStatWithPort> getOwnershipByDcWithPort(NodeProbe probe, boolean resolveIp, + Map<String, String> tokenToEndpoint, + Map<String, Float> ownerships) + { + SortedMap<String, SetHostStatWithPort> ownershipByDc = Maps.newTreeMap(); + EndpointSnitchInfoMBean epSnitchInfo = probe.getEndpointSnitchInfoProxy(); + try + { + for (Entry<String, String> tokenAndEndPoint : tokenToEndpoint.entrySet()) + { + String dc = epSnitchInfo.getDatacenter(tokenAndEndPoint.getValue()); + if (!ownershipByDc.containsKey(dc)) + ownershipByDc.put(dc, new SetHostStatWithPort(resolveIp)); + ownershipByDc.get(dc).add(tokenAndEndPoint.getKey(), tokenAndEndPoint.getValue(), ownerships); + } + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + return ownershipByDc; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/DescribeCluster.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/DescribeCluster.java b/src/java/org/apache/cassandra/tools/nodetool/DescribeCluster.java index 5228468..ed91b8b 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/DescribeCluster.java +++ b/src/java/org/apache/cassandra/tools/nodetool/DescribeCluster.java @@ -49,10 +49,10 @@ public class DescribeCluster extends NodeToolCmd // display schema version for each node System.out.println("\tSchema versions:"); - Map<String, List<String>> schemaVersions = probe.getSpProxy().getSchemaVersions(); + Map<String, List<String>> schemaVersions = withPort ? probe.getSpProxy().getSchemaVersionsWithPort() : probe.getSpProxy().getSchemaVersions(); for (String version : schemaVersions.keySet()) { System.out.println(format("\t\t%s: %s%n", version, schemaVersions.get(version))); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/DescribeRing.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/DescribeRing.java b/src/java/org/apache/cassandra/tools/nodetool/DescribeRing.java index 2a73c2a..c57e54d 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/DescribeRing.java +++ b/src/java/org/apache/cassandra/tools/nodetool/DescribeRing.java @@ -39,7 +39,7 @@ public class DescribeRing extends NodeToolCmd System.out.println("TokenRange: "); try { - for (String tokenRangeString : probe.describeRing(keyspace)) + for (String tokenRangeString : probe.describeRing(keyspace, withPort)) { System.out.println("\t" + tokenRangeString); } @@ -48,4 +48,4 @@ public class DescribeRing extends NodeToolCmd throw new RuntimeException(e); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java b/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java index b3ffb6d..896663d 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java +++ b/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java @@ -33,7 +33,7 @@ public class FailureDetectorInfo extends NodeToolCmd @Override public void execute(NodeProbe probe) { - TabularData data = probe.getFailureDetectorPhilValues(); + TabularData data = probe.getFailureDetectorPhilValues(withPort); System.out.printf("%10s,%16s%n", "Endpoint", "Phi"); for (Object o : data.keySet()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/GetEndpoints.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetEndpoints.java b/src/java/org/apache/cassandra/tools/nodetool/GetEndpoints.java index 922ae26..c0adb2a 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/GetEndpoints.java +++ b/src/java/org/apache/cassandra/tools/nodetool/GetEndpoints.java @@ -42,10 +42,20 @@ public class GetEndpoints extends NodeToolCmd String table = args.get(1); String key = args.get(2); - List<InetAddress> endpoints = probe.getEndpoints(ks, table, key); - for (InetAddress endpoint : endpoints) + if (withPort) { - System.out.println(endpoint.getHostAddress()); + for (String endpoint : probe.getEndpointsWithPort(ks, table, key)) + { + System.out.println(endpoint); + } + } + else + { + List<InetAddress> endpoints = probe.getEndpoints(ks, table, key); + for (InetAddress endpoint : endpoints) + { + System.out.println(endpoint.getHostAddress()); + } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/GossipInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/GossipInfo.java b/src/java/org/apache/cassandra/tools/nodetool/GossipInfo.java index 1b4b979..182c395 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/GossipInfo.java +++ b/src/java/org/apache/cassandra/tools/nodetool/GossipInfo.java @@ -28,6 +28,6 @@ public class GossipInfo extends NodeToolCmd @Override public void execute(NodeProbe probe) { - System.out.println(probe.getGossipInfo()); + System.out.println(probe.getGossipInfo(withPort)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/HostStatWithPort.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/HostStatWithPort.java b/src/java/org/apache/cassandra/tools/nodetool/HostStatWithPort.java new file mode 100644 index 0000000..54cda17 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/HostStatWithPort.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import org.apache.cassandra.locator.InetAddressAndPort; + +public class HostStatWithPort +{ + public final InetAddressAndPort endpoint; + public final boolean resolveIp; + public final Float owns; + public final String token; + + public HostStatWithPort(String token, InetAddressAndPort endpoint, boolean resolveIp, Float owns) + { + this.token = token; + this.endpoint = endpoint; + this.resolveIp = resolveIp; + this.owns = owns; + } + + public String ipOrDns() + { + return resolveIp ? + endpoint.address.getHostName() + ":" + endpoint.port : + endpoint.toString(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/NetStats.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java index 2312097..2702d9e 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java +++ b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java @@ -50,11 +50,11 @@ public class NetStats extends NodeToolCmd System.out.printf("%s %s%n", status.streamOperation.getDescription(), status.planId.toString()); for (SessionInfo info : status.sessions) { - System.out.printf(" %s", info.peer.toString()); + System.out.printf(" %s", info.peer.toString(withPort)); // print private IP when it is used if (!info.peer.equals(info.connecting)) { - System.out.printf(" (using %s)", info.connecting.toString()); + System.out.printf(" (using %s)", info.connecting.toString(withPort)); } System.out.printf("%n"); if (!info.receivingSummaries.isEmpty()) @@ -65,7 +65,7 @@ public class NetStats extends NodeToolCmd System.out.printf(" Receiving %d files, %d bytes total. Already received %d files, %d bytes total%n", info.getTotalFilesToReceive(), info.getTotalSizeToReceive(), info.getTotalFilesReceived(), info.getTotalSizeReceived()); for (ProgressInfo progress : info.getReceivingFiles()) { - System.out.printf(" %s%n", progress.toString()); + System.out.printf(" %s%n", progress.toString(withPort)); } } if (!info.sendingSummaries.isEmpty()) @@ -76,7 +76,7 @@ public class NetStats extends NodeToolCmd System.out.printf(" Sending %d files, %d bytes total. Already sent %d files, %d bytes total%n", info.getTotalFilesToSend(), info.getTotalSizeToSend(), info.getTotalFilesSent(), info.getTotalSizeSent()); for (ProgressInfo progress : info.getSendingFiles()) { - System.out.printf(" %s%n", progress.toString()); + System.out.printf(" %s%n", progress.toString(withPort)); } } } @@ -131,4 +131,4 @@ public class NetStats extends NodeToolCmd System.out.printf("%-25s%10s%10s%15s%10s%n", "Gossip messages", "n/a", pending, completed, dropped); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/RemoveNode.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/RemoveNode.java b/src/java/org/apache/cassandra/tools/nodetool/RemoveNode.java index 7312597..bd40aba 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/RemoveNode.java +++ b/src/java/org/apache/cassandra/tools/nodetool/RemoveNode.java @@ -36,10 +36,10 @@ public class RemoveNode extends NodeToolCmd switch (removeOperation) { case "status": - System.out.println("RemovalStatus: " + probe.getRemovalStatus()); + System.out.println("RemovalStatus: " + probe.getRemovalStatus(withPort)); break; case "force": - System.out.println("RemovalStatus: " + probe.getRemovalStatus()); + System.out.println("RemovalStatus: " + probe.getRemovalStatus(withPort)); probe.forceRemoveCompletion(); break; default: @@ -47,4 +47,4 @@ public class RemoveNode extends NodeToolCmd break; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java index 0b4f767..ba3cf62 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java +++ b/src/java/org/apache/cassandra/tools/nodetool/RepairAdmin.java @@ -57,7 +57,8 @@ public class RepairAdmin extends NodeTool.NodeToolCmd "state", "last activity", "coordinator", - "participants"); + "participants", + "participants_wp"); private List<String> sessionValues(Map<String, String> session, int now) @@ -67,7 +68,8 @@ public class RepairAdmin extends NodeTool.NodeToolCmd session.get(LocalSessionInfo.STATE), Integer.toString(now - updated) + " (s)", session.get(LocalSessionInfo.COORDINATOR), - session.get(LocalSessionInfo.PARTICIPANTS)); + session.get(LocalSessionInfo.PARTICIPANTS), + session.get(LocalSessionInfo.PARTICIPANTS_WP)); } private void listSessions(ActiveRepairServiceMBean repairServiceProxy) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/Ring.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Ring.java b/src/java/org/apache/cassandra/tools/nodetool/Ring.java index 9c389c2..105726c 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Ring.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Ring.java @@ -51,59 +51,103 @@ public class Ring extends NodeToolCmd @Override public void execute(NodeProbe probe) { - Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap(); - LinkedHashMultimap<String, String> endpointsToTokens = LinkedHashMultimap.create(); - boolean haveVnodes = false; - for (Map.Entry<String, String> entry : tokensToEndpoints.entrySet()) - { - haveVnodes |= endpointsToTokens.containsKey(entry.getValue()); - endpointsToTokens.put(entry.getValue(), entry.getKey()); - } - - int maxAddressLength = Collections.max(endpointsToTokens.keys(), new Comparator<String>() + try { - @Override - public int compare(String first, String second) + Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap(withPort); + LinkedHashMultimap<String, String> endpointsToTokens = LinkedHashMultimap.create(); + boolean haveVnodes = false; + for (Map.Entry<String, String> entry : tokensToEndpoints.entrySet()) { - return Integer.compare(first.length(), second.length()); + haveVnodes |= endpointsToTokens.containsKey(entry.getValue()); + endpointsToTokens.put(entry.getValue(), entry.getKey()); } - }).length(); - String formatPlaceholder = "%%-%ds %%-12s%%-7s%%-8s%%-16s%%-20s%%-44s%%n"; - String format = format(formatPlaceholder, maxAddressLength); + int maxAddressLength = Collections.max(endpointsToTokens.keys(), new Comparator<String>() + { + @Override + public int compare(String first, String second) + { + return Integer.compare(first.length(), second.length()); + } + }).length(); - StringBuilder errors = new StringBuilder(); - boolean showEffectiveOwnership = true; - // Calculate per-token ownership of the ring - Map<InetAddress, Float> ownerships; - try - { - ownerships = probe.effectiveOwnership(keyspace); - } - catch (IllegalStateException ex) - { - ownerships = probe.getOwnership(); - errors.append("Note: ").append(ex.getMessage()).append("%n"); - showEffectiveOwnership = false; - } - catch (IllegalArgumentException ex) - { - System.out.printf("%nError: %s%n", ex.getMessage()); - return; - } + String formatPlaceholder = "%%-%ds %%-12s%%-7s%%-8s%%-16s%%-20s%%-44s%%n"; + String format = format(formatPlaceholder, maxAddressLength); + StringBuilder errors = new StringBuilder(); + boolean showEffectiveOwnership = true; - System.out.println(); - for (Entry<String, SetHostStat> entry : NodeTool.getOwnershipByDc(probe, resolveIp, tokensToEndpoints, ownerships).entrySet()) - printDc(probe, format, entry.getKey(), endpointsToTokens, entry.getValue(),showEffectiveOwnership); + if (withPort) + { + // Calculate per-token ownership of the ring + Map<String, Float> ownerships; + try + { + ownerships = probe.effectiveOwnershipWithPort(keyspace); + } + catch (IllegalStateException ex) + { + ownerships = probe.getOwnershipWithPort(); + errors.append("Note: ").append(ex.getMessage()).append("%n"); + showEffectiveOwnership = false; + } + catch (IllegalArgumentException ex) + { + System.out.printf("%nError: %s%n", ex.getMessage()); + return; + } + + + System.out.println(); + for (Entry<String, SetHostStatWithPort> entry : NodeTool.getOwnershipByDcWithPort(probe, resolveIp, tokensToEndpoints, ownerships).entrySet()) + printDc(probe, format, entry.getKey(), endpointsToTokens, entry.getValue(), showEffectiveOwnership); + + if (haveVnodes) + { + System.out.println(" Warning: \"nodetool ring\" is used to output all the tokens of a node."); + System.out.println(" To view status related info of a node use \"nodetool status\" instead.\n"); + } + + System.out.printf("%n " + errors.toString()); + } + else + { + // Calculate per-token ownership of the ring + Map<InetAddress, Float> ownerships; + try + { + ownerships = probe.effectiveOwnership(keyspace); + } + catch (IllegalStateException ex) + { + ownerships = probe.getOwnership(); + errors.append("Note: ").append(ex.getMessage()).append("%n"); + showEffectiveOwnership = false; + } + catch (IllegalArgumentException ex) + { + System.out.printf("%nError: %s%n", ex.getMessage()); + return; + } - if (haveVnodes) + + System.out.println(); + for (Entry<String, SetHostStat> entry : NodeTool.getOwnershipByDc(probe, resolveIp, tokensToEndpoints, ownerships).entrySet()) + printDc(probe, format, entry.getKey(), endpointsToTokens, entry.getValue(), showEffectiveOwnership); + + if (haveVnodes) + { + System.out.println(" Warning: \"nodetool ring\" is used to output all the tokens of a node."); + System.out.println(" To view status related info of a node use \"nodetool status\" instead.\n"); + } + + System.out.printf("%n " + errors.toString()); + } + } catch (Exception e) { - System.out.println(" Warning: \"nodetool ring\" is used to output all the tokens of a node."); - System.out.println(" To view status related info of a node use \"nodetool status\" instead.\n"); + e.printStackTrace(); + throw e; } - - System.out.printf("%n " + errors.toString()); } private void printDc(NodeProbe probe, String format, @@ -111,12 +155,12 @@ public class Ring extends NodeToolCmd LinkedHashMultimap<String, String> endpointsToTokens, SetHostStat hoststats,boolean showEffectiveOwnership) { - Collection<String> liveNodes = probe.getLiveNodes(); - Collection<String> deadNodes = probe.getUnreachableNodes(); - Collection<String> joiningNodes = probe.getJoiningNodes(); - Collection<String> leavingNodes = probe.getLeavingNodes(); - Collection<String> movingNodes = probe.getMovingNodes(); - Map<String, String> loadMap = probe.getLoadMap(); + Collection<String> liveNodes = probe.getLiveNodes(false); + Collection<String> deadNodes = probe.getUnreachableNodes(false); + Collection<String> joiningNodes = probe.getJoiningNodes(false); + Collection<String> leavingNodes = probe.getLeavingNodes(false); + Collection<String> movingNodes = probe.getMovingNodes(false); + Map<String, String> loadMap = probe.getLoadMap(false); System.out.println("Datacenter: " + dc); System.out.println("=========="); @@ -174,4 +218,73 @@ public class Ring extends NodeToolCmd } System.out.println(); } + + private void printDc(NodeProbe probe, String format, + String dc, + LinkedHashMultimap<String, String> endpointsToTokens, + SetHostStatWithPort hoststats,boolean showEffectiveOwnership) + { + Collection<String> liveNodes = probe.getLiveNodes(true); + Collection<String> deadNodes = probe.getUnreachableNodes(true); + Collection<String> joiningNodes = probe.getJoiningNodes(true); + Collection<String> leavingNodes = probe.getLeavingNodes(true); + Collection<String> movingNodes = probe.getMovingNodes(true); + Map<String, String> loadMap = probe.getLoadMap(true); + + System.out.println("Datacenter: " + dc); + System.out.println("=========="); + + // get the total amount of replicas for this dc and the last token in this dc's ring + List<String> tokens = new ArrayList<>(); + String lastToken = ""; + + for (HostStatWithPort stat : hoststats) + { + tokens.addAll(endpointsToTokens.get(stat.endpoint.toString())); + lastToken = tokens.get(tokens.size() - 1); + } + + System.out.printf(format, "Address", "Rack", "Status", "State", "Load", "Owns", "Token"); + + if (hoststats.size() > 1) + System.out.printf(format, "", "", "", "", "", "", lastToken); + else + System.out.println(); + + for (HostStatWithPort stat : hoststats) + { + String endpoint = stat.endpoint.toString(); + String rack; + try + { + rack = probe.getEndpointSnitchInfoProxy().getRack(endpoint); + } + catch (UnknownHostException e) + { + rack = "Unknown"; + } + + String status = liveNodes.contains(endpoint) + ? "Up" + : deadNodes.contains(endpoint) + ? "Down" + : "?"; + + String state = "Normal"; + + if (joiningNodes.contains(endpoint)) + state = "Joining"; + else if (leavingNodes.contains(endpoint)) + state = "Leaving"; + else if (movingNodes.contains(endpoint)) + state = "Moving"; + + String load = loadMap.containsKey(endpoint) + ? loadMap.get(endpoint) + : "?"; + String owns = stat.owns != null && showEffectiveOwnership? new DecimalFormat("##0.00%").format(stat.owns) : "?"; + System.out.printf(format, stat.ipOrDns(), rack, status, state, load, owns, stat.token); + } + System.out.println(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/SetHostStatWithPort.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetHostStatWithPort.java b/src/java/org/apache/cassandra/tools/nodetool/SetHostStatWithPort.java new file mode 100644 index 0000000..67cd464 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/SetHostStatWithPort.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.cassandra.locator.InetAddressAndPort; + +public class SetHostStatWithPort implements Iterable<HostStatWithPort> +{ + final List<HostStatWithPort> hostStats = new ArrayList<>(); + final boolean resolveIp; + + public SetHostStatWithPort(boolean resolveIp) + { + this.resolveIp = resolveIp; + } + + public int size() + { + return hostStats.size(); + } + + @Override + public Iterator<HostStatWithPort> iterator() + { + return hostStats.iterator(); + } + + public void add(String token, String host, Map<String, Float> ownerships) throws UnknownHostException + { + InetAddressAndPort endpoint = InetAddressAndPort.getByName(host); + Float owns = ownerships.get(endpoint); + hostStats.add(new HostStatWithPort(token, endpoint, resolveIp, owns)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/nodetool/Status.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Status.java b/src/java/org/apache/cassandra/tools/nodetool/Status.java index f3ef606..49724a5 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Status.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Status.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.SortedMap; import org.apache.cassandra.locator.EndpointSnitchInfoMBean; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; @@ -55,72 +56,130 @@ public class Status extends NodeToolCmd @Override public void execute(NodeProbe probe) { - joiningNodes = probe.getJoiningNodes(); - leavingNodes = probe.getLeavingNodes(); - movingNodes = probe.getMovingNodes(); - loadMap = probe.getLoadMap(); - Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap(); - liveNodes = probe.getLiveNodes(); - unreachableNodes = probe.getUnreachableNodes(); - hostIDMap = probe.getHostIdMap(); + joiningNodes = probe.getJoiningNodes(withPort); + leavingNodes = probe.getLeavingNodes(withPort); + movingNodes = probe.getMovingNodes(withPort); + loadMap = probe.getLoadMap(withPort); + Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap(withPort); + liveNodes = probe.getLiveNodes(withPort); + unreachableNodes = probe.getUnreachableNodes(withPort); + hostIDMap = probe.getHostIdMap(withPort); epSnitchInfo = probe.getEndpointSnitchInfoProxy(); StringBuilder errors = new StringBuilder(); - Map<InetAddress, Float> ownerships = null; - boolean hasEffectiveOwns = false; - try - { - ownerships = probe.effectiveOwnership(keyspace); - hasEffectiveOwns = true; - } - catch (IllegalStateException e) - { - ownerships = probe.getOwnership(); - errors.append("Note: ").append(e.getMessage()).append("%n"); - } - catch (IllegalArgumentException ex) + if (withPort) { - System.out.printf("%nError: %s%n", ex.getMessage()); - System.exit(1); - } + Map<String, Float> ownerships = null; + boolean hasEffectiveOwns = false; + try + { + ownerships = probe.effectiveOwnershipWithPort(keyspace); + hasEffectiveOwns = true; + } + catch (IllegalStateException e) + { + ownerships = probe.getOwnershipWithPort(); + errors.append("Note: ").append(e.getMessage()).append("%n"); + } + catch (IllegalArgumentException ex) + { + System.out.printf("%nError: %s%n", ex.getMessage()); + System.exit(1); + } - SortedMap<String, SetHostStat> dcs = NodeTool.getOwnershipByDc(probe, resolveIp, tokensToEndpoints, ownerships); + SortedMap<String, SetHostStatWithPort> dcs = NodeTool.getOwnershipByDcWithPort(probe, resolveIp, tokensToEndpoints, ownerships); - // More tokens than nodes (aka vnodes)? - if (dcs.values().size() < tokensToEndpoints.keySet().size()) - isTokenPerNode = false; + // More tokens than nodes (aka vnodes)? + if (dcs.values().size() < tokensToEndpoints.keySet().size()) + isTokenPerNode = false; - findMaxAddressLength(dcs); + findMaxAddressLengthWithPort(dcs); - // Datacenters - for (Map.Entry<String, SetHostStat> dc : dcs.entrySet()) - { - String dcHeader = String.format("Datacenter: %s%n", dc.getKey()); - System.out.printf(dcHeader); - for (int i = 0; i < (dcHeader.length() - 1); i++) System.out.print('='); - System.out.println(); + // Datacenters + for (Map.Entry<String, SetHostStatWithPort> dc : dcs.entrySet()) + { + String dcHeader = String.format("Datacenter: %s%n", dc.getKey()); + System.out.printf(dcHeader); + for (int i = 0; i < (dcHeader.length() - 1); i++) System.out.print('='); + System.out.println(); - // Legend - System.out.println("Status=Up/Down"); - System.out.println("|/ State=Normal/Leaving/Joining/Moving"); + // Legend + System.out.println("Status=Up/Down"); + System.out.println("|/ State=Normal/Leaving/Joining/Moving"); - printNodesHeader(hasEffectiveOwns, isTokenPerNode); + printNodesHeader(hasEffectiveOwns, isTokenPerNode); - ArrayListMultimap<InetAddress, HostStat> hostToTokens = ArrayListMultimap.create(); - for (HostStat stat : dc.getValue()) - hostToTokens.put(stat.endpoint, stat); + ArrayListMultimap<InetAddressAndPort, HostStatWithPort> hostToTokens = ArrayListMultimap.create(); + for (HostStatWithPort stat : dc.getValue()) + hostToTokens.put(stat.endpoint, stat); - for (InetAddress endpoint : hostToTokens.keySet()) - { - Float owns = ownerships.get(endpoint); - List<HostStat> tokens = hostToTokens.get(endpoint); - printNode(endpoint.getHostAddress(), owns, tokens, hasEffectiveOwns, isTokenPerNode); + for (InetAddressAndPort endpoint : hostToTokens.keySet()) + { + Float owns = ownerships.get(endpoint); + List<HostStatWithPort> tokens = hostToTokens.get(endpoint); + printNodeWithPort(endpoint.toString(), owns, tokens, hasEffectiveOwns, isTokenPerNode); + } } + + System.out.printf("%n" + errors.toString()); } + else + { + Map<InetAddress, Float> ownerships = null; + boolean hasEffectiveOwns = false; + try + { + ownerships = probe.effectiveOwnership(keyspace); + hasEffectiveOwns = true; + } + catch (IllegalStateException e) + { + ownerships = probe.getOwnership(); + errors.append("Note: ").append(e.getMessage()).append("%n"); + } + catch (IllegalArgumentException ex) + { + System.out.printf("%nError: %s%n", ex.getMessage()); + System.exit(1); + } + + SortedMap<String, SetHostStat> dcs = NodeTool.getOwnershipByDc(probe, resolveIp, tokensToEndpoints, ownerships); - System.out.printf("%n" + errors.toString()); + // More tokens than nodes (aka vnodes)? + if (dcs.values().size() < tokensToEndpoints.keySet().size()) + isTokenPerNode = false; + findMaxAddressLength(dcs); + + // Datacenters + for (Map.Entry<String, SetHostStat> dc : dcs.entrySet()) + { + String dcHeader = String.format("Datacenter: %s%n", dc.getKey()); + System.out.printf(dcHeader); + for (int i = 0; i < (dcHeader.length() - 1); i++) System.out.print('='); + System.out.println(); + + // Legend + System.out.println("Status=Up/Down"); + System.out.println("|/ State=Normal/Leaving/Joining/Moving"); + + printNodesHeader(hasEffectiveOwns, isTokenPerNode); + + ArrayListMultimap<InetAddress, HostStat> hostToTokens = ArrayListMultimap.create(); + for (HostStat stat : dc.getValue()) + hostToTokens.put(stat.endpoint, stat); + + for (InetAddress endpoint : hostToTokens.keySet()) + { + Float owns = ownerships.get(endpoint); + List<HostStat> tokens = hostToTokens.get(endpoint); + printNode(endpoint.getHostAddress(), owns, tokens, hasEffectiveOwns, isTokenPerNode); + } + } + + System.out.printf("%n" + errors.toString()); + } } private void findMaxAddressLength(Map<String, SetHostStat> dcs) @@ -135,6 +194,18 @@ public class Status extends NodeToolCmd } } + private void findMaxAddressLengthWithPort(Map<String, SetHostStatWithPort> dcs) + { + maxAddressLength = 0; + for (Map.Entry<String, SetHostStatWithPort> dc : dcs.entrySet()) + { + for (HostStatWithPort stat : dc.getValue()) + { + maxAddressLength = Math.max(maxAddressLength, stat.ipOrDns().length()); + } + } + } + private void printNodesHeader(boolean hasEffectiveOwns, boolean isTokenPerNode) { String fmt = getFormat(hasEffectiveOwns, isTokenPerNode); @@ -177,6 +248,37 @@ public class Status extends NodeToolCmd System.out.printf(fmt, status, state, endpointDns, load, tokens.size(), strOwns, hostID, rack); } + private void printNodeWithPort(String endpoint, Float owns, List<HostStatWithPort> tokens, boolean hasEffectiveOwns, boolean isTokenPerNode) + { + String status, state, load, strOwns, hostID, rack, fmt; + fmt = getFormat(hasEffectiveOwns, isTokenPerNode); + if (liveNodes.contains(endpoint)) status = "U"; + else if (unreachableNodes.contains(endpoint)) status = "D"; + else status = "?"; + if (joiningNodes.contains(endpoint)) state = "J"; + else if (leavingNodes.contains(endpoint)) state = "L"; + else if (movingNodes.contains(endpoint)) state = "M"; + else state = "N"; + + load = loadMap.containsKey(endpoint) ? loadMap.get(endpoint) : "?"; + strOwns = owns != null && hasEffectiveOwns ? new DecimalFormat("##0.0%").format(owns) : "?"; + hostID = hostIDMap.get(endpoint); + + try + { + rack = epSnitchInfo.getRack(endpoint); + } catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + + String endpointDns = tokens.get(0).ipOrDns(); + if (isTokenPerNode) + System.out.printf(fmt, status, state, endpointDns, load, strOwns, hostID, tokens.get(0).token, rack); + else + System.out.printf(fmt, status, state, endpointDns, load, tokens.size(), strOwns, hostID, rack); + } + private String getFormat( boolean hasEffectiveOwns, boolean isTokenPerNode) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java index 9230d38..bf95080 100644 --- a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java +++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java @@ -29,7 +29,7 @@ class ExpiredTraceState extends TraceState ExpiredTraceState(TraceState delegate) { - super(FBUtilities.getBroadcastAddress(), delegate.sessionId, delegate.traceType); + super(FBUtilities.getBroadcastAddressAndPort(), delegate.sessionId, delegate.traceType); this.delegate = delegate; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tracing/TraceKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java index 20c992c..487ed65 100644 --- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java +++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java @@ -48,29 +48,31 @@ public final class TraceKeyspace private static final TableMetadata Sessions = parse(SESSIONS, - "tracing sessions", - "CREATE TABLE %s (" - + "session_id uuid," - + "command text," - + "client inet," - + "coordinator inet," - + "duration int," - + "parameters map<text, text>," - + "request text," - + "started_at timestamp," - + "PRIMARY KEY ((session_id)))"); + "tracing sessions", + "CREATE TABLE %s (" + + "session_id uuid," + + "command text," + + "client inet," + + "coordinator inet," + + "coordinator_port int," + + "duration int," + + "parameters map<text, text>," + + "request text," + + "started_at timestamp," + + "PRIMARY KEY ((session_id)))"); private static final TableMetadata Events = parse(EVENTS, - "tracing events", - "CREATE TABLE %s (" - + "session_id uuid," - + "event_id timeuuid," - + "activity text," - + "source inet," - + "source_elapsed int," - + "thread text," - + "PRIMARY KEY ((session_id), event_id))"); + "tracing events", + "CREATE TABLE %s (" + + "session_id uuid," + + "event_id timeuuid," + + "activity text," + + "source inet," + + "source_port int," + + "source_elapsed int," + + "thread text," + + "PRIMARY KEY ((session_id), event_id))"); private static TableMetadata parse(String table, String description, String cql) { @@ -100,7 +102,8 @@ public final class TraceKeyspace builder.row() .ttl(ttl) .add("client", client) - .add("coordinator", FBUtilities.getBroadcastAddress()) + .add("coordinator", FBUtilities.getBroadcastAddressAndPort().address) + .add("coordinator_port", FBUtilities.getBroadcastAddressAndPort().port) .add("request", request) .add("started_at", new Date(startedAt)) .add("command", command) @@ -125,7 +128,8 @@ public final class TraceKeyspace .ttl(ttl); rowBuilder.add("activity", message) - .add("source", FBUtilities.getBroadcastAddress()) + .add("source", FBUtilities.getBroadcastAddressAndPort().address) + .add("source_port", FBUtilities.getBroadcastAddressAndPort().port) .add("thread", threadName); if (elapsed >= 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tracing/TraceState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java index b4eff6b..a53846c 100644 --- a/src/java/org/apache/cassandra/tracing/TraceState.java +++ b/src/java/org/apache/cassandra/tracing/TraceState.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.tracing; -import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.List; import java.util.UUID; @@ -28,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Stopwatch; import org.slf4j.helpers.MessageFormatter; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventNotifier; @@ -40,7 +40,7 @@ import org.apache.cassandra.utils.progress.ProgressListener; public abstract class TraceState implements ProgressEventNotifier { public final UUID sessionId; - public final InetAddress coordinator; + public final InetAddressAndPort coordinator; public final Stopwatch watch; public final ByteBuffer sessionIdBytes; public final Tracing.TraceType traceType; @@ -63,7 +63,7 @@ public abstract class TraceState implements ProgressEventNotifier // See CASSANDRA-7626 for more details. private final AtomicInteger references = new AtomicInteger(1); - protected TraceState(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType) + protected TraceState(InetAddressAndPort coordinator, UUID sessionId, Tracing.TraceType traceType) { assert coordinator != null; assert sessionId != null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tracing/TraceStateImpl.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java index 349000a..2722406 100644 --- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java +++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.tracing; -import java.net.InetAddress; import java.util.Collections; import java.util.Set; import java.util.UUID; @@ -36,6 +35,7 @@ import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.WrappedRunnable; @@ -54,7 +54,7 @@ public class TraceStateImpl extends TraceState private final Set<Future<?>> pendingFutures = ConcurrentHashMap.newKeySet(); - public TraceStateImpl(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType) + public TraceStateImpl(InetAddressAndPort coordinator, UUID sessionId, Tracing.TraceType traceType) { super(coordinator, sessionId, traceType); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tracing/Tracing.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java index 4cdddba..55e36a4 100644 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -19,15 +19,17 @@ */ package org.apache.cassandra.tracing; +import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,8 +37,13 @@ import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.concurrent.ExecutorLocal; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.TimeUUIDType; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.ParameterType; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.UUIDGen; @@ -48,8 +55,23 @@ import org.apache.cassandra.utils.UUIDGen; */ public abstract class Tracing implements ExecutorLocal<TraceState> { - public static final String TRACE_HEADER = "TraceSession"; - public static final String TRACE_TYPE = "TraceType"; + public static final IVersionedSerializer<TraceType> traceTypeSerializer = new IVersionedSerializer<TraceType>() + { + public void serialize(TraceType traceType, DataOutputPlus out, int version) throws IOException + { + out.write((byte)traceType.ordinal()); + } + + public TraceType deserialize(DataInputPlus in, int version) throws IOException + { + return TraceType.deserialize(in.readByte()); + } + + public long serializedSize(TraceType traceType, int version) + { + return 1; + } + }; public enum TraceType { @@ -83,7 +105,7 @@ public abstract class Tracing implements ExecutorLocal<TraceState> protected static final Logger logger = LoggerFactory.getLogger(Tracing.class); - private final InetAddress localAddress = FBUtilities.getLocalAddress(); + private final InetAddressAndPort localAddress = FBUtilities.getLocalAddressAndPort(); private final FastThreadLocal<TraceState> state = new FastThreadLocal<>(); @@ -228,21 +250,19 @@ public abstract class Tracing implements ExecutorLocal<TraceState> */ public TraceState initializeFromMessage(final MessageIn<?> message) { - final byte[] sessionBytes = message.parameters.get(TRACE_HEADER); + final UUID sessionId = (UUID)message.parameters.get(ParameterType.TRACE_SESSION); - if (sessionBytes == null) + if (sessionId == null) return null; - assert sessionBytes.length == 16; - UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)); TraceState ts = get(sessionId); if (ts != null && ts.acquireReference()) return ts; - byte[] tmpBytes; + TraceType tmpType; TraceType traceType = TraceType.QUERY; - if ((tmpBytes = message.parameters.get(TRACE_TYPE)) != null) - traceType = TraceType.deserialize(tmpBytes[0]); + if ((tmpType = (TraceType)message.parameters.get(ParameterType.TRACE_TYPE)) != null) + traceType = tmpType; if (message.verb == MessagingService.Verb.REQUEST_RESPONSE) { @@ -257,16 +277,16 @@ public abstract class Tracing implements ExecutorLocal<TraceState> } } - public Map<String, byte[]> getTraceHeaders() + public List<Object> getTraceHeaders() { assert isTracing(); - return ImmutableMap.of( - TRACE_HEADER, UUIDGen.decompose(Tracing.instance.getSessionId()), - TRACE_TYPE, new byte[] { Tracing.TraceType.serialize(Tracing.instance.getTraceType()) }); + return ImmutableList.of( + ParameterType.TRACE_SESSION, Tracing.instance.getSessionId(), + ParameterType.TRACE_TYPE, Tracing.instance.getTraceType()); } - protected abstract TraceState newTraceState(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType); + protected abstract TraceState newTraceState(InetAddressAndPort coordinator, UUID sessionId, Tracing.TraceType traceType); // repair just gets a varargs method since it's so heavyweight anyway public static void traceRepair(String format, Object... args) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tracing/TracingImpl.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TracingImpl.java b/src/java/org/apache/cassandra/tracing/TracingImpl.java index 789216e..1e32f10 100644 --- a/src/java/org/apache/cassandra/tracing/TracingImpl.java +++ b/src/java/org/apache/cassandra/tracing/TracingImpl.java @@ -26,6 +26,7 @@ import java.util.UUID; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.WrappedRunnable; @@ -93,7 +94,7 @@ class TracingImpl extends Tracing } @Override - protected TraceState newTraceState(InetAddress coordinator, UUID sessionId, TraceType traceType) + protected TraceState newTraceState(InetAddressAndPort coordinator, UUID sessionId, TraceType traceType) { return new TraceStateImpl(coordinator, sessionId, traceType); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/transport/Event.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java index ed77e59..77edf8a 100644 --- a/src/java/org/apache/cassandra/transport/Event.java +++ b/src/java/org/apache/cassandra/transport/Event.java @@ -24,6 +24,7 @@ import java.util.List; import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; +import org.apache.cassandra.locator.InetAddressAndPort; public abstract class Event { @@ -110,19 +111,19 @@ public abstract class Event this.change = change; } - public static TopologyChange newNode(InetAddress host, int port) + public static TopologyChange newNode(InetAddressAndPort address) { - return new TopologyChange(Change.NEW_NODE, new InetSocketAddress(host, port)); + return new TopologyChange(Change.NEW_NODE, new InetSocketAddress(address.address, address.port)); } - public static TopologyChange removedNode(InetAddress host, int port) + public static TopologyChange removedNode(InetAddressAndPort address) { - return new TopologyChange(Change.REMOVED_NODE, new InetSocketAddress(host, port)); + return new TopologyChange(Change.REMOVED_NODE, new InetSocketAddress(address.address, address.port)); } - public static TopologyChange movedNode(InetAddress host, int port) + public static TopologyChange movedNode(InetAddressAndPort address) { - return new TopologyChange(Change.MOVED_NODE, new InetSocketAddress(host, port)); + return new TopologyChange(Change.MOVED_NODE, new InetSocketAddress(address.address, address.port)); } // Assumes the type has already been deserialized @@ -181,14 +182,14 @@ public abstract class Event this.status = status; } - public static StatusChange nodeUp(InetAddress host, int port) + public static StatusChange nodeUp(InetAddressAndPort address) { - return new StatusChange(Status.UP, new InetSocketAddress(host, port)); + return new StatusChange(Status.UP, new InetSocketAddress(address.address, address.port)); } - public static StatusChange nodeDown(InetAddress host, int port) + public static StatusChange nodeDown(InetAddressAndPort address) { - return new StatusChange(Status.DOWN, new InetSocketAddress(host, port)); + return new StatusChange(Status.DOWN, new InetSocketAddress(address.address, address.port)); } // Assumes the type has already been deserialized http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/transport/ProtocolVersion.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/ProtocolVersion.java b/src/java/org/apache/cassandra/transport/ProtocolVersion.java index cd73c86..838176a 100644 --- a/src/java/org/apache/cassandra/transport/ProtocolVersion.java +++ b/src/java/org/apache/cassandra/transport/ProtocolVersion.java @@ -43,7 +43,8 @@ public enum ProtocolVersion implements Comparable<ProtocolVersion> V2(2, "v2", false), // no longer supported V3(3, "v3", false), V4(4, "v4", false), - V5(5, "v5-beta", true); + V5(5, "v5", false), + V6(6, "v6-beta", true); /** The version number */ private final int num; @@ -62,7 +63,7 @@ public enum ProtocolVersion implements Comparable<ProtocolVersion> } /** The supported versions stored as an array, these should be private and are required for fast decoding*/ - private final static ProtocolVersion[] SUPPORTED_VERSIONS = new ProtocolVersion[] { V3, V4, V5 }; + private final static ProtocolVersion[] SUPPORTED_VERSIONS = new ProtocolVersion[] { V3, V4, V5, V6 }; final static ProtocolVersion MIN_SUPPORTED_VERSION = SUPPORTED_VERSIONS[0]; final static ProtocolVersion MAX_SUPPORTED_VERSION = SUPPORTED_VERSIONS[SUPPORTED_VERSIONS.length - 1]; @@ -73,8 +74,8 @@ public enum ProtocolVersion implements Comparable<ProtocolVersion> public final static EnumSet<ProtocolVersion> UNSUPPORTED = EnumSet.complementOf(SUPPORTED); /** The preferred versions */ - public final static ProtocolVersion CURRENT = V4; - public final static Optional<ProtocolVersion> BETA = Optional.of(V5); + public final static ProtocolVersion CURRENT = V5; + public final static Optional<ProtocolVersion> BETA = Optional.of(V6); public static List<String> supportedVersions() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index d3f1c2c..cd04edc 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -51,6 +51,7 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaChangeListener; import org.apache.cassandra.security.SSLFactory; @@ -454,51 +455,32 @@ public class Server implements CassandraDaemon.Server // We keep track of the latest status change events we have sent to avoid sending duplicates // since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236, CASSANDRA-9156) - private final Map<InetAddress, LatestEvent> latestEvents = new ConcurrentHashMap<>(); + private final Map<InetAddressAndPort, LatestEvent> latestEvents = new ConcurrentHashMap<>(); // We also want to delay delivering a NEW_NODE notification until the new node has set its RPC ready // state. This tracks the endpoints which have joined, but not yet signalled they're ready for clients - private final Set<InetAddress> endpointsPendingJoinedNotification = ConcurrentHashMap.newKeySet(); - - - private static final InetAddress bindAll; - static - { - try - { - bindAll = InetAddress.getByAddress(new byte[4]); - } - catch (UnknownHostException e) - { - throw new AssertionError(e); - } - } + private final Set<InetAddressAndPort> endpointsPendingJoinedNotification = ConcurrentHashMap.newKeySet(); private EventNotifier(Server server) { this.server = server; } - private InetAddress getRpcAddress(InetAddress endpoint) + private InetAddressAndPort getNativeAddress(InetAddressAndPort endpoint) { try { - InetAddress rpcAddress = InetAddress.getByName(StorageService.instance.getRpcaddress(endpoint)); - // If rpcAddress == 0.0.0.0 (i.e. bound on all addresses), returning that is not very helpful, - // so return the internal address (which is ok since "we're bound on all addresses"). - // Note that after all nodes are running a version that includes CASSANDRA-5899, rpcAddress should - // never be 0.0.0.0, so this can eventually be removed. - return rpcAddress.equals(bindAll) ? endpoint : rpcAddress; + return InetAddressAndPort.getByName(StorageService.instance.getNativeaddress(endpoint, true)); } catch (UnknownHostException e) { // That should not happen, so log an error, but return the // endpoint address since there's a good change this is right logger.error("Problem retrieving RPC address for {}", endpoint, e); - return endpoint; + return InetAddressAndPort.getByAddressOverrideDefaults(endpoint.address, DatabaseDescriptor.getNativeTransportPort()); } } - private void send(InetAddress endpoint, Event.NodeEvent event) + private void send(InetAddressAndPort endpoint, Event.NodeEvent event) { if (logger.isTraceEnabled()) logger.trace("Sending event for endpoint {}, rpc address {}", endpoint, event.nodeAddress()); @@ -508,8 +490,8 @@ public class Server implements CassandraDaemon.Server // then don't send the notification. This covers the case of rpc_address set to "localhost", // which is not useful to any driver and in fact may cauase serious problems to some drivers, // see CASSANDRA-10052 - if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && - event.nodeAddress().equals(FBUtilities.getBroadcastRpcAddress())) + if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) && + event.nodeAddress().equals(FBUtilities.getJustBroadcastNativeAddress())) return; send(event); @@ -520,38 +502,38 @@ public class Server implements CassandraDaemon.Server server.connectionTracker.send(event); } - public void onJoinCluster(InetAddress endpoint) + public void onJoinCluster(InetAddressAndPort endpoint) { if (!StorageService.instance.isRpcReady(endpoint)) endpointsPendingJoinedNotification.add(endpoint); else - onTopologyChange(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort())); + onTopologyChange(endpoint, Event.TopologyChange.newNode(getNativeAddress(endpoint))); } - public void onLeaveCluster(InetAddress endpoint) + public void onLeaveCluster(InetAddressAndPort endpoint) { - onTopologyChange(endpoint, Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort())); + onTopologyChange(endpoint, Event.TopologyChange.removedNode(getNativeAddress(endpoint))); } - public void onMove(InetAddress endpoint) + public void onMove(InetAddressAndPort endpoint) { - onTopologyChange(endpoint, Event.TopologyChange.movedNode(getRpcAddress(endpoint), server.socket.getPort())); + onTopologyChange(endpoint, Event.TopologyChange.movedNode(getNativeAddress(endpoint))); } - public void onUp(InetAddress endpoint) + public void onUp(InetAddressAndPort endpoint) { if (endpointsPendingJoinedNotification.remove(endpoint)) onJoinCluster(endpoint); - onStatusChange(endpoint, Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort())); + onStatusChange(endpoint, Event.StatusChange.nodeUp(getNativeAddress(endpoint))); } - public void onDown(InetAddress endpoint) + public void onDown(InetAddressAndPort endpoint) { - onStatusChange(endpoint, Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort())); + onStatusChange(endpoint, Event.StatusChange.nodeDown(getNativeAddress(endpoint))); } - private void onTopologyChange(InetAddress endpoint, Event.TopologyChange event) + private void onTopologyChange(InetAddressAndPort endpoint, Event.TopologyChange event) { if (logger.isTraceEnabled()) logger.trace("Topology changed event : {}, {}", endpoint, event.change); @@ -565,7 +547,7 @@ public class Server implements CassandraDaemon.Server } } - private void onStatusChange(InetAddress endpoint, Event.StatusChange event) + private void onStatusChange(InetAddressAndPort endpoint, Event.StatusChange event) { if (logger.isTraceEnabled()) logger.trace("Status changed event : {}, {}", endpoint, event.status); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index 9163d56..84af41c 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -33,6 +33,7 @@ import org.apache.cassandra.cql3.functions.FunctionName; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.WriteType; import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.transport.*; import org.apache.cassandra.utils.MD5Digest; @@ -88,14 +89,14 @@ public class ErrorMessage extends Message.Response // The number of failures is also present in protocol v5, but used instead to specify the size of the failure map int failure = body.readInt(); - Map<InetAddress, RequestFailureReason> failureReasonByEndpoint = new ConcurrentHashMap<>(); + Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint = new ConcurrentHashMap<>(); if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) { for (int i = 0; i < failure; i++) { InetAddress endpoint = CBUtil.readInetAddr(body); RequestFailureReason failureReason = RequestFailureReason.fromCode(body.readUnsignedShort()); - failureReasonByEndpoint.put(endpoint, failureReason); + failureReasonByEndpoint.put(InetAddressAndPort.getByAddress(endpoint), failureReason); } } @@ -195,9 +196,9 @@ public class ErrorMessage extends Message.Response if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) { - for (Map.Entry<InetAddress, RequestFailureReason> entry : rfe.failureReasonByEndpoint.entrySet()) + for (Map.Entry<InetAddressAndPort, RequestFailureReason> entry : rfe.failureReasonByEndpoint.entrySet()) { - CBUtil.writeInetAddr(entry.getKey(), dest); + CBUtil.writeInetAddr(entry.getKey().address, dest); dest.writeShort(entry.getValue().code); } } @@ -260,9 +261,9 @@ public class ErrorMessage extends Message.Response if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) { - for (Map.Entry<InetAddress, RequestFailureReason> entry : rfe.failureReasonByEndpoint.entrySet()) + for (Map.Entry<InetAddressAndPort, RequestFailureReason> entry : rfe.failureReasonByEndpoint.entrySet()) { - size += CBUtil.sizeOfInetAddr(entry.getKey()); + size += CBUtil.sizeOfInetAddr(entry.getKey().address); size += 2; // RequestFailureReason code } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 1cb59d4..3ca8b89 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -58,6 +58,7 @@ import org.apache.cassandra.io.sstable.metadata.ValidationMetadata; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputBufferFixed; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.AsyncOneResponse; import org.codehaus.jackson.JsonFactory; @@ -78,7 +79,10 @@ public class FBUtilities private static volatile InetAddress localInetAddress; private static volatile InetAddress broadcastInetAddress; - private static volatile InetAddress broadcastRpcAddress; + private static volatile InetAddress broadcastNativeAddress; + private static volatile InetAddressAndPort broadcastNativeAddressAndPort; + private static volatile InetAddressAndPort broadcastInetAddressAndPort; + private static volatile InetAddressAndPort localInetAddressAndPort; public static int getAvailableProcessors() { @@ -92,9 +96,9 @@ public class FBUtilities public static final int MAX_UNSIGNED_SHORT = 0xFFFF; /** - * Please use getBroadcastAddress instead. You need this only when you have to listen/connect. + * Please use getJustBroadcastAddress instead. You need this only when you have to listen/connect. */ - public static InetAddress getLocalAddress() + public static InetAddress getJustLocalAddress() { if (localInetAddress == null) try @@ -110,30 +114,57 @@ public class FBUtilities return localInetAddress; } - public static InetAddress getBroadcastAddress() + public static InetAddressAndPort getLocalAddressAndPort() + { + if (localInetAddressAndPort == null) + { + localInetAddressAndPort = InetAddressAndPort.getByAddress(getJustLocalAddress()); + } + return localInetAddressAndPort; + } + + public static InetAddress getJustBroadcastAddress() { if (broadcastInetAddress == null) broadcastInetAddress = DatabaseDescriptor.getBroadcastAddress() == null - ? getLocalAddress() + ? getJustLocalAddress() : DatabaseDescriptor.getBroadcastAddress(); return broadcastInetAddress; } + public static InetAddressAndPort getBroadcastAddressAndPort() + { + if (broadcastInetAddressAndPort == null) + { + broadcastInetAddressAndPort = InetAddressAndPort.getByAddress(getJustBroadcastAddress()); + } + return broadcastInetAddressAndPort; + } + /** * <b>THIS IS FOR TESTING ONLY!!</b> */ public static void setBroadcastInetAddress(InetAddress addr) { broadcastInetAddress = addr; + broadcastInetAddressAndPort = InetAddressAndPort.getByAddress(broadcastInetAddress); } - public static InetAddress getBroadcastRpcAddress() + public static InetAddress getJustBroadcastNativeAddress() { - if (broadcastRpcAddress == null) - broadcastRpcAddress = DatabaseDescriptor.getBroadcastRpcAddress() == null + if (broadcastNativeAddress == null) + broadcastNativeAddress = DatabaseDescriptor.getBroadcastRpcAddress() == null ? DatabaseDescriptor.getRpcAddress() : DatabaseDescriptor.getBroadcastRpcAddress(); - return broadcastRpcAddress; + return broadcastNativeAddress; + } + + public static InetAddressAndPort getBroadcastNativeAddressAndPort() + { + if (broadcastNativeAddressAndPort == null) + broadcastNativeAddressAndPort = InetAddressAndPort.getByAddressOverrideDefaults(getJustBroadcastNativeAddress(), + DatabaseDescriptor.getNativeTransportPort()); + return broadcastNativeAddressAndPort; } public static Collection<InetAddress> getAllLocalAddresses() @@ -838,7 +869,9 @@ public class FBUtilities public static void reset() { localInetAddress = null; + localInetAddressAndPort = null; broadcastInetAddress = null; - broadcastRpcAddress = null; + broadcastInetAddressAndPort = null; + broadcastNativeAddress = null; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/utils/JMXServerUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/JMXServerUtils.java b/src/java/org/apache/cassandra/utils/JMXServerUtils.java index e78ed01..bb5c3ac 100644 --- a/src/java/org/apache/cassandra/utils/JMXServerUtils.java +++ b/src/java/org/apache/cassandra/utils/JMXServerUtils.java @@ -236,7 +236,7 @@ public class JMXServerUtils String hostName; if (serverAddress == null) { - hostName = FBUtilities.getBroadcastAddress() instanceof Inet6Address ? "[::]" : "0.0.0.0"; + hostName = FBUtilities.getJustBroadcastAddress() instanceof Inet6Address ? "[::]" : "0.0.0.0"; } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/utils/Mx4jTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Mx4jTool.java b/src/java/org/apache/cassandra/utils/Mx4jTool.java index cd42aca..5baaea2 100644 --- a/src/java/org/apache/cassandra/utils/Mx4jTool.java +++ b/src/java/org/apache/cassandra/utils/Mx4jTool.java @@ -79,7 +79,7 @@ public class Mx4jTool { String sAddress = System.getProperty("mx4jaddress"); if (StringUtils.isEmpty(sAddress)) - sAddress = FBUtilities.getBroadcastAddress().getHostAddress(); + sAddress = FBUtilities.getBroadcastAddressAndPort().address.getHostAddress(); return sAddress; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org