http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 48e1b2f..29b45b4 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -37,7 +37,8 @@ public interface StorageServiceMBean extends NotificationEmitter * * @return set of IP addresses, as Strings */ - public List<String> getLiveNodes(); + @Deprecated public List<String> getLiveNodes(); + public List<String> getLiveNodesWithPort(); /** * Retrieve the list of unreachable nodes in the cluster, as determined @@ -45,28 +46,32 @@ public interface StorageServiceMBean extends NotificationEmitter * * @return set of IP addresses, as Strings */ - public List<String> getUnreachableNodes(); + @Deprecated public List<String> getUnreachableNodes(); + public List<String> getUnreachableNodesWithPort(); /** * Retrieve the list of nodes currently bootstrapping into the ring. * * @return set of IP addresses, as Strings */ - public List<String> getJoiningNodes(); + @Deprecated public List<String> getJoiningNodes(); + public List<String> getJoiningNodesWithPort(); /** * Retrieve the list of nodes currently leaving the ring. * * @return set of IP addresses, as Strings */ - public List<String> getLeavingNodes(); + @Deprecated public List<String> getLeavingNodes(); + public List<String> getLeavingNodesWithPort(); /** * Retrieve the list of nodes currently moving in the ring. * * @return set of IP addresses, as Strings */ - public List<String> getMovingNodes(); + @Deprecated public List<String> getMovingNodes(); + public List<String> getMovingNodesWithPort(); /** * Fetch string representations of the tokens for this node. @@ -120,7 +125,8 @@ public interface StorageServiceMBean extends NotificationEmitter * * @return mapping of ranges to end points */ - public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace); + @Deprecated public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace); + public Map<List<String>, List<String>> getRangeToEndpointWithPortMap(String keyspace); /** * Retrieve a map of range to rpc addresses that describe the ring topology @@ -128,7 +134,8 @@ public interface StorageServiceMBean extends NotificationEmitter * * @return mapping of ranges to rpc addresses */ - public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace); + @Deprecated public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace); + public Map<List<String>, List<String>> getRangeToNativeaddressWithPortMap(String keyspace); /** * The same as {@code describeRing(String)} but converts TokenRange to the String for JMX compatibility @@ -137,14 +144,16 @@ public interface StorageServiceMBean extends NotificationEmitter * * @return a List of TokenRange(s) converted to String for the given keyspace */ - public List <String> describeRingJMX(String keyspace) throws IOException; + @Deprecated public List <String> describeRingJMX(String keyspace) throws IOException; + public List<String> describeRingWithPortJMX(String keyspace) throws IOException; /** * Retrieve a map of pending ranges to endpoints that describe the ring topology * @param keyspace the keyspace to get the pending range map for. * @return a map of pending ranges to endpoints */ - public Map<List<String>, List<String>> getPendingRangeToEndpointMap(String keyspace); + @Deprecated public Map<List<String>, List<String>> getPendingRangeToEndpointMap(String keyspace); + public Map<List<String>, List<String>> getPendingRangeToEndpointWithPortMap(String keyspace); /** * Retrieve a map of tokens to endpoints, including the bootstrapping @@ -152,7 +161,8 @@ public interface StorageServiceMBean extends NotificationEmitter * * @return a map of tokens to endpoints in ascending order */ - public Map<String, String> getTokenToEndpointMap(); + @Deprecated public Map<String, String> getTokenToEndpointMap(); + public Map<String, String> getTokenToEndpointWithPortMap(); /** Retrieve this hosts unique ID */ public String getLocalHostId(); @@ -162,16 +172,19 @@ public interface StorageServiceMBean extends NotificationEmitter public Map<String, String> getHostIdMap(); /** Retrieve the mapping of endpoint to host ID */ - public Map<String, String> getEndpointToHostId(); + @Deprecated public Map<String, String> getEndpointToHostId(); + public Map<String, String> getEndpointWithPortToHostId(); /** Retrieve the mapping of host ID to endpoint */ - public Map<String, String> getHostIdToEndpoint(); + @Deprecated public Map<String, String> getHostIdToEndpoint(); + public Map<String, String> getHostIdToEndpointWithPort(); /** Human-readable load value */ public String getLoadString(); /** Human-readable load value. Keys are IP addresses. */ - public Map<String, String> getLoadMap(); + @Deprecated public Map<String, String> getLoadMap(); + public Map<String, String> getLoadMapWithPort(); /** * Return the generation value for this node. @@ -189,8 +202,10 @@ public interface StorageServiceMBean extends NotificationEmitter * @param key - key for which we need to find the endpoint return value - * the endpoint responsible for this key */ - public List<InetAddress> getNaturalEndpoints(String keyspaceName, String cf, String key); - public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key); + @Deprecated public List<InetAddress> getNaturalEndpoints(String keyspaceName, String cf, String key); + public List<String> getNaturalEndpointsWithPort(String keyspaceName, String cf, String key); + @Deprecated public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key); + public List<String> getNaturalEndpointsWithPort(String keysapceName, ByteBuffer key); /** * @deprecated use {@link #takeSnapshot(String tag, Map options, String... entities)} instead. @@ -353,7 +368,8 @@ public interface StorageServiceMBean extends NotificationEmitter /** * Get the status of a token removal. */ - public String getRemovalStatus(); + @Deprecated public String getRemovalStatus(); + public String getRemovalStatusWithPort(); /** * Force a remove operation to finish. @@ -408,7 +424,8 @@ public interface StorageServiceMBean extends NotificationEmitter * given a list of tokens (representing the nodes in the cluster), returns * a mapping from {@code "token -> %age of cluster owned by that token"} */ - public Map<InetAddress, Float> getOwnership(); + @Deprecated public Map<InetAddress, Float> getOwnership(); + public Map<String, Float> getOwnershipWithPort(); /** * Effective ownership is % of the data each node owns given the keyspace @@ -417,7 +434,8 @@ public interface StorageServiceMBean extends NotificationEmitter * in the cluster have the same replication strategies and if yes then we will * use the first else a empty Map is returned. */ - public Map<InetAddress, Float> effectiveOwnership(String keyspace) throws IllegalStateException; + @Deprecated public Map<InetAddress, Float> effectiveOwnership(String keyspace) throws IllegalStateException; + public Map<String, Float> effectiveOwnershipWithPort(String keyspace) throws IllegalStateException; public List<String> getKeyspaces(); @@ -425,7 +443,8 @@ public interface StorageServiceMBean extends NotificationEmitter public List<String> getNonLocalStrategyKeyspaces(); - public Map<String, String> getViewBuildStatuses(String keyspace, String view); + @Deprecated public Map<String, String> getViewBuildStatuses(String keyspace, String view); + public Map<String, String> getViewBuildStatusesWithPort(String keyspace, String view); /** * Change endpointsnitch class and dynamic-ness (and dynamic attributes) at runtime.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/TokenRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/TokenRange.java b/src/java/org/apache/cassandra/service/TokenRange.java index 0e46910..a1f9aee 100644 --- a/src/java/org/apache/cassandra/service/TokenRange.java +++ b/src/java/org/apache/cassandra/service/TokenRange.java @@ -17,13 +17,13 @@ */ package org.apache.cassandra.service; -import java.net.InetAddress; import java.util.*; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; /** * Holds token range informations for the sake of {@link StorageService#describeRing}. @@ -54,13 +54,13 @@ public class TokenRange return tokenFactory.toString(tk); } - public static TokenRange create(Token.TokenFactory tokenFactory, Range<Token> range, List<InetAddress> endpoints) + public static TokenRange create(Token.TokenFactory tokenFactory, Range<Token> range, List<InetAddressAndPort> endpoints, boolean withPorts) { List<EndpointDetails> details = new ArrayList<>(endpoints.size()); IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - for (InetAddress ep : endpoints) + for (InetAddressAndPort ep : endpoints) details.add(new EndpointDetails(ep, - StorageService.instance.getRpcaddress(ep), + StorageService.instance.getNativeaddress(ep, withPorts), snitch.getDatacenter(ep), snitch.getRack(ep))); return new TokenRange(tokenFactory, range, details); @@ -69,6 +69,11 @@ public class TokenRange @Override public String toString() { + return toString(false); + } + + public String toString(boolean withPorts) + { StringBuilder sb = new StringBuilder("TokenRange("); sb.append("start_token:").append(toStr(range.left)); @@ -76,33 +81,43 @@ public class TokenRange List<String> hosts = new ArrayList<>(endpoints.size()); List<String> rpcs = new ArrayList<>(endpoints.size()); + List<String> endpointDetails = new ArrayList<>(endpoints.size()); for (EndpointDetails ep : endpoints) { - hosts.add(ep.host.getHostAddress()); - rpcs.add(ep.rpcAddress); + hosts.add(ep.host.getHostAddress(withPorts)); + rpcs.add(ep.nativeAddress); + endpointDetails.add(ep.toString(withPorts)); } - sb.append("endpoints:").append(hosts); - sb.append("rpc_endpoints:").append(rpcs); - sb.append("endpoint_details:").append(endpoints); - + if (withPorts) + { + sb.append(", endpoints:").append(hosts); + sb.append(", rpc_endpoints:").append(rpcs); + sb.append(", endpoint_details:").append(endpointDetails); + } + else + { + sb.append("endpoints:").append(hosts); + sb.append("rpc_endpoints:").append(rpcs); + sb.append("endpoint_details:").append(endpointDetails); + } sb.append(")"); return sb.toString(); } public static class EndpointDetails { - public final InetAddress host; - public final String rpcAddress; + public final InetAddressAndPort host; + public final String nativeAddress; public final String datacenter; public final String rack; - private EndpointDetails(InetAddress host, String rpcAddress, String datacenter, String rack) + private EndpointDetails(InetAddressAndPort host, String nativeAddress, String datacenter, String rack) { // dc and rack can be null, but host shouldn't assert host != null; this.host = host; - this.rpcAddress = rpcAddress; + this.nativeAddress = nativeAddress; this.datacenter = datacenter; this.rack = rack; } @@ -110,10 +125,15 @@ public class TokenRange @Override public String toString() { + return toString(false); + } + + public String toString(boolean withPorts) + { // Format matters for backward compatibility with describeRing() String dcStr = datacenter == null ? "" : String.format(", datacenter:%s", datacenter); String rackStr = rack == null ? "" : String.format(", rack:%s", rack); - return String.format("EndpointDetails(host:%s%s%s)", host.getHostAddress(), dcStr, rackStr); + return String.format("EndpointDetails(host:%s%s%s)", host.getHostAddress(withPorts), dcStr, rackStr); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/WriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index 55ca5aa..65efeff 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.service; -import java.net.InetAddress; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -27,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.WriteType; @@ -42,8 +42,8 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T> private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses"); - public WriteResponseHandler(Collection<InetAddress> writeEndpoints, - Collection<InetAddress> pendingEndpoints, + public WriteResponseHandler(Collection<InetAddressAndPort> writeEndpoints, + Collection<InetAddressAndPort> pendingEndpoints, ConsistencyLevel consistencyLevel, Keyspace keyspace, Runnable callback, @@ -54,12 +54,12 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T> responses = totalBlockFor(); } - public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback, long queryStartNanoTime) + public WriteResponseHandler(InetAddressAndPort endpoint, WriteType writeType, Runnable callback, long queryStartNanoTime) { - this(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, null, callback, writeType, queryStartNanoTime); + this(Arrays.asList(endpoint), Collections.<InetAddressAndPort>emptyList(), ConsistencyLevel.ONE, null, callback, writeType, queryStartNanoTime); } - public WriteResponseHandler(InetAddress endpoint, WriteType writeType, long queryStartNanoTime) + public WriteResponseHandler(InetAddressAndPort endpoint, WriteType writeType, long queryStartNanoTime) { this(endpoint, writeType, null, queryStartNanoTime); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java index 381c498..ed70e96 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java @@ -21,7 +21,6 @@ package org.apache.cassandra.service.paxos; */ -import java.net.InetAddress; import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -29,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; @@ -48,7 +48,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> public Commit mostRecentInProgressCommit; public Commit mostRecentInProgressCommitWithUpdate; - private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>(); + private final Map<InetAddressAndPort, Commit> commitsByReplica = new ConcurrentHashMap<>(); public PrepareCallback(DecoratedKey key, TableMetadata metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime) { @@ -90,7 +90,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> latch.countDown(); } - public Iterable<InetAddress> replicasMissingMostRecentCommit(TableMetadata metadata, int nowInSec) + public Iterable<InetAddressAndPort> replicasMissingMostRecentCommit(TableMetadata metadata, int nowInSec) { // In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see // coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes @@ -105,9 +105,9 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> if (UUIDGen.unixTimestampInSec(mostRecentCommit.ballot) + paxosTtlSec < nowInSec) return Collections.emptySet(); - return Iterables.filter(commitsByReplica.keySet(), new Predicate<InetAddress>() + return Iterables.filter(commitsByReplica.keySet(), new Predicate<InetAddressAndPort>() { - public boolean apply(InetAddress inetAddress) + public boolean apply(InetAddressAndPort inetAddress) { return (!commitsByReplica.get(inetAddress).ballot.equals(mostRecentCommit.ballot)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/ProgressInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java index fdd3e97..2334599 100644 --- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java +++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java @@ -18,10 +18,11 @@ package org.apache.cassandra.streaming; import java.io.Serializable; -import java.net.InetAddress; import com.google.common.base.Objects; +import org.apache.cassandra.locator.InetAddressAndPort; + /** * ProgressInfo contains file transfer progress. */ @@ -48,14 +49,14 @@ public class ProgressInfo implements Serializable } } - public final InetAddress peer; + public final InetAddressAndPort peer; public final int sessionIndex; public final String fileName; public final Direction direction; public final long currentBytes; public final long totalBytes; - public ProgressInfo(InetAddress peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes) + public ProgressInfo(InetAddressAndPort peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes) { assert totalBytes > 0; @@ -102,13 +103,18 @@ public class ProgressInfo implements Serializable @Override public String toString() { + return toString(false); + } + + public String toString(boolean withPorts) + { StringBuilder sb = new StringBuilder(fileName); sb.append(" ").append(currentBytes); sb.append("/").append(totalBytes).append(" bytes"); sb.append("(").append(currentBytes*100/totalBytes).append("%) "); sb.append(direction == Direction.OUT ? "sent to " : "received from "); sb.append("idx:").append(sessionIndex); - sb.append(peer); + sb.append(peer.toString(withPorts)); return sb.toString(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/SessionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java index 1521614..bbca753 100644 --- a/src/java/org/apache/cassandra/streaming/SessionInfo.java +++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java @@ -18,7 +18,6 @@ package org.apache.cassandra.streaming; import java.io.Serializable; -import java.net.InetAddress; import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -27,6 +26,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.FBUtilities; /** @@ -34,9 +34,9 @@ import org.apache.cassandra.utils.FBUtilities; */ public final class SessionInfo implements Serializable { - public final InetAddress peer; + public final InetAddressAndPort peer; public final int sessionIndex; - public final InetAddress connecting; + public final InetAddressAndPort connecting; /** Immutable collection of receiving summaries */ public final Collection<StreamSummary> receivingSummaries; /** Immutable collection of sending summaries*/ @@ -47,9 +47,9 @@ public final class SessionInfo implements Serializable private final Map<String, ProgressInfo> receivingFiles; private final Map<String, ProgressInfo> sendingFiles; - public SessionInfo(InetAddress peer, + public SessionInfo(InetAddressAndPort peer, int sessionIndex, - InetAddress connecting, + InetAddressAndPort connecting, Collection<StreamSummary> receivingSummaries, Collection<StreamSummary> sendingSummaries, StreamSession.State state) @@ -195,6 +195,6 @@ public final class SessionInfo implements Serializable public SessionSummary createSummary() { - return new SessionSummary(FBUtilities.getBroadcastAddress(), peer, receivingSummaries, sendingSummaries); + return new SessionSummary(FBUtilities.getBroadcastAddressAndPort(), peer, receivingSummaries, sendingSummaries); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/SessionSummary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/SessionSummary.java b/src/java/org/apache/cassandra/streaming/SessionSummary.java index d52c2ca..cf63a57 100644 --- a/src/java/org/apache/cassandra/streaming/SessionSummary.java +++ b/src/java/org/apache/cassandra/streaming/SessionSummary.java @@ -19,7 +19,6 @@ package org.apache.cassandra.streaming; import java.io.IOException; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -28,19 +27,19 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.serializers.InetAddressSerializer; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.CompactEndpointSerializationHelper; public class SessionSummary { - public final InetAddress coordinator; - public final InetAddress peer; + public final InetAddressAndPort coordinator; + public final InetAddressAndPort peer; /** Immutable collection of receiving summaries */ public final Collection<StreamSummary> receivingSummaries; /** Immutable collection of sending summaries*/ public final Collection<StreamSummary> sendingSummaries; - public SessionSummary(InetAddress coordinator, InetAddress peer, + public SessionSummary(InetAddressAndPort coordinator, InetAddressAndPort peer, Collection<StreamSummary> receivingSummaries, Collection<StreamSummary> sendingSummaries) { @@ -81,8 +80,8 @@ public class SessionSummary { public void serialize(SessionSummary summary, DataOutputPlus out, int version) throws IOException { - ByteBufferUtil.writeWithLength(InetAddressSerializer.instance.serialize(summary.coordinator), out); - ByteBufferUtil.writeWithLength(InetAddressSerializer.instance.serialize(summary.peer), out); + CompactEndpointSerializationHelper.instance.serialize(summary.coordinator, out, version); + CompactEndpointSerializationHelper.instance.serialize(summary.peer, out, version); out.writeInt(summary.receivingSummaries.size()); for (StreamSummary streamSummary: summary.receivingSummaries) @@ -99,8 +98,8 @@ public class SessionSummary public SessionSummary deserialize(DataInputPlus in, int version) throws IOException { - InetAddress coordinator = InetAddressSerializer.instance.deserialize(ByteBufferUtil.readWithLength(in)); - InetAddress peer = InetAddressSerializer.instance.deserialize(ByteBufferUtil.readWithLength(in)); + InetAddressAndPort coordinator = CompactEndpointSerializationHelper.instance.deserialize(in, version); + InetAddressAndPort peer = CompactEndpointSerializationHelper.instance.deserialize(in, version); int numRcvd = in.readInt(); List<StreamSummary> receivingSummaries = new ArrayList<>(numRcvd); @@ -122,8 +121,8 @@ public class SessionSummary public long serializedSize(SessionSummary summary, int version) { long size = 0; - size += ByteBufferUtil.serializedSizeWithLength(InetAddressSerializer.instance.serialize(summary.coordinator)); - size += ByteBufferUtil.serializedSizeWithLength(InetAddressSerializer.instance.serialize(summary.peer)); + size += CompactEndpointSerializationHelper.instance.serializedSize(summary.coordinator, version); + size += CompactEndpointSerializationHelper.instance.serializedSize(summary.peer, version); size += TypeSizes.sizeof(summary.receivingSummaries.size()); for (StreamSummary streamSummary: summary.receivingSummaries) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamCoordinator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java index bb8c702..a22e07d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@ -17,13 +17,13 @@ */ package org.apache.cassandra.streaming; -import java.net.InetAddress; import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.FBUtilities; /** @@ -45,7 +45,7 @@ public class StreamCoordinator FBUtilities.getAvailableProcessors()); private final boolean connectSequentially; - private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>(); + private Map<InetAddressAndPort, HostStreamingData> peerSessions = new HashMap<>(); private final int connectionsPerHost; private StreamConnectionFactory factory; private final boolean keepSSTableLevel; @@ -143,29 +143,29 @@ public class StreamCoordinator if (sessionsToConnect.hasNext()) { StreamSession next = sessionsToConnect.next(); - logger.debug("Connecting next session {} with {}.", next.planId(), next.peer.getHostAddress()); + logger.debug("Connecting next session {} with {}.", next.planId(), next.peer.toString()); streamExecutor.execute(new StreamSessionConnector(next)); } else logger.debug("Finished connecting all sessions"); } - public synchronized Set<InetAddress> getPeers() + public synchronized Set<InetAddressAndPort> getPeers() { return new HashSet<>(peerSessions.keySet()); } - public synchronized StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting) + public synchronized StreamSession getOrCreateNextSession(InetAddressAndPort peer, InetAddressAndPort connecting) { return getOrCreateHostData(peer).getOrCreateNextSession(peer, connecting); } - public synchronized StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting) + public synchronized StreamSession getOrCreateSessionById(InetAddressAndPort peer, int id, InetAddressAndPort connecting) { return getOrCreateHostData(peer).getOrCreateSessionById(peer, id, connecting); } - public StreamSession getSessionById(InetAddress peer, int id) + public StreamSession getSessionById(InetAddressAndPort peer, int id) { return getHostData(peer).getSessionById(id); } @@ -191,7 +191,7 @@ public class StreamCoordinator return result; } - public synchronized void transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails) + public synchronized void transferFiles(InetAddressAndPort to, Collection<StreamSession.SSTableStreamingSections> sstableDetails) { HostStreamingData sessionList = getOrCreateHostData(to); @@ -239,7 +239,7 @@ public class StreamCoordinator return result; } - private HostStreamingData getHostData(InetAddress peer) + private HostStreamingData getHostData(InetAddressAndPort peer) { HostStreamingData data = peerSessions.get(peer); if (data == null) @@ -247,7 +247,7 @@ public class StreamCoordinator return data; } - private HostStreamingData getOrCreateHostData(InetAddress peer) + private HostStreamingData getOrCreateHostData(InetAddressAndPort peer) { HostStreamingData data = peerSessions.get(peer); if (data == null) @@ -297,7 +297,7 @@ public class StreamCoordinator return false; } - public StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting) + public StreamSession getOrCreateNextSession(InetAddressAndPort peer, InetAddressAndPort connecting) { // create if (streamSessions.size() < connectionsPerHost) @@ -329,7 +329,7 @@ public class StreamCoordinator return Collections.unmodifiableCollection(streamSessions.values()); } - public StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting) + public StreamSession getOrCreateSessionById(InetAddressAndPort peer, int id, InetAddressAndPort connecting) { StreamSession session = streamSessions.get(id); if (session == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java index 6ea2814..7ecd081 100644 --- a/src/java/org/apache/cassandra/streaming/StreamEvent.java +++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.streaming; -import java.net.InetAddress; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -27,6 +26,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; public abstract class StreamEvent { @@ -48,7 +48,7 @@ public abstract class StreamEvent public static class SessionCompleteEvent extends StreamEvent { - public final InetAddress peer; + public final InetAddressAndPort peer; public final boolean success; public final int sessionIndex; public final Set<StreamRequest> requests; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java index a44f02e..81c65c5 100644 --- a/src/java/org/apache/cassandra/streaming/StreamManager.java +++ b/src/java/org/apache/cassandra/streaming/StreamManager.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.streaming; -import java.net.InetAddress; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -35,6 +34,7 @@ import com.google.common.util.concurrent.RateLimiter; import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.management.StreamEventJMXNotifier; import org.apache.cassandra.streaming.management.StreamStateCompositeData; @@ -55,7 +55,7 @@ public class StreamManager implements StreamManagerMBean * * @return StreamRateLimiter with rate limit set based on peer location. */ - public static StreamRateLimiter getRateLimiter(InetAddress peer) + public static StreamRateLimiter getRateLimiter(InetAddressAndPort peer) { return new StreamRateLimiter(peer); } @@ -67,7 +67,7 @@ public class StreamManager implements StreamManagerMBean private static final RateLimiter interDCLimiter = RateLimiter.create(Double.MAX_VALUE); private final boolean isLocalDC; - public StreamRateLimiter(InetAddress peer) + public StreamRateLimiter(InetAddressAndPort peer) { double throughput = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT; mayUpdateThroughput(throughput, limiter); @@ -176,7 +176,7 @@ public class StreamManager implements StreamManagerMBean return notifier.getNotificationInfo(); } - public StreamSession findSession(InetAddress peer, UUID planId, int sessionIndex) + public StreamSession findSession(InetAddressAndPort peer, UUID planId, int sessionIndex) { StreamSession session = findSession(initiatedStreams, peer, planId, sessionIndex); if (session != null) @@ -185,7 +185,7 @@ public class StreamManager implements StreamManagerMBean return findSession(receivingStreams, peer, planId, sessionIndex); } - private StreamSession findSession(Map<UUID, StreamResultFuture> streams, InetAddress peer, UUID planId, int sessionIndex) + private StreamSession findSession(Map<UUID, StreamResultFuture> streams, InetAddressAndPort peer, UUID planId, int sessionIndex) { StreamResultFuture streamResultFuture = streams.get(planId); if (streamResultFuture == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index 213f74b..43e9068 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -17,11 +17,11 @@ */ package org.apache.cassandra.streaming; -import java.net.InetAddress; import java.util.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.UUIDGen; import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; @@ -73,7 +73,7 @@ public class StreamPlan * @param ranges ranges to fetch * @return this object for chaining */ - public StreamPlan requestRanges(InetAddress from, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges) + public StreamPlan requestRanges(InetAddressAndPort from, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges) { return requestRanges(from, connecting, keyspace, ranges, EMPTY_COLUMN_FAMILIES); } @@ -88,7 +88,7 @@ public class StreamPlan * @param columnFamilies specific column families * @return this object for chaining */ - public StreamPlan requestRanges(InetAddress from, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) + public StreamPlan requestRanges(InetAddressAndPort from, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) { StreamSession session = coordinator.getOrCreateNextSession(from, connecting); session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies)); @@ -98,9 +98,9 @@ public class StreamPlan /** * Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}. * - * @see #transferRanges(java.net.InetAddress, java.net.InetAddress, String, java.util.Collection, String...) + * @see #transferRanges(InetAddressAndPort, InetAddressAndPort, String, java.util.Collection, String...) */ - public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) + public StreamPlan transferRanges(InetAddressAndPort to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) { return transferRanges(to, to, keyspace, ranges, columnFamilies); } @@ -114,7 +114,7 @@ public class StreamPlan * @param ranges ranges to send * @return this object for chaining */ - public StreamPlan transferRanges(InetAddress to, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges) + public StreamPlan transferRanges(InetAddressAndPort to, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges) { return transferRanges(to, connecting, keyspace, ranges, EMPTY_COLUMN_FAMILIES); } @@ -129,7 +129,7 @@ public class StreamPlan * @param columnFamilies specific column families * @return this object for chaining */ - public StreamPlan transferRanges(InetAddress to, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) + public StreamPlan transferRanges(InetAddressAndPort to, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) { StreamSession session = coordinator.getOrCreateNextSession(to, connecting); session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer); @@ -144,7 +144,7 @@ public class StreamPlan * this collection will be modified to remove those files that are successfully handed off * @return this object for chaining */ - public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails) + public StreamPlan transferFiles(InetAddressAndPort to, Collection<StreamSession.SSTableStreamingSections> sstableDetails) { coordinator.transferFiles(to, sstableDetails); return this; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 0f74c7f..544f37f 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.channel.Channel; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.FBUtilities; /** @@ -103,7 +104,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> public static synchronized StreamResultFuture initReceivingSide(int sessionIndex, UUID planId, StreamOperation streamOperation, - InetAddress from, + InetAddressAndPort from, Channel channel, boolean keepSSTableLevel, UUID pendingRepair, @@ -135,11 +136,15 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> return coordinator; } - private void attachConnection(InetAddress from, int sessionIndex, Channel channel) + private void attachConnection(InetAddressAndPort from, int sessionIndex, Channel channel) { SocketAddress addr = channel.remoteAddress(); - InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from); - StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connecting); + //In the case of unit tests, if you use the EmbeddedChannel, channel.remoteAddress() + //does not return an InetSocketAddress, but an EmbeddedSocketAddress. Hence why we need the type check here + InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from.address); + //Need to turn connecting into a InetAddressAndPort with the correct port. I think getting the port from "from" + //Will work since we don't actually have ports diverge across network interfaces + StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, InetAddressAndPort.getByAddressOverrideDefaults(connecting, from.port)); session.init(this); session.attach(channel); } @@ -228,7 +233,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> } } - StreamSession getSession(InetAddress peer, int sessionIndex) + StreamSession getSession(InetAddressAndPort peer, int sessionIndex) { return coordinator.getSessionById(peer, sessionIndex); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index b6351f9..4085c43 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -49,6 +49,7 @@ import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.StreamingMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.async.OutboundConnectionIdentifier; @@ -142,14 +143,14 @@ public class StreamSession implements IEndpointStateChangeSubscriber /** * Streaming endpoint. * - * Each {@code StreamSession} is identified by this InetAddress which is broadcast address of the node streaming. + * Each {@code StreamSession} is identified by this InetAddressAndPort which is broadcast address of the node streaming. */ - public final InetAddress peer; + public final InetAddressAndPort peer; private final int index; /** Actual connecting address. Can be the same as {@linkplain #peer}. */ - public final InetAddress connecting; + public final InetAddressAndPort connecting; // should not be null when session is started private StreamResultFuture streamResult; @@ -191,14 +192,14 @@ public class StreamSession implements IEndpointStateChangeSubscriber * @param peer Address of streaming peer * @param connecting Actual connecting address */ - public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind) + public StreamSession(InetAddressAndPort peer, InetAddressAndPort connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind) { this.peer = peer; this.connecting = connecting; this.index = index; - OutboundConnectionIdentifier id = OutboundConnectionIdentifier.stream(new InetSocketAddress(FBUtilities.getBroadcastAddress(), 0), - new InetSocketAddress(connecting, MessagingService.instance().portFor(connecting))); + OutboundConnectionIdentifier id = OutboundConnectionIdentifier.stream(InetAddressAndPort.getByAddressOverrideDefaults(FBUtilities.getBroadcastAddressAndPort().address, 0), + InetAddressAndPort.getByAddressOverrideDefaults(connecting.address, MessagingService.instance().portFor(connecting))); this.messageSender = new NettyStreamingMessageSender(this, id, factory, StreamMessage.CURRENT_VERSION, previewKind.isPreview()); this.metrics = StreamingMetrics.get(connecting); this.keepSSTableLevel = keepSSTableLevel; @@ -607,16 +608,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber { logger.error("[Stream #{}] Did not receive response from peer {}{} for {} secs. Is peer down? " + "If not, maybe try increasing streaming_keep_alive_period_in_secs.", planId(), - peer.getHostAddress(), - peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(), + peer.getHostAddress(true), + peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(true), 2 * DatabaseDescriptor.getStreamingKeepAlivePeriod(), e); } else { logger.error("[Stream #{}] Streaming error occurred on session with peer {}{}", planId(), - peer.getHostAddress(), - peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(), + peer.getHostAddress(true), + peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(true), e); } } @@ -644,7 +645,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber prepareReceiving(summary); PrepareSynAckMessage prepareSynAck = new PrepareSynAckMessage(); - if (!peer.equals(FBUtilities.getBroadcastAddress())) + if (!peer.equals(FBUtilities.getBroadcastAddressAndPort())) for (StreamTransferTask task : transfers.values()) prepareSynAck.summaries.add(task.getSummary()); messageSender.sendMessage(prepareSynAck); @@ -754,7 +755,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber */ public synchronized void sessionFailed() { - logger.error("[Stream #{}] Remote peer {} failed stream session.", planId(), peer.getHostAddress()); + logger.error("[Stream #{}] Remote peer {} failed stream session.", planId(), peer.toString()); closeSession(State.FAILED); } @@ -784,21 +785,21 @@ public class StreamSession implements IEndpointStateChangeSubscriber maybeCompleted(); } - public void onJoin(InetAddress endpoint, EndpointState epState) {} - public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} - public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} - public void onAlive(InetAddress endpoint, EndpointState state) {} - public void onDead(InetAddress endpoint, EndpointState state) {} + public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {} + public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} + public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {} + public void onAlive(InetAddressAndPort endpoint, EndpointState state) {} + public void onDead(InetAddressAndPort endpoint, EndpointState state) {} - public void onRemove(InetAddress endpoint) + public void onRemove(InetAddressAndPort endpoint) { - logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), peer.getHostAddress()); + logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), peer.toString()); closeSession(State.FAILED); } - public void onRestart(InetAddress endpoint, EndpointState epState) + public void onRestart(InetAddressAndPort endpoint, EndpointState epState) { - logger.error("[Stream #{}] Session failed because remote peer {} was restarted.", planId(), peer.getHostAddress()); + logger.error("[Stream #{}] Session failed because remote peer {} was restarted.", planId(), peer.toString()); closeSession(State.FAILED); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java index 0b38760..20b7c87 100644 --- a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java +++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java @@ -137,7 +137,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender @Override public void initialize() throws IOException { - StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(), + StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddressAndPort(), session.sessionIndex(), session.planId(), session.streamOperation(), @@ -183,7 +183,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender { Channel channel = factory.createConnection(connectionId, protocolVersion); ChannelPipeline pipeline = channel.pipeline(); - pipeline.addLast(NettyFactory.instance.streamingGroup, NettyFactory.INBOUND_STREAM_HANDLER_NAME, new StreamingInboundHandler(connectionId.remoteAddress(), protocolVersion, session)); + pipeline.addLast(NettyFactory.instance.streamingGroup, NettyFactory.INBOUND_STREAM_HANDLER_NAME, new StreamingInboundHandler(connectionId.remote(), protocolVersion, session)); channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE); return channel; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java index cc6f9e0..907572b 100644 --- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java +++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java @@ -21,7 +21,6 @@ package org.apache.cassandra.streaming.async; import java.io.EOFException; import java.io.IOException; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -38,6 +37,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.FastThreadLocalThread; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; import org.apache.cassandra.streaming.StreamManager; import org.apache.cassandra.streaming.StreamReceiveException; @@ -65,7 +65,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter private static final int AUTO_READ_LOW_WATER_MARK = 1 << 15; private static final int AUTO_READ_HIGH_WATER_MARK = 1 << 16; - private final InetSocketAddress remoteAddress; + private final InetAddressAndPort remoteAddress; private final int protocolVersion; private final StreamSession session; @@ -82,7 +82,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter private volatile boolean closed; - public StreamingInboundHandler(InetSocketAddress remoteAddress, int protocolVersion, @Nullable StreamSession session) + public StreamingInboundHandler(InetAddressAndPort remoteAddress, int protocolVersion, @Nullable StreamSession session) { this.remoteAddress = remoteAddress; this.protocolVersion = protocolVersion; @@ -254,11 +254,11 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter */ static class SessionIdentifier { - final InetAddress from; + final InetAddressAndPort from; final UUID planId; final int sessionIndex; - SessionIdentifier(InetAddress from, UUID planId, int sessionIndex) + SessionIdentifier(InetAddressAndPort from, UUID planId, int sessionIndex) { this.from = from; this.planId = planId; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java index b9e6951..964fe10 100644 --- a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java +++ b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.streaming.management; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; @@ -26,12 +25,14 @@ import javax.management.openmbean.*; import com.google.common.base.Throwables; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.ProgressInfo; public class ProgressInfoCompositeData { private static final String[] ITEM_NAMES = new String[]{"planId", "peer", + "peer storage port", "sessionIndex", "fileName", "direction", @@ -39,6 +40,7 @@ public class ProgressInfoCompositeData "totalBytes"}; private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID", "Session peer", + "Session peer storage port", "Index of session", "Name of the file", "Direction('IN' or 'OUT')", @@ -47,6 +49,7 @@ public class ProgressInfoCompositeData private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING, SimpleType.STRING, SimpleType.INTEGER, + SimpleType.INTEGER, SimpleType.STRING, SimpleType.STRING, SimpleType.LONG, @@ -73,12 +76,13 @@ public class ProgressInfoCompositeData { Map<String, Object> valueMap = new HashMap<>(); valueMap.put(ITEM_NAMES[0], planId.toString()); - valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress()); - valueMap.put(ITEM_NAMES[2], progressInfo.sessionIndex); - valueMap.put(ITEM_NAMES[3], progressInfo.fileName); - valueMap.put(ITEM_NAMES[4], progressInfo.direction.name()); - valueMap.put(ITEM_NAMES[5], progressInfo.currentBytes); - valueMap.put(ITEM_NAMES[6], progressInfo.totalBytes); + valueMap.put(ITEM_NAMES[1], progressInfo.peer.address.getHostAddress()); + valueMap.put(ITEM_NAMES[2], progressInfo.peer.port); + valueMap.put(ITEM_NAMES[3], progressInfo.sessionIndex); + valueMap.put(ITEM_NAMES[4], progressInfo.fileName); + valueMap.put(ITEM_NAMES[5], progressInfo.direction.name()); + valueMap.put(ITEM_NAMES[6], progressInfo.currentBytes); + valueMap.put(ITEM_NAMES[7], progressInfo.totalBytes); try { return new CompositeDataSupport(COMPOSITE_TYPE, valueMap); @@ -94,12 +98,12 @@ public class ProgressInfoCompositeData Object[] values = cd.getAll(ITEM_NAMES); try { - return new ProgressInfo(InetAddress.getByName((String) values[1]), - (int) values[2], - (String) values[3], - ProgressInfo.Direction.valueOf((String)values[4]), - (long) values[5], - (long) values[6]); + return new ProgressInfo(InetAddressAndPort.getByNameOverrideDefaults((String) values[1], (Integer)values[2]), + (int) values[3], + (String) values[4], + ProgressInfo.Direction.valueOf((String)values[5]), + (long) values[6], + (long) values[7]); } catch (UnknownHostException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java index 516582a..1c0d8c5 100644 --- a/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java +++ b/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java @@ -29,12 +29,15 @@ public class SessionCompleteEventCompositeData { private static final String[] ITEM_NAMES = new String[]{"planId", "peer", + "peer storage port", "success"}; private static final String[] ITEM_DESCS = new String[]{"Plan ID", "Session peer", + "Session peer storage port", "Indicates whether session was successful"}; private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING, SimpleType.STRING, + SimpleType.INTEGER, SimpleType.BOOLEAN}; public static final CompositeType COMPOSITE_TYPE; @@ -58,8 +61,9 @@ public class SessionCompleteEventCompositeData { Map<String, Object> valueMap = new HashMap<>(); valueMap.put(ITEM_NAMES[0], event.planId.toString()); - valueMap.put(ITEM_NAMES[1], event.peer.getHostAddress()); - valueMap.put(ITEM_NAMES[2], event.success); + valueMap.put(ITEM_NAMES[1], event.peer.address.getHostAddress()); + valueMap.put(ITEM_NAMES[2], event.peer.port); + valueMap.put(ITEM_NAMES[3], event.success); try { return new CompositeDataSupport(COMPOSITE_TYPE, valueMap); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java index a6762a8..d20eaf5 100644 --- a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java +++ b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.streaming.management; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; import javax.management.openmbean.*; @@ -27,6 +26,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.SessionInfo; import org.apache.cassandra.streaming.StreamSession; @@ -36,7 +36,9 @@ public class SessionInfoCompositeData { private static final String[] ITEM_NAMES = new String[]{"planId", "peer", + "peer_port", "connecting", + "connecting_port", "receivingSummaries", "sendingSummaries", "state", @@ -45,7 +47,9 @@ public class SessionInfoCompositeData "sessionIndex"}; private static final String[] ITEM_DESCS = new String[]{"Plan ID", "Session peer", + "Session peer storage port", "Connecting address", + "Connecting storage port", "Summaries of receiving data", "Summaries of sending data", "Current session state", @@ -61,7 +65,9 @@ public class SessionInfoCompositeData { ITEM_TYPES = new OpenType[]{SimpleType.STRING, SimpleType.STRING, + SimpleType.INTEGER, SimpleType.STRING, + SimpleType.INTEGER, ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE), ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE), SimpleType.STRING, @@ -84,8 +90,10 @@ public class SessionInfoCompositeData { Map<String, Object> valueMap = new HashMap<>(); valueMap.put(ITEM_NAMES[0], planId.toString()); - valueMap.put(ITEM_NAMES[1], sessionInfo.peer.getHostAddress()); - valueMap.put(ITEM_NAMES[2], sessionInfo.connecting.getHostAddress()); + valueMap.put(ITEM_NAMES[1], sessionInfo.peer.address.getHostAddress()); + valueMap.put(ITEM_NAMES[2], sessionInfo.peer.port); + valueMap.put(ITEM_NAMES[3], sessionInfo.connecting.address.getHostAddress()); + valueMap.put(ITEM_NAMES[4], sessionInfo.connecting.port); Function<StreamSummary, CompositeData> fromStreamSummary = new Function<StreamSummary, CompositeData>() { public CompositeData apply(StreamSummary input) @@ -93,9 +101,9 @@ public class SessionInfoCompositeData return StreamSummaryCompositeData.toCompositeData(input); } }; - valueMap.put(ITEM_NAMES[3], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary)); - valueMap.put(ITEM_NAMES[4], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary)); - valueMap.put(ITEM_NAMES[5], sessionInfo.state.name()); + valueMap.put(ITEM_NAMES[5], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary)); + valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary)); + valueMap.put(ITEM_NAMES[7], sessionInfo.state.name()); Function<ProgressInfo, CompositeData> fromProgressInfo = new Function<ProgressInfo, CompositeData>() { public CompositeData apply(ProgressInfo input) @@ -103,9 +111,9 @@ public class SessionInfoCompositeData return ProgressInfoCompositeData.toCompositeData(planId, input); } }; - valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo)); - valueMap.put(ITEM_NAMES[7], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo)); - valueMap.put(ITEM_NAMES[8], sessionInfo.sessionIndex); + valueMap.put(ITEM_NAMES[8], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo)); + valueMap.put(ITEM_NAMES[9], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo)); + valueMap.put(ITEM_NAMES[10], sessionInfo.sessionIndex); try { return new CompositeDataSupport(COMPOSITE_TYPE, valueMap); @@ -121,11 +129,11 @@ public class SessionInfoCompositeData assert cd.getCompositeType().equals(COMPOSITE_TYPE); Object[] values = cd.getAll(ITEM_NAMES); - InetAddress peer, connecting; + InetAddressAndPort peer, connecting; try { - peer = InetAddress.getByName((String) values[1]); - connecting = InetAddress.getByName((String) values[2]); + peer = InetAddressAndPort.getByNameOverrideDefaults((String) values[1], (Integer)values[2]); + connecting = InetAddressAndPort.getByNameOverrideDefaults((String) values[3], (Integer)values[4]); } catch (UnknownHostException e) { @@ -139,11 +147,11 @@ public class SessionInfoCompositeData } }; SessionInfo info = new SessionInfo(peer, - (int)values[8], + (int)values[10], connecting, - fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary), - fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary), - StreamSession.State.valueOf((String) values[5])); + fromArrayOfCompositeData((CompositeData[]) values[5], toStreamSummary), + fromArrayOfCompositeData((CompositeData[]) values[6], toStreamSummary), + StreamSession.State.valueOf((String) values[7])); Function<CompositeData, ProgressInfo> toProgressInfo = new Function<CompositeData, ProgressInfo>() { public ProgressInfo apply(CompositeData input) @@ -151,11 +159,11 @@ public class SessionInfoCompositeData return ProgressInfoCompositeData.fromCompositeData(input); } }; - for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo)) + for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[8], toProgressInfo)) { info.updateProgress(progress); } - for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[7], toProgressInfo)) + for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[9], toProgressInfo)) { info.updateProgress(progress); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java index fedb971..13a3358 100644 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java @@ -18,7 +18,6 @@ package org.apache.cassandra.streaming.messages; import java.io.IOException; -import java.net.InetAddress; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -30,6 +29,7 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.TableId; @@ -67,13 +67,13 @@ public class FileMessageHeader public final UUID pendingRepair; public final int sstableLevel; public final SerializationHeader.Component header; - public final InetAddress sender; + public final InetAddressAndPort sender; /* cached size value */ private transient final long size; private FileMessageHeader(TableId tableId, - InetAddress sender, + InetAddressAndPort sender, UUID planId, int sessionIndex, int sequenceNumber, @@ -106,7 +106,7 @@ public class FileMessageHeader } public FileMessageHeader(TableId tableId, - InetAddress sender, + InetAddressAndPort sender, UUID planId, int sessionIndex, int sequenceNumber, @@ -218,7 +218,7 @@ public class FileMessageHeader public CompressionInfo serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException { header.tableId.serialize(out); - CompactEndpointSerializationHelper.serialize(header.sender, out); + CompactEndpointSerializationHelper.streamingInstance.serialize(header.sender, out, version); UUIDSerializer.serializer.serialize(header.planId, out, version); out.writeInt(header.sessionIndex); out.writeInt(header.sequenceNumber); @@ -252,7 +252,7 @@ public class FileMessageHeader public FileMessageHeader deserialize(DataInputPlus in, int version) throws IOException { TableId tableId = TableId.deserialize(in); - InetAddress sender = CompactEndpointSerializationHelper.deserialize(in); + InetAddressAndPort sender = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version); UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); int sessionIndex = in.readInt(); int sequenceNumber = in.readInt(); @@ -276,7 +276,7 @@ public class FileMessageHeader public long serializedSize(FileMessageHeader header, int version) { long size = header.tableId.serializedSize(); - size += CompactEndpointSerializationHelper.serializedSize(header.sender); + size += CompactEndpointSerializationHelper.streamingInstance.serializedSize(header.sender, version); size += UUIDSerializer.serializer.serializedSize(header.planId, version); size += TypeSizes.sizeof(header.sessionIndex); size += TypeSizes.sizeof(header.sequenceNumber); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index f44b41c..8bbcc05 100644 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@ -79,7 +79,7 @@ public class OutgoingFileMessage extends StreamMessage SSTableReader sstable = ref.get(); filename = sstable.getFilename(); this.header = new FileMessageHeader(sstable.metadata().id, - FBUtilities.getBroadcastAddress(), + FBUtilities.getBroadcastAddressAndPort(), session.planId(), session.sessionIndex(), sequenceNumber, http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java index 68c6034..fced133 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java @@ -18,12 +18,12 @@ package org.apache.cassandra.streaming.messages; import java.io.IOException; -import java.net.InetAddress; import java.util.UUID; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.StreamOperation; @@ -39,7 +39,7 @@ public class StreamInitMessage extends StreamMessage { public static Serializer<StreamInitMessage> serializer = new StreamInitMessageSerializer(); - public final InetAddress from; + public final InetAddressAndPort from; public final int sessionIndex; public final UUID planId; public final StreamOperation streamOperation; @@ -48,7 +48,7 @@ public class StreamInitMessage extends StreamMessage public final UUID pendingRepair; public final PreviewKind previewKind; - public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind) + public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind) { super(Type.STREAM_INIT); this.from = from; @@ -73,7 +73,7 @@ public class StreamInitMessage extends StreamMessage { public void serialize(StreamInitMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException { - CompactEndpointSerializationHelper.serialize(message.from, out); + CompactEndpointSerializationHelper.streamingInstance.serialize(message.from, out, version); out.writeInt(message.sessionIndex); UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version); out.writeUTF(message.streamOperation.getDescription()); @@ -89,7 +89,7 @@ public class StreamInitMessage extends StreamMessage public StreamInitMessage deserialize(DataInputPlus in, int version, StreamSession session) throws IOException { - InetAddress from = CompactEndpointSerializationHelper.deserialize(in); + InetAddressAndPort from = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version); int sessionIndex = in.readInt(); UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); String description = in.readUTF(); @@ -102,7 +102,7 @@ public class StreamInitMessage extends StreamMessage public long serializedSize(StreamInitMessage message, int version) { - long size = CompactEndpointSerializationHelper.serializedSize(message.from); + long size = CompactEndpointSerializationHelper.streamingInstance.serializedSize(message.from, version); size += TypeSizes.sizeof(message.sessionIndex); size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version); size += TypeSizes.sizeof(message.streamOperation.getDescription()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java index b56d292..cce686f 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java +++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java @@ -22,21 +22,21 @@ import java.io.IOException; import java.net.InetSocketAddress; import io.netty.channel.Channel; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.net.async.OutboundConnectionIdentifier; import org.apache.cassandra.streaming.DefaultConnectionFactory; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.StreamConnectionFactory; public class BulkLoadConnectionFactory extends DefaultConnectionFactory implements StreamConnectionFactory { private final boolean outboundBindAny; - private final int storagePort; private final int secureStoragePort; private final EncryptionOptions.ServerEncryptionOptions encryptionOptions; - public BulkLoadConnectionFactory(int storagePort, int secureStoragePort, EncryptionOptions.ServerEncryptionOptions encryptionOptions, boolean outboundBindAny) + public BulkLoadConnectionFactory(int secureStoragePort, EncryptionOptions.ServerEncryptionOptions encryptionOptions, boolean outboundBindAny) { - this.storagePort = storagePort; this.secureStoragePort = secureStoragePort; this.encryptionOptions = encryptionOptions != null && encryptionOptions.internode_encryption == EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none ? null @@ -50,9 +50,9 @@ public class BulkLoadConnectionFactory extends DefaultConnectionFactory implemen // When 'all', 'dc' and 'rack', server nodes always have SSL port open, and since thin client like sstableloader // does not know which node is in which dc/rack, connecting to SSL port is always the option. int port = encryptionOptions != null && encryptionOptions.internode_encryption != EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none ? - secureStoragePort : storagePort; + secureStoragePort : connectionId.remote().port; - connectionId = connectionId.withNewConnectionAddress(new InetSocketAddress(connectionId.remote(), port)); + connectionId = connectionId.withNewConnectionAddress(InetAddressAndPort.getByAddressOverrideDefaults(connectionId.remote().address, port)); return createConnection(connectionId, protocolVersion, encryptionOptions); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index 0812e53..545d1f7 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -18,7 +18,7 @@ package org.apache.cassandra.tools; import java.io.IOException; -import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Set; import javax.net.ssl.SSLContext; @@ -33,6 +33,7 @@ import org.apache.commons.cli.Options; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.io.sstable.SSTableLoader; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.streaming.*; import org.apache.cassandra.utils.FBUtilities; @@ -57,11 +58,12 @@ public class BulkLoader new ExternalClient( options.hosts, options.nativePort, - options.authProvider, options.storagePort, + options.authProvider, options.sslStoragePort, options.serverEncOptions, - buildSSLOptions(options.clientEncOptions)), + buildSSLOptions(options.clientEncOptions), + options.allowServerPortDiscovery), handler, options.connectionsPerHost); DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle); @@ -124,7 +126,7 @@ public class BulkLoader private long peak = 0; private int totalFiles = 0; - private final Multimap<InetAddress, SessionInfo> sessionsByHost = HashMultimap.create(); + private final Multimap<InetAddressAndPort, SessionInfo> sessionsByHost = HashMultimap.create(); public ProgressIndicator() { @@ -165,7 +167,7 @@ public class BulkLoader boolean updateTotalFiles = totalFiles == 0; // recalculate progress across all sessions in all hosts and display - for (InetAddress peer : sessionsByHost.keySet()) + for (InetAddressAndPort peer : sessionsByHost.keySet()) { sb.append("[").append(peer).append("]"); @@ -268,20 +270,19 @@ public class BulkLoader static class ExternalClient extends NativeSSTableLoaderClient { - private final int storagePort; private final int sslStoragePort; private final EncryptionOptions.ServerEncryptionOptions serverEncOptions; - public ExternalClient(Set<InetAddress> hosts, - int port, - AuthProvider authProvider, + public ExternalClient(Set<InetSocketAddress> hosts, + int nativePort, int storagePort, + AuthProvider authProvider, int sslStoragePort, EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions, - SSLOptions sslOptions) + SSLOptions sslOptions, + boolean allowServerPortDiscovery) { - super(hosts, port, authProvider, sslOptions); - this.storagePort = storagePort; + super(hosts, nativePort, storagePort, authProvider, sslOptions, allowServerPortDiscovery); this.sslStoragePort = sslStoragePort; serverEncOptions = serverEncryptionOptions; } @@ -289,7 +290,7 @@ public class BulkLoader @Override public StreamConnectionFactory getConnectionFactory() { - return new BulkLoadConnectionFactory(storagePort, sslStoragePort, serverEncOptions, false); + return new BulkLoadConnectionFactory(sslStoragePort, serverEncOptions, false); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org