http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index 8c0ed1e..f01197d 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -52,11 +52,13 @@ public class ConfigHelper private static final int DEFAULT_RANGE_BATCH_SIZE = 4096; private static final String INPUT_INITIAL_ADDRESS = "cassandra.input.address"; private static final String OUTPUT_INITIAL_ADDRESS = "cassandra.output.address"; + private static final String OUTPUT_INITIAL_PORT = "cassandra.output.port"; private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read"; private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write"; private static final String OUTPUT_COMPRESSION_CLASS = "cassandra.output.compression.class"; private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length"; private static final String OUTPUT_LOCAL_DC_ONLY = "cassandra.output.local.dc.only"; + private static final String DEFAULT_CASSANDRA_NATIVE_PORT = "7000"; private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class); @@ -349,6 +351,16 @@ public class ConfigHelper return conf.get(OUTPUT_INITIAL_ADDRESS); } + public static void setOutputInitialPort(Configuration conf, Integer port) + { + conf.set(OUTPUT_INITIAL_PORT, port.toString()); + } + + public static Integer getOutputInitialPort(Configuration conf) + { + return Integer.valueOf(conf.get(OUTPUT_INITIAL_PORT, DEFAULT_CASSANDRA_NATIVE_PORT)); + } + public static void setOutputInitialAddress(Configuration conf, String address) { conf.set(OUTPUT_INITIAL_ADDRESS, address);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java index 0f44e0c..204d9ee 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java @@ -21,11 +21,13 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; +import com.google.common.net.HostAndPort; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +41,7 @@ import org.apache.cassandra.hadoop.HadoopCompat; import org.apache.cassandra.io.sstable.CQLSSTableWriter; import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.streaming.StreamState; import org.apache.cassandra.utils.NativeSSTableLoaderClient; @@ -80,7 +83,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>> protected SSTableLoader loader; protected Progressable progress; protected TaskAttemptContext context; - protected final Set<InetAddress> ignores = new HashSet<>(); + protected final Set<InetAddressAndPort> ignores = new HashSet<>(); private String keyspace; private String table; @@ -139,7 +142,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>> try { for (String hostToIgnore : CqlBulkOutputFormat.getIgnoreHosts(conf)) - ignores.add(InetAddress.getByName(hostToIgnore)); + ignores.add(InetAddressAndPort.getByName(hostToIgnore)); } catch (UnknownHostException e) { @@ -285,20 +288,23 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>> { super(resolveHostAddresses(conf), CqlConfigHelper.getOutputNativePort(conf), + ConfigHelper.getOutputInitialPort(conf), ConfigHelper.getOutputKeyspaceUserName(conf), ConfigHelper.getOutputKeyspacePassword(conf), - CqlConfigHelper.getSSLOptions(conf).orNull()); + CqlConfigHelper.getSSLOptions(conf).orNull(), + CqlConfigHelper.getAllowServerPortDiscovery(conf)); } - private static Collection<InetAddress> resolveHostAddresses(Configuration conf) + private static Collection<InetSocketAddress> resolveHostAddresses(Configuration conf) { - Set<InetAddress> addresses = new HashSet<>(); - + Set<InetSocketAddress> addresses = new HashSet<>(); + int port = CqlConfigHelper.getOutputNativePort(conf); for (String host : ConfigHelper.getOutputInitialAddress(conf).split(",")) { try { - addresses.add(InetAddress.getByName(host)); + HostAndPort hap = HostAndPort.fromString(host); + addresses.add(new InetSocketAddress(InetAddress.getByName(hap.getHost()), hap.getPortOrDefault(port))); } catch (UnknownHostException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java index f9a6f3a..3a47a72 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java @@ -88,6 +88,7 @@ public class CqlConfigHelper private static final String OUTPUT_CQL = "cassandra.output.cql"; private static final String OUTPUT_NATIVE_PORT = "cassandra.output.native.port"; + private static final String ALLOW_SERVER_PORT_DISCOVERY = "cassandra.allowserverportdiscovery"; /** * Set the CQL columns for the input of this job. @@ -651,4 +652,15 @@ public class CqlConfigHelper new SecureRandom()); return ctx; } + + public static void setAllowServerPortDiscovery(Configuration conf, boolean allowServerPortDiscovery) + { + conf.set(ALLOW_SERVER_PORT_DISCOVERY, Boolean.toString(allowServerPortDiscovery)); + } + + public static boolean getAllowServerPortDiscovery(Configuration conf) + { + return Boolean.parseBoolean(conf.get(ALLOW_SERVER_PORT_DISCOVERY, "false")); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintVerbHandler.java b/src/java/org/apache/cassandra/hints/HintVerbHandler.java index 2b92a42..cec6f0b 100644 --- a/src/java/org/apache/cassandra/hints/HintVerbHandler.java +++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java @@ -18,13 +18,13 @@ */ package org.apache.cassandra.hints; -import java.net.InetAddress; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; @@ -47,7 +47,7 @@ public final class HintVerbHandler implements IVerbHandler<HintMessage> { UUID hostId = message.payload.hostId; Hint hint = message.payload.hint; - InetAddress address = StorageService.instance.getEndpointForHostId(hostId); + InetAddressAndPort address = StorageService.instance.getEndpointForHostId(hostId); // If we see an unknown table id, it means the table, or one of the tables in the mutation, had been dropped. // In that case there is nothing we can really do, or should do, other than log it go on. @@ -96,7 +96,7 @@ public final class HintVerbHandler implements IVerbHandler<HintMessage> } } - private static void reply(int id, InetAddress to) + private static void reply(int id, InetAddressAndPort to) { MessagingService.instance().sendReply(HintResponse.message, id, to); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java index 58a3e6f..cbbb212 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java @@ -18,7 +18,6 @@ package org.apache.cassandra.hints; import java.io.File; -import java.net.InetAddress; import java.util.Map; import java.util.UUID; import java.util.concurrent.*; @@ -36,6 +35,7 @@ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.service.StorageService; /** @@ -50,10 +50,10 @@ final class HintsDispatchExecutor private final File hintsDirectory; private final ExecutorService executor; private final AtomicBoolean isPaused; - private final Predicate<InetAddress> isAlive; + private final Predicate<InetAddressAndPort> isAlive; private final Map<UUID, Future> scheduledDispatches; - HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused, Predicate<InetAddress> isAlive) + HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused, Predicate<InetAddressAndPort> isAlive) { this.hintsDirectory = hintsDirectory; this.isPaused = isPaused; @@ -154,7 +154,7 @@ final class HintsDispatchExecutor public void run() { UUID hostId = hostIdSupplier.get(); - InetAddress address = StorageService.instance.getEndpointForHostId(hostId); + InetAddressAndPort address = StorageService.instance.getEndpointForHostId(hostId); logger.info("Transferring all hints to {}: {}", address, hostId); if (transfer(hostId)) return; @@ -257,7 +257,7 @@ final class HintsDispatchExecutor { logger.trace("Dispatching hints file {}", descriptor.fileName()); - InetAddress address = StorageService.instance.getEndpointForHostId(hostId); + InetAddressAndPort address = StorageService.instance.getEndpointForHostId(hostId); if (address != null) return deliver(descriptor, address); @@ -266,7 +266,7 @@ final class HintsDispatchExecutor return true; } - private boolean deliver(HintsDescriptor descriptor, InetAddress address) + private boolean deliver(HintsDescriptor descriptor, InetAddressAndPort address) { File file = new File(hintsDirectory, descriptor.fileName()); InputPosition offset = store.getDispatchOffset(descriptor); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java index 34d1eb2..ca38c0c 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java @@ -23,7 +23,7 @@ import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.schema.Schema; -import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddress; +import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; /** * A simple dispatch trigger that's being run every 10 seconds. http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsDispatcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java index 323eeb1..d0d9aac 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java @@ -18,7 +18,6 @@ package org.apache.cassandra.hints; import java.io.File; -import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.TimeUnit; @@ -31,6 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.monitoring.ApproximateTime; import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.HintsServiceMetrics; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; @@ -51,13 +51,13 @@ final class HintsDispatcher implements AutoCloseable private final HintsReader reader; private final UUID hostId; - private final InetAddress address; + private final InetAddressAndPort address; private final int messagingVersion; private final BooleanSupplier abortRequested; private InputPosition currentPagePosition; - private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress address, int messagingVersion, BooleanSupplier abortRequested) + private HintsDispatcher(HintsReader reader, UUID hostId, InetAddressAndPort address, int messagingVersion, BooleanSupplier abortRequested) { currentPagePosition = null; @@ -68,7 +68,7 @@ final class HintsDispatcher implements AutoCloseable this.abortRequested = abortRequested; } - static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddress address, UUID hostId, BooleanSupplier abortRequested) + static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddressAndPort address, UUID hostId, BooleanSupplier abortRequested) { int messagingVersion = MessagingService.instance().getVersion(address); return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, abortRequested); @@ -228,7 +228,7 @@ final class HintsDispatcher implements AutoCloseable return timedOut ? Outcome.TIMEOUT : outcome; } - public void onFailure(InetAddress from, RequestFailureReason failureReason) + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) { outcome = Outcome.FAILURE; condition.signalAll(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java index 3d82c02..5c331d0 100644 --- a/src/java/org/apache/cassandra/hints/HintsService.java +++ b/src/java/org/apache/cassandra/hints/HintsService.java @@ -19,7 +19,6 @@ package org.apache.cassandra.hints; import java.io.File; import java.lang.management.ManagementFactory; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; import java.util.UUID; @@ -40,6 +39,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.IFailureDetector; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.HintedHandoffMetrics; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.dht.Token; @@ -267,10 +267,10 @@ public final class HintsService implements HintsServiceMBean */ public void deleteAllHintsForEndpoint(String address) { - InetAddress target; + InetAddressAndPort target; try { - target = InetAddress.getByName(address); + target = InetAddressAndPort.getByName(address); } catch (UnknownHostException e) { @@ -284,7 +284,7 @@ public final class HintsService implements HintsServiceMBean * * @param target inet address of the target node */ - public void deleteAllHintsForEndpoint(InetAddress target) + public void deleteAllHintsForEndpoint(InetAddressAndPort target) { UUID hostId = StorageService.instance.getHostIdForEndpoint(target); if (hostId == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/hints/HintsStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java index 3572172..bbf57f5 100644 --- a/src/java/org/apache/cassandra/hints/HintsStore.java +++ b/src/java/org/apache/cassandra/hints/HintsStore.java @@ -19,7 +19,6 @@ package org.apache.cassandra.hints; import java.io.File; import java.io.IOException; -import java.net.InetAddress; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; @@ -31,6 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.SyncUtil; @@ -77,14 +77,14 @@ final class HintsStore return new HintsStore(hostId, hintsDirectory, writerParams, descriptors); } - InetAddress address() + InetAddressAndPort address() { return StorageService.instance.getEndpointForHostId(hostId); } boolean isLive() { - InetAddress address = address(); + InetAddressAndPort address = address(); return address != null && FailureDetector.instance.isAlive(address); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/io/DummyByteVersionedSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/DummyByteVersionedSerializer.java b/src/java/org/apache/cassandra/io/DummyByteVersionedSerializer.java new file mode 100644 index 0000000..d82ff7d --- /dev/null +++ b/src/java/org/apache/cassandra/io/DummyByteVersionedSerializer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io; + +import java.io.IOException; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; + +/** + * Serializes a dummy byte that can't be set. Will always write 0 and return 0 in a correctly formed message. + */ +public class DummyByteVersionedSerializer implements IVersionedSerializer<byte[]> +{ + public static final DummyByteVersionedSerializer instance = new DummyByteVersionedSerializer(); + + private DummyByteVersionedSerializer() {} + + public void serialize(byte[] bytes, DataOutputPlus out, int version) throws IOException + { + Preconditions.checkArgument(bytes == MessagingService.ONE_BYTE); + out.write(0); + } + + public byte[] deserialize(DataInputPlus in, int version) throws IOException + { + assert(0 == in.readByte()); + return MessagingService.ONE_BYTE; + } + + public long serializedSize(byte[] bytes, int version) + { + //Payload + return 1; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/io/ShortVersionedSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/ShortVersionedSerializer.java b/src/java/org/apache/cassandra/io/ShortVersionedSerializer.java new file mode 100644 index 0000000..8731f4c --- /dev/null +++ b/src/java/org/apache/cassandra/io/ShortVersionedSerializer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io; + +import java.io.IOException; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ShortVersionedSerializer implements IVersionedSerializer<Short> +{ + + public static final ShortVersionedSerializer instance = new ShortVersionedSerializer(); + + private ShortVersionedSerializer() {} + + public void serialize(Short aShort, DataOutputPlus out, int version) throws IOException + { + out.writeShort(aShort); + } + + public Short deserialize(DataInputPlus in, int version) throws IOException + { + return in.readShort(); + } + + public long serializedSize(Short aShort, int version) + { + return 2; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 9fb3059..7d77ad5 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -19,12 +19,12 @@ package org.apache.cassandra.io.sstable; import java.io.File; import java.io.IOException; -import java.net.InetAddress; import java.util.*; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.io.FSError; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.db.Directories; @@ -32,7 +32,6 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.*; import org.apache.cassandra.utils.OutputHandler; import org.apache.cassandra.utils.Pair; @@ -50,10 +49,10 @@ public class SSTableLoader implements StreamEventHandler private final Client client; private final int connectionsPerHost; private final OutputHandler outputHandler; - private final Set<InetAddress> failedHosts = new HashSet<>(); + private final Set<InetAddressAndPort> failedHosts = new HashSet<>(); private final List<SSTableReader> sstables = new ArrayList<>(); - private final Multimap<InetAddress, StreamSession.SSTableStreamingSections> streamingDetails = HashMultimap.create(); + private final Multimap<InetAddressAndPort, StreamSession.SSTableStreamingSections> streamingDetails = HashMultimap.create(); public SSTableLoader(File directory, Client client, OutputHandler outputHandler) { @@ -70,7 +69,7 @@ public class SSTableLoader implements StreamEventHandler } @SuppressWarnings("resource") - protected Collection<SSTableReader> openSSTables(final Map<InetAddress, Collection<Range<Token>>> ranges) + protected Collection<SSTableReader> openSSTables(final Map<InetAddressAndPort, Collection<Range<Token>>> ranges) { outputHandler.output("Opening sstables and calculating sections to stream"); @@ -124,9 +123,9 @@ public class SSTableLoader implements StreamEventHandler // calculate the sstable sections to stream as well as the estimated number of // keys per host - for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : ranges.entrySet()) + for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : ranges.entrySet()) { - InetAddress endpoint = entry.getKey(); + InetAddressAndPort endpoint = entry.getKey(); Collection<Range<Token>> tokenRanges = entry.getValue(); List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges); @@ -153,17 +152,17 @@ public class SSTableLoader implements StreamEventHandler public StreamResultFuture stream() { - return stream(Collections.<InetAddress>emptySet()); + return stream(Collections.<InetAddressAndPort>emptySet()); } - public StreamResultFuture stream(Set<InetAddress> toIgnore, StreamEventHandler... listeners) + public StreamResultFuture stream(Set<InetAddressAndPort> toIgnore, StreamEventHandler... listeners) { client.init(keyspace); outputHandler.output("Established connection to initial hosts"); StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, connectionsPerHost, false, false, null, PreviewKind.NONE).connectionFactory(client.getConnectionFactory()); - Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap(); + Map<InetAddressAndPort, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap(); openSSTables(endpointToRanges); if (sstables.isEmpty()) { @@ -173,9 +172,9 @@ public class SSTableLoader implements StreamEventHandler outputHandler.output(String.format("Streaming relevant part of %s to %s", names(sstables), endpointToRanges.keySet())); - for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : endpointToRanges.entrySet()) + for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : endpointToRanges.entrySet()) { - InetAddress remote = entry.getKey(); + InetAddressAndPort remote = entry.getKey(); if (toIgnore.contains(remote)) continue; @@ -232,14 +231,14 @@ public class SSTableLoader implements StreamEventHandler return builder.toString(); } - public Set<InetAddress> getFailedHosts() + public Set<InetAddressAndPort> getFailedHosts() { return failedHosts; } public static abstract class Client { - private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<>(); + private final Map<InetAddressAndPort, Collection<Range<Token>>> endpointToRanges = new HashMap<>(); /** * Initialize the client. @@ -281,12 +280,12 @@ public class SSTableLoader implements StreamEventHandler throw new RuntimeException(); } - public Map<InetAddress, Collection<Range<Token>>> getEndpointToRangesMap() + public Map<InetAddressAndPort, Collection<Range<Token>>> getEndpointToRangesMap() { return endpointToRanges; } - protected void addRangeForEndpoint(Range<Token> range, InetAddress endpoint) + protected void addRangeForEndpoint(Range<Token> range, InetAddressAndPort endpoint) { Collection<Range<Token>> ranges = endpointToRanges.get(endpoint); if (ranges == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java index 546d15e..2ee8eea 100644 --- a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java @@ -17,14 +17,13 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.util.*; import org.apache.cassandra.config.DatabaseDescriptor; public abstract class AbstractEndpointSnitch implements IEndpointSnitch { - public abstract int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2); + public abstract int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2); /** * Sorts the <tt>Collection</tt> of node addresses by proximity to the given address @@ -32,9 +31,9 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch * @param unsortedAddress the nodes to sort * @return a new sorted <tt>List</tt> */ - public List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress) + public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress) { - List<InetAddress> preferred = new ArrayList<InetAddress>(unsortedAddress); + List<InetAddressAndPort> preferred = new ArrayList<>(unsortedAddress); sortByProximity(address, preferred); return preferred; } @@ -44,11 +43,11 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch * @param address the address to sort the proximity by * @param addresses the nodes to sort */ - public void sortByProximity(final InetAddress address, List<InetAddress> addresses) + public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses) { - Collections.sort(addresses, new Comparator<InetAddress>() + Collections.sort(addresses, new Comparator<InetAddressAndPort>() { - public int compare(InetAddress a1, InetAddress a2) + public int compare(InetAddressAndPort a1, InetAddressAndPort a2) { return compareEndpoints(address, a1, a2); } @@ -60,7 +59,7 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch // noop by default } - public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2) + public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2) { // Querying remote DC is likely to be an order of magnitude slower than // querying locally, so 2 queries to local nodes is likely to still be @@ -71,10 +70,10 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch : true; } - private boolean hasRemoteNode(List<InetAddress> l) + private boolean hasRemoteNode(List<InetAddressAndPort> l) { String localDc = DatabaseDescriptor.getLocalDataCenter(); - for (InetAddress ep : l) + for (InetAddressAndPort ep : l) { if (!localDc.equals(getDatacenter(ep))) return true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java b/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java index b5606d6..e91f6ac 100644 --- a/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java +++ b/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; - /** * An endpoint snitch tells Cassandra information about network topology that it can use to route * requests more efficiently. @@ -30,16 +28,16 @@ public abstract class AbstractNetworkTopologySnitch extends AbstractEndpointSnit * @param endpoint a specified endpoint * @return string of rack */ - abstract public String getRack(InetAddress endpoint); + abstract public String getRack(InetAddressAndPort endpoint); /** * Return the data center for which an endpoint resides in * @param endpoint a specified endpoint * @return string of data center */ - abstract public String getDatacenter(InetAddress endpoint); + abstract public String getDatacenter(InetAddressAndPort endpoint); - public int compareEndpoints(InetAddress address, InetAddress a1, InetAddress a2) + public int compareEndpoints(InetAddressAndPort address, InetAddressAndPort a1, InetAddressAndPort a2) { if (address.equals(a1) && !address.equals(a2)) return -1; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index c3498d9..3e9b5bb 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -19,7 +19,6 @@ package org.apache.cassandra.locator; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.net.InetAddress; import java.util.*; import com.google.common.annotations.VisibleForTesting; @@ -74,9 +73,9 @@ public abstract class AbstractReplicationStrategy // lazy-initialize keyspace itself since we don't create them until after the replication strategies } - private final Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>(); + private final Map<Token, ArrayList<InetAddressAndPort>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddressAndPort>>(); - public ArrayList<InetAddress> getCachedEndpoints(Token t) + public ArrayList<InetAddressAndPort> getCachedEndpoints(Token t) { long lastVersion = tokenMetadata.getRingVersion(); @@ -103,21 +102,21 @@ public abstract class AbstractReplicationStrategy * @param searchPosition the position the natural endpoints are requested for * @return a copy of the natural endpoints for the given token */ - public ArrayList<InetAddress> getNaturalEndpoints(RingPosition searchPosition) + public ArrayList<InetAddressAndPort> getNaturalEndpoints(RingPosition searchPosition) { Token searchToken = searchPosition.getToken(); Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken); - ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken); + ArrayList<InetAddressAndPort> endpoints = getCachedEndpoints(keyToken); if (endpoints == null) { TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap(); // if our cache got invalidated, it's possible there is a new token to account for too keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken); - endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tm)); + endpoints = new ArrayList<InetAddressAndPort>(calculateNaturalEndpoints(searchToken, tm)); cachedEndpoints.put(keyToken, endpoints); } - return new ArrayList<InetAddress>(endpoints); + return new ArrayList<InetAddressAndPort>(endpoints); } /** @@ -128,10 +127,10 @@ public abstract class AbstractReplicationStrategy * @param searchToken the token the natural endpoints are requested for * @return a copy of the natural endpoints for the given token */ - public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata); + public abstract List<InetAddressAndPort> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata); - public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddress> naturalEndpoints, - Collection<InetAddress> pendingEndpoints, + public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints, + Collection<InetAddressAndPort> pendingEndpoints, ConsistencyLevel consistency_level, Runnable callback, WriteType writeType, @@ -140,8 +139,8 @@ public abstract class AbstractReplicationStrategy return getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel()); } - public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddress> naturalEndpoints, - Collection<InetAddress> pendingEndpoints, + public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints, + Collection<InetAddressAndPort> pendingEndpoints, ConsistencyLevel consistency_level, Runnable callback, WriteType writeType, @@ -211,14 +210,14 @@ public abstract class AbstractReplicationStrategy * (fixing this would probably require merging tokenmetadata into replicationstrategy, * so we could cache/invalidate cleanly.) */ - public Multimap<InetAddress, Range<Token>> getAddressRanges(TokenMetadata metadata) + public Multimap<InetAddressAndPort, Range<Token>> getAddressRanges(TokenMetadata metadata) { - Multimap<InetAddress, Range<Token>> map = HashMultimap.create(); + Multimap<InetAddressAndPort, Range<Token>> map = HashMultimap.create(); for (Token token : metadata.sortedTokens()) { Range<Token> range = metadata.getPrimaryRangeFor(token); - for (InetAddress ep : calculateNaturalEndpoints(token, metadata)) + for (InetAddressAndPort ep : calculateNaturalEndpoints(token, metadata)) { map.put(ep, range); } @@ -227,14 +226,14 @@ public abstract class AbstractReplicationStrategy return map; } - public Multimap<Range<Token>, InetAddress> getRangeAddresses(TokenMetadata metadata) + public Multimap<Range<Token>, InetAddressAndPort> getRangeAddresses(TokenMetadata metadata) { - Multimap<Range<Token>, InetAddress> map = HashMultimap.create(); + Multimap<Range<Token>, InetAddressAndPort> map = HashMultimap.create(); for (Token token : metadata.sortedTokens()) { Range<Token> range = metadata.getPrimaryRangeFor(token); - for (InetAddress ep : calculateNaturalEndpoints(token, metadata)) + for (InetAddressAndPort ep : calculateNaturalEndpoints(token, metadata)) { map.put(range, ep); } @@ -243,17 +242,17 @@ public abstract class AbstractReplicationStrategy return map; } - public Multimap<InetAddress, Range<Token>> getAddressRanges() + public Multimap<InetAddressAndPort, Range<Token>> getAddressRanges() { return getAddressRanges(tokenMetadata.cloneOnlyTokenMap()); } - public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress) + public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddressAndPort pendingAddress) { return getPendingAddressRanges(metadata, Arrays.asList(pendingToken), pendingAddress); } - public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Collection<Token> pendingTokens, InetAddress pendingAddress) + public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Collection<Token> pendingTokens, InetAddressAndPort pendingAddress) { TokenMetadata temp = metadata.cloneOnlyTokenMap(); temp.updateNormalTokens(pendingTokens, pendingAddress); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/CloudstackSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/CloudstackSnitch.java b/src/java/org/apache/cassandra/locator/CloudstackSnitch.java index ec2e87e..be6d3c4 100644 --- a/src/java/org/apache/cassandra/locator/CloudstackSnitch.java +++ b/src/java/org/apache/cassandra/locator/CloudstackSnitch.java @@ -24,7 +24,6 @@ import java.io.FileReader; import java.io.IOException; import java.io.File; import java.net.HttpURLConnection; -import java.net.InetAddress; import java.net.URL; import java.net.URI; import java.nio.charset.StandardCharsets; @@ -56,7 +55,7 @@ public class CloudstackSnitch extends AbstractNetworkTopologySnitch protected static final Logger logger = LoggerFactory.getLogger(CloudstackSnitch.class); protected static final String ZONE_NAME_QUERY_URI = "/latest/meta-data/availability-zone"; - private Map<InetAddress, Map<String, String>> savedEndpoints; + private Map<InetAddressAndPort, Map<String, String>> savedEndpoints; private static final String DEFAULT_DC = "UNKNOWN-DC"; private static final String DEFAULT_RACK = "UNKNOWN-RACK"; @@ -83,9 +82,9 @@ public class CloudstackSnitch extends AbstractNetworkTopologySnitch csZoneRack = zone_parts[2]; } - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) return csZoneRack; EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); if (state == null || state.getApplicationState(ApplicationState.RACK) == null) @@ -99,9 +98,9 @@ public class CloudstackSnitch extends AbstractNetworkTopologySnitch return state.getApplicationState(ApplicationState.RACK).value; } - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) return csZoneDc; EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); if (state == null || state.getApplicationState(ApplicationState.DC) == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java index 42fc26c..b9c9ba0 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java @@ -25,6 +25,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import com.codahale.metrics.ExponentiallyDecayingReservoir; import javax.management.MBeanServer; @@ -63,8 +64,8 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa private String mbeanName; private boolean registered = false; - private volatile HashMap<InetAddress, Double> scores = new HashMap<>(); - private final ConcurrentHashMap<InetAddress, ExponentiallyDecayingReservoir> samples = new ConcurrentHashMap<>(); + private volatile HashMap<InetAddressAndPort, Double> scores = new HashMap<>(); + private final ConcurrentHashMap<InetAddressAndPort, ExponentiallyDecayingReservoir> samples = new ConcurrentHashMap<>(); public final IEndpointSnitch subsnitch; @@ -174,27 +175,27 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa subsnitch.gossiperStarting(); } - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { return subsnitch.getRack(endpoint); } - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { return subsnitch.getDatacenter(endpoint); } - public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses) + public List<InetAddressAndPort> getSortedListByProximity(final InetAddressAndPort address, Collection<InetAddressAndPort> addresses) { - List<InetAddress> list = new ArrayList<InetAddress>(addresses); + List<InetAddressAndPort> list = new ArrayList<>(addresses); sortByProximity(address, list); return list; } @Override - public void sortByProximity(final InetAddress address, List<InetAddress> addresses) + public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses) { - assert address.equals(FBUtilities.getBroadcastAddress()); // we only know about ourself + assert address.equals(FBUtilities.getBroadcastAddressAndPort()); // we only know about ourself if (dynamicBadnessThreshold == 0) { sortByProximityWithScore(address, addresses); @@ -205,32 +206,32 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa } } - private void sortByProximityWithScore(final InetAddress address, List<InetAddress> addresses) + private void sortByProximityWithScore(final InetAddressAndPort address, List<InetAddressAndPort> addresses) { // Scores can change concurrently from a call to this method. But Collections.sort() expects // its comparator to be "stable", that is 2 endpoint should compare the same way for the duration // of the sort() call. As we copy the scores map on write, it is thus enough to alias the current // version of it during this call. - final HashMap<InetAddress, Double> scores = this.scores; - Collections.sort(addresses, new Comparator<InetAddress>() + final HashMap<InetAddressAndPort, Double> scores = this.scores; + Collections.sort(addresses, new Comparator<InetAddressAndPort>() { - public int compare(InetAddress a1, InetAddress a2) + public int compare(InetAddressAndPort a1, InetAddressAndPort a2) { return compareEndpoints(address, a1, a2, scores); } }); } - private void sortByProximityWithBadness(final InetAddress address, List<InetAddress> addresses) + private void sortByProximityWithBadness(final InetAddressAndPort address, List<InetAddressAndPort> addresses) { if (addresses.size() < 2) return; subsnitch.sortByProximity(address, addresses); - HashMap<InetAddress, Double> scores = this.scores; // Make sure the score don't change in the middle of the loop below + HashMap<InetAddressAndPort, Double> scores = this.scores; // Make sure the score don't change in the middle of the loop below // (which wouldn't really matter here but its cleaner that way). ArrayList<Double> subsnitchOrderedScores = new ArrayList<>(addresses.size()); - for (InetAddress inet : addresses) + for (InetAddressAndPort inet : addresses) { Double score = scores.get(inet); if (score == null) @@ -256,7 +257,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa } // Compare endpoints given an immutable snapshot of the scores - private int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2, Map<InetAddress, Double> scores) + private int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2, Map<InetAddressAndPort, Double> scores) { Double scored1 = scores.get(a1); Double scored2 = scores.get(a2); @@ -279,7 +280,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa return 1; } - public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { // That function is fundamentally unsafe because the scores can change at any time and so the result of that // method is not stable for identical arguments. This is why we don't rely on super.sortByProximity() in @@ -287,7 +288,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa throw new UnsupportedOperationException("You shouldn't wrap the DynamicEndpointSnitch (within itself or otherwise)"); } - public void receiveTiming(InetAddress host, long latency) // this is cheap + public void receiveTiming(InetAddressAndPort host, long latency) // this is cheap { ExponentiallyDecayingReservoir sample = samples.get(host); if (sample == null) @@ -315,23 +316,23 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa } double maxLatency = 1; - Map<InetAddress, Snapshot> snapshots = new HashMap<>(samples.size()); - for (Map.Entry<InetAddress, ExponentiallyDecayingReservoir> entry : samples.entrySet()) + Map<InetAddressAndPort, Snapshot> snapshots = new HashMap<>(samples.size()); + for (Map.Entry<InetAddressAndPort, ExponentiallyDecayingReservoir> entry : samples.entrySet()) { snapshots.put(entry.getKey(), entry.getValue().getSnapshot()); } // We're going to weight the latency for each host against the worst one we see, to // arrive at sort of a 'badness percentage' for them. First, find the worst for each: - HashMap<InetAddress, Double> newScores = new HashMap<>(); - for (Map.Entry<InetAddress, Snapshot> entry : snapshots.entrySet()) + HashMap<InetAddressAndPort, Double> newScores = new HashMap<>(); + for (Map.Entry<InetAddressAndPort, Snapshot> entry : snapshots.entrySet()) { double mean = entry.getValue().getMedian(); if (mean > maxLatency) maxLatency = mean; } // now make another pass to do the weighting based on the maximums we found before - for (Map.Entry<InetAddress, Snapshot> entry : snapshots.entrySet()) + for (Map.Entry<InetAddressAndPort, Snapshot> entry : snapshots.entrySet()) { double score = entry.getValue().getMedian() / maxLatency; // finally, add the severity without any weighting, since hosts scale this relative to their own load and the size of the task causing the severity. @@ -351,6 +352,11 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa public Map<InetAddress, Double> getScores() { + return scores.entrySet().stream().collect(Collectors.toMap(address -> address.getKey().address, Map.Entry::getValue)); + } + + public Map<InetAddressAndPort, Double> getScoresWithPort() + { return scores; } @@ -374,7 +380,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa public List<Double> dumpTimings(String hostname) throws UnknownHostException { - InetAddress host = InetAddress.getByName(hostname); + InetAddressAndPort host = InetAddressAndPort.getByName(hostname); ArrayList<Double> timings = new ArrayList<Double>(); ExponentiallyDecayingReservoir sample = samples.get(host); if (sample != null) @@ -390,7 +396,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, StorageService.instance.valueFactory.severity(severity)); } - private double getSeverity(InetAddress endpoint) + private double getSeverity(InetAddressAndPort endpoint) { EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); if (state == null) @@ -405,10 +411,10 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa public double getSeverity() { - return getSeverity(FBUtilities.getBroadcastAddress()); + return getSeverity(FBUtilities.getBroadcastAddressAndPort()); } - public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2) + public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2) { if (!subsnitch.isWorthMergingForRangeQuery(merged, l1, l2)) return false; @@ -428,10 +434,10 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa } // Return the max score for the endpoint in the provided list, or -1.0 if no node have a score. - private double maxScore(List<InetAddress> endpoints) + private double maxScore(List<InetAddressAndPort> endpoints) { double maxScore = -1.0; - for (InetAddress endpoint : endpoints) + for (InetAddressAndPort endpoint : endpoints) { Double score = scores.get(endpoint); if (score == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java index bfafa75..61f0d97 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java @@ -24,6 +24,8 @@ import java.util.List; public interface DynamicEndpointSnitchMBean { + public Map<InetAddressAndPort, Double> getScoresWithPort(); + @Deprecated public Map<InetAddress, Double> getScores(); public int getUpdateInterval(); public int getResetInterval(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java b/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java index b32ca84..2a6c7e9 100644 --- a/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java +++ b/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java @@ -19,6 +19,7 @@ package org.apache.cassandra.locator; import java.io.IOException; import java.net.InetAddress; +import java.net.UnknownHostException; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; @@ -62,6 +63,16 @@ public class Ec2MultiRegionSnitch extends Ec2Snitch public void gossiperStarting() { super.gossiperStarting(); + InetAddressAndPort address; + try + { + address = InetAddressAndPort.getByName(localPrivateAddress); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT, StorageService.instance.valueFactory.internalAddressAndPort(address)); Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(localPrivateAddress)); Gossiper.instance.register(new ReconnectableSnitchHelper(this, ec2region, true)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/Ec2Snitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/Ec2Snitch.java b/src/java/org/apache/cassandra/locator/Ec2Snitch.java index 59eb27b..c7324c8 100644 --- a/src/java/org/apache/cassandra/locator/Ec2Snitch.java +++ b/src/java/org/apache/cassandra/locator/Ec2Snitch.java @@ -21,7 +21,6 @@ import java.io.DataInputStream; import java.io.FilterInputStream; import java.io.IOException; import java.net.HttpURLConnection; -import java.net.InetAddress; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -46,7 +45,7 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch protected static final String ZONE_NAME_QUERY_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"; private static final String DEFAULT_DC = "UNKNOWN-DC"; private static final String DEFAULT_RACK = "UNKNOWN-RACK"; - private Map<InetAddress, Map<String, String>> savedEndpoints; + private Map<InetAddressAndPort, Map<String, String>> savedEndpoints; protected String ec2zone; protected String ec2region; @@ -92,9 +91,9 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch } } - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) return ec2zone; EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); if (state == null || state.getApplicationState(ApplicationState.RACK) == null) @@ -108,9 +107,9 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch return state.getApplicationState(ApplicationState.RACK).value; } - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) return ec2region; EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); if (state == null || state.getApplicationState(ApplicationState.DC) == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java index bbfabb6..c06d765 100644 --- a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java +++ b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java @@ -19,7 +19,6 @@ package org.apache.cassandra.locator; import java.lang.management.ManagementFactory; -import java.net.InetAddress; import java.net.UnknownHostException; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -44,22 +43,22 @@ public class EndpointSnitchInfo implements EndpointSnitchInfoMBean public String getDatacenter(String host) throws UnknownHostException { - return DatabaseDescriptor.getEndpointSnitch().getDatacenter(InetAddress.getByName(host)); + return DatabaseDescriptor.getEndpointSnitch().getDatacenter(InetAddressAndPort.getByName(host)); } public String getRack(String host) throws UnknownHostException { - return DatabaseDescriptor.getEndpointSnitch().getRack(InetAddress.getByName(host)); + return DatabaseDescriptor.getEndpointSnitch().getRack(InetAddressAndPort.getByName(host)); } public String getDatacenter() { - return DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + return DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); } public String getRack() { - return DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress()); + return DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); } public String getSnitchName() http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java b/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java index b4d3b19..1e1c500 100644 --- a/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java +++ b/src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java @@ -21,7 +21,6 @@ import java.io.DataInputStream; import java.io.FilterInputStream; import java.io.IOException; import java.net.HttpURLConnection; -import java.net.InetAddress; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -46,7 +45,7 @@ public class GoogleCloudSnitch extends AbstractNetworkTopologySnitch protected static final String ZONE_NAME_QUERY_URL = "http://metadata.google.internal/computeMetadata/v1/instance/zone"; private static final String DEFAULT_DC = "UNKNOWN-DC"; private static final String DEFAULT_RACK = "UNKNOWN-RACK"; - private Map<InetAddress, Map<String, String>> savedEndpoints; + private Map<InetAddressAndPort, Map<String, String>> savedEndpoints; protected String gceZone; protected String gceRegion; @@ -94,9 +93,9 @@ public class GoogleCloudSnitch extends AbstractNetworkTopologySnitch } } - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) return gceZone; EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); if (state == null || state.getApplicationState(ApplicationState.RACK) == null) @@ -110,9 +109,9 @@ public class GoogleCloudSnitch extends AbstractNetworkTopologySnitch return state.getApplicationState(ApplicationState.RACK).value; } - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) return gceRegion; EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); if (state == null || state.getApplicationState(ApplicationState.DC) == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java index e2449ae..75b5685 100644 --- a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java +++ b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java @@ -18,7 +18,6 @@ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.util.concurrent.atomic.AtomicReference; import java.util.Map; @@ -45,7 +44,7 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch// private final boolean preferLocal; private final AtomicReference<ReconnectableSnitchHelper> snitchHelperReference; - private Map<InetAddress, Map<String, String>> savedEndpoints; + private Map<InetAddressAndPort, Map<String, String>> savedEndpoints; private static final String DEFAULT_DC = "UNKNOWN_DC"; private static final String DEFAULT_RACK = "UNKNOWN_RACK"; @@ -84,9 +83,9 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch// * @param endpoint the endpoint to process * @return string of data center */ - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) return myDC; EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); @@ -112,9 +111,9 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch// * @param endpoint the endpoint to process * @return string of rack */ - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) return myRack; EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); @@ -138,8 +137,10 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch// { super.gossiperStarting(); + Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT, + StorageService.instance.valueFactory.internalAddressAndPort(FBUtilities.getLocalAddressAndPort())); Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, - StorageService.instance.valueFactory.internalIP(FBUtilities.getLocalAddress().getHostAddress())); + StorageService.instance.valueFactory.internalIP(FBUtilities.getJustLocalAddress().getHostAddress())); loadGossiperState(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/IEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java index 71b441c..00a1543 100644 --- a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.util.Collection; import java.util.List; @@ -32,27 +31,27 @@ public interface IEndpointSnitch /** * returns a String representing the rack this endpoint belongs to */ - public String getRack(InetAddress endpoint); + public String getRack(InetAddressAndPort endpoint); /** * returns a String representing the datacenter this endpoint belongs to */ - public String getDatacenter(InetAddress endpoint); + public String getDatacenter(InetAddressAndPort endpoint); /** * returns a new <tt>List</tt> sorted by proximity to the given endpoint */ - public List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress); + public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress); /** * This method will sort the <tt>List</tt> by proximity to the given address. */ - public void sortByProximity(InetAddress address, List<InetAddress> addresses); + public void sortByProximity(InetAddressAndPort address, List<InetAddressAndPort> addresses); /** * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would */ - public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2); + public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2); /** * called after Gossiper instance exists immediately before it starts gossiping @@ -63,5 +62,5 @@ public interface IEndpointSnitch * Returns whether for a range query doing a query against merged is likely * to be faster than 2 sequential queries, one against l1 followed by one against l2. */ - public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2); + public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/ILatencySubscriber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ILatencySubscriber.java b/src/java/org/apache/cassandra/locator/ILatencySubscriber.java index d2ae6db..f6c1c7f 100644 --- a/src/java/org/apache/cassandra/locator/ILatencySubscriber.java +++ b/src/java/org/apache/cassandra/locator/ILatencySubscriber.java @@ -17,9 +17,7 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; - public interface ILatencySubscriber { - public void receiveTiming(InetAddress address, long latency); + public void receiveTiming(InetAddressAndPort address, long latency); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/InetAddressAndPort.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java new file mode 100644 index 0000000..38a1a49 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.io.Serializable; +import java.net.InetAddress; +import java.net.UnknownHostException; + +import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; + +import org.apache.cassandra.utils.FastByteOperations; + +/** + * A class to replace the usage of InetAddress to identify hosts in the cluster. + * Opting for a full replacement class so that in the future if we change the nature + * of the identifier the refactor will be easier in that we don't have to change the type + * just the methods. + * + * Because an IP might contain multiple C* instances the identification must be done + * using the IP + port. InetSocketAddress is undesirable for a couple of reasons. It's not comparable, + * it's toString() method doesn't correctly bracket IPv6, it doesn't handle optional default values, + * and a couple of other minor behaviors that are slightly less troublesome like handling the + * need to sometimes return a port and sometimes not. + * + */ +public final class InetAddressAndPort implements Comparable<InetAddressAndPort>, Serializable +{ + private static final long serialVersionUID = 0; + + //Store these here to avoid requiring DatabaseDescriptor to be loaded. DatabaseDescriptor will set + //these when it loads the config. A lot of unit tests won't end up loading DatabaseDescriptor. + //Tools that might use this class also might not load database descriptor. Those tools are expected + //to always override the defaults. + static volatile int defaultPort = 7000; + + public final InetAddress address; + public final byte[] addressBytes; + public final int port; + + private InetAddressAndPort(InetAddress address, byte[] addressBytes, int port) + { + Preconditions.checkNotNull(address); + Preconditions.checkNotNull(addressBytes); + validatePortRange(port); + this.address = address; + this.port = port; + this.addressBytes = addressBytes; + } + + private static void validatePortRange(int port) + { + if (port < 0 | port > 65535) + { + throw new IllegalArgumentException("Port " + port + " is not a valid port number in the range 0-65535"); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + InetAddressAndPort that = (InetAddressAndPort) o; + + if (port != that.port) return false; + return address.equals(that.address); + } + + @Override + public int hashCode() + { + int result = address.hashCode(); + result = 31 * result + port; + return result; + } + + @Override + public int compareTo(InetAddressAndPort o) + { + int retval = FastByteOperations.compareUnsigned(addressBytes, 0, addressBytes.length, o.addressBytes, 0, o.addressBytes.length); + if (retval != 0) + { + return retval; + } + + return Integer.compare(port, o.port); + } + + public String getHostAddress(boolean withPort) + { + if (withPort) + { + return toString(); + } + else + { + return address.getHostAddress(); + } + } + + @Override + public String toString() + { + return toString(true); + } + + public String toString(boolean withPort) + { + if (withPort) + { + return HostAndPort.fromParts(address.getHostAddress(), port).toString(); + } + else + { + return address.toString(); + } + } + + public static InetAddressAndPort getByName(String name) throws UnknownHostException + { + return getByNameOverrideDefaults(name, null); + } + + /** + * + * @param name Hostname + optional ports string + * @param port Port to connect on, overridden by values in hostname string, defaults to DatabaseDescriptor default if not specified anywhere. + * @return + * @throws UnknownHostException + */ + public static InetAddressAndPort getByNameOverrideDefaults(String name, Integer port) throws UnknownHostException + { + HostAndPort hap = HostAndPort.fromString(name); + if (hap.hasPort()) + { + port = hap.getPort(); + } + return getByAddressOverrideDefaults(InetAddress.getByName(hap.getHost()), port); + } + + public static InetAddressAndPort getByAddress(byte[] address) throws UnknownHostException + { + return getByAddressOverrideDefaults(InetAddress.getByAddress(address), address, null); + } + + public static InetAddressAndPort getByAddress(InetAddress address) + { + return getByAddressOverrideDefaults(address, null); + } + + public static InetAddressAndPort getByAddressOverrideDefaults(InetAddress address, Integer port) + { + if (port == null) + { + port = defaultPort; + } + + return new InetAddressAndPort(address, address.getAddress(), port); + } + + public static InetAddressAndPort getByAddressOverrideDefaults(InetAddress address, byte[] addressBytes, Integer port) + { + if (port == null) + { + port = defaultPort; + } + + return new InetAddressAndPort(address, addressBytes, port); + } + + public static InetAddressAndPort getLoopbackAddress() + { + return InetAddressAndPort.getByAddress(InetAddress.getLoopbackAddress()); + } + + public static InetAddressAndPort getLocalHost() throws UnknownHostException + { + return InetAddressAndPort.getByAddress(InetAddress.getLocalHost()); + } + + public static void initializeDefaultPort(int port) + { + defaultPort = port; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/LocalStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java b/src/java/org/apache/cassandra/locator/LocalStrategy.java index ae58203..a76fe96 100644 --- a/src/java/org/apache/cassandra/locator/LocalStrategy.java +++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.Collection; @@ -42,16 +41,16 @@ public class LocalStrategy extends AbstractReplicationStrategy * LocalStrategy may be used before tokens are set up. */ @Override - public ArrayList<InetAddress> getNaturalEndpoints(RingPosition searchPosition) + public ArrayList<InetAddressAndPort> getNaturalEndpoints(RingPosition searchPosition) { - ArrayList<InetAddress> l = new ArrayList<InetAddress>(1); - l.add(FBUtilities.getBroadcastAddress()); + ArrayList<InetAddressAndPort> l = new ArrayList<InetAddressAndPort>(1); + l.add(FBUtilities.getBroadcastAddressAndPort()); return l; } - public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata) + public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata) { - return Collections.singletonList(FBUtilities.getBroadcastAddress()); + return Collections.singletonList(FBUtilities.getBroadcastAddressAndPort()); } public int getReplicationFactor() http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java index 442e6cf..673c018 100644 --- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java +++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.util.*; import java.util.Map.Entry; @@ -72,7 +71,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy } datacenters = Collections.unmodifiableMap(newDatacenters); - logger.trace("Configured datacenter replicas are {}", FBUtilities.toString(datacenters)); + logger.info("Configured datacenter replicas are {}", FBUtilities.toString(datacenters)); } /** @@ -81,7 +80,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy private static final class DatacenterEndpoints { /** List accepted endpoints get pushed into. */ - Set<InetAddress> endpoints; + Set<InetAddressAndPort> endpoints; /** * Racks encountered so far. Replicas are put into separate racks while possible. * For efficiency the set is shared between the instances, using the location pair (dc, rack) to make sure @@ -93,7 +92,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy int rfLeft; int acceptableRackRepeats; - DatacenterEndpoints(int rf, int rackCount, int nodeCount, Set<InetAddress> endpoints, Set<Pair<String, String>> racks) + DatacenterEndpoints(int rf, int rackCount, int nodeCount, Set<InetAddressAndPort> endpoints, Set<Pair<String, String>> racks) { this.endpoints = endpoints; this.racks = racks; @@ -108,7 +107,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy * Attempts to add an endpoint to the replicas for this datacenter, adding to the endpoints set if successful. * Returns true if the endpoint was added, and this datacenter does not require further replicas. */ - boolean addEndpointAndCheckIfDone(InetAddress ep, Pair<String,String> location) + boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, Pair<String,String> location) { if (done()) return false; @@ -143,17 +142,17 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy /** * calculate endpoints in one pass through the tokens by tracking our progress in each DC. */ - public List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata) + public List<InetAddressAndPort> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata) { // we want to preserve insertion order so that the first added endpoint becomes primary - Set<InetAddress> replicas = new LinkedHashSet<>(); + Set<InetAddressAndPort> replicas = new LinkedHashSet<>(); Set<Pair<String, String>> seenRacks = new HashSet<>(); Topology topology = tokenMetadata.getTopology(); // all endpoints in each DC, so we can check when we have exhausted all the members of a DC - Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints(); + Multimap<String, InetAddressAndPort> allEndpoints = topology.getDatacenterEndpoints(); // all racks in a DC so we can check when we have exhausted all racks in a DC - Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks(); + Map<String, Multimap<String, InetAddressAndPort>> racks = topology.getDatacenterRacks(); assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members"; int dcsToFill = 0; @@ -178,7 +177,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy while (dcsToFill > 0 && tokenIter.hasNext()) { Token next = tokenIter.next(); - InetAddress ep = tokenMetadata.getEndpoint(next); + InetAddressAndPort ep = tokenMetadata.getEndpoint(next); Pair<String, String> location = topology.getLocation(ep); DatacenterEndpoints dcEndpoints = dcs.get(location.left); if (dcEndpoints != null && dcEndpoints.addEndpointAndCheckIfDone(ep, location)) @@ -227,9 +226,9 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); // Add data center of localhost. - validDataCenters.add(snitch.getDatacenter(FBUtilities.getBroadcastAddress())); + validDataCenters.add(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort())); // Fetch and add DCs of all peers. - for (final InetAddress peer : StorageService.instance.getTokenMetadata().getAllEndpoints()) + for (final InetAddressAndPort peer : StorageService.instance.getTokenMetadata().getAllEndpoints()) { validDataCenters.add(snitch.getDatacenter(peer)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java index b9bd767..93e629e 100644 --- a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java +++ b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.Collection; @@ -42,10 +41,10 @@ public class OldNetworkTopologyStrategy extends AbstractReplicationStrategy super(keyspaceName, tokenMetadata, snitch, configOptions); } - public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata) + public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata) { int replicas = getReplicationFactor(); - List<InetAddress> endpoints = new ArrayList<InetAddress>(replicas); + List<InetAddressAndPort> endpoints = new ArrayList<>(replicas); ArrayList<Token> tokens = metadata.sortedTokens(); if (tokens.isEmpty()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org