http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/PendingRangeMaps.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java index cfeccc4..92307a3 100644 --- a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java +++ b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java @@ -26,10 +26,9 @@ import org.apache.cassandra.dht.Token; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; import java.util.*; -public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<InetAddress>>> +public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<InetAddressAndPort>>> { private static final Logger logger = LoggerFactory.getLogger(PendingRangeMaps.class); @@ -39,7 +38,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I * First two are for non-wrap-around ranges, and the last two are for wrap-around ranges. */ // ascendingMap will sort the ranges by the ascending order of right token - final NavigableMap<Range<Token>, List<InetAddress>> ascendingMap; + final NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMap; /** * sorting end ascending, if ends are same, sorting begin descending, so that token (end, end) will * come before (begin, end] with the same end, and (begin, end) will be selected in the tailMap. @@ -58,7 +57,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I }; // ascendingMap will sort the ranges by the descending order of left token - final NavigableMap<Range<Token>, List<InetAddress>> descendingMap; + final NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMap; /** * sorting begin descending, if begins are same, sorting end descending, so that token (begin, begin) will * come after (begin, end] with the same begin, and (begin, end) won't be selected in the tailMap. @@ -78,7 +77,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I }; // these two maps are for warp around ranges. - final NavigableMap<Range<Token>, List<InetAddress>> ascendingMapForWrapAround; + final NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMapForWrapAround; /** * for wrap around range (begin, end], which begin > end. * Sorting end ascending, if ends are same, sorting begin ascending, @@ -98,7 +97,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I } }; - final NavigableMap<Range<Token>, List<InetAddress>> descendingMapForWrapAround; + final NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMapForWrapAround; /** * for wrap around ranges, which begin > end. * Sorting end ascending, so that token (begin, begin) will come after (begin, end] with the same begin, @@ -118,28 +117,28 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I public PendingRangeMaps() { - this.ascendingMap = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparator); - this.descendingMap = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparator); - this.ascendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparatorForWrapAround); - this.descendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparatorForWrapAround); + this.ascendingMap = new TreeMap<Range<Token>, List<InetAddressAndPort>>(ascendingComparator); + this.descendingMap = new TreeMap<Range<Token>, List<InetAddressAndPort>>(descendingComparator); + this.ascendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddressAndPort>>(ascendingComparatorForWrapAround); + this.descendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddressAndPort>>(descendingComparatorForWrapAround); } static final void addToMap(Range<Token> range, - InetAddress address, - NavigableMap<Range<Token>, List<InetAddress>> ascendingMap, - NavigableMap<Range<Token>, List<InetAddress>> descendingMap) + InetAddressAndPort address, + NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMap, + NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMap) { - List<InetAddress> addresses = ascendingMap.get(range); + List<InetAddressAndPort> addresses = ascendingMap.get(range); if (addresses == null) { - addresses = new ArrayList<InetAddress>(1); + addresses = new ArrayList<>(1); ascendingMap.put(range, addresses); descendingMap.put(range, addresses); } addresses.add(address); } - public void addPendingRange(Range<Token> range, InetAddress address) + public void addPendingRange(Range<Token> range, InetAddressAndPort address) { if (Range.isWrapAround(range.left, range.right)) { @@ -151,14 +150,14 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I } } - static final void addIntersections(Set<InetAddress> endpointsToAdd, - NavigableMap<Range<Token>, List<InetAddress>> smallerMap, - NavigableMap<Range<Token>, List<InetAddress>> biggerMap) + static final void addIntersections(Set<InetAddressAndPort> endpointsToAdd, + NavigableMap<Range<Token>, List<InetAddressAndPort>> smallerMap, + NavigableMap<Range<Token>, List<InetAddressAndPort>> biggerMap) { // find the intersection of two sets for (Range<Token> range : smallerMap.keySet()) { - List<InetAddress> addresses = biggerMap.get(range); + List<InetAddressAndPort> addresses = biggerMap.get(range); if (addresses != null) { endpointsToAdd.addAll(addresses); @@ -166,15 +165,15 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I } } - public Collection<InetAddress> pendingEndpointsFor(Token token) + public Collection<InetAddressAndPort> pendingEndpointsFor(Token token) { - Set<InetAddress> endpoints = new HashSet<>(); + Set<InetAddressAndPort> endpoints = new HashSet<>(); Range searchRange = new Range(token, token); // search for non-wrap-around maps - NavigableMap<Range<Token>, List<InetAddress>> ascendingTailMap = ascendingMap.tailMap(searchRange, true); - NavigableMap<Range<Token>, List<InetAddress>> descendingTailMap = descendingMap.tailMap(searchRange, false); + NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingTailMap = ascendingMap.tailMap(searchRange, true); + NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingTailMap = descendingMap.tailMap(searchRange, false); // add intersections of two maps if (ascendingTailMap.size() < descendingTailMap.size()) @@ -191,11 +190,11 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I descendingTailMap = descendingMapForWrapAround.tailMap(searchRange, false); // add them since they are all necessary. - for (Map.Entry<Range<Token>, List<InetAddress>> entry : ascendingTailMap.entrySet()) + for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : ascendingTailMap.entrySet()) { endpoints.addAll(entry.getValue()); } - for (Map.Entry<Range<Token>, List<InetAddress>> entry : descendingTailMap.entrySet()) + for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : descendingTailMap.entrySet()) { endpoints.addAll(entry.getValue()); } @@ -207,11 +206,11 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I { StringBuilder sb = new StringBuilder(); - for (Map.Entry<Range<Token>, List<InetAddress>> entry : this) + for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : this) { Range<Token> range = entry.getKey(); - for (InetAddress address : entry.getValue()) + for (InetAddressAndPort address : entry.getValue()) { sb.append(address).append(':').append(range); sb.append(System.getProperty("line.separator")); @@ -222,7 +221,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I } @Override - public Iterator<Map.Entry<Range<Token>, List<InetAddress>>> iterator() + public Iterator<Map.Entry<Range<Token>, List<InetAddressAndPort>>> iterator() { return Iterators.concat(ascendingMap.entrySet().iterator(), ascendingMapForWrapAround.entrySet().iterator()); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java index 2908976..3a9b161 100644 --- a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java +++ b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java @@ -18,7 +18,6 @@ package org.apache.cassandra.locator; import java.io.InputStream; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Arrays; import java.util.HashMap; @@ -55,7 +54,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch public static final String SNITCH_PROPERTIES_FILENAME = "cassandra-topology.properties"; private static final int DEFAULT_REFRESH_PERIOD_IN_SECONDS = 5; - private static volatile Map<InetAddress, String[]> endpointMap; + private static volatile Map<InetAddressAndPort, String[]> endpointMap; private static volatile String[] defaultDCRack; private volatile boolean gossipStarted; @@ -93,7 +92,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch * @param endpoint endpoint to process * @return a array of string with the first index being the data center and the second being the rack */ - public static String[] getEndpointInfo(InetAddress endpoint) + public static String[] getEndpointInfo(InetAddressAndPort endpoint) { String[] rawEndpointInfo = getRawEndpointInfo(endpoint); if (rawEndpointInfo == null) @@ -101,7 +100,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch return rawEndpointInfo; } - private static String[] getRawEndpointInfo(InetAddress endpoint) + private static String[] getRawEndpointInfo(InetAddressAndPort endpoint) { String[] value = endpointMap.get(endpoint); if (value == null) @@ -118,7 +117,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch * @param endpoint the endpoint to process * @return string of data center */ - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { String[] info = getEndpointInfo(endpoint); assert info != null : "No location defined for endpoint " + endpoint; @@ -131,7 +130,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch * @param endpoint the endpoint to process * @return string of rack */ - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { String[] info = getEndpointInfo(endpoint); assert info != null : "No location defined for endpoint " + endpoint; @@ -140,7 +139,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch public void reloadConfiguration(boolean isUpdate) throws ConfigurationException { - HashMap<InetAddress, String[]> reloadedMap = new HashMap<>(); + HashMap<InetAddressAndPort, String[]> reloadedMap = new HashMap<>(); String[] reloadedDefaultDCRack = null; Properties properties = new Properties(); @@ -168,11 +167,11 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch } else { - InetAddress host; + InetAddressAndPort host; String hostString = StringUtils.remove(key, '/'); try { - host = InetAddress.getByName(hostString); + host = InetAddressAndPort.getByName(hostString); } catch (UnknownHostException e) { @@ -186,7 +185,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch reloadedMap.put(host, token); } } - InetAddress broadcastAddress = FBUtilities.getBroadcastAddress(); + InetAddressAndPort broadcastAddress = FBUtilities.getBroadcastAddressAndPort(); String[] localInfo = reloadedMap.get(broadcastAddress); if (reloadedDefaultDCRack == null && localInfo == null) throw new ConfigurationException(String.format("Snitch definitions at %s do not define a location for " + @@ -194,7 +193,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch SNITCH_PROPERTIES_FILENAME, broadcastAddress)); // internode messaging code converts our broadcast address to local, // make sure we can be found at that as well. - InetAddress localAddress = FBUtilities.getLocalAddress(); + InetAddressAndPort localAddress = FBUtilities.getLocalAddressAndPort(); if (!localAddress.equals(broadcastAddress) && !reloadedMap.containsKey(localAddress)) reloadedMap.put(localAddress, localInfo); @@ -204,7 +203,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); - for (Map.Entry<InetAddress, String[]> entry : reloadedMap.entrySet()) + for (Map.Entry<InetAddressAndPort, String[]> entry : reloadedMap.entrySet()) sb.append(entry.getKey()).append(':').append(Arrays.toString(entry.getValue())).append(", "); logger.trace("Loaded network topology from property file: {}", StringUtils.removeEnd(sb.toString(), ", ")); } @@ -231,17 +230,17 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch * @param reloadedDefaultDCRack - the default dc:rack or null if no default * @return true if we can continue updating (no live host had dc or rack updated) */ - private static boolean livenessCheck(HashMap<InetAddress, String[]> reloadedMap, String[] reloadedDefaultDCRack) + private static boolean livenessCheck(HashMap<InetAddressAndPort, String[]> reloadedMap, String[] reloadedDefaultDCRack) { // If the default has changed we must check all live hosts but hopefully we will find a live // host quickly and interrupt the loop. Otherwise we only check the live hosts that were either // in the old set or in the new set - Set<InetAddress> hosts = Arrays.equals(defaultDCRack, reloadedDefaultDCRack) + Set<InetAddressAndPort> hosts = Arrays.equals(defaultDCRack, reloadedDefaultDCRack) ? Sets.intersection(StorageService.instance.getLiveRingMembers(), // same default Sets.union(endpointMap.keySet(), reloadedMap.keySet())) : StorageService.instance.getLiveRingMembers(); // default updated - for (InetAddress host : hosts) + for (InetAddressAndPort host : hosts) { String[] origValue = endpointMap.containsKey(host) ? endpointMap.get(host) : defaultDCRack; String[] updateValue = reloadedMap.containsKey(host) ? reloadedMap.get(host) : reloadedDefaultDCRack; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/RackInferringSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/RackInferringSnitch.java b/src/java/org/apache/cassandra/locator/RackInferringSnitch.java index a6ea1ab..6ae10cc 100644 --- a/src/java/org/apache/cassandra/locator/RackInferringSnitch.java +++ b/src/java/org/apache/cassandra/locator/RackInferringSnitch.java @@ -17,21 +17,19 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; - /** * A simple endpoint snitch implementation that assumes datacenter and rack information is encoded * in the 2nd and 3rd octets of the ip address, respectively. */ public class RackInferringSnitch extends AbstractNetworkTopologySnitch { - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { - return Integer.toString(endpoint.getAddress()[2] & 0xFF, 10); + return Integer.toString(endpoint.address.getAddress()[2] & 0xFF, 10); } - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { - return Integer.toString(endpoint.getAddress()[1] & 0xFF, 10); + return Integer.toString(endpoint.address.getAddress()[1] & 0xFF, 10); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java index 0b344c9..5479010 100644 --- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java +++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java @@ -18,7 +18,6 @@ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.net.UnknownHostException; import com.google.common.annotations.VisibleForTesting; @@ -49,11 +48,11 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber this.preferLocal = preferLocal; } - private void reconnect(InetAddress publicAddress, VersionedValue localAddressValue) + private void reconnect(InetAddressAndPort publicAddress, VersionedValue localAddressValue) { try { - reconnect(publicAddress, InetAddress.getByName(localAddressValue.value), snitch, localDc); + reconnect(publicAddress, InetAddressAndPort.getByName(localAddressValue.value), snitch, localDc); } catch (UnknownHostException e) { @@ -62,9 +61,9 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber } @VisibleForTesting - static void reconnect(InetAddress publicAddress, InetAddress localAddress, IEndpointSnitch snitch, String localDc) + static void reconnect(InetAddressAndPort publicAddress, InetAddressAndPort localAddress, IEndpointSnitch snitch, String localDc) { - if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(publicAddress, MessagingService.instance().portFor(publicAddress))) + if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(publicAddress.address, MessagingService.instance().portFor(publicAddress))) { logger.debug("InternodeAuthenticator said don't reconnect to {} on {}", publicAddress, localAddress); return; @@ -78,40 +77,65 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber } } - public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) + public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) { // no-op } - public void onJoin(InetAddress endpoint, EndpointState epState) + public void onJoin(InetAddressAndPort endpoint, EndpointState epState) { - if (preferLocal && !Gossiper.instance.isDeadState(epState) && epState.getApplicationState(ApplicationState.INTERNAL_IP) != null) - reconnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP)); + if (preferLocal && !Gossiper.instance.isDeadState(epState)) + { + VersionedValue address = epState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT); + if (address == null) + { + address = epState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT); + } + if (address != null) + { + reconnect(endpoint, address); + } + } } - public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) + //Skeptical this will always do the right thing all the time port wise. It will converge on the right thing + //eventually once INTERNAL_ADDRESS_AND_PORT is populated + public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) { - if (preferLocal && state == ApplicationState.INTERNAL_IP && !Gossiper.instance.isDeadState(Gossiper.instance.getEndpointStateForEndpoint(endpoint))) - reconnect(endpoint, value); + if (preferLocal && !Gossiper.instance.isDeadState(Gossiper.instance.getEndpointStateForEndpoint(endpoint))) + { + if (state == ApplicationState.INTERNAL_ADDRESS_AND_PORT) + { + reconnect(endpoint, value); + } + else if (state == ApplicationState.INTERNAL_IP && + null == Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT)) + { + //Only use INTERNAL_IP if INTERNAL_ADDRESS_AND_PORT is unavailable + reconnect(endpoint, value); + } + } } - public void onAlive(InetAddress endpoint, EndpointState state) + public void onAlive(InetAddressAndPort endpoint, EndpointState state) { - if (preferLocal && state.getApplicationState(ApplicationState.INTERNAL_IP) != null) - reconnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP)); + VersionedValue internalIP = state.getApplicationState(ApplicationState.INTERNAL_IP); + VersionedValue internalIPAndPorts = state.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT); + if (preferLocal && internalIP != null) + reconnect(endpoint, internalIPAndPorts != null ? internalIPAndPorts : internalIP); } - public void onDead(InetAddress endpoint, EndpointState state) + public void onDead(InetAddressAndPort endpoint, EndpointState state) { // do nothing. } - public void onRemove(InetAddress endpoint) + public void onRemove(InetAddressAndPort endpoint) { // do nothing. } - public void onRestart(InetAddress endpoint, EndpointState state) + public void onRestart(InetAddressAndPort endpoint, EndpointState state) { // do nothing. } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/SeedProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/SeedProvider.java b/src/java/org/apache/cassandra/locator/SeedProvider.java index a013fbb..7efa9e0 100644 --- a/src/java/org/apache/cassandra/locator/SeedProvider.java +++ b/src/java/org/apache/cassandra/locator/SeedProvider.java @@ -17,10 +17,9 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.util.List; public interface SeedProvider { - List<InetAddress> getSeeds(); + List<InetAddressAndPort> getSeeds(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java b/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java index 665261d..47401a0 100644 --- a/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java +++ b/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; @@ -26,6 +25,7 @@ import java.util.Map; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +35,7 @@ public class SimpleSeedProvider implements SeedProvider public SimpleSeedProvider(Map<String, String> args) {} - public List<InetAddress> getSeeds() + public List<InetAddressAndPort> getSeeds() { Config conf; try @@ -47,12 +47,12 @@ public class SimpleSeedProvider implements SeedProvider throw new AssertionError(e); } String[] hosts = conf.seed_provider.parameters.get("seeds").split(",", -1); - List<InetAddress> seeds = new ArrayList<InetAddress>(hosts.length); + List<InetAddressAndPort> seeds = new ArrayList<>(hosts.length); for (String host : hosts) { try { - seeds.add(InetAddress.getByName(host.trim())); + seeds.add(InetAddressAndPort.getByName(host.trim())); } catch (UnknownHostException ex) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/SimpleSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/SimpleSnitch.java b/src/java/org/apache/cassandra/locator/SimpleSnitch.java index 27648c8..e31fc6b 100644 --- a/src/java/org/apache/cassandra/locator/SimpleSnitch.java +++ b/src/java/org/apache/cassandra/locator/SimpleSnitch.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.util.List; /** @@ -27,23 +26,23 @@ import java.util.List; */ public class SimpleSnitch extends AbstractEndpointSnitch { - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { return "rack1"; } - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { return "datacenter1"; } @Override - public void sortByProximity(final InetAddress address, List<InetAddress> addresses) + public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses) { // Optimization to avoid walking the list } - public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { // Making all endpoints equal ensures we won't change the original ordering (since // Collections.sort is guaranteed to be stable) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/SimpleStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java index 9a5062b..545ad28 100644 --- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java +++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.Collection; @@ -42,11 +41,11 @@ public class SimpleStrategy extends AbstractReplicationStrategy super(keyspaceName, tokenMetadata, snitch, configOptions); } - public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata) + public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata) { int replicas = getReplicationFactor(); ArrayList<Token> tokens = metadata.sortedTokens(); - List<InetAddress> endpoints = new ArrayList<InetAddress>(replicas); + List<InetAddressAndPort> endpoints = new ArrayList<InetAddressAndPort>(replicas); if (tokens.isEmpty()) return endpoints; @@ -55,7 +54,7 @@ public class SimpleStrategy extends AbstractReplicationStrategy Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token, false); while (endpoints.size() < replicas && iter.hasNext()) { - InetAddress ep = metadata.getEndpoint(iter.next()); + InetAddressAndPort ep = metadata.getEndpoint(iter.next()); if (!endpoints.contains(ep)) endpoints.add(ep); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index 00f9536..e2c4628 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -52,12 +51,12 @@ public class TokenMetadata * Each Token is associated with exactly one Address, but each Address may have * multiple tokens. Hence, the BiMultiValMap collection. */ - private final BiMultiValMap<Token, InetAddress> tokenToEndpointMap; + private final BiMultiValMap<Token, InetAddressAndPort> tokenToEndpointMap; /** Maintains endpoint to host ID map of every node in the cluster */ - private final BiMap<InetAddress, UUID> endpointToHostIdMap; + private final BiMap<InetAddressAndPort, UUID> endpointToHostIdMap; - // Prior to CASSANDRA-603, we just had <tt>Map<Range, InetAddress> pendingRanges<tt>, + // Prior to CASSANDRA-603, we just had <tt>Map<Range, InetAddressAndPort> pendingRanges<tt>, // which was added to when a node began bootstrap and removed from when it finished. // // This is inadequate when multiple changes are allowed simultaneously. For example, @@ -70,8 +69,8 @@ public class TokenMetadata // // So, we made two changes: // - // First, we changed pendingRanges to a <tt>Multimap<Range, InetAddress></tt> (now - // <tt>Map<String, Multimap<Range, InetAddress>></tt>, because replication strategy + // First, we changed pendingRanges to a <tt>Multimap<Range, InetAddressAndPort></tt> (now + // <tt>Map<String, Multimap<Range, InetAddressAndPort>></tt>, because replication strategy // and options are per-KeySpace). // // Second, we added the bootstrapTokens and leavingEndpoints collections, so we can @@ -81,17 +80,17 @@ public class TokenMetadata // Finally, note that recording the tokens of joining nodes in bootstrapTokens also // means we can detect and reject the addition of multiple nodes at the same token // before one becomes part of the ring. - private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>(); + private final BiMultiValMap<Token, InetAddressAndPort> bootstrapTokens = new BiMultiValMap<>(); - private final BiMap<InetAddress, InetAddress> replacementToOriginal = HashBiMap.create(); + private final BiMap<InetAddressAndPort, InetAddressAndPort> replacementToOriginal = HashBiMap.create(); // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving) - private final Set<InetAddress> leavingEndpoints = new HashSet<>(); + private final Set<InetAddressAndPort> leavingEndpoints = new HashSet<>(); // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints} private final ConcurrentMap<String, PendingRangeMaps> pendingRanges = new ConcurrentHashMap<String, PendingRangeMaps>(); // nodes which are migrating to the new tokens in the ring - private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>(); + private final Set<Pair<Token, InetAddressAndPort>> movingEndpoints = new HashSet<>(); /* Use this lock for manipulating the token map */ private final ReadWriteLock lock = new ReentrantReadWriteLock(true); @@ -100,26 +99,18 @@ public class TokenMetadata private final Topology topology; public final IPartitioner partitioner; - private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>() - { - public int compare(InetAddress o1, InetAddress o2) - { - return ByteBuffer.wrap(o1.getAddress()).compareTo(ByteBuffer.wrap(o2.getAddress())); - } - }; - // signals replication strategies that nodes have joined or left the ring and they need to recompute ownership private volatile long ringVersion = 0; public TokenMetadata() { - this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp), - HashBiMap.<InetAddress, UUID>create(), + this(SortedBiMultiValMap.<Token, InetAddressAndPort>create(), + HashBiMap.create(), new Topology(), DatabaseDescriptor.getPartitioner()); } - private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, IPartitioner partitioner) + private TokenMetadata(BiMultiValMap<Token, InetAddressAndPort> tokenToEndpointMap, BiMap<InetAddressAndPort, UUID> endpointsMap, Topology topology, IPartitioner partitioner) { this.tokenToEndpointMap = tokenToEndpointMap; this.topology = topology; @@ -143,7 +134,7 @@ public class TokenMetadata } /** @return the number of nodes bootstrapping into source's primary range */ - public int pendingRangeChanges(InetAddress source) + public int pendingRangeChanges(InetAddressAndPort source) { int n = 0; Collection<Range<Token>> sourceRanges = getPrimaryRangesFor(getTokens(source)); @@ -165,14 +156,14 @@ public class TokenMetadata /** * Update token map with a single token/endpoint pair in normal state. */ - public void updateNormalToken(Token token, InetAddress endpoint) + public void updateNormalToken(Token token, InetAddressAndPort endpoint) { updateNormalTokens(Collections.singleton(token), endpoint); } - public void updateNormalTokens(Collection<Token> tokens, InetAddress endpoint) + public void updateNormalTokens(Collection<Token> tokens, InetAddressAndPort endpoint) { - Multimap<InetAddress, Token> endpointTokens = HashMultimap.create(); + Multimap<InetAddressAndPort, Token> endpointTokens = HashMultimap.create(); for (Token token : tokens) endpointTokens.put(endpoint, token); updateNormalTokens(endpointTokens); @@ -184,7 +175,7 @@ public class TokenMetadata * Prefer this whenever there are multiple pairs to update, as each update (whether a single or multiple) * is expensive (CASSANDRA-3831). */ - public void updateNormalTokens(Multimap<InetAddress, Token> endpointTokens) + public void updateNormalTokens(Multimap<InetAddressAndPort, Token> endpointTokens) { if (endpointTokens.isEmpty()) return; @@ -193,7 +184,7 @@ public class TokenMetadata try { boolean shouldSortTokens = false; - for (InetAddress endpoint : endpointTokens.keySet()) + for (InetAddressAndPort endpoint : endpointTokens.keySet()) { Collection<Token> tokens = endpointTokens.get(endpoint); @@ -208,7 +199,7 @@ public class TokenMetadata for (Token token : tokens) { - InetAddress prev = tokenToEndpointMap.put(token, endpoint); + InetAddressAndPort prev = tokenToEndpointMap.put(token, endpoint); if (!endpoint.equals(prev)) { if (prev != null) @@ -231,7 +222,7 @@ public class TokenMetadata * Store an end-point to host ID mapping. Each ID must be unique, and * cannot be changed after the fact. */ - public void updateHostId(UUID hostId, InetAddress endpoint) + public void updateHostId(UUID hostId, InetAddressAndPort endpoint) { assert hostId != null; assert endpoint != null; @@ -239,7 +230,7 @@ public class TokenMetadata lock.writeLock().lock(); try { - InetAddress storedEp = endpointToHostIdMap.inverse().get(hostId); + InetAddressAndPort storedEp = endpointToHostIdMap.inverse().get(hostId); if (storedEp != null) { if (!storedEp.equals(endpoint) && (FailureDetector.instance.isAlive(storedEp))) @@ -265,7 +256,7 @@ public class TokenMetadata } /** Return the unique host ID for an end-point. */ - public UUID getHostId(InetAddress endpoint) + public UUID getHostId(InetAddressAndPort endpoint) { lock.readLock().lock(); try @@ -279,7 +270,7 @@ public class TokenMetadata } /** Return the end-point for a unique host ID */ - public InetAddress getEndpointForHostId(UUID hostId) + public InetAddressAndPort getEndpointForHostId(UUID hostId) { lock.readLock().lock(); try @@ -293,12 +284,12 @@ public class TokenMetadata } /** @return a copy of the endpoint-to-id map for read-only operations */ - public Map<InetAddress, UUID> getEndpointToHostIdMapForReading() + public Map<InetAddressAndPort, UUID> getEndpointToHostIdMapForReading() { lock.readLock().lock(); try { - Map<InetAddress, UUID> readMap = new HashMap<>(); + Map<InetAddressAndPort, UUID> readMap = new HashMap<>(); readMap.putAll(endpointToHostIdMap); return readMap; } @@ -309,17 +300,17 @@ public class TokenMetadata } @Deprecated - public void addBootstrapToken(Token token, InetAddress endpoint) + public void addBootstrapToken(Token token, InetAddressAndPort endpoint) { addBootstrapTokens(Collections.singleton(token), endpoint); } - public void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint) + public void addBootstrapTokens(Collection<Token> tokens, InetAddressAndPort endpoint) { addBootstrapTokens(tokens, endpoint, null); } - private void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint, InetAddress original) + private void addBootstrapTokens(Collection<Token> tokens, InetAddressAndPort endpoint, InetAddressAndPort original) { assert tokens != null && !tokens.isEmpty(); assert endpoint != null; @@ -328,7 +319,7 @@ public class TokenMetadata try { - InetAddress oldEndpoint; + InetAddressAndPort oldEndpoint; for (Token token : tokens) { @@ -352,7 +343,7 @@ public class TokenMetadata } } - public void addReplaceTokens(Collection<Token> replacingTokens, InetAddress newNode, InetAddress oldNode) + public void addReplaceTokens(Collection<Token> replacingTokens, InetAddressAndPort newNode, InetAddressAndPort oldNode) { assert replacingTokens != null && !replacingTokens.isEmpty(); assert newNode != null && oldNode != null; @@ -379,12 +370,12 @@ public class TokenMetadata } } - public Optional<InetAddress> getReplacementNode(InetAddress endpoint) + public Optional<InetAddressAndPort> getReplacementNode(InetAddressAndPort endpoint) { return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint)); } - public Optional<InetAddress> getReplacingNode(InetAddress endpoint) + public Optional<InetAddressAndPort> getReplacingNode(InetAddressAndPort endpoint) { return Optional.ofNullable((replacementToOriginal.get(endpoint))); } @@ -405,7 +396,7 @@ public class TokenMetadata } } - public void addLeavingEndpoint(InetAddress endpoint) + public void addLeavingEndpoint(InetAddressAndPort endpoint) { assert endpoint != null; @@ -425,7 +416,7 @@ public class TokenMetadata * @param token token which is node moving to * @param endpoint address of the moving node */ - public void addMovingEndpoint(Token token, InetAddress endpoint) + public void addMovingEndpoint(Token token, InetAddressAndPort endpoint) { assert endpoint != null; @@ -441,7 +432,7 @@ public class TokenMetadata } } - public void removeEndpoint(InetAddress endpoint) + public void removeEndpoint(InetAddressAndPort endpoint) { assert endpoint != null; @@ -469,7 +460,7 @@ public class TokenMetadata /** * This is called when the snitch properties for this endpoint are updated, see CASSANDRA-10238. */ - public void updateTopology(InetAddress endpoint) + public void updateTopology(InetAddressAndPort endpoint) { assert endpoint != null; @@ -509,14 +500,14 @@ public class TokenMetadata * Remove pair of token/address from moving endpoints * @param endpoint address of the moving node */ - public void removeFromMoving(InetAddress endpoint) + public void removeFromMoving(InetAddressAndPort endpoint) { assert endpoint != null; lock.writeLock().lock(); try { - for (Pair<Token, InetAddress> pair : movingEndpoints) + for (Pair<Token, InetAddressAndPort> pair : movingEndpoints) { if (pair.right.equals(endpoint)) { @@ -533,7 +524,7 @@ public class TokenMetadata } } - public Collection<Token> getTokens(InetAddress endpoint) + public Collection<Token> getTokens(InetAddressAndPort endpoint) { assert endpoint != null; assert isMember(endpoint); // don't want to return nulls @@ -550,12 +541,12 @@ public class TokenMetadata } @Deprecated - public Token getToken(InetAddress endpoint) + public Token getToken(InetAddressAndPort endpoint) { return getTokens(endpoint).iterator().next(); } - public boolean isMember(InetAddress endpoint) + public boolean isMember(InetAddressAndPort endpoint) { assert endpoint != null; @@ -570,7 +561,7 @@ public class TokenMetadata } } - public boolean isLeaving(InetAddress endpoint) + public boolean isLeaving(InetAddressAndPort endpoint) { assert endpoint != null; @@ -585,7 +576,7 @@ public class TokenMetadata } } - public boolean isMoving(InetAddress endpoint) + public boolean isMoving(InetAddressAndPort endpoint) { assert endpoint != null; @@ -593,7 +584,7 @@ public class TokenMetadata try { - for (Pair<Token, InetAddress> pair : movingEndpoints) + for (Pair<Token, InetAddressAndPort> pair : movingEndpoints) { if (pair.right.equals(endpoint)) return true; @@ -618,7 +609,7 @@ public class TokenMetadata lock.readLock().lock(); try { - return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp), + return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap), HashBiMap.create(endpointToHostIdMap), new Topology(topology), partitioner); @@ -673,9 +664,9 @@ public class TokenMetadata } } - private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddress> leavingEndpoints) + private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddressAndPort> leavingEndpoints) { - for (InetAddress endpoint : leavingEndpoints) + for (InetAddressAndPort endpoint : leavingEndpoints) allLeftMetadata.removeEndpoint(endpoint); return allLeftMetadata; @@ -695,11 +686,11 @@ public class TokenMetadata { TokenMetadata metadata = cloneOnlyTokenMap(); - for (InetAddress endpoint : leavingEndpoints) + for (InetAddressAndPort endpoint : leavingEndpoints) metadata.removeEndpoint(endpoint); - for (Pair<Token, InetAddress> pair : movingEndpoints) + for (Pair<Token, InetAddressAndPort> pair : movingEndpoints) metadata.updateNormalToken(pair.left, pair.right); return metadata; @@ -710,7 +701,7 @@ public class TokenMetadata } } - public InetAddress getEndpoint(Token token) + public InetAddressAndPort getEndpoint(Token token) { lock.readLock().lock(); try @@ -742,17 +733,17 @@ public class TokenMetadata return sortedTokens; } - public Multimap<Range<Token>, InetAddress> getPendingRangesMM(String keyspaceName) + public Multimap<Range<Token>, InetAddressAndPort> getPendingRangesMM(String keyspaceName) { - Multimap<Range<Token>, InetAddress> map = HashMultimap.create(); + Multimap<Range<Token>, InetAddressAndPort> map = HashMultimap.create(); PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName); if (pendingRangeMaps != null) { - for (Map.Entry<Range<Token>, List<InetAddress>> entry : pendingRangeMaps) + for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : pendingRangeMaps) { Range<Token> range = entry.getKey(); - for (InetAddress address : entry.getValue()) + for (InetAddressAndPort address : entry.getValue()) { map.put(range, address); } @@ -768,10 +759,10 @@ public class TokenMetadata return this.pendingRanges.get(keyspaceName); } - public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddress endpoint) + public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddressAndPort endpoint) { List<Range<Token>> ranges = new ArrayList<>(); - for (Map.Entry<Range<Token>, InetAddress> entry : getPendingRangesMM(keyspaceName).entries()) + for (Map.Entry<Range<Token>, InetAddressAndPort> entry : getPendingRangesMM(keyspaceName).entries()) { if (entry.getValue().equals(endpoint)) { @@ -824,9 +815,9 @@ public class TokenMetadata long startedAt = System.currentTimeMillis(); // create clone of current state - BiMultiValMap<Token, InetAddress> bootstrapTokensClone = new BiMultiValMap<>(); - Set<InetAddress> leavingEndpointsClone = new HashSet<>(); - Set<Pair<Token, InetAddress>> movingEndpointsClone = new HashSet<>(); + BiMultiValMap<Token, InetAddressAndPort> bootstrapTokensClone = new BiMultiValMap<>(); + Set<InetAddressAndPort> leavingEndpointsClone = new HashSet<>(); + Set<Pair<Token, InetAddressAndPort>> movingEndpointsClone = new HashSet<>(); TokenMetadata metadata; lock.readLock().lock(); @@ -859,29 +850,29 @@ public class TokenMetadata */ private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrategy strategy, TokenMetadata metadata, - BiMultiValMap<Token, InetAddress> bootstrapTokens, - Set<InetAddress> leavingEndpoints, - Set<Pair<Token, InetAddress>> movingEndpoints) + BiMultiValMap<Token, InetAddressAndPort> bootstrapTokens, + Set<InetAddressAndPort> leavingEndpoints, + Set<Pair<Token, InetAddressAndPort>> movingEndpoints) { PendingRangeMaps newPendingRanges = new PendingRangeMaps(); - Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(metadata); + Multimap<InetAddressAndPort, Range<Token>> addressRanges = strategy.getAddressRanges(metadata); // Copy of metadata reflecting the situation after all leave operations are finished. TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints); // get all ranges that will be affected by leaving nodes Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>(); - for (InetAddress endpoint : leavingEndpoints) + for (InetAddressAndPort endpoint : leavingEndpoints) affectedRanges.addAll(addressRanges.get(endpoint)); // for each of those ranges, find what new nodes will be responsible for the range when // all leaving nodes are gone. for (Range<Token> range : affectedRanges) { - Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata)); - Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); - for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints)) + Set<InetAddressAndPort> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata)); + Set<InetAddressAndPort> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); + for (InetAddressAndPort address : Sets.difference(newEndpoints, currentEndpoints)) { newPendingRanges.addPendingRange(range, address); } @@ -892,8 +883,8 @@ public class TokenMetadata // For each of the bootstrapping nodes, simply add and remove them one by one to // allLeftMetadata and check in between what their ranges would be. - Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse(); - for (InetAddress endpoint : bootstrapAddresses.keySet()) + Multimap<InetAddressAndPort, Token> bootstrapAddresses = bootstrapTokens.inverse(); + for (InetAddressAndPort endpoint : bootstrapAddresses.keySet()) { Collection<Token> tokens = bootstrapAddresses.get(endpoint); @@ -910,11 +901,11 @@ public class TokenMetadata // For each of the moving nodes, we do the same thing we did for bootstrapping: // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. - for (Pair<Token, InetAddress> moving : movingEndpoints) + for (Pair<Token, InetAddressAndPort> moving : movingEndpoints) { //Calculate all the ranges which will could be affected. This will include the ranges before and after the move. Set<Range<Token>> moveAffectedRanges = new HashSet<>(); - InetAddress endpoint = moving.right; // address of the moving node + InetAddressAndPort endpoint = moving.right; // address of the moving node //Add ranges before the move for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) { @@ -930,10 +921,10 @@ public class TokenMetadata for(Range<Token> range : moveAffectedRanges) { - Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata)); - Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); - Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints); - for(final InetAddress address : difference) + Set<InetAddressAndPort> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata)); + Set<InetAddressAndPort> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); + Set<InetAddressAndPort> difference = Sets.difference(newEndpoints, currentEndpoints); + for(final InetAddressAndPort address : difference) { Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address); Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address); @@ -973,12 +964,12 @@ public class TokenMetadata } /** @return a copy of the bootstrapping tokens map */ - public BiMultiValMap<Token, InetAddress> getBootstrapTokens() + public BiMultiValMap<Token, InetAddressAndPort> getBootstrapTokens() { lock.readLock().lock(); try { - return new BiMultiValMap<Token, InetAddress>(bootstrapTokens); + return new BiMultiValMap<>(bootstrapTokens); } finally { @@ -986,7 +977,7 @@ public class TokenMetadata } } - public Set<InetAddress> getAllEndpoints() + public Set<InetAddressAndPort> getAllEndpoints() { lock.readLock().lock(); try @@ -1010,7 +1001,7 @@ public class TokenMetadata } /** caller should not modify leavingEndpoints */ - public Set<InetAddress> getLeavingEndpoints() + public Set<InetAddressAndPort> getLeavingEndpoints() { lock.readLock().lock(); try @@ -1037,7 +1028,7 @@ public class TokenMetadata * Endpoints which are migrating to the new tokens * @return set of addresses of moving endpoints */ - public Set<Pair<Token, InetAddress>> getMovingEndpoints() + public Set<Pair<Token, InetAddressAndPort>> getMovingEndpoints() { lock.readLock().lock(); try @@ -1148,14 +1139,14 @@ public class TokenMetadata lock.readLock().lock(); try { - Multimap<InetAddress, Token> endpointToTokenMap = tokenToEndpointMap.inverse(); - Set<InetAddress> eps = endpointToTokenMap.keySet(); + Multimap<InetAddressAndPort, Token> endpointToTokenMap = tokenToEndpointMap.inverse(); + Set<InetAddressAndPort> eps = endpointToTokenMap.keySet(); if (!eps.isEmpty()) { sb.append("Normal Tokens:"); sb.append(System.getProperty("line.separator")); - for (InetAddress ep : eps) + for (InetAddressAndPort ep : eps) { sb.append(ep); sb.append(':'); @@ -1168,7 +1159,7 @@ public class TokenMetadata { sb.append("Bootstrapping Tokens:" ); sb.append(System.getProperty("line.separator")); - for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet()) + for (Map.Entry<Token, InetAddressAndPort> entry : bootstrapTokens.entrySet()) { sb.append(entry.getValue()).append(':').append(entry.getKey()); sb.append(System.getProperty("line.separator")); @@ -1179,7 +1170,7 @@ public class TokenMetadata { sb.append("Leaving Endpoints:"); sb.append(System.getProperty("line.separator")); - for (InetAddress ep : leavingEndpoints) + for (InetAddressAndPort ep : leavingEndpoints) { sb.append(ep); sb.append(System.getProperty("line.separator")); @@ -1213,7 +1204,7 @@ public class TokenMetadata return sb.toString(); } - public Collection<InetAddress> pendingEndpointsFor(Token token, String keyspaceName) + public Collection<InetAddressAndPort> pendingEndpointsFor(Token token, String keyspaceName) { PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName); if (pendingRangeMaps == null) @@ -1225,19 +1216,19 @@ public class TokenMetadata /** * @deprecated retained for benefit of old tests */ - public Collection<InetAddress> getWriteEndpoints(Token token, String keyspaceName, Collection<InetAddress> naturalEndpoints) + public Collection<InetAddressAndPort> getWriteEndpoints(Token token, String keyspaceName, Collection<InetAddressAndPort> naturalEndpoints) { return ImmutableList.copyOf(Iterables.concat(naturalEndpoints, pendingEndpointsFor(token, keyspaceName))); } /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */ - public Multimap<InetAddress, Token> getEndpointToTokenMapForReading() + public Multimap<InetAddressAndPort, Token> getEndpointToTokenMapForReading() { lock.readLock().lock(); try { - Multimap<InetAddress, Token> cloned = HashMultimap.create(); - for (Map.Entry<Token, InetAddress> entry : tokenToEndpointMap.entrySet()) + Multimap<InetAddressAndPort, Token> cloned = HashMultimap.create(); + for (Map.Entry<Token, InetAddressAndPort> entry : tokenToEndpointMap.entrySet()) cloned.put(entry.getValue(), entry.getKey()); return cloned; } @@ -1251,12 +1242,12 @@ public class TokenMetadata * @return a (stable copy, won't be modified) Token to Endpoint map for all the normal and bootstrapping nodes * in the cluster. */ - public Map<Token, InetAddress> getNormalAndBootstrappingTokenToEndpointMap() + public Map<Token, InetAddressAndPort> getNormalAndBootstrappingTokenToEndpointMap() { lock.readLock().lock(); try { - Map<Token, InetAddress> map = new HashMap<>(tokenToEndpointMap.size() + bootstrapTokens.size()); + Map<Token, InetAddressAndPort> map = new HashMap<>(tokenToEndpointMap.size() + bootstrapTokens.size()); map.putAll(tokenToEndpointMap); map.putAll(bootstrapTokens); return map; @@ -1302,11 +1293,11 @@ public class TokenMetadata public static class Topology { /** multi-map of DC to endpoints in that DC */ - private final Multimap<String, InetAddress> dcEndpoints; + private final Multimap<String, InetAddressAndPort> dcEndpoints; /** map of DC to multi-map of rack to endpoints in that rack */ - private final Map<String, Multimap<String, InetAddress>> dcRacks; + private final Map<String, Multimap<String, InetAddressAndPort>> dcRacks; /** reverse-lookup map for endpoint to current known dc/rack assignment */ - private final Map<InetAddress, Pair<String, String>> currentLocations; + private final Map<InetAddressAndPort, Pair<String, String>> currentLocations; Topology() { @@ -1337,7 +1328,7 @@ public class TokenMetadata /** * Stores current DC/rack assignment for ep */ - void addEndpoint(InetAddress ep) + void addEndpoint(InetAddressAndPort ep) { IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); String dc = snitch.getDatacenter(ep); @@ -1353,12 +1344,12 @@ public class TokenMetadata doAddEndpoint(ep, dc, rack); } - private void doAddEndpoint(InetAddress ep, String dc, String rack) + private void doAddEndpoint(InetAddressAndPort ep, String dc, String rack) { dcEndpoints.put(dc, ep); if (!dcRacks.containsKey(dc)) - dcRacks.put(dc, HashMultimap.<String, InetAddress>create()); + dcRacks.put(dc, HashMultimap.create()); dcRacks.get(dc).put(rack, ep); currentLocations.put(ep, Pair.create(dc, rack)); @@ -1367,7 +1358,7 @@ public class TokenMetadata /** * Removes current DC/rack assignment for ep */ - void removeEndpoint(InetAddress ep) + void removeEndpoint(InetAddressAndPort ep) { if (!currentLocations.containsKey(ep)) return; @@ -1375,13 +1366,13 @@ public class TokenMetadata doRemoveEndpoint(ep, currentLocations.remove(ep)); } - private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current) + private void doRemoveEndpoint(InetAddressAndPort ep, Pair<String, String> current) { dcRacks.get(current.left).remove(current.right, ep); dcEndpoints.remove(current.left, ep); } - void updateEndpoint(InetAddress ep) + void updateEndpoint(InetAddressAndPort ep) { IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); if (snitch == null || !currentLocations.containsKey(ep)) @@ -1396,11 +1387,11 @@ public class TokenMetadata if (snitch == null) return; - for (InetAddress ep : currentLocations.keySet()) + for (InetAddressAndPort ep : currentLocations.keySet()) updateEndpoint(ep, snitch); } - private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch) + private void updateEndpoint(InetAddressAndPort ep, IEndpointSnitch snitch) { Pair<String, String> current = currentLocations.get(ep); String dc = snitch.getDatacenter(ep); @@ -1415,7 +1406,7 @@ public class TokenMetadata /** * @return multi-map of DC to endpoints in that DC */ - public Multimap<String, InetAddress> getDatacenterEndpoints() + public Multimap<String, InetAddressAndPort> getDatacenterEndpoints() { return dcEndpoints; } @@ -1423,7 +1414,7 @@ public class TokenMetadata /** * @return map of DC to multi-map of rack to endpoints in that rack */ - public Map<String, Multimap<String, InetAddress>> getDatacenterRacks() + public Map<String, Multimap<String, InetAddressAndPort>> getDatacenterRacks() { return dcRacks; } @@ -1431,7 +1422,7 @@ public class TokenMetadata /** * @return The DC and rack of the given endpoint. */ - public Pair<String, String> getLocation(InetAddress addr) + public Pair<String, String> getLocation(InetAddressAndPort addr) { return currentLocations.get(addr); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java index 7815784..3655a40 100644 --- a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java @@ -17,14 +17,14 @@ */ package org.apache.cassandra.metrics; -import java.net.InetAddress; - import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import org.apache.cassandra.net.async.OutboundMessagingPool; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; +import org.apache.cassandra.locator.InetAddressAndPort; + /** * Metrics for internode connections. */ @@ -65,10 +65,10 @@ public class ConnectionMetrics * * @param ip IP address to use for metrics label */ - public ConnectionMetrics(InetAddress ip, final OutboundMessagingPool messagingPool) + public ConnectionMetrics(InetAddressAndPort ip, final OutboundMessagingPool messagingPool) { // ipv6 addresses will contain colons, which are invalid in a JMX ObjectName - address = ip.getHostAddress().replace(':', '.'); + address = ip.toString().replace(':', '.'); factory = new DefaultNameFactory("Connection", address); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java index 052830a..56888da 100644 --- a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java +++ b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.metrics; -import java.net.InetAddress; import java.util.Map.Entry; import com.google.common.util.concurrent.MoreExecutors; @@ -27,6 +26,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.UUIDGen; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,28 +43,28 @@ public class HintedHandoffMetrics private static final MetricNameFactory factory = new DefaultNameFactory("HintedHandOffManager"); /** Total number of hints which are not stored, This is not a cache. */ - private final LoadingCache<InetAddress, DifferencingCounter> notStored = Caffeine.newBuilder() - .executor(MoreExecutors.directExecutor()) - .build(DifferencingCounter::new); + private final LoadingCache<InetAddressAndPort, DifferencingCounter> notStored = Caffeine.newBuilder() + .executor(MoreExecutors.directExecutor()) + .build(DifferencingCounter::new); /** Total number of hints that have been created, This is not a cache. */ - private final LoadingCache<InetAddress, Counter> createdHintCounts = Caffeine.newBuilder() - .executor(MoreExecutors.directExecutor()) - .build(address -> Metrics.counter(factory.createMetricName("Hints_created-" + address.getHostAddress().replace(':', '.')))); + private final LoadingCache<InetAddressAndPort, Counter> createdHintCounts = Caffeine.newBuilder() + .executor(MoreExecutors.directExecutor()) + .build(address -> Metrics.counter(factory.createMetricName("Hints_created-" + address.toString().replace(':', '.')))); - public void incrCreatedHints(InetAddress address) + public void incrCreatedHints(InetAddressAndPort address) { createdHintCounts.get(address).inc(); } - public void incrPastWindow(InetAddress address) + public void incrPastWindow(InetAddressAndPort address) { notStored.get(address).mark(); } public void log() { - for (Entry<InetAddress, DifferencingCounter> entry : notStored.asMap().entrySet()) + for (Entry<InetAddressAndPort, DifferencingCounter> entry : notStored.asMap().entrySet()) { long difference = entry.getValue().difference(); if (difference == 0) @@ -79,9 +79,10 @@ public class HintedHandoffMetrics private final Counter meter; private long reported = 0; - public DifferencingCounter(InetAddress address) + public DifferencingCounter(InetAddressAndPort address) { - this.meter = Metrics.counter(factory.createMetricName("Hints_not_stored-" + address.getHostAddress().replace(':', '.'))); + //This changes the name of the metric, people can update their monitoring when upgrading? + this.meter = Metrics.counter(factory.createMetricName("Hints_not_stored-" + address.toString().replace(':', '.'))); } public long difference() http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java index d6a75f7..424f502 100644 --- a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.metrics; -import java.net.InetAddress; - import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +25,7 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import org.apache.cassandra.locator.InetAddressAndPort; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -47,11 +46,11 @@ public final class HintsServiceMetrics private static final Histogram globalDelayHistogram = Metrics.histogram(factory.createMetricName("Hint_delays"), false); /** Histograms per-endpoint of hint delivery delays, This is not a cache. */ - private static final LoadingCache<InetAddress, Histogram> delayByEndpoint = Caffeine.newBuilder() - .executor(MoreExecutors.directExecutor()) - .build(address -> Metrics.histogram(factory.createMetricName("Hint_delays-"+address.getHostAddress().replace(':', '.')), false)); + private static final LoadingCache<InetAddressAndPort, Histogram> delayByEndpoint = Caffeine.newBuilder() + .executor(MoreExecutors.directExecutor()) + .build(address -> Metrics.histogram(factory.createMetricName("Hint_delays-"+address.toString().replace(':', '.')), false)); - public static void updateDelayMetrics(InetAddress endpoint, long delay) + public static void updateDelayMetrics(InetAddressAndPort endpoint, long delay) { if (delay <= 0) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/MessagingMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java index 5f640b9..2f096f6 100644 --- a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java +++ b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.metrics; -import java.net.InetAddress; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -26,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.codahale.metrics.Timer; +import org.apache.cassandra.locator.InetAddressAndPort; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -47,7 +47,7 @@ public class MessagingMetrics queueWaitLatency = new ConcurrentHashMap<>(); } - public void addTimeTaken(InetAddress from, long timeTaken) + public void addTimeTaken(InetAddressAndPort from, long timeTaken) { String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(from); Timer timer = dcLatency.get(dc); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/StreamingMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java index 72e9b23..d220ca5 100644 --- a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java +++ b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java @@ -17,11 +17,12 @@ */ package org.apache.cassandra.metrics; -import java.net.InetAddress; import java.util.concurrent.ConcurrentMap; import com.codahale.metrics.Counter; +import org.apache.cassandra.locator.InetAddressAndPort; + import org.cliffc.high_scale_lib.NonBlockingHashMap; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -33,7 +34,7 @@ public class StreamingMetrics { public static final String TYPE_NAME = "Streaming"; - private static final ConcurrentMap<InetAddress, StreamingMetrics> instances = new NonBlockingHashMap<InetAddress, StreamingMetrics>(); + private static final ConcurrentMap<InetAddressAndPort, StreamingMetrics> instances = new NonBlockingHashMap<>(); public static final Counter activeStreamsOutbound = Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "ActiveOutboundStreams", null)); public static final Counter totalIncomingBytes = Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "TotalIncomingBytes", null)); @@ -41,7 +42,7 @@ public class StreamingMetrics public final Counter incomingBytes; public final Counter outgoingBytes; - public static StreamingMetrics get(InetAddress ip) + public static StreamingMetrics get(InetAddressAndPort ip) { StreamingMetrics metrics = instances.get(ip); if (metrics == null) @@ -52,9 +53,9 @@ public class StreamingMetrics return metrics; } - public StreamingMetrics(final InetAddress peer) + public StreamingMetrics(final InetAddressAndPort peer) { - MetricNameFactory factory = new DefaultNameFactory("Streaming", peer.getHostAddress().replace(':', '.')); + MetricNameFactory factory = new DefaultNameFactory("Streaming", peer.toString().replace(':', '.')); incomingBytes = Metrics.counter(factory.createMetricName("IncomingBytes")); outgoingBytes= Metrics.counter(factory.createMetricName("OutgoingBytes")); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/BackPressureState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/BackPressureState.java b/src/java/org/apache/cassandra/net/BackPressureState.java index 34fd0dd..886c075 100644 --- a/src/java/org/apache/cassandra/net/BackPressureState.java +++ b/src/java/org/apache/cassandra/net/BackPressureState.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.net; -import java.net.InetAddress; +import org.apache.cassandra.locator.InetAddressAndPort; /** * Interface meant to track the back-pressure state per replica host. @@ -47,5 +47,5 @@ public interface BackPressureState /** * Returns the host this state refers to. */ - InetAddress getHost(); + InetAddressAndPort getHost(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/BackPressureStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/BackPressureStrategy.java b/src/java/org/apache/cassandra/net/BackPressureStrategy.java index 78f748b..6b49495 100644 --- a/src/java/org/apache/cassandra/net/BackPressureStrategy.java +++ b/src/java/org/apache/cassandra/net/BackPressureStrategy.java @@ -17,10 +17,11 @@ */ package org.apache.cassandra.net; -import java.net.InetAddress; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.cassandra.locator.InetAddressAndPort; + /** * Back-pressure algorithm interface. * <p> @@ -39,5 +40,5 @@ public interface BackPressureStrategy<S extends BackPressureState> /** * Creates a new {@link BackPressureState} initialized as needed by the specific implementation. */ - S newState(InetAddress host); + S newState(InetAddressAndPort host); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/CallbackInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java index ea000ae..f2ed8a1 100644 --- a/src/java/org/apache/cassandra/net/CallbackInfo.java +++ b/src/java/org/apache/cassandra/net/CallbackInfo.java @@ -17,9 +17,8 @@ */ package org.apache.cassandra.net; -import java.net.InetAddress; - import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.locator.InetAddressAndPort; /** * Encapsulates the callback information. @@ -28,7 +27,7 @@ import org.apache.cassandra.io.IVersionedSerializer; */ public class CallbackInfo { - protected final InetAddress target; + protected final InetAddressAndPort target; protected final IAsyncCallback callback; protected final IVersionedSerializer<?> serializer; private final boolean failureCallback; @@ -41,7 +40,7 @@ public class CallbackInfo * @param serializer serializer to deserialize response message * @param failureCallback True when we have a callback to handle failures */ - public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer, boolean failureCallback) + public CallbackInfo(InetAddressAndPort target, IAsyncCallback callback, IVersionedSerializer<?> serializer, boolean failureCallback) { this.target = target; this.callback = callback; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java b/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java index 83bbbf3..b58ca47 100644 --- a/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java +++ b/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java @@ -21,28 +21,108 @@ import java.io.*; import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; +import java.nio.ByteBuffer; -public class CompactEndpointSerializationHelper +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.streaming.messages.StreamMessage; + +/* + * As of version 4.0 the endpoint description includes a port number as an unsigned short + */ +public class CompactEndpointSerializationHelper implements IVersionedSerializer<InetAddressAndPort> { - public static void serialize(InetAddress endpoint, DataOutput out) throws IOException + public static final IVersionedSerializer<InetAddressAndPort> instance = new CompactEndpointSerializationHelper(); + + /** + * Streaming uses its own version numbering so we need to ignore it and always use currrent version. + * There is no cross version streaming so it will always use the latest address serialization. + **/ + public static final IVersionedSerializer<InetAddressAndPort> streamingInstance = new IVersionedSerializer<InetAddressAndPort>() { - byte[] buf = endpoint.getAddress(); - out.writeByte(buf.length); - out.write(buf); + public void serialize(InetAddressAndPort inetAddressAndPort, DataOutputPlus out, int version) throws IOException + { + instance.serialize(inetAddressAndPort, out, MessagingService.current_version); + } + + public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException + { + return instance.deserialize(in, MessagingService.current_version); + } + + public long serializedSize(InetAddressAndPort inetAddressAndPort, int version) + { + return instance.serializedSize(inetAddressAndPort, MessagingService.current_version); + } + }; + + private CompactEndpointSerializationHelper() {} + + public void serialize(InetAddressAndPort endpoint, DataOutputPlus out, int version) throws IOException + { + if (version >= MessagingService.VERSION_40) + { + byte[] buf = endpoint.addressBytes; + out.writeByte(buf.length + 2); + out.write(buf); + out.writeShort(endpoint.port); + } + else + { + byte[] buf = endpoint.addressBytes; + out.writeByte(buf.length); + out.write(buf); + } } - public static InetAddress deserialize(DataInput in) throws IOException + public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException { - byte[] bytes = new byte[in.readByte()]; - in.readFully(bytes, 0, bytes.length); - return InetAddress.getByAddress(bytes); + int size = in.readByte() & 0xFF; + switch(size) + { + //The original pre-4.0 serialiation of just an address + case 4: + case 16: + { + byte[] bytes = new byte[size]; + in.readFully(bytes, 0, bytes.length); + return InetAddressAndPort.getByAddress(bytes); + } + //Address and one port + case 6: + case 18: + { + byte[] bytes = new byte[size - 2]; + in.readFully(bytes); + + int port = in.readShort() & 0xFFFF; + return InetAddressAndPort.getByAddressOverrideDefaults(InetAddress.getByAddress(bytes), bytes, port); + } + default: + throw new AssertionError("Unexpected size " + size); + + } } - public static int serializedSize(InetAddress from) + public long serializedSize(InetAddressAndPort from, int version) { - if (from instanceof Inet4Address) - return 1 + 4; - assert from instanceof Inet6Address; - return 1 + 16; + //4.0 includes a port number + if (version >= MessagingService.VERSION_40) + { + if (from.address instanceof Inet4Address) + return 1 + 4 + 2; + assert from.address instanceof Inet6Address; + return 1 + 16 + 2; + } + else + { + if (from.address instanceof Inet4Address) + return 1 + 4; + assert from.address instanceof Inet6Address; + return 1 + 16; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/ForwardToContainer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/ForwardToContainer.java b/src/java/org/apache/cassandra/net/ForwardToContainer.java new file mode 100644 index 0000000..ac9e725 --- /dev/null +++ b/src/java/org/apache/cassandra/net/ForwardToContainer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.util.Collection; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.locator.InetAddressAndPort; + +/** + * Contains forward to information until it can be serialized as part of a message using a version + * specific serialization + */ +public class ForwardToContainer +{ + public final Collection<InetAddressAndPort> targets; + public final int[] messageIds; + + public ForwardToContainer(Collection<InetAddressAndPort> targets, + int[] messageIds) + { + Preconditions.checkArgument(targets.size() == messageIds.length); + this.targets = targets; + this.messageIds = messageIds; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org