http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 89d5358..cda575a 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -158,23 +158,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public Collection<Range<Token>> getLocalRanges(String keyspaceName) { - return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddress()); + return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddressAndPort()); } public Collection<Range<Token>> getPrimaryRanges(String keyspace) { - return getPrimaryRangesForEndpoint(keyspace, FBUtilities.getBroadcastAddress()); + return getPrimaryRangesForEndpoint(keyspace, FBUtilities.getBroadcastAddressAndPort()); } public Collection<Range<Token>> getPrimaryRangesWithinDC(String keyspace) { - return getPrimaryRangeForEndpointWithinDC(keyspace, FBUtilities.getBroadcastAddress()); + return getPrimaryRangeForEndpointWithinDC(keyspace, FBUtilities.getBroadcastAddressAndPort()); } - private final Set<InetAddress> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddress>()); + private final Set<InetAddressAndPort> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddressAndPort>()); private CassandraDaemon daemon; - private InetAddress removingNode; + private InetAddressAndPort removingNode; /* Are we starting this node in bootstrap mode? */ private volatile boolean isBootstrapMode; @@ -225,7 +225,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE SystemKeyspace.updateTokens(tokens); Collection<Token> localTokens = getLocalTokens(); setGossipTokens(localTokens); - tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress()); + tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddressAndPort()); setMode(Mode.NORMAL, false); } @@ -233,6 +233,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>(); states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens))); + states.add(Pair.create(ApplicationState.STATUS_WITH_PORT, valueFactory.normal(tokens))); states.add(Pair.create(ApplicationState.STATUS, valueFactory.normal(tokens))); Gossiper.instance.addLocalApplicationStates(states); } @@ -407,7 +408,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * they get the Gossip shutdown message, so even if * we don't get time to broadcast this, it is not a problem. * - * See {@link Gossiper#markAsShutdown(InetAddress)} + * See {@link Gossiper#markAsShutdown(InetAddressAndPort)} */ private void shutdownClientServers() { @@ -463,9 +464,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE "To perform this operation, please restart with " + "-Dcassandra.allow_unsafe_replace=true"); - InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress(); + InetAddressAndPort replaceAddress = DatabaseDescriptor.getReplaceAddress(); logger.info("Gathering node replacement information for {}", replaceAddress); - Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound(); + Map<InetAddressAndPort, EndpointState> epStates = Gossiper.instance.doShadowRound(); // as we've completed the shadow round of gossip, we should be able to find the node we're replacing if (epStates.get(replaceAddress) == null) throw new RuntimeException(String.format("Cannot replace_address %s because it doesn't exist in gossip", replaceAddress)); @@ -503,25 +504,32 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } logger.debug("Starting shadow gossip round to check for endpoint collision"); - Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound(); + Map<InetAddressAndPort, EndpointState> epStates = Gossiper.instance.doShadowRound(); // If bootstrapping, check whether any previously known status for the endpoint makes it unsafe to do so. // If not bootstrapping, compare the host id for this endpoint learned from gossip (if any) with the local // one, which was either read from system.local or generated at startup. If a learned id is present & // doesn't match the local, then the node needs replacing - if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap(), epStates)) + if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddressAndPort(), localHostId, shouldBootstrap(), epStates)) { throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " + "Use cassandra.replace_address if you want to replace this node.", - FBUtilities.getBroadcastAddress())); + FBUtilities.getBroadcastAddressAndPort())); } if (shouldBootstrap() && useStrictConsistency && !allowSimultaneousMoves()) { - for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet()) + for (Map.Entry<InetAddressAndPort, EndpointState> entry : epStates.entrySet()) { // ignore local node or empty status - if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null) + if (entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()) || (entry.getValue().getApplicationState(ApplicationState.STATUS_WITH_PORT) == null & entry.getValue().getApplicationState(ApplicationState.STATUS) == null)) continue; + + VersionedValue value = entry.getValue().getApplicationState(ApplicationState.STATUS_WITH_PORT); + if (value == null) + { + value = entry.getValue().getApplicationState(ApplicationState.STATUS); + } + String[] pieces = splitValue(entry.getValue().getApplicationState(ApplicationState.STATUS)); assert (pieces.length > 0); String state = pieces[0]; @@ -553,10 +561,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true"))) { logger.info("Populating token metadata from system tables"); - Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens(); + Multimap<InetAddressAndPort, Token> loadedTokens = SystemKeyspace.loadTokens(); if (!shouldBootstrap()) // if we have not completed bootstrapping, we should not add ourselves as a normal token - loadedTokens.putAll(FBUtilities.getBroadcastAddress(), SystemKeyspace.getSavedTokens()); - for (InetAddress ep : loadedTokens.keySet()) + loadedTokens.putAll(FBUtilities.getBroadcastAddressAndPort(), SystemKeyspace.getSavedTokens()); + for (InetAddressAndPort ep : loadedTokens.keySet()) tokenMetadata.updateNormalTokens(loadedTokens.get(ep), ep); logger.info("Token metadata: {}", tokenMetadata); @@ -640,10 +648,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Collection<Token> tokens = SystemKeyspace.getSavedTokens(); if (!tokens.isEmpty()) { - tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress()); + tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddressAndPort()); // order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa. List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>(); states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens))); + states.add(Pair.create(ApplicationState.STATUS_WITH_PORT, valueFactory.hibernate(true))); states.add(Pair.create(ApplicationState.STATUS, valueFactory.hibernate(true))); Gossiper.instance.addLocalApplicationStates(states); } @@ -659,11 +668,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true"))) { logger.info("Loading persisted ring state"); - Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens(); - Map<InetAddress, UUID> loadedHostIds = SystemKeyspace.loadHostIds(); - for (InetAddress ep : loadedTokens.keySet()) + Multimap<InetAddressAndPort, Token> loadedTokens = SystemKeyspace.loadTokens(); + Map<InetAddressAndPort, UUID> loadedHostIds = SystemKeyspace.loadHostIds(); + for (InetAddressAndPort ep : loadedTokens.keySet()) { - if (ep.equals(FBUtilities.getBroadcastAddress())) + if (ep.equals(FBUtilities.getBroadcastAddressAndPort())) { // entry has been mistakenly added, delete it SystemKeyspace.removeEndpoint(ep); @@ -707,7 +716,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public static boolean isSeed() { - return DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()); + return DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort()); } private void prepareToJoin() throws ConfigurationException @@ -753,6 +762,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE "the node to be replaced ({}). If the previous node has been down for longer than max_hint_window_in_ms, " + "repair must be run after the replacement process in order to make this node consistent.", DatabaseDescriptor.getReplaceAddress()); + appStates.put(ApplicationState.STATUS_WITH_PORT, valueFactory.hibernate(true)); appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true)); } } @@ -765,10 +775,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // for bootstrap to get the load info it needs. // (we won't be part of the storage ring though until we add a counterId to our state, below.) // Seed the host ID-to-endpoint map with our own ID. - getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddress()); + getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddressAndPort()); appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion()); appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(localHostId)); - appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getBroadcastRpcAddress())); + appStates.put(ApplicationState.NATIVE_ADDRESS_AND_PORT, valueFactory.nativeaddressAndPort(FBUtilities.getBroadcastNativeAddressAndPort())); + appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getJustBroadcastNativeAddress())); appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion()); // load the persisted ring state. This used to be done earlier in the init process, @@ -826,16 +837,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // // We attempted to replace this with a schema-presence check, but you need a meaningful sleep // to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details. - Set<InetAddress> current = new HashSet<>(); + Set<InetAddressAndPort> current = new HashSet<>(); if (logger.isDebugEnabled()) { logger.debug("Bootstrap variables: {} {} {} {}", DatabaseDescriptor.isAutoBootstrap(), SystemKeyspace.bootstrapInProgress(), SystemKeyspace.bootstrapComplete(), - DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())); + DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort())); } - if (DatabaseDescriptor.isAutoBootstrap() && !SystemKeyspace.bootstrapComplete() && DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())) + if (DatabaseDescriptor.isAutoBootstrap() && !SystemKeyspace.bootstrapComplete() && DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort())) { logger.info("This node will not auto bootstrap because it is configured to be a seed node."); } @@ -873,13 +884,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // get bootstrap tokens if (!replacing) { - if (tokenMetadata.isMember(FBUtilities.getBroadcastAddress())) + if (tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort())) { String s = "This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)"; throw new UnsupportedOperationException(s); } setMode(Mode.JOINING, "getting bootstrap token", true); - bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata, FBUtilities.getBroadcastAddress(), delay); + bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata, FBUtilities.getBroadcastAddressAndPort(), delay); } else { @@ -899,7 +910,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // check for operator errors... for (Token token : bootstrapTokens) { - InetAddress existing = tokenMetadata.getEndpoint(token); + InetAddressAndPort existing = tokenMetadata.getEndpoint(token); if (existing != null) { long nanoDelay = delay * 1000000L; @@ -935,7 +946,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE bootstrapTokens = SystemKeyspace.getSavedTokens(); if (bootstrapTokens.isEmpty()) { - bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata, FBUtilities.getBroadcastAddress(), delay); + bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata, FBUtilities.getBroadcastAddressAndPort(), delay); } else { @@ -958,7 +969,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // remove the existing info about the replaced node. if (!current.isEmpty()) { - for (InetAddress existing : current) + for (InetAddressAndPort existing : current) Gossiper.instance.replacedEndpoint(existing); } } @@ -975,15 +986,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public static boolean isReplacingSameAddress() { - InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress(); - return replaceAddress != null && replaceAddress.equals(FBUtilities.getBroadcastAddress()); + InetAddressAndPort replaceAddress = DatabaseDescriptor.getReplaceAddress(); + return replaceAddress != null && replaceAddress.equals(FBUtilities.getBroadcastAddressAndPort()); } public void gossipSnitchInfo() { IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - String dc = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); - String rack = snitch.getRack(FBUtilities.getBroadcastAddress()); + String dc = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + String rack = snitch.getRack(FBUtilities.getBroadcastAddressAndPort()); Gossiper.instance.addLocalApplicationState(ApplicationState.DC, StorageService.instance.valueFactory.datacenter(dc)); Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.instance.valueFactory.rack(rack)); } @@ -1111,7 +1122,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public boolean isJoined() { - return tokenMetadata.isMember(FBUtilities.getBroadcastAddress()) && !isSurveyMode; + return tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort()) && !isSurveyMode; } public void rebuild(String sourceDc) @@ -1141,7 +1152,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { RangeStreamer streamer = new RangeStreamer(tokenMetadata, null, - FBUtilities.getBroadcastAddress(), + FBUtilities.getBroadcastAddressAndPort(), StreamOperation.REBUILD, useStrictConsistency && !replacing, DatabaseDescriptor.getEndpointSnitch(), @@ -1202,13 +1213,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (specificSources != null) { String[] stringHosts = specificSources.split(","); - Set<InetAddress> sources = new HashSet<>(stringHosts.length); + Set<InetAddressAndPort> sources = new HashSet<>(stringHosts.length); for (String stringHost : stringHosts) { try { - InetAddress endpoint = InetAddress.getByName(stringHost); - if (FBUtilities.getBroadcastAddress().equals(endpoint)) + InetAddressAndPort endpoint = InetAddressAndPort.getByName(stringHost); + if (FBUtilities.getBroadcastAddressAndPort().equals(endpoint)) { throw new IllegalArgumentException("This host was specified as a source for rebuilding. Sources for a rebuild can only be other nodes in the cluster."); } @@ -1449,8 +1460,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // if not an existing token then bootstrap List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<>(); states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens))); + states.add(Pair.create(ApplicationState.STATUS_WITH_PORT, replacing? + valueFactory.bootReplacingWithPort(DatabaseDescriptor.getReplaceAddress()) : + valueFactory.bootstrapping(tokens))); states.add(Pair.create(ApplicationState.STATUS, replacing? - valueFactory.bootReplacing(DatabaseDescriptor.getReplaceAddress()) : + valueFactory.bootReplacing(DatabaseDescriptor.getReplaceAddress().address) : valueFactory.bootstrapping(tokens))); Gossiper.instance.addLocalApplicationStates(states); setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true); @@ -1459,7 +1473,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE else { // Dont set any state for the node which is bootstrapping the existing token... - tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress()); + tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddressAndPort()); SystemKeyspace.removeEndpoint(DatabaseDescriptor.getReplaceAddress()); } if (!Gossiper.instance.seenAnySeed()) @@ -1475,7 +1489,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE invalidateDiskBoundaries(); setMode(Mode.JOINING, "Starting to bootstrap...", true); - BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata); + BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddressAndPort(), tokens, tokenMetadata); bootstrapper.addProgressListener(progressSupport); ListenableFuture<StreamState> bootstrapStream = bootstrapper.bootstrap(streamStateStore, useStrictConsistency && !replacing); // handles token update Futures.addCallback(bootstrapStream, new FutureCallback<StreamState>() @@ -1547,7 +1561,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // get bootstrap tokens saved in system keyspace final Collection<Token> tokens = SystemKeyspace.getSavedTokens(); // already bootstrapped ranges are filtered during bootstrap - BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata); + BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddressAndPort(), tokens, tokenMetadata); bootstrapper.addProgressListener(progressSupport); ListenableFuture<StreamState> bootstrapStream = bootstrapper.bootstrap(streamStateStore, useStrictConsistency && !replacing); // handles token update Futures.addCallback(bootstrapStream, new FutureCallback<StreamState>() @@ -1608,35 +1622,67 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return tokenMetadata; } + public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace) + { + return getRangeToEndpointMap(keyspace, false); + } + + public Map<List<String>, List<String>> getRangeToEndpointWithPortMap(String keyspace) + { + return getRangeToEndpointMap(keyspace, true); + } + /** * for a keyspace, return the ranges and corresponding listen addresses. * @param keyspace * @return the endpoint map */ - public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace) + public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace, boolean withPort) { /* All the ranges for the tokens */ Map<List<String>, List<String>> map = new HashMap<>(); - for (Map.Entry<Range<Token>,List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet()) + for (Map.Entry<Range<Token>,List<InetAddressAndPort>> entry : getRangeToAddressMap(keyspace).entrySet()) { - map.put(entry.getKey().asList(), stringify(entry.getValue())); + map.put(entry.getKey().asList(), stringify(entry.getValue(), withPort)); } return map; } /** - * Return the rpc address associated with an endpoint as a string. + * Return the native address associated with an endpoint as a string. * @param endpoint The endpoint to get rpc address for - * @return the rpc address + * @return the native address */ - public String getRpcaddress(InetAddress endpoint) + public String getNativeaddress(InetAddressAndPort endpoint, boolean withPort) { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) - return FBUtilities.getBroadcastRpcAddress().getHostAddress(); + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) + return FBUtilities.getBroadcastNativeAddressAndPort().toString(withPort); + else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT) != null) + { + try + { + InetAddressAndPort address = InetAddressAndPort.getByName(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value); + return address.getHostAddress(withPort); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS) == null) - return endpoint.getHostAddress(); + return endpoint.address.getHostAddress() + ":" + DatabaseDescriptor.getNativeTransportPort(); else - return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value; + return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value + ":" + DatabaseDescriptor.getNativeTransportPort(); + } + + public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace) + { + return getRangeToNativeaddressMap(keyspace, false); + } + + public Map<List<String>, List<String>> getRangeToNativeaddressWithPortMap(String keyspace) + { + return getRangeToNativeaddressMap(keyspace, true); } /** @@ -1644,16 +1690,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param keyspace * @return the endpoint map */ - public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace) + private Map<List<String>, List<String>> getRangeToNativeaddressMap(String keyspace, boolean withPort) { /* All the ranges for the tokens */ Map<List<String>, List<String>> map = new HashMap<>(); - for (Map.Entry<Range<Token>, List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet()) + for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : getRangeToAddressMap(keyspace).entrySet()) { List<String> rpcaddrs = new ArrayList<>(entry.getValue().size()); - for (InetAddress endpoint: entry.getValue()) + for (InetAddressAndPort endpoint: entry.getValue()) { - rpcaddrs.add(getRpcaddress(endpoint)); + rpcaddrs.add(getNativeaddress(endpoint, withPort)); } map.put(entry.getKey().asList(), rpcaddrs); } @@ -1662,40 +1708,50 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public Map<List<String>, List<String>> getPendingRangeToEndpointMap(String keyspace) { + return getPendingRangeToEndpointMap(keyspace, false); + } + + public Map<List<String>, List<String>> getPendingRangeToEndpointWithPortMap(String keyspace) + { + return getPendingRangeToEndpointMap(keyspace, true); + } + + private Map<List<String>, List<String>> getPendingRangeToEndpointMap(String keyspace, boolean withPort) + { // some people just want to get a visual representation of things. Allow null and set it to the first // non-system keyspace. if (keyspace == null) keyspace = Schema.instance.getNonLocalStrategyKeyspaces().get(0); Map<List<String>, List<String>> map = new HashMap<>(); - for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : tokenMetadata.getPendingRangesMM(keyspace).asMap().entrySet()) + for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> entry : tokenMetadata.getPendingRangesMM(keyspace).asMap().entrySet()) { - List<InetAddress> l = new ArrayList<>(entry.getValue()); - map.put(entry.getKey().asList(), stringify(l)); + List<InetAddressAndPort> l = new ArrayList<>(entry.getValue()); + map.put(entry.getKey().asList(), stringify(l, withPort)); } return map; } - public Map<Range<Token>, List<InetAddress>> getRangeToAddressMap(String keyspace) + public Map<Range<Token>, List<InetAddressAndPort>> getRangeToAddressMap(String keyspace) { return getRangeToAddressMap(keyspace, tokenMetadata.sortedTokens()); } - public Map<Range<Token>, List<InetAddress>> getRangeToAddressMapInLocalDC(String keyspace) + public Map<Range<Token>, List<InetAddressAndPort>> getRangeToAddressMapInLocalDC(String keyspace) { - Predicate<InetAddress> isLocalDC = new Predicate<InetAddress>() + Predicate<InetAddressAndPort> isLocalDC = new Predicate<InetAddressAndPort>() { - public boolean apply(InetAddress address) + public boolean apply(InetAddressAndPort address) { return isLocalDC(address); } }; - Map<Range<Token>, List<InetAddress>> origMap = getRangeToAddressMap(keyspace, getTokensInLocalDC()); - Map<Range<Token>, List<InetAddress>> filteredMap = Maps.newHashMap(); - for (Map.Entry<Range<Token>, List<InetAddress>> entry : origMap.entrySet()) + Map<Range<Token>, List<InetAddressAndPort>> origMap = getRangeToAddressMap(keyspace, getTokensInLocalDC()); + Map<Range<Token>, List<InetAddressAndPort>> filteredMap = Maps.newHashMap(); + for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : origMap.entrySet()) { - List<InetAddress> endpointsInLocalDC = Lists.newArrayList(Collections2.filter(entry.getValue(), isLocalDC)); + List<InetAddressAndPort> endpointsInLocalDC = Lists.newArrayList(Collections2.filter(entry.getValue(), isLocalDC)); filteredMap.put(entry.getKey(), endpointsInLocalDC); } @@ -1707,21 +1763,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE List<Token> filteredTokens = Lists.newArrayList(); for (Token token : tokenMetadata.sortedTokens()) { - InetAddress endpoint = tokenMetadata.getEndpoint(token); + InetAddressAndPort endpoint = tokenMetadata.getEndpoint(token); if (isLocalDC(endpoint)) filteredTokens.add(token); } return filteredTokens; } - private boolean isLocalDC(InetAddress targetHost) + private boolean isLocalDC(InetAddressAndPort targetHost) { String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost); - String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); return remoteDC.equals(localDC); } - private Map<Range<Token>, List<InetAddress>> getRangeToAddressMap(String keyspace, List<Token> sortedTokens) + private Map<Range<Token>, List<InetAddressAndPort>> getRangeToAddressMap(String keyspace, List<Token> sortedTokens) { // some people just want to get a visual representation of things. Allow null and set it to the first // non-system keyspace. @@ -1733,6 +1789,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } + public List<String> describeRingJMX(String keyspace) throws IOException + { + return describeRingJMX(keyspace, false); + } + + public List<String> describeRingWithPortJMX(String keyspace) throws IOException + { + return describeRingJMX(keyspace,true); + } + /** * The same as {@code describeRing(String)} but converts TokenRange to the String for JMX compatibility * @@ -1740,12 +1806,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * * @return a List of TokenRange(s) converted to String for the given keyspace */ - public List<String> describeRingJMX(String keyspace) throws IOException + private List<String> describeRingJMX(String keyspace, boolean withPort) throws IOException { List<TokenRange> tokenRanges; try { - tokenRanges = describeRing(keyspace); + tokenRanges = describeRing(keyspace, false, withPort); } catch (InvalidRequestException e) { @@ -1754,7 +1820,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE List<String> result = new ArrayList<>(tokenRanges.size()); for (TokenRange tokenRange : tokenRanges) - result.add(tokenRange.toString()); + result.add(tokenRange.toString(withPort)); return result; } @@ -1770,7 +1836,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ public List<TokenRange> describeRing(String keyspace) throws InvalidRequestException { - return describeRing(keyspace, false); + return describeRing(keyspace, false, false); } /** @@ -1778,10 +1844,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ public List<TokenRange> describeLocalRing(String keyspace) throws InvalidRequestException { - return describeRing(keyspace, true); + return describeRing(keyspace, true, false); } - private List<TokenRange> describeRing(String keyspace, boolean includeOnlyLocalDC) throws InvalidRequestException + private List<TokenRange> describeRing(String keyspace, boolean includeOnlyLocalDC, boolean withPort) throws InvalidRequestException { if (!Schema.instance.getKeyspaces().contains(keyspace)) throw new InvalidRequestException("No such keyspace: " + keyspace); @@ -1792,39 +1858,49 @@ public class StorageService extends NotificationBroadcasterSupport implements IE List<TokenRange> ranges = new ArrayList<>(); Token.TokenFactory tf = getTokenFactory(); - Map<Range<Token>, List<InetAddress>> rangeToAddressMap = + Map<Range<Token>, List<InetAddressAndPort>> rangeToAddressMap = includeOnlyLocalDC ? getRangeToAddressMapInLocalDC(keyspace) : getRangeToAddressMap(keyspace); - for (Map.Entry<Range<Token>, List<InetAddress>> entry : rangeToAddressMap.entrySet()) - ranges.add(TokenRange.create(tf, entry.getKey(), entry.getValue())); + for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : rangeToAddressMap.entrySet()) + ranges.add(TokenRange.create(tf, entry.getKey(), entry.getValue(), withPort)); return ranges; } public Map<String, String> getTokenToEndpointMap() { - Map<Token, InetAddress> mapInetAddress = tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap(); + return getTokenToEndpointMap(false); + } + + public Map<String, String> getTokenToEndpointWithPortMap() + { + return getTokenToEndpointMap(true); + } + + private Map<String, String> getTokenToEndpointMap(boolean withPort) + { + Map<Token, InetAddressAndPort> mapInetAddress = tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap(); // in order to preserve tokens in ascending order, we use LinkedHashMap here Map<String, String> mapString = new LinkedHashMap<>(mapInetAddress.size()); List<Token> tokens = new ArrayList<>(mapInetAddress.keySet()); Collections.sort(tokens); for (Token token : tokens) { - mapString.put(token.toString(), mapInetAddress.get(token).getHostAddress()); + mapString.put(token.toString(), mapInetAddress.get(token).getHostAddress(withPort)); } return mapString; } public String getLocalHostId() { - return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddress()).toString(); + return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddressAndPort()).toString(); } public UUID getLocalHostUUID() { - return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddress()); + return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddressAndPort()); } public Map<String, String> getHostIdMap() @@ -1832,19 +1908,40 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return getEndpointToHostId(); } + public Map<String, String> getEndpointToHostId() { + return getEndpointToHostId(false); + } + + public Map<String, String> getEndpointWithPortToHostId() + { + return getEndpointToHostId(true); + } + + private Map<String, String> getEndpointToHostId(boolean withPort) + { Map<String, String> mapOut = new HashMap<>(); - for (Map.Entry<InetAddress, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet()) - mapOut.put(entry.getKey().getHostAddress(), entry.getValue().toString()); + for (Map.Entry<InetAddressAndPort, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet()) + mapOut.put(entry.getKey().getHostAddress(withPort), entry.getValue().toString()); return mapOut; } public Map<String, String> getHostIdToEndpoint() { + return getHostIdToEndpoint(false); + } + + public Map<String, String> getHostIdToEndpointWithPort() + { + return getHostIdToEndpoint(true); + } + + private Map<String, String> getHostIdToEndpoint(boolean withPort) + { Map<String, String> mapOut = new HashMap<>(); - for (Map.Entry<InetAddress, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet()) - mapOut.put(entry.getValue().toString(), entry.getKey().getHostAddress()); + for (Map.Entry<InetAddressAndPort, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet()) + mapOut.put(entry.getValue().toString(), entry.getKey().getHostAddress(withPort)); return mapOut; } @@ -1854,9 +1951,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param ranges * @return mapping of ranges to the replicas responsible for them. */ - private Map<Range<Token>, List<InetAddress>> constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges) + private Map<Range<Token>, List<InetAddressAndPort>> constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges) { - Map<Range<Token>, List<InetAddress>> rangeToEndpointMap = new HashMap<>(ranges.size()); + Map<Range<Token>, List<InetAddressAndPort>> rangeToEndpointMap = new HashMap<>(ranges.size()); for (Range<Token> range : ranges) { rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.right)); @@ -1864,7 +1961,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return rangeToEndpointMap; } - public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) + public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) { // no-op } @@ -1901,9 +1998,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that * you should never bootstrap a new node during a removenode, decommission or move. */ - public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) + public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) { - if (state == ApplicationState.STATUS) + if (state == ApplicationState.STATUS | state == ApplicationState.STATUS_WITH_PORT) { String[] pieces = splitValue(value); assert (pieces.length > 0); @@ -1973,6 +2070,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new RuntimeException(e); } break; + case NATIVE_ADDRESS_AND_PORT: + try + { + InetAddressAndPort address = InetAddressAndPort.getByName(value.value); + SystemKeyspace.updatePeerNativeAddress(endpoint, address); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + break; case SCHEMA: SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value)); MigrationManager.instance.scheduleSchemaPull(endpoint, epState); @@ -1996,7 +2104,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return value.value.split(VersionedValue.DELIMITER_STR, -1); } - private void updateNetVersion(InetAddress endpoint, VersionedValue value) + private void updateNetVersion(InetAddressAndPort endpoint, VersionedValue value) { try { @@ -2008,7 +2116,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } - public void updateTopology(InetAddress endpoint) + public void updateTopology(InetAddressAndPort endpoint) { if (getTokenMetadata().isMember(endpoint)) { @@ -2021,9 +2129,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE getTokenMetadata().updateTopology(); } - private void updatePeerInfo(InetAddress endpoint) + private void updatePeerInfo(InetAddressAndPort endpoint) { EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + InetAddress native_address = null; + int native_port = DatabaseDescriptor.getNativeTransportPort(); + for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states()) { switch (entry.getKey()) @@ -2040,7 +2151,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE case RPC_ADDRESS: try { - SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value)); + native_address = InetAddress.getByName(entry.getValue().value); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + break; + case NATIVE_ADDRESS_AND_PORT: + try + { + InetAddressAndPort address = InetAddressAndPort.getByName(entry.getValue().value); + native_address = address.address; + native_port = address.port; } catch (UnknownHostException e) { @@ -2055,9 +2178,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE break; } } + + //Some tests won't set all the states + if (native_address != null) + { + SystemKeyspace.updatePeerNativeAddress(endpoint, + InetAddressAndPort.getByAddressOverrideDefaults(native_address, + native_port)); + } } - private void notifyRpcChange(InetAddress endpoint, boolean ready) + private void notifyRpcChange(InetAddressAndPort endpoint, boolean ready) { if (ready) notifyUp(endpoint); @@ -2065,7 +2196,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE notifyDown(endpoint); } - private void notifyUp(InetAddress endpoint) + private void notifyUp(InetAddressAndPort endpoint) { if (!isRpcReady(endpoint) || !Gossiper.instance.isAlive(endpoint)) return; @@ -2074,13 +2205,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE subscriber.onUp(endpoint); } - private void notifyDown(InetAddress endpoint) + private void notifyDown(InetAddressAndPort endpoint) { for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) subscriber.onDown(endpoint); } - private void notifyJoined(InetAddress endpoint) + private void notifyJoined(InetAddressAndPort endpoint) { if (!isStatus(endpoint, VersionedValue.STATUS_NORMAL)) return; @@ -2089,25 +2220,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE subscriber.onJoinCluster(endpoint); } - private void notifyMoved(InetAddress endpoint) + private void notifyMoved(InetAddressAndPort endpoint) { for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) subscriber.onMove(endpoint); } - private void notifyLeft(InetAddress endpoint) + private void notifyLeft(InetAddressAndPort endpoint) { for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) subscriber.onLeaveCluster(endpoint); } - private boolean isStatus(InetAddress endpoint, String status) + private boolean isStatus(InetAddressAndPort endpoint, String status) { EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); return state != null && state.getStatus().equals(status); } - public boolean isRpcReady(InetAddress endpoint) + public boolean isRpcReady(InetAddressAndPort endpoint) { EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); return state != null && state.isRpcReady(); @@ -2123,7 +2254,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ public void setRpcReady(boolean value) { - EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()); + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort()); // if value is false we're OK with a null state, if it is true we are not. assert !value || state != null; @@ -2131,7 +2262,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Gossiper.instance.addLocalApplicationState(ApplicationState.RPC_READY, valueFactory.rpcReady(value)); } - private Collection<Token> getTokensFor(InetAddress endpoint) + private Collection<Token> getTokensFor(InetAddressAndPort endpoint) { try { @@ -2156,7 +2287,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * * @param endpoint bootstrapping node */ - private void handleStateBootstrap(InetAddress endpoint) + private void handleStateBootstrap(InetAddressAndPort endpoint) { Collection<Token> tokens; // explicitly check for TOKENS, because a bootstrapping node might be bootstrapping in legacy mode; that is, not using vnodes and no token specified @@ -2186,12 +2317,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint); } - private void handleStateBootreplacing(InetAddress newNode, String[] pieces) + private void handleStateBootreplacing(InetAddressAndPort newNode, String[] pieces) { - InetAddress oldNode; + InetAddressAndPort oldNode; try { - oldNode = InetAddress.getByName(pieces[1]); + oldNode = InetAddressAndPort.getByName(pieces[1]); } catch (Exception e) { @@ -2204,7 +2335,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new RuntimeException(String.format("Node %s is trying to replace alive node %s.", newNode, oldNode)); } - Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(newNode); + Optional<InetAddressAndPort> replacingNode = tokenMetadata.getReplacingNode(newNode); if (replacingNode.isPresent() && !replacingNode.get().equals(oldNode)) { throw new RuntimeException(String.format("Node %s is already replacing %s but is trying to replace %s.", @@ -2228,12 +2359,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * * @param endpoint node */ - private void handleStateNormal(final InetAddress endpoint, final String status) + private void handleStateNormal(final InetAddressAndPort endpoint, final String status) { Collection<Token> tokens = getTokensFor(endpoint); Set<Token> tokensToUpdateInMetadata = new HashSet<>(); Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>(); - Set<InetAddress> endpointsToRemove = new HashSet<>(); + Set<InetAddressAndPort> endpointsToRemove = new HashSet<>(); if (logger.isDebugEnabled()) logger.debug("Node {} state {}, token {}", endpoint, status, tokens); @@ -2246,7 +2377,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE endpoint, Gossiper.instance.getEndpointStateForEndpoint(endpoint)); - Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(endpoint); + Optional<InetAddressAndPort> replacingNode = tokenMetadata.getReplacingNode(endpoint); if (replacingNode.isPresent()) { assert !endpoint.equals(replacingNode.get()) : "Pending replacement endpoint with same address is not supported"; @@ -2259,7 +2390,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE endpointsToRemove.add(replacingNode.get()); } - Optional<InetAddress> replacementNode = tokenMetadata.getReplacementNode(endpoint); + Optional<InetAddressAndPort> replacementNode = tokenMetadata.getReplacementNode(endpoint); if (replacementNode.isPresent()) { logger.warn("Node {} is currently being replaced by node {}.", endpoint, replacementNode.get()); @@ -2268,7 +2399,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE updatePeerInfo(endpoint); // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300). UUID hostId = Gossiper.instance.getHostId(endpoint); - InetAddress existing = tokenMetadata.getEndpointForHostId(hostId); + InetAddressAndPort existing = tokenMetadata.getEndpointForHostId(hostId); if (replacing && isReplacingSameAddress() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress())))) logger.warn("Not updating token metadata for {} because I am replacing it", endpoint); @@ -2276,7 +2407,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { if (existing != null && !existing.equals(endpoint)) { - if (existing.equals(FBUtilities.getBroadcastAddress())) + if (existing.equals(FBUtilities.getBroadcastAddressAndPort())) { logger.warn("Not updating host ID {} for {} because it's mine", hostId, endpoint); tokenMetadata.removeEndpoint(endpoint); @@ -2303,7 +2434,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE for (final Token token : tokens) { // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint. - InetAddress currentOwner = tokenMetadata.getEndpoint(token); + InetAddressAndPort currentOwner = tokenMetadata.getEndpoint(token); if (currentOwner == null) { logger.debug("New node {} at token {}", endpoint, token); @@ -2323,7 +2454,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // currentOwner is no longer current, endpoint is. Keep track of these moves, because when // a host no longer has any tokens, we'll want to remove it. - Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading(); + Multimap<InetAddressAndPort, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading(); epToTokenCopy.get(currentOwner).remove(token); if (epToTokenCopy.get(currentOwner).size() < 1) endpointsToRemove.add(currentOwner); @@ -2348,7 +2479,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE boolean isMember = tokenMetadata.isMember(endpoint); boolean isMoving = tokenMetadata.isMoving(endpoint); tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint); - for (InetAddress ep : endpointsToRemove) + for (InetAddressAndPort ep : endpointsToRemove) { removeEndpoint(ep); if (replacing && DatabaseDescriptor.getReplaceAddress().equals(ep)) @@ -2375,7 +2506,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * * @param endpoint node */ - private void handleStateLeaving(InetAddress endpoint) + private void handleStateLeaving(InetAddressAndPort endpoint) { Collection<Token> tokens = getTokensFor(endpoint); @@ -2408,7 +2539,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param endpoint If reason for leaving is decommission, endpoint is the leaving node. * @param pieces STATE_LEFT,token */ - private void handleStateLeft(InetAddress endpoint, String[] pieces) + private void handleStateLeft(InetAddressAndPort endpoint, String[] pieces) { assert pieces.length >= 2; Collection<Token> tokens = getTokensFor(endpoint); @@ -2425,7 +2556,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param endpoint moving endpoint address * @param pieces STATE_MOVING, token */ - private void handleStateMoving(InetAddress endpoint, String[] pieces) + private void handleStateMoving(InetAddressAndPort endpoint, String[] pieces) { assert pieces.length >= 2; Token token = getTokenFactory().fromString(pieces[1]); @@ -2444,11 +2575,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param endpoint node * @param pieces either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored) */ - private void handleStateRemoving(InetAddress endpoint, String[] pieces) + private void handleStateRemoving(InetAddressAndPort endpoint, String[] pieces) { assert (pieces.length > 0); - if (endpoint.equals(FBUtilities.getBroadcastAddress())) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) { logger.info("Received removenode gossip about myself. Is this node rejoining after an explicit removenode?"); try @@ -2494,7 +2625,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } - private void excise(Collection<Token> tokens, InetAddress endpoint) + private void excise(Collection<Token> tokens, InetAddressAndPort endpoint) { logger.info("Removing tokens {} for {}", tokens, endpoint); @@ -2510,20 +2641,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE PendingRangeCalculatorService.instance.update(); } - private void excise(Collection<Token> tokens, InetAddress endpoint, long expireTime) + private void excise(Collection<Token> tokens, InetAddressAndPort endpoint, long expireTime) { addExpireTimeIfFound(endpoint, expireTime); excise(tokens, endpoint); } /** unlike excise we just need this endpoint gone without going through any notifications **/ - private void removeEndpoint(InetAddress endpoint) + private void removeEndpoint(InetAddressAndPort endpoint) { Gossiper.instance.removeEndpoint(endpoint); SystemKeyspace.removeEndpoint(endpoint); } - protected void addExpireTimeIfFound(InetAddress endpoint, long expireTime) + protected void addExpireTimeIfFound(InetAddressAndPort endpoint, long expireTime) { if (expireTime != 0L) { @@ -2543,23 +2674,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param ranges the ranges to find sources for * @return multimap of addresses to ranges the address is responsible for */ - private Multimap<InetAddress, Range<Token>> getNewSourceRanges(String keyspaceName, Set<Range<Token>> ranges) + private Multimap<InetAddressAndPort, Range<Token>> getNewSourceRanges(String keyspaceName, Set<Range<Token>> ranges) { - InetAddress myAddress = FBUtilities.getBroadcastAddress(); - Multimap<Range<Token>, InetAddress> rangeAddresses = Keyspace.open(keyspaceName).getReplicationStrategy().getRangeAddresses(tokenMetadata.cloneOnlyTokenMap()); - Multimap<InetAddress, Range<Token>> sourceRanges = HashMultimap.create(); + InetAddressAndPort myAddress = FBUtilities.getBroadcastAddressAndPort(); + Multimap<Range<Token>, InetAddressAndPort> rangeAddresses = Keyspace.open(keyspaceName).getReplicationStrategy().getRangeAddresses(tokenMetadata.cloneOnlyTokenMap()); + Multimap<InetAddressAndPort, Range<Token>> sourceRanges = HashMultimap.create(); IFailureDetector failureDetector = FailureDetector.instance; // find alive sources for our new ranges for (Range<Token> range : ranges) { - Collection<InetAddress> possibleRanges = rangeAddresses.get(range); + Collection<InetAddressAndPort> possibleRanges = rangeAddresses.get(range); IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - List<InetAddress> sources = snitch.getSortedListByProximity(myAddress, possibleRanges); + List<InetAddressAndPort> sources = snitch.getSortedListByProximity(myAddress, possibleRanges); assert (!sources.contains(myAddress)); - for (InetAddress source : sources) + for (InetAddressAndPort source : sources) { if (failureDetector.isAlive(source)) { @@ -2576,7 +2707,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * * @param remote node to send notification to */ - private void sendReplicationNotification(InetAddress remote) + private void sendReplicationNotification(InetAddressAndPort remote) { // notify the remote token MessageOut msg = new MessageOut(MessagingService.Verb.REPLICATION_FINISHED); @@ -2608,23 +2739,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * * @param endpoint the node that left */ - private void restoreReplicaCount(InetAddress endpoint, final InetAddress notifyEndpoint) + private void restoreReplicaCount(InetAddressAndPort endpoint, final InetAddressAndPort notifyEndpoint) { - Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> rangesToFetch = HashMultimap.create(); + Multimap<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> rangesToFetch = HashMultimap.create(); - InetAddress myAddress = FBUtilities.getBroadcastAddress(); + InetAddressAndPort myAddress = FBUtilities.getBroadcastAddressAndPort(); for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) { - Multimap<Range<Token>, InetAddress> changedRanges = getChangedRangesForLeaving(keyspaceName, endpoint); + Multimap<Range<Token>, InetAddressAndPort> changedRanges = getChangedRangesForLeaving(keyspaceName, endpoint); Set<Range<Token>> myNewRanges = new HashSet<>(); - for (Map.Entry<Range<Token>, InetAddress> entry : changedRanges.entries()) + for (Map.Entry<Range<Token>, InetAddressAndPort> entry : changedRanges.entries()) { if (entry.getValue().equals(myAddress)) myNewRanges.add(entry.getKey()); } - Multimap<InetAddress, Range<Token>> sourceRanges = getNewSourceRanges(keyspaceName, myNewRanges); - for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : sourceRanges.asMap().entrySet()) + Multimap<InetAddressAndPort, Range<Token>> sourceRanges = getNewSourceRanges(keyspaceName, myNewRanges); + for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : sourceRanges.asMap().entrySet()) { rangesToFetch.put(keyspaceName, entry); } @@ -2633,10 +2764,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE StreamPlan stream = new StreamPlan(StreamOperation.RESTORE_REPLICA_COUNT); for (String keyspaceName : rangesToFetch.keySet()) { - for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName)) + for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName)) { - InetAddress source = entry.getKey(); - InetAddress preferred = SystemKeyspace.getPreferredIP(source); + InetAddressAndPort source = entry.getKey(); + InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(source); Collection<Range<Token>> ranges = entry.getValue(); if (logger.isDebugEnabled()) logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", ")); @@ -2661,7 +2792,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } // needs to be modified to accept either a keyspace or ARS. - private Multimap<Range<Token>, InetAddress> getChangedRangesForLeaving(String keyspaceName, InetAddress endpoint) + private Multimap<Range<Token>, InetAddressAndPort> getChangedRangesForLeaving(String keyspaceName, InetAddressAndPort endpoint) { // First get all ranges the leaving endpoint is responsible for Collection<Range<Token>> ranges = getRangesForEndpoint(keyspaceName, endpoint); @@ -2669,7 +2800,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (logger.isDebugEnabled()) logger.debug("Node {} ranges [{}]", endpoint, StringUtils.join(ranges, ", ")); - Map<Range<Token>, List<InetAddress>> currentReplicaEndpoints = new HashMap<>(ranges.size()); + Map<Range<Token>, List<InetAddressAndPort>> currentReplicaEndpoints = new HashMap<>(ranges.size()); // Find (for each range) all nodes that store replicas for these ranges as well TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap(); // don't do this in the loop! #7758 @@ -2683,7 +2814,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (temp.isMember(endpoint)) temp.removeEndpoint(endpoint); - Multimap<Range<Token>, InetAddress> changedRanges = HashMultimap.create(); + Multimap<Range<Token>, InetAddressAndPort> changedRanges = HashMultimap.create(); // Go through the ranges and for each range check who will be // storing replicas for these ranges when the leaving endpoint @@ -2692,7 +2823,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // range. for (Range<Token> range : ranges) { - Collection<InetAddress> newReplicaEndpoints = Keyspace.open(keyspaceName).getReplicationStrategy().calculateNaturalEndpoints(range.right, temp); + Collection<InetAddressAndPort> newReplicaEndpoints = Keyspace.open(keyspaceName).getReplicationStrategy().calculateNaturalEndpoints(range.right, temp); newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range)); if (logger.isDebugEnabled()) if (newReplicaEndpoints.isEmpty()) @@ -2705,7 +2836,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return changedRanges; } - public void onJoin(InetAddress endpoint, EndpointState epState) + public void onJoin(InetAddressAndPort endpoint, EndpointState epState) { for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states()) { @@ -2714,7 +2845,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE MigrationManager.instance.scheduleSchemaPull(endpoint, epState); } - public void onAlive(InetAddress endpoint, EndpointState state) + public void onAlive(InetAddressAndPort endpoint, EndpointState state) { MigrationManager.instance.scheduleSchemaPull(endpoint, state); @@ -2722,19 +2853,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE notifyUp(endpoint); } - public void onRemove(InetAddress endpoint) + public void onRemove(InetAddressAndPort endpoint) { tokenMetadata.removeEndpoint(endpoint); PendingRangeCalculatorService.instance.update(); } - public void onDead(InetAddress endpoint, EndpointState state) + public void onDead(InetAddressAndPort endpoint, EndpointState state) { MessagingService.instance().convict(endpoint); notifyDown(endpoint); } - public void onRestart(InetAddress endpoint, EndpointState state) + public void onRestart(InetAddressAndPort endpoint, EndpointState state) { // If we have restarted before the node was even marked down, we need to reset the connection pool if (state.isAlive()) @@ -2753,15 +2884,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return FileUtils.stringifyFileSize(StorageMetrics.load.getCount()); } + public Map<String, String> getLoadMapWithPort() + { + return getLoadMap(true); + } + public Map<String, String> getLoadMap() { + return getLoadMap(false); + } + + private Map<String, String> getLoadMap(boolean withPort) + { Map<String, String> map = new HashMap<>(); - for (Map.Entry<InetAddress,Double> entry : LoadBroadcaster.instance.getLoadInfo().entrySet()) + for (Map.Entry<InetAddressAndPort,Double> entry : LoadBroadcaster.instance.getLoadInfo().entrySet()) { - map.put(entry.getKey().getHostAddress(), FileUtils.stringifyFileSize(entry.getValue())); + map.put(entry.getKey().getHostAddress(withPort), FileUtils.stringifyFileSize(entry.getValue())); } // gossiper doesn't see its own updates, so we need to special-case the local node - map.put(FBUtilities.getBroadcastAddress().getHostAddress(), getLoadString()); + map.put(withPort ? FBUtilities.getJustBroadcastAddress().getHostAddress() : FBUtilities.getBroadcastAddressAndPort().toString(), getLoadString()); return map; } @@ -2779,13 +2920,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } @Nullable - public InetAddress getEndpointForHostId(UUID hostId) + public InetAddressAndPort getEndpointForHostId(UUID hostId) { return tokenMetadata.getEndpointForHostId(hostId); } @Nullable - public UUID getHostIdForEndpoint(InetAddress address) + public UUID getHostIdForEndpoint(InetAddressAndPort address) { return tokenMetadata.getHostId(address); } @@ -2794,15 +2935,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public List<String> getTokens() { - return getTokens(FBUtilities.getBroadcastAddress()); + return getTokens(FBUtilities.getBroadcastAddressAndPort()); } public List<String> getTokens(String endpoint) throws UnknownHostException { - return getTokens(InetAddress.getByName(endpoint)); + return getTokens(InetAddressAndPort.getByName(endpoint)); } - private List<String> getTokens(InetAddress endpoint) + private List<String> getTokens(InetAddressAndPort endpoint) { List<String> strTokens = new ArrayList<>(); for (Token tok : getTokenMetadata().getTokens(endpoint)) @@ -2820,42 +2961,74 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return Schema.instance.getVersion().toString(); } + @Deprecated public List<String> getLeavingNodes() { - return stringify(tokenMetadata.getLeavingEndpoints()); + return stringify(tokenMetadata.getLeavingEndpoints(), false); + } + + public List<String> getLeavingNodesWithPort() + { + return stringify(tokenMetadata.getLeavingEndpoints(), true); } + @Deprecated public List<String> getMovingNodes() { List<String> endpoints = new ArrayList<>(); - for (Pair<Token, InetAddress> node : tokenMetadata.getMovingEndpoints()) + for (Pair<Token, InetAddressAndPort> node : tokenMetadata.getMovingEndpoints()) + { + endpoints.add(node.right.address.getHostAddress()); + } + + return endpoints; + } + + public List<String> getMovingNodesWithPort() + { + List<String> endpoints = new ArrayList<>(); + + for (Pair<Token, InetAddressAndPort> node : tokenMetadata.getMovingEndpoints()) { - endpoints.add(node.right.getHostAddress()); + endpoints.add(node.right.toString()); } return endpoints; } + public List<String> getJoiningNodes() { - return stringify(tokenMetadata.getBootstrapTokens().valueSet()); + return stringify(tokenMetadata.getBootstrapTokens().valueSet(), false); + } + + @Deprecated + public List<String> getJoiningNodesWithPort() + { + return stringify(tokenMetadata.getBootstrapTokens().valueSet(), true); } public List<String> getLiveNodes() { - return stringify(Gossiper.instance.getLiveMembers()); + return stringify(Gossiper.instance.getLiveMembers(), false); + } + + @Deprecated + public List<String> getLiveNodesWithPort() + { + return stringify(Gossiper.instance.getLiveMembers(), true); } - public Set<InetAddress> getLiveRingMembers() + public Set<InetAddressAndPort> getLiveRingMembers() { return getLiveRingMembers(false); } - public Set<InetAddress> getLiveRingMembers(boolean excludeDeadStates) + public Set<InetAddressAndPort> getLiveRingMembers(boolean excludeDeadStates) { - Set<InetAddress> ret = new HashSet<>(); - for (InetAddress ep : Gossiper.instance.getLiveMembers()) + Set<InetAddressAndPort> ret = new HashSet<>(); + for (InetAddressAndPort ep : Gossiper.instance.getLiveMembers()) { if (excludeDeadStates) { @@ -2871,9 +3044,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } + @Deprecated public List<String> getUnreachableNodes() { - return stringify(Gossiper.instance.getUnreachableMembers()); + return stringify(Gossiper.instance.getUnreachableMembers(), false); + } + + public List<String> getUnreachableNodesWithPort() + { + return stringify(Gossiper.instance.getUnreachableMembers(), true); } public String[] getAllDataFileLocations() @@ -2894,19 +3073,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return FileUtils.getCanonicalPath(DatabaseDescriptor.getSavedCachesLocation()); } - private List<String> stringify(Iterable<InetAddress> endpoints) + private List<String> stringify(Iterable<InetAddressAndPort> endpoints, boolean withPort) { List<String> stringEndpoints = new ArrayList<>(); - for (InetAddress ep : endpoints) + for (InetAddressAndPort ep : endpoints) { - stringEndpoints.add(ep.getHostAddress()); + stringEndpoints.add(ep.getHostAddress(withPort)); } return stringEndpoints; } public int getCurrentGenerationNumber() { - return Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getBroadcastAddress()); + return Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getBroadcastAddressAndPort()); } public int forceKeyspaceCleanup(String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException @@ -3436,14 +3615,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param ep endpoint we are interested in. * @return primary ranges for the specified endpoint. */ - public Collection<Range<Token>> getPrimaryRangesForEndpoint(String keyspace, InetAddress ep) + public Collection<Range<Token>> getPrimaryRangesForEndpoint(String keyspace, InetAddressAndPort ep) { AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); Collection<Range<Token>> primaryRanges = new HashSet<>(); TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap(); for (Token token : metadata.sortedTokens()) { - List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(token, metadata); + List<InetAddressAndPort> endpoints = strategy.calculateNaturalEndpoints(token, metadata); if (endpoints.size() > 0 && endpoints.get(0).equals(ep)) primaryRanges.add(new Range<>(metadata.getPredecessor(token), token)); } @@ -3453,23 +3632,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE /** * Get the "primary ranges" within local DC for the specified keyspace and endpoint. * - * @see #getPrimaryRangesForEndpoint(String, java.net.InetAddress) + * @see #getPrimaryRangesForEndpoint(String, InetAddressAndPort) * @param keyspace Keyspace name to check primary ranges * @param referenceEndpoint endpoint we are interested in. * @return primary ranges within local DC for the specified endpoint. */ - public Collection<Range<Token>> getPrimaryRangeForEndpointWithinDC(String keyspace, InetAddress referenceEndpoint) + public Collection<Range<Token>> getPrimaryRangeForEndpointWithinDC(String keyspace, InetAddressAndPort referenceEndpoint) { TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap(); String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(referenceEndpoint); - Collection<InetAddress> localDcNodes = metadata.getTopology().getDatacenterEndpoints().get(localDC); + Collection<InetAddressAndPort> localDcNodes = metadata.getTopology().getDatacenterEndpoints().get(localDC); AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); Collection<Range<Token>> localDCPrimaryRanges = new HashSet<>(); for (Token token : metadata.sortedTokens()) { - List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(token, metadata); - for (InetAddress endpoint : endpoints) + List<InetAddressAndPort> endpoints = strategy.calculateNaturalEndpoints(token, metadata); + for (InetAddressAndPort endpoint : endpoints) { if (localDcNodes.contains(endpoint)) { @@ -3490,7 +3669,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param ep endpoint we are interested in. * @return ranges for the specified endpoint. */ - Collection<Range<Token>> getRangesForEndpoint(String keyspaceName, InetAddress ep) + Collection<Range<Token>> getRangesForEndpoint(String keyspaceName, InetAddressAndPort ep) { return Keyspace.open(keyspaceName).getReplicationStrategy().getAddressRanges().get(ep); } @@ -3530,6 +3709,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param key key for which we need to find the endpoint * @return the endpoint responsible for this key */ + @Deprecated public List<InetAddress> getNaturalEndpoints(String keyspaceName, String cf, String key) { KeyspaceMetadata ksMetaData = Schema.instance.getKeyspaceMetadata(keyspaceName); @@ -3540,12 +3720,32 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (metadata == null) throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'"); - return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key))); + return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key))).stream().map(i -> i.address).collect(toList()); + } + + public List<String> getNaturalEndpointsWithPort(String keyspaceName, String cf, String key) + { + KeyspaceMetadata ksMetaData = Schema.instance.getKeyspaceMetadata(keyspaceName); + if (ksMetaData == null) + throw new IllegalArgumentException("Unknown keyspace '" + keyspaceName + "'"); + + TableMetadata metadata = ksMetaData.getTableOrViewNullable(cf); + if (metadata == null) + throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'"); + + return stringify(getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key))), true); } + + @Deprecated public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key) { - return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(key)); + return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(key)).stream().map(i -> i.address).collect(toList()); + } + + public List<String> getNaturalEndpointsWithPort(String keyspaceName, ByteBuffer key) + { + return stringify(getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(key)), true); } /** @@ -3556,7 +3756,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param pos position for which we need to find the endpoint * @return the endpoint responsible for this token */ - public List<InetAddress> getNaturalEndpoints(String keyspaceName, RingPosition pos) + public List<InetAddressAndPort> getNaturalEndpoints(String keyspaceName, RingPosition pos) { return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalEndpoints(pos); } @@ -3564,7 +3764,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE /** * Returns the endpoints currently responsible for storing the token plus pending ones */ - public Iterable<InetAddress> getNaturalAndPendingEndpoints(String keyspaceName, Token token) + public Iterable<InetAddressAndPort> getNaturalAndPendingEndpoints(String keyspaceName, Token token) { return Iterables.concat(getNaturalEndpoints(keyspaceName, token), tokenMetadata.pendingEndpointsFor(token, keyspaceName)); } @@ -3577,14 +3777,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param key key for which we need to find the endpoint * @return the endpoint responsible for this key */ - public List<InetAddress> getLiveNaturalEndpoints(Keyspace keyspace, ByteBuffer key) + public List<InetAddressAndPort> getLiveNaturalEndpoints(Keyspace keyspace, ByteBuffer key) { return getLiveNaturalEndpoints(keyspace, tokenMetadata.decorateKey(key)); } - public List<InetAddress> getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos) + public List<InetAddressAndPort> getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos) { - List<InetAddress> liveEps = new ArrayList<>(); + List<InetAddressAndPort> liveEps = new ArrayList<>(); getLiveNaturalEndpoints(keyspace, pos, liveEps); return liveEps; } @@ -3597,11 +3797,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE * @param pos position for which we need to find the endpoint * @param liveEps the list of endpoints to mutate */ - public void getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos, List<InetAddress> liveEps) + public void getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos, List<InetAddressAndPort> liveEps) { - List<InetAddress> endpoints = keyspace.getReplicationStrategy().getNaturalEndpoints(pos); + List<InetAddressAndPort> endpoints = keyspace.getReplicationStrategy().getNaturalEndpoints(pos); - for (InetAddress endpoint : endpoints) + for (InetAddressAndPort endpoint : endpoints) { if (FailureDetector.instance.isAlive(endpoint)) liveEps.add(endpoint); @@ -3718,8 +3918,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ private void startLeaving() { + Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, valueFactory.leaving(getLocalTokens())); Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.leaving(getLocalTokens())); - tokenMetadata.addLeavingEndpoint(FBUtilities.getBroadcastAddress()); + tokenMetadata.addLeavingEndpoint(FBUtilities.getBroadcastAddressAndPort()); PendingRangeCalculatorService.instance.update(); } @@ -3728,7 +3929,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE TokenMetadata metadata = tokenMetadata.cloneAfterAllLeft(); if (operationMode != Mode.LEAVING) { - if (!tokenMetadata.isMember(FBUtilities.getBroadcastAddress())) + if (!tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort())) throw new UnsupportedOperationException("local node is not a member of the token ring yet"); if (metadata.getAllEndpoints().size() < 2) throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless"); @@ -3745,7 +3946,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { PendingRangeCalculatorService.instance.blockUntilFinished(); - String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); if (operationMode != Mode.LEAVING) // If we're already decommissioning there is no point checking RF/pending ranges { @@ -3772,7 +3973,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE + keyspaceName + " (RF = " + rf + ", N = " + numNodes + ")." + " Perform a forceful decommission to ignore."); } - if (tokenMetadata.getPendingRanges(keyspaceName, FBUtilities.getBroadcastAddress()).size() > 0) + if (tokenMetadata.getPendingRanges(keyspaceName, FBUtilities.getBroadcastAddressAndPort()).size() > 0) throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring"); } } @@ -3822,9 +4023,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void leaveRing() { SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.NEEDS_BOOTSTRAP); - tokenMetadata.removeEndpoint(FBUtilities.getBroadcastAddress()); + tokenMetadata.removeEndpoint(FBUtilities.getBroadcastAddressAndPort()); PendingRangeCalculatorService.instance.update(); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, valueFactory.left(getLocalTokens(),Gossiper.computeExpireTime())); Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalTokens(),Gossiper.computeExpireTime())); int delay = Math.max(RING_DELAY, Gossiper.intervalInMillis * 2); logger.info("Announcing that I have left the ring for {}ms", delay); @@ -3833,11 +4035,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void unbootstrap(Runnable onFinish) throws ExecutionException, InterruptedException { - Map<String, Multimap<Range<Token>, InetAddress>> rangesToStream = new HashMap<>(); + Map<String, Multimap<Range<Token>, InetAddressAndPort>> rangesToStream = new HashMap<>(); for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) { - Multimap<Range<Token>, InetAddress> rangesMM = getChangedRangesForLeaving(keyspaceName, FBUtilities.getBroadcastAddress()); + Multimap<Range<Token>, InetAddressAndPort> rangesMM = getChangedRangesForLeaving(keyspaceName, FBUtilities.getBroadcastAddressAndPort()); if (logger.isDebugEnabled()) logger.debug("Ranges needing transfer are [{}]", StringUtils.join(rangesMM.keySet(), ",")); @@ -3878,11 +4080,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE */ private UUID getPreferredHintsStreamTarget() { - List<InetAddress> candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints()); -
<TRUNCATED> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org