http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index be0cf0f..dcf0cab 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -17,9 +17,7 @@ */ package org.apache.cassandra.service; -import java.io.IOException; import java.lang.management.ManagementFactory; -import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.file.Paths; import java.util.*; @@ -64,7 +62,6 @@ import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.hints.Hint; import org.apache.cassandra.hints.HintsService; import org.apache.cassandra.index.Index; -import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.*; import org.apache.cassandra.metrics.*; import org.apache.cassandra.net.*; @@ -95,9 +92,9 @@ public class StorageProxy implements StorageProxyMBean public static final StorageProxy instance = new StorageProxy(); private static volatile int maxHintsInProgress = 128 * FBUtilities.getAvailableProcessors(); - private static final CacheLoader<InetAddress, AtomicInteger> hintsInProgress = new CacheLoader<InetAddress, AtomicInteger>() + private static final CacheLoader<InetAddressAndPort, AtomicInteger> hintsInProgress = new CacheLoader<InetAddressAndPort, AtomicInteger>() { - public AtomicInteger load(InetAddress inetAddress) + public AtomicInteger load(InetAddressAndPort inetAddress) { return new AtomicInteger(0); } @@ -135,7 +132,7 @@ public class StorageProxy implements StorageProxyMBean standardWritePerformer = new WritePerformer() { public void apply(IMutation mutation, - Iterable<InetAddress> targets, + Iterable<InetAddressAndPort> targets, AbstractWriteResponseHandler<IMutation> responseHandler, String localDataCenter, ConsistencyLevel consistency_level) @@ -155,7 +152,7 @@ public class StorageProxy implements StorageProxyMBean counterWritePerformer = new WritePerformer() { public void apply(IMutation mutation, - Iterable<InetAddress> targets, + Iterable<InetAddressAndPort> targets, AbstractWriteResponseHandler<IMutation> responseHandler, String localDataCenter, ConsistencyLevel consistencyLevel) @@ -167,7 +164,7 @@ public class StorageProxy implements StorageProxyMBean counterWriteOnCoordinatorPerformer = new WritePerformer() { public void apply(IMutation mutation, - Iterable<InetAddress> targets, + Iterable<InetAddressAndPort> targets, AbstractWriteResponseHandler<IMutation> responseHandler, String localDataCenter, ConsistencyLevel consistencyLevel) @@ -248,8 +245,8 @@ public class StorageProxy implements StorageProxyMBean while (System.nanoTime() - queryStartNanoTime < timeout) { // for simplicity, we'll do a single liveness check at the start of each attempt - Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos); - List<InetAddress> liveEndpoints = p.left; + Pair<List<InetAddressAndPort>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos); + List<InetAddressAndPort> liveEndpoints = p.left; int requiredParticipants = p.right; final Pair<UUID, Integer> pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state); @@ -342,34 +339,34 @@ public class StorageProxy implements StorageProxyMBean casWriteMetrics.contention.update(contentions); } - private static Predicate<InetAddress> sameDCPredicateFor(final String dc) + private static Predicate<InetAddressAndPort> sameDCPredicateFor(final String dc) { final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - return new Predicate<InetAddress>() + return new Predicate<InetAddressAndPort>() { - public boolean apply(InetAddress host) + public boolean apply(InetAddressAndPort host) { return dc.equals(snitch.getDatacenter(host)); } }; } - private static Pair<List<InetAddress>, Integer> getPaxosParticipants(TableMetadata metadata, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException + private static Pair<List<InetAddressAndPort>, Integer> getPaxosParticipants(TableMetadata metadata, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException { Token tk = key.getToken(); - List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(metadata.keyspace, tk); - Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, metadata.keyspace); + List<InetAddressAndPort> naturalEndpoints = StorageService.instance.getNaturalEndpoints(metadata.keyspace, tk); + Collection<InetAddressAndPort> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, metadata.keyspace); if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL) { // Restrict naturalEndpoints and pendingEndpoints to node in the local DC only - String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); - Predicate<InetAddress> isLocalDc = sameDCPredicateFor(localDc); + String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + Predicate<InetAddressAndPort> isLocalDc = sameDCPredicateFor(localDc); naturalEndpoints = ImmutableList.copyOf(Iterables.filter(naturalEndpoints, isLocalDc)); pendingEndpoints = ImmutableList.copyOf(Iterables.filter(pendingEndpoints, isLocalDc)); } int participants = pendingEndpoints.size() + naturalEndpoints.size(); int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833 - List<InetAddress> liveEndpoints = ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), IAsyncCallback.isAlive)); + List<InetAddressAndPort> liveEndpoints = ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), IAsyncCallback.isAlive)); if (liveEndpoints.size() < requiredParticipants) throw new UnavailableException(consistencyForPaxos, requiredParticipants, liveEndpoints.size()); @@ -394,7 +391,7 @@ public class StorageProxy implements StorageProxyMBean private static Pair<UUID, Integer> beginAndRepairPaxos(long queryStartNanoTime, DecoratedKey key, TableMetadata metadata, - List<InetAddress> liveEndpoints, + List<InetAddressAndPort> liveEndpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, @@ -472,7 +469,7 @@ public class StorageProxy implements StorageProxyMBean // Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also // mean we lost messages), we pro-actively "repair" those nodes, and retry. int nowInSec = Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(ballotMicros)); - Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(metadata, nowInSec); + Iterable<InetAddressAndPort> missingMRC = summary.replicasMissingMostRecentCommit(metadata, nowInSec); if (Iterables.size(missingMRC) > 0) { Tracing.trace("Repairing replicas that missed the most recent commit"); @@ -494,19 +491,19 @@ public class StorageProxy implements StorageProxyMBean /** * Unlike commitPaxos, this does not wait for replies */ - private static void sendCommit(Commit commit, Iterable<InetAddress> replicas) + private static void sendCommit(Commit commit, Iterable<InetAddressAndPort> replicas) { MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, commit, Commit.serializer); - for (InetAddress target : replicas) + for (InetAddressAndPort target : replicas) MessagingService.instance().sendOneWay(message, target); } - private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress> endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, long queryStartNanoTime) + private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddressAndPort> endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, long queryStartNanoTime) throws WriteTimeoutException { PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos, queryStartNanoTime); MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer); - for (InetAddress target : endpoints) + for (InetAddressAndPort target : endpoints) { if (canDoLocalRequest(target)) { @@ -516,9 +513,9 @@ public class StorageProxy implements StorageProxyMBean { try { - MessageIn<PrepareResponse> message = MessageIn.create(FBUtilities.getBroadcastAddress(), + MessageIn<PrepareResponse> message = MessageIn.create(FBUtilities.getBroadcastAddressAndPort(), PrepareVerbHandler.doPrepare(toPrepare), - Collections.<String, byte[]>emptyMap(), + Collections.emptyMap(), MessagingService.Verb.INTERNAL_RESPONSE, MessagingService.current_version); callback.response(message); @@ -539,12 +536,12 @@ public class StorageProxy implements StorageProxyMBean return callback; } - private static boolean proposePaxos(Commit proposal, List<InetAddress> endpoints, int requiredParticipants, boolean timeoutIfPartial, ConsistencyLevel consistencyLevel, long queryStartNanoTime) + private static boolean proposePaxos(Commit proposal, List<InetAddressAndPort> endpoints, int requiredParticipants, boolean timeoutIfPartial, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws WriteTimeoutException { ProposeCallback callback = new ProposeCallback(endpoints.size(), requiredParticipants, !timeoutIfPartial, consistencyLevel, queryStartNanoTime); MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer); - for (InetAddress target : endpoints) + for (InetAddressAndPort target : endpoints) { if (canDoLocalRequest(target)) { @@ -554,9 +551,9 @@ public class StorageProxy implements StorageProxyMBean { try { - MessageIn<Boolean> message = MessageIn.create(FBUtilities.getBroadcastAddress(), + MessageIn<Boolean> message = MessageIn.create(FBUtilities.getBroadcastAddressAndPort(), ProposeVerbHandler.doPropose(proposal), - Collections.<String, byte[]>emptyMap(), + Collections.emptyMap(), MessagingService.Verb.INTERNAL_RESPONSE, MessagingService.current_version); callback.response(message); @@ -590,8 +587,8 @@ public class StorageProxy implements StorageProxyMBean Keyspace keyspace = Keyspace.open(proposal.update.metadata().keyspace); Token tk = proposal.update.partitionKey().getToken(); - List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk); - Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspace.getName()); + List<InetAddressAndPort> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk); + Collection<InetAddressAndPort> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspace.getName()); AbstractWriteResponseHandler<Commit> responseHandler = null; if (shouldBlock) @@ -602,7 +599,7 @@ public class StorageProxy implements StorageProxyMBean } MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer); - for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints)) + for (InetAddressAndPort destination : Iterables.concat(naturalEndpoints, pendingEndpoints)) { checkHintOverload(destination); @@ -658,7 +655,7 @@ public class StorageProxy implements StorageProxyMBean { if (!(ex instanceof WriteTimeoutException)) logger.error("Failed to apply paxos commit locally : ", ex); - responseHandler.onFailure(FBUtilities.getBroadcastAddress(), RequestFailureReason.UNKNOWN); + responseHandler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.UNKNOWN); } } @@ -684,7 +681,7 @@ public class StorageProxy implements StorageProxyMBean throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException { Tracing.trace("Determining replicas for mutation"); - final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); long startTime = System.nanoTime(); List<AbstractWriteResponseHandler<IMutation>> responseHandlers = new ArrayList<>(mutations.size()); @@ -781,13 +778,13 @@ public class StorageProxy implements StorageProxyMBean String keyspaceName = mutation.getKeyspaceName(); Token token = mutation.key().getToken(); - Iterable<InetAddress> endpoints = StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token); - ArrayList<InetAddress> endpointsToHint = new ArrayList<>(Iterables.size(endpoints)); + Iterable<InetAddressAndPort> endpoints = StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token); + ArrayList<InetAddressAndPort> endpointsToHint = new ArrayList<>(Iterables.size(endpoints)); // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and CASSANDRA-6510), // so there is no need to hint or retry. - for (InetAddress target : endpoints) - if (!target.equals(FBUtilities.getBroadcastAddress()) && shouldHint(target)) + for (InetAddressAndPort target : endpoints) + if (!target.equals(FBUtilities.getBroadcastAddressAndPort()) && shouldHint(target)) endpointsToHint.add(target); submitHint(mutation, endpointsToHint, null); @@ -797,7 +794,7 @@ public class StorageProxy implements StorageProxyMBean { String keyspaceName = mutation.getKeyspaceName(); Token token = mutation.key().getToken(); - InetAddress local = FBUtilities.getBroadcastAddress(); + InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); return StorageService.instance.getNaturalEndpoints(keyspaceName, token).contains(local) || StorageService.instance.getTokenMetadata().pendingEndpointsFor(token, keyspaceName).contains(local); @@ -816,7 +813,7 @@ public class StorageProxy implements StorageProxyMBean throws UnavailableException, OverloadedException, WriteTimeoutException { Tracing.trace("Determining replicas for mutation"); - final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); long startTime = System.nanoTime(); @@ -841,7 +838,7 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; //Since the base -> view replication is 1:1 we only need to store the BL locally - final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress()); + final Collection<InetAddressAndPort> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet @@ -849,8 +846,8 @@ public class StorageProxy implements StorageProxyMBean { String keyspaceName = mutation.getKeyspaceName(); Token tk = mutation.key().getToken(); - Optional<InetAddress> pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk); - Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); + Optional<InetAddressAndPort> pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk); + Collection<InetAddressAndPort> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); // if there are no paired endpoints there are probably range movements going on, so we write to the local batchlog to replay later if (!pairedEndpoint.isPresent()) @@ -865,7 +862,7 @@ public class StorageProxy implements StorageProxyMBean } // When local node is the paired endpoint just apply the mutation locally. - if (pairedEndpoint.get().equals(FBUtilities.getBroadcastAddress()) && StorageService.instance.isJoined()) + if (pairedEndpoint.get().equals(FBUtilities.getBroadcastAddressAndPort()) && StorageService.instance.isJoined()) { try { @@ -956,7 +953,7 @@ public class StorageProxy implements StorageProxyMBean long startTime = System.nanoTime(); List<WriteResponseHandlerWrapper> wrappers = new ArrayList<WriteResponseHandlerWrapper>(mutations.size()); - String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); try { @@ -974,7 +971,7 @@ public class StorageProxy implements StorageProxyMBean batchConsistencyLevel = consistency_level; } - final Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel); + final Collection<InetAddressAndPort> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel); final UUID batchUUID = UUIDGen.getTimeUUID(); BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); @@ -1029,16 +1026,16 @@ public class StorageProxy implements StorageProxyMBean } } - public static boolean canDoLocalRequest(InetAddress replica) + public static boolean canDoLocalRequest(InetAddressAndPort replica) { - return replica.equals(FBUtilities.getBroadcastAddress()); + return replica.equals(FBUtilities.getBroadcastAddressAndPort()); } - private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid, long queryStartNanoTime) + private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddressAndPort> endpoints, UUID uuid, long queryStartNanoTime) throws WriteTimeoutException, WriteFailureException { WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints, - Collections.<InetAddress>emptyList(), + Collections.emptyList(), endpoints.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO, Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME), null, @@ -1047,7 +1044,7 @@ public class StorageProxy implements StorageProxyMBean Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations); MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer); - for (InetAddress target : endpoints) + for (InetAddressAndPort target : endpoints) { logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size()); @@ -1059,10 +1056,10 @@ public class StorageProxy implements StorageProxyMBean handler.get(); } - private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid) + private static void asyncRemoveFromBatchlog(Collection<InetAddressAndPort> endpoints, UUID uuid) { MessageOut<UUID> message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer); - for (InetAddress target : endpoints) + for (InetAddressAndPort target : endpoints) { if (logger.isTraceEnabled()) logger.trace("Sending batchlog remove request {} to {}", uuid, target); @@ -1078,7 +1075,7 @@ public class StorageProxy implements StorageProxyMBean { for (WriteResponseHandlerWrapper wrapper : wrappers) { - Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints); + Iterable<InetAddressAndPort> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints); try { @@ -1086,7 +1083,7 @@ public class StorageProxy implements StorageProxyMBean } catch (OverloadedException | WriteTimeoutException e) { - wrapper.handler.onFailure(FBUtilities.getBroadcastAddress(), RequestFailureReason.UNKNOWN); + wrapper.handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.UNKNOWN); } } } @@ -1096,7 +1093,7 @@ public class StorageProxy implements StorageProxyMBean { for (WriteResponseHandlerWrapper wrapper : wrappers) { - Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints); + Iterable<InetAddressAndPort> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints); sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, stage); } @@ -1132,8 +1129,8 @@ public class StorageProxy implements StorageProxyMBean AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy(); Token tk = mutation.key().getToken(); - List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); - Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); + List<InetAddressAndPort> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); + Collection<InetAddressAndPort> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType, queryStartNanoTime); @@ -1156,8 +1153,8 @@ public class StorageProxy implements StorageProxyMBean AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); String keyspaceName = mutation.getKeyspaceName(); Token tk = mutation.key().getToken(); - List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); - Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); + List<InetAddressAndPort> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); + Collection<InetAddressAndPort> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType, queryStartNanoTime); BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime); return new WriteResponseHandlerWrapper(batchHandler, mutation); @@ -1170,7 +1167,7 @@ public class StorageProxy implements StorageProxyMBean private static WriteResponseHandlerWrapper wrapViewBatchResponseHandler(Mutation mutation, ConsistencyLevel consistency_level, ConsistencyLevel batchConsistencyLevel, - List<InetAddress> naturalEndpoints, + List<InetAddressAndPort> naturalEndpoints, AtomicLong baseComplete, WriteType writeType, BatchlogResponseHandler.BatchlogCleanup cleanup, @@ -1180,7 +1177,7 @@ public class StorageProxy implements StorageProxyMBean AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); String keyspaceName = mutation.getKeyspaceName(); Token tk = mutation.key().getToken(); - Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); + Collection<InetAddressAndPort> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, () -> { long delay = Math.max(0, System.currentTimeMillis() - baseComplete.get()); viewWriteMetrics.viewWriteLatency.update(delay, TimeUnit.MILLISECONDS); @@ -1209,18 +1206,18 @@ public class StorageProxy implements StorageProxyMBean * - choose min(2, number of qualifying candiates above) * - allow the local node to be the only replica only if it's a single-node DC */ - private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel) + private static Collection<InetAddressAndPort> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException { TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); - Multimap<String, InetAddress> localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); - String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress()); + Multimap<String, InetAddressAndPort> localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter)); + String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); - Collection<InetAddress> chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); + Collection<InetAddressAndPort> chosenEndpoints = new BatchlogManager.EndpointFilter(localRack, localEndpoints).filter(); if (chosenEndpoints.isEmpty()) { if (consistencyLevel == ConsistencyLevel.ANY) - return Collections.singleton(FBUtilities.getBroadcastAddress()); + return Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); throw new UnavailableException(ConsistencyLevel.ONE, 1, 0); } @@ -1246,7 +1243,7 @@ public class StorageProxy implements StorageProxyMBean * @throws OverloadedException if the hints cannot be written/enqueued */ public static void sendToHintedEndpoints(final Mutation mutation, - Iterable<InetAddress> targets, + Iterable<InetAddressAndPort> targets, AbstractWriteResponseHandler<IMutation> responseHandler, String localDataCenter, Stage stage) @@ -1255,18 +1252,18 @@ public class StorageProxy implements StorageProxyMBean int targetsSize = Iterables.size(targets); // this dc replicas: - Collection<InetAddress> localDc = null; + Collection<InetAddressAndPort> localDc = null; // extra-datacenter replicas, grouped by dc - Map<String, Collection<InetAddress>> dcGroups = null; + Map<String, Collection<InetAddressAndPort>> dcGroups = null; // only need to create a Message for non-local writes MessageOut<Mutation> message = null; boolean insertLocal = false; - ArrayList<InetAddress> endpointsToHint = null; + ArrayList<InetAddressAndPort> endpointsToHint = null; - List<InetAddress> backPressureHosts = null; + List<InetAddressAndPort> backPressureHosts = null; - for (InetAddress destination : targets) + for (InetAddressAndPort destination : targets) { checkHintOverload(destination); @@ -1295,7 +1292,7 @@ public class StorageProxy implements StorageProxyMBean } else { - Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null; + Collection<InetAddressAndPort> messages = (dcGroups != null) ? dcGroups.get(dc) : null; if (messages == null) { messages = new ArrayList<>(3); // most DCs will have <= 3 replicas @@ -1338,18 +1335,18 @@ public class StorageProxy implements StorageProxyMBean if (localDc != null) { - for (InetAddress destination : localDc) + for (InetAddressAndPort destination : localDc) MessagingService.instance().sendRR(message, destination, responseHandler, true); } if (dcGroups != null) { // for each datacenter, send the message to one node to relay the write to other replicas - for (Collection<InetAddress> dcTargets : dcGroups.values()) + for (Collection<InetAddressAndPort> dcTargets : dcGroups.values()) sendMessagesToNonlocalDC(message, dcTargets, responseHandler); } } - private static void checkHintOverload(InetAddress destination) + private static void checkHintOverload(InetAddressAndPort destination) { // avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead. @@ -1366,39 +1363,31 @@ public class StorageProxy implements StorageProxyMBean } private static void sendMessagesToNonlocalDC(MessageOut<? extends IMutation> message, - Collection<InetAddress> targets, + Collection<InetAddressAndPort> targets, AbstractWriteResponseHandler<IMutation> handler) { - Iterator<InetAddress> iter = targets.iterator(); - InetAddress target = iter.next(); + Iterator<InetAddressAndPort> iter = targets.iterator(); + int[] messageIds = new int[targets.size()]; + InetAddressAndPort target = iter.next(); + int idIdx = 0; // Add the other destinations of the same message as a FORWARD_HEADER entry - try(DataOutputBuffer out = new DataOutputBuffer()) - { - out.writeInt(targets.size() - 1); - while (iter.hasNext()) - { - InetAddress destination = iter.next(); - CompactEndpointSerializationHelper.serialize(destination, out); - int id = MessagingService.instance().addCallback(handler, - message, - destination, - message.getTimeout(), - handler.consistencyLevel, - true); - out.writeInt(id); - logger.trace("Adding FWD message to {}@{}", id, destination); - } - message = message.withParameter(Mutation.FORWARD_TO, out.getData()); - // send the combined message + forward headers - int id = MessagingService.instance().sendRR(message, target, handler, true); - logger.trace("Sending message to {}@{}", id, target); - } - catch (IOException e) - { - // DataOutputBuffer is in-memory, doesn't throw IOException - throw new AssertionError(e); + while (iter.hasNext()) + { + InetAddressAndPort destination = iter.next(); + int id = MessagingService.instance().addCallback(handler, + message, + destination, + message.getTimeout(), + handler.consistencyLevel, + true); + messageIds[idIdx++] = id; + logger.trace("Adding FWD message to {}@{}", id, destination); } + message = message.withParameter(ParameterType.FORWARD_TO.FORWARD_TO, new ForwardToContainer(targets, messageIds)); + // send the combined message + forward headers + int id = MessagingService.instance().sendRR(message, target, handler, true); + logger.trace("Sending message to {}@{}", id, target); } private static void performLocally(Stage stage, final Runnable runnable) @@ -1440,7 +1429,7 @@ public class StorageProxy implements StorageProxyMBean { if (!(ex instanceof WriteTimeoutException)) logger.error("Failed to apply mutation locally : ", ex); - handler.onFailure(FBUtilities.getBroadcastAddress(), RequestFailureReason.UNKNOWN); + handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.UNKNOWN); } } @@ -1468,9 +1457,9 @@ public class StorageProxy implements StorageProxyMBean */ public static AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter, long queryStartNanoTime) throws UnavailableException, OverloadedException { - InetAddress endpoint = findSuitableEndpoint(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency()); + InetAddressAndPort endpoint = findSuitableEndpoint(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency()); - if (endpoint.equals(FBUtilities.getBroadcastAddress())) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) { return applyCounterMutationOnCoordinator(cm, localDataCenter, queryStartNanoTime); } @@ -1480,8 +1469,8 @@ public class StorageProxy implements StorageProxyMBean String keyspaceName = cm.getKeyspaceName(); AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy(); Token tk = cm.key().getToken(); - List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); - Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); + List<InetAddressAndPort> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); + Collection<InetAddressAndPort> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER, queryStartNanoTime).assureSufficientLiveNodes(); @@ -1504,11 +1493,11 @@ public class StorageProxy implements StorageProxyMBean * is unclear we want to mix those latencies with read latencies, so this * may be a bit involved. */ - private static InetAddress findSuitableEndpoint(String keyspaceName, DecoratedKey key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException + private static InetAddressAndPort findSuitableEndpoint(String keyspaceName, DecoratedKey key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException { Keyspace keyspace = Keyspace.open(keyspaceName); IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - List<InetAddress> endpoints = new ArrayList<>(); + List<InetAddressAndPort> endpoints = new ArrayList<>(); StorageService.instance.getLiveNaturalEndpoints(keyspace, key, endpoints); // CASSANDRA-13043: filter out those endpoints not accepting clients yet, maybe because still bootstrapping @@ -1518,9 +1507,9 @@ public class StorageProxy implements StorageProxyMBean if (endpoints.isEmpty()) throw new UnavailableException(cl, cl.blockFor(keyspace), 0); - List<InetAddress> localEndpoints = new ArrayList<>(endpoints.size()); + List<InetAddressAndPort> localEndpoints = new ArrayList<>(endpoints.size()); - for (InetAddress endpoint : endpoints) + for (InetAddressAndPort endpoint : endpoints) if (snitch.getDatacenter(endpoint).equals(localDataCenter)) localEndpoints.add(endpoint); @@ -1531,7 +1520,7 @@ public class StorageProxy implements StorageProxyMBean throw new UnavailableException(cl, cl.blockFor(keyspace), 0); // No endpoint in local DC, pick the closest endpoint according to the snitch - snitch.sortByProximity(FBUtilities.getBroadcastAddress(), endpoints); + snitch.sortByProximity(FBUtilities.getBroadcastAddressAndPort(), endpoints); return endpoints.get(0); } @@ -1555,7 +1544,7 @@ public class StorageProxy implements StorageProxyMBean } private static Runnable counterWriteTask(final IMutation mutation, - final Iterable<InetAddress> targets, + final Iterable<InetAddressAndPort> targets, final AbstractWriteResponseHandler<IMutation> responseHandler, final String localDataCenter) { @@ -1569,8 +1558,8 @@ public class StorageProxy implements StorageProxyMBean Mutation result = ((CounterMutation) mutation).applyCounterMutation(); responseHandler.response(null); - Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), - ImmutableSet.of(FBUtilities.getBroadcastAddress())); + Set<InetAddressAndPort> remotes = Sets.difference(ImmutableSet.copyOf(targets), + ImmutableSet.of(FBUtilities.getBroadcastAddressAndPort())); if (!remotes.isEmpty()) sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter, Stage.COUNTER_MUTATION); } @@ -1640,8 +1629,8 @@ public class StorageProxy implements StorageProxyMBean try { // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read - Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyLevel); - List<InetAddress> liveEndpoints = p.left; + Pair<List<InetAddressAndPort>, Integer> p = getPaxosParticipants(metadata, key, consistencyLevel); + List<InetAddressAndPort> liveEndpoints = p.left; int requiredParticipants = p.right; // does the work of applying in-progress writes; throws UAE or timeout if it can't @@ -1844,7 +1833,7 @@ public class StorageProxy implements StorageProxyMBean executor.handler.endpoints, queryStartNanoTime); - for (InetAddress endpoint : executor.getContactedReplicas()) + for (InetAddressAndPort endpoint : executor.getContactedReplicas()) { Tracing.trace("Enqueuing full data read to {}", endpoint); MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, repairHandler); @@ -1920,47 +1909,47 @@ public class StorageProxy implements StorageProxyMBean else { MessagingService.instance().incrementDroppedMessages(verb, System.currentTimeMillis() - constructionTime); - handler.onFailure(FBUtilities.getBroadcastAddress(), RequestFailureReason.UNKNOWN); + handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.UNKNOWN); } - MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + MessagingService.instance().addLatency(FBUtilities.getBroadcastAddressAndPort(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); } catch (Throwable t) { if (t instanceof TombstoneOverwhelmingException) { - handler.onFailure(FBUtilities.getBroadcastAddress(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); + handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); logger.error(t.getMessage()); } else { - handler.onFailure(FBUtilities.getBroadcastAddress(), RequestFailureReason.UNKNOWN); + handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.UNKNOWN); throw t; } } } } - public static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, ByteBuffer key) + public static List<InetAddressAndPort> getLiveSortedEndpoints(Keyspace keyspace, ByteBuffer key) { return getLiveSortedEndpoints(keyspace, StorageService.instance.getTokenMetadata().decorateKey(key)); } - public static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, RingPosition pos) + public static List<InetAddressAndPort> getLiveSortedEndpoints(Keyspace keyspace, RingPosition pos) { - List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, pos); - DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints); + List<InetAddressAndPort> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, pos); + DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddressAndPort(), liveEndpoints); return liveEndpoints; } - private static List<InetAddress> intersection(List<InetAddress> l1, List<InetAddress> l2) + private static List<InetAddressAndPort> intersection(List<InetAddressAndPort> l1, List<InetAddressAndPort> l2) { // Note: we don't use Guava Sets.intersection() for 3 reasons: // 1) retainAll would be inefficient if l1 and l2 are large but in practice both are the replicas for a range and // so will be very small (< RF). In that case, retainAll is in fact more efficient. // 2) we do ultimately need a list so converting everything to sets don't make sense // 3) l1 and l2 are sorted by proximity. The use of retainAll maintain that sorting in the result, while using sets wouldn't. - List<InetAddress> inter = new ArrayList<InetAddress>(l1); + List<InetAddressAndPort> inter = new ArrayList<>(l1); inter.retainAll(l2); return inter; } @@ -1986,10 +1975,10 @@ public class StorageProxy implements StorageProxyMBean private static class RangeForQuery { public final AbstractBounds<PartitionPosition> range; - public final List<InetAddress> liveEndpoints; - public final List<InetAddress> filteredEndpoints; + public final List<InetAddressAndPort> liveEndpoints; + public final List<InetAddressAndPort> filteredEndpoints; - public RangeForQuery(AbstractBounds<PartitionPosition> range, List<InetAddress> liveEndpoints, List<InetAddress> filteredEndpoints) + public RangeForQuery(AbstractBounds<PartitionPosition> range, List<InetAddressAndPort> liveEndpoints, List<InetAddressAndPort> filteredEndpoints) { this.range = range; this.liveEndpoints = liveEndpoints; @@ -2027,7 +2016,7 @@ public class StorageProxy implements StorageProxyMBean return endOfData(); AbstractBounds<PartitionPosition> range = ranges.next(); - List<InetAddress> liveEndpoints = getLiveSortedEndpoints(keyspace, range.right); + List<InetAddressAndPort> liveEndpoints = getLiveSortedEndpoints(keyspace, range.right); return new RangeForQuery(range, liveEndpoints, consistency.filterForQuery(keyspace, liveEndpoints)); @@ -2069,13 +2058,13 @@ public class StorageProxy implements StorageProxyMBean RangeForQuery next = ranges.peek(); - List<InetAddress> merged = intersection(current.liveEndpoints, next.liveEndpoints); + List<InetAddressAndPort> merged = intersection(current.liveEndpoints, next.liveEndpoints); // Check if there is enough endpoint for the merge to be possible. if (!consistency.isSufficientLiveNodes(keyspace, merged)) break; - List<InetAddress> filteredMerged = consistency.filterForQuery(keyspace, merged); + List<InetAddressAndPort> filteredMerged = consistency.filterForQuery(keyspace, merged); // Estimate whether merging will be a win or not if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, current.filteredEndpoints, next.filteredEndpoints)) @@ -2237,7 +2226,7 @@ public class StorageProxy implements StorageProxyMBean int blockFor = consistency.blockFor(keyspace); int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor); - List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses); + List<InetAddressAndPort> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses); ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints, queryStartNanoTime); handler.assureSufficientLiveNodes(); @@ -2248,7 +2237,7 @@ public class StorageProxy implements StorageProxyMBean } else { - for (InetAddress endpoint : toQuery.filteredEndpoints) + for (InetAddressAndPort endpoint : toQuery.filteredEndpoints) { Tracing.trace("Enqueuing request to {}", endpoint); MessagingService.instance().sendRRWithFailure(rangeCommand.createMessage(), endpoint, handler); @@ -2320,7 +2309,12 @@ public class StorageProxy implements StorageProxyMBean public Map<String, List<String>> getSchemaVersions() { - return describeSchemaVersions(); + return describeSchemaVersions(false); + } + + public Map<String, List<String>> getSchemaVersionsWithPort() + { + return describeSchemaVersions(true); } /** @@ -2328,11 +2322,11 @@ public class StorageProxy implements StorageProxyMBean * migration id. This is useful for determining if a schema change has propagated through the cluster. Disagreement * is assumed if any node fails to respond. */ - public static Map<String, List<String>> describeSchemaVersions() + public static Map<String, List<String>> describeSchemaVersions(boolean withPort) { final String myVersion = Schema.instance.getVersion().toString(); - final Map<InetAddress, UUID> versions = new ConcurrentHashMap<InetAddress, UUID>(); - final Set<InetAddress> liveHosts = Gossiper.instance.getLiveMembers(); + final Map<InetAddressAndPort, UUID> versions = new ConcurrentHashMap<>(); + final Set<InetAddressAndPort> liveHosts = Gossiper.instance.getLiveMembers(); final CountDownLatch latch = new CountDownLatch(liveHosts.size()); IAsyncCallback<UUID> cb = new IAsyncCallback<UUID>() @@ -2351,7 +2345,7 @@ public class StorageProxy implements StorageProxyMBean }; // an empty message acts as a request to the SchemaVersionVerbHandler. MessageOut message = new MessageOut(MessagingService.Verb.SCHEMA_CHECK); - for (InetAddress endpoint : liveHosts) + for (InetAddressAndPort endpoint : liveHosts) MessagingService.instance().sendRR(message, endpoint, cb); try @@ -2366,8 +2360,8 @@ public class StorageProxy implements StorageProxyMBean // maps versions to hosts that are on that version. Map<String, List<String>> results = new HashMap<String, List<String>>(); - Iterable<InetAddress> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers()); - for (InetAddress host : allHosts) + Iterable<InetAddressAndPort> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers()); + for (InetAddressAndPort host : allHosts) { UUID version = versions.get(host); String stringVersion = version == null ? UNREACHABLE : version.toString(); @@ -2377,7 +2371,7 @@ public class StorageProxy implements StorageProxyMBean hosts = new ArrayList<String>(); results.put(stringVersion, hosts); } - hosts.add(host.getHostAddress()); + hosts.add(host.getHostAddress(withPort)); } // we're done: the results map is ready to return to the client. the rest is just debug logging: @@ -2485,7 +2479,7 @@ public class StorageProxy implements StorageProxyMBean DatabaseDescriptor.setMaxHintWindow(ms); } - public static boolean shouldHint(InetAddress ep) + public static boolean shouldHint(InetAddressAndPort ep) { if (DatabaseDescriptor.hintedHandoffEnabled()) { @@ -2534,7 +2528,7 @@ public class StorageProxy implements StorageProxyMBean throw new UnavailableException(ConsistencyLevel.ALL, liveMembers + Gossiper.instance.getUnreachableMembers().size(), liveMembers); } - Set<InetAddress> allEndpoints = StorageService.instance.getLiveRingMembers(true); + Set<InetAddressAndPort> allEndpoints = StorageService.instance.getLiveRingMembers(true); int blockFor = allEndpoints.size(); final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor); @@ -2543,7 +2537,7 @@ public class StorageProxy implements StorageProxyMBean Tracing.trace("Enqueuing truncate messages to hosts {}", allEndpoints); final Truncation truncation = new Truncation(keyspace, cfname); MessageOut<Truncation> message = truncation.createMessage(); - for (InetAddress endpoint : allEndpoints) + for (InetAddressAndPort endpoint : allEndpoints) MessagingService.instance().sendRR(message, endpoint, responseHandler); // Wait for all @@ -2570,7 +2564,7 @@ public class StorageProxy implements StorageProxyMBean public interface WritePerformer { public void apply(IMutation mutation, - Iterable<InetAddress> targets, + Iterable<InetAddressAndPort> targets, AbstractWriteResponseHandler<IMutation> responseHandler, String localDataCenter, ConsistencyLevel consistencyLevel) throws OverloadedException; @@ -2658,7 +2652,7 @@ public class StorageProxy implements StorageProxyMBean { if (MessagingService.DROPPABLE_VERBS.contains(verb)) MessagingService.instance().incrementDroppedMutations(mutationOpt, timeTaken); - HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress())) + HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddressAndPort())) { protected void runMayThrow() throws Exception { @@ -2689,9 +2683,9 @@ public class StorageProxy implements StorageProxyMBean */ private abstract static class HintRunnable implements Runnable { - public final Collection<InetAddress> targets; + public final Collection<InetAddressAndPort> targets; - protected HintRunnable(Collection<InetAddress> targets) + protected HintRunnable(Collection<InetAddressAndPort> targets) { this.targets = targets; } @@ -2709,7 +2703,7 @@ public class StorageProxy implements StorageProxyMBean finally { StorageMetrics.totalHintsInProgress.dec(targets.size()); - for (InetAddress target : targets) + for (InetAddressAndPort target : targets) getHintsInProgressFor(target).decrementAndGet(); } } @@ -2743,7 +2737,7 @@ public class StorageProxy implements StorageProxyMBean logger.warn("Some hints were not written before shutdown. This is not supposed to happen. You should (a) run repair, and (b) file a bug report"); } - private static AtomicInteger getHintsInProgressFor(InetAddress destination) + private static AtomicInteger getHintsInProgressFor(InetAddressAndPort destination) { try { @@ -2755,22 +2749,22 @@ public class StorageProxy implements StorageProxyMBean } } - public static Future<Void> submitHint(Mutation mutation, InetAddress target, AbstractWriteResponseHandler<IMutation> responseHandler) + public static Future<Void> submitHint(Mutation mutation, InetAddressAndPort target, AbstractWriteResponseHandler<IMutation> responseHandler) { return submitHint(mutation, Collections.singleton(target), responseHandler); } public static Future<Void> submitHint(Mutation mutation, - Collection<InetAddress> targets, + Collection<InetAddressAndPort> targets, AbstractWriteResponseHandler<IMutation> responseHandler) { HintRunnable runnable = new HintRunnable(targets) { public void runMayThrow() { - Set<InetAddress> validTargets = new HashSet<>(targets.size()); + Set<InetAddressAndPort> validTargets = new HashSet<>(targets.size()); Set<UUID> hostIds = new HashSet<>(targets.size()); - for (InetAddress target : targets) + for (InetAddressAndPort target : targets) { UUID hostId = StorageService.instance.getHostIdForEndpoint(target); if (hostId != null) @@ -2796,7 +2790,7 @@ public class StorageProxy implements StorageProxyMBean private static Future<Void> submitHint(HintRunnable runnable) { StorageMetrics.totalHintsInProgress.inc(runnable.targets.size()); - for (InetAddress target : runnable.targets) + for (InetAddressAndPort target : runnable.targets) getHintsInProgressFor(target).incrementAndGet(); return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/StorageProxyMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java index 173d43f..76a6617 100644 --- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java +++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java @@ -65,7 +65,8 @@ public interface StorageProxyMBean public void setOtcBacklogExpirationInterval(int intervalInMillis); /** Returns each live node's schema version */ - public Map<String, List<String>> getSchemaVersions(); + @Deprecated public Map<String, List<String>> getSchemaVersions(); + public Map<String, List<String>> getSchemaVersionsWithPort(); public int getNumberOfTables(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org