stream to private IP when available patch by yukim; reviewed by Josh McKenzie for CASSANDRA-8084
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6867c2c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6867c2c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6867c2c Branch: refs/heads/cassandra-2.1 Commit: c6867c2c25e1a220abef24e54a86eeb64dab28c5 Parents: 29a8b88 Author: Yuki Morishita <yu...@apache.org> Authored: Mon Oct 20 09:25:42 2014 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Mon Oct 20 09:25:42 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/SystemKeyspace.java | 8 ++++- .../org/apache/cassandra/dht/RangeStreamer.java | 4 ++- .../net/OutboundTcpConnectionPool.java | 12 +++---- .../cassandra/repair/StreamingRepairTask.java | 9 +++-- .../cassandra/service/StorageService.java | 23 +++++++++---- .../apache/cassandra/streaming/SessionInfo.java | 3 ++ .../apache/cassandra/streaming/StreamPlan.java | 36 ++++++++++++++------ .../cassandra/streaming/StreamResultFuture.java | 2 +- .../cassandra/streaming/StreamSession.java | 23 ++++++++++--- .../management/SessionInfoCompositeData.java | 26 ++++++++------ .../org/apache/cassandra/tools/NodeCmd.java | 8 ++++- .../cassandra/streaming/SessionInfoTest.java | 2 +- .../streaming/StreamTransferTaskTest.java | 4 ++- .../streaming/StreamingTransferTest.java | 2 +- 15 files changed, 115 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 544cf9a..4ed7bed 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -56,6 +56,7 @@ * Fix possible infinite loop in creating repair range (CASSANDRA-7983) * Fix unit in nodetool for streaming throughput (CASSANDRA-7375) * Do not exit nodetool repair when receiving JMX NOTIF_LOST (CASSANDRA-7909) + * Stream to private IP when available (CASSANDRA-8084) Merged from 1.2: * Don't index tombstones (CASSANDRA-7828) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 5b77f63..30e6d47 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -497,13 +497,19 @@ public class SystemKeyspace return hostIdMap; } + /** + * Get preferred IP for given endpoint if it is known. Otherwise this returns given endpoint itself. + * + * @param ep endpoint address to check + * @return Preferred IP for given endpoint if present, otherwise returns given ep + */ public static InetAddress getPreferredIP(InetAddress ep) { String req = "SELECT preferred_ip FROM system.%s WHERE peer='%s'"; UntypedResultSet result = processInternal(String.format(req, PEERS_CF, ep.getHostAddress())); if (!result.isEmpty() && result.one().has("preferred_ip")) return result.one().getInetAddress("preferred_ip"); - return null; + return ep; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 1e6d9b8..4e925d3 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.locator.AbstractReplicationStrategy; @@ -221,11 +222,12 @@ public class RangeStreamer { String keyspace = entry.getKey(); InetAddress source = entry.getValue().getKey(); + InetAddress preferred = SystemKeyspace.getPreferredIP(source); Collection<Range<Token>> ranges = entry.getValue().getValue(); /* Send messages to respective folks to stream data over to me */ if (logger.isDebugEnabled()) logger.debug("" + description + "ing from " + source + " ranges " + StringUtils.join(ranges, ", ")); - streamPlan.requestRanges(source, keyspace, ranges); + streamPlan.requestRanges(source, preferred, keyspace, ranges); } return streamPlan.execute(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java index c45fc53..66a0362 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java @@ -41,14 +41,14 @@ public class OutboundTcpConnectionPool private final CountDownLatch started; public final OutboundTcpConnection cmdCon; public final OutboundTcpConnection ackCon; - // pointer to the reseted Address. - private InetAddress resetedEndpoint; + // pointer to the reset Address. + private InetAddress resetEndpoint; private ConnectionMetrics metrics; OutboundTcpConnectionPool(InetAddress remoteEp) { id = remoteEp; - resetedEndpoint = SystemKeyspace.getPreferredIP(remoteEp); + resetEndpoint = SystemKeyspace.getPreferredIP(remoteEp); started = new CountDownLatch(1); cmdCon = new OutboundTcpConnection(this); @@ -90,13 +90,13 @@ public class OutboundTcpConnectionPool public void reset(InetAddress remoteEP) { SystemKeyspace.updatePreferredIP(id, remoteEP); - resetedEndpoint = remoteEP; + resetEndpoint = remoteEP; for (OutboundTcpConnection conn : new OutboundTcpConnection[] { cmdCon, ackCon }) conn.softCloseSocket(); // release previous metrics and create new one with reset address metrics.release(); - metrics = new ConnectionMetrics(resetedEndpoint, this); + metrics = new ConnectionMetrics(resetEndpoint, this); } public long getTimeouts() @@ -142,7 +142,7 @@ public class OutboundTcpConnectionPool { if (id.equals(FBUtilities.getBroadcastAddress())) return FBUtilities.getLocalAddress(); - return resetedEndpoint == null ? id : resetedEndpoint; + return resetEndpoint; } public static boolean isEncryptedChannel(InetAddress address) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index f7203a4..4226184 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -17,9 +17,12 @@ */ package org.apache.cassandra.repair; +import java.net.InetAddress; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.SyncComplete; import org.apache.cassandra.repair.messages.SyncRequest; @@ -56,13 +59,15 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler private void initiateStreaming() { + InetAddress dest = request.dst; + InetAddress preferred = SystemKeyspace.getPreferredIP(dest); logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst)); StreamResultFuture op = new StreamPlan("Repair") .flushBeforeTransfer(true) // request ranges from the remote node - .requestRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily) + .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) // send ranges to the remote node - .transferRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily) + .transferRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) .execute(); op.addEventListener(this); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 56056ab..4973e40 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1890,11 +1890,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName)) { - final InetAddress source = entry.getKey(); + InetAddress source = entry.getKey(); + InetAddress preferred = SystemKeyspace.getPreferredIP(source); Collection<Range<Token>> ranges = entry.getValue(); if (logger.isDebugEnabled()) logger.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", ")); - stream.requestRanges(source, keyspaceName, ranges); + stream.requestRanges(source, preferred, keyspaceName, ranges); } } StreamResultFuture future = stream.execute(); @@ -3022,12 +3023,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // stream to the closest peer as chosen by the snitch DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), candidates); InetAddress hintsDestinationHost = candidates.get(0); + InetAddress preferred = SystemKeyspace.getPreferredIP(hintsDestinationHost); // stream all hints -- range list will be a singleton of "the entire ring" Token token = StorageService.getPartitioner().getMinimumToken(); List<Range<Token>> ranges = Collections.singletonList(new Range<Token>(token, token)); return new StreamPlan("Hints").transferRanges(hintsDestinationHost, + preferred, Keyspace.SYSTEM_KS, ranges, SystemKeyspace.HINTS_CF) @@ -3182,15 +3185,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // stream ranges for (InetAddress address : endpointRanges.keySet()) - streamPlan.transferRanges(address, keyspace, endpointRanges.get(address)); + { + InetAddress preferred = SystemKeyspace.getPreferredIP(address); + streamPlan.transferRanges(address, preferred, keyspace, endpointRanges.get(address)); + } // stream requests Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints); for (InetAddress address : workMap.keySet()) - streamPlan.requestRanges(address, keyspace, workMap.get(address)); + { + InetAddress preferred = SystemKeyspace.getPreferredIP(address); + streamPlan.requestRanges(address, preferred, keyspace, workMap.get(address)); + } - if (logger.isDebugEnabled()) - logger.debug("Keyspace {}: work map {}.", keyspace, workMap); + logger.debug("Keyspace {}: work map {}.", keyspace, workMap); } } } @@ -3648,9 +3656,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { final List<Range<Token>> ranges = rangesEntry.getValue(); final InetAddress newEndpoint = rangesEntry.getKey(); + final InetAddress preferred = SystemKeyspace.getPreferredIP(newEndpoint); // TODO each call to transferRanges re-flushes, this is potentially a lot of waste - streamPlan.transferRanges(newEndpoint, keyspaceName, ranges); + streamPlan.transferRanges(newEndpoint, preferred, keyspaceName, ranges); } } return streamPlan.execute(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/streaming/SessionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java index b722ecf..4f80461 100644 --- a/src/java/org/apache/cassandra/streaming/SessionInfo.java +++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java @@ -33,6 +33,7 @@ import com.google.common.collect.Iterables; public final class SessionInfo implements Serializable { public final InetAddress peer; + public final InetAddress connecting; /** Immutable collection of receiving summaries */ public final Collection<StreamSummary> receivingSummaries; /** Immutable collection of sending summaries*/ @@ -44,11 +45,13 @@ public final class SessionInfo implements Serializable private final Map<String, ProgressInfo> sendingFiles; public SessionInfo(InetAddress peer, + InetAddress connecting, Collection<StreamSummary> receivingSummaries, Collection<StreamSummary> sendingSummaries, StreamSession.State state) { this.peer = peer; + this.connecting = connecting; this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries); this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries); this.receivingFiles = new ConcurrentHashMap<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index e582c79..326bf48 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -56,56 +56,70 @@ public class StreamPlan * Request data in {@code keyspace} and {@code ranges} from specific node. * * @param from endpoint address to fetch data from. + * @param connecting Actual connecting address for the endpoint * @param keyspace name of keyspace * @param ranges ranges to fetch * @return this object for chaining */ - public StreamPlan requestRanges(InetAddress from, String keyspace, Collection<Range<Token>> ranges) + public StreamPlan requestRanges(InetAddress from, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges) { - return requestRanges(from, keyspace, ranges, new String[0]); + return requestRanges(from, connecting, keyspace, ranges, new String[0]); } /** * Request data in {@code columnFamilies} under {@code keyspace} and {@code ranges} from specific node. * * @param from endpoint address to fetch data from. + * @param connecting Actual connecting address for the endpoint * @param keyspace name of keyspace * @param ranges ranges to fetch * @param columnFamilies specific column families * @return this object for chaining */ - public StreamPlan requestRanges(InetAddress from, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) + public StreamPlan requestRanges(InetAddress from, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) { - StreamSession session = getOrCreateSession(from); + StreamSession session = getOrCreateSession(from, connecting); session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies)); return this; } /** + * Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}. + * + * @see #transferRanges(java.net.InetAddress, java.net.InetAddress, String, java.util.Collection, String...) + */ + public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) + { + return transferRanges(to, to, keyspace, ranges, columnFamilies); + } + + /** * Add transfer task to send data of specific keyspace and ranges. * * @param to endpoint address of receiver + * @param connecting Actual connecting address of the endpoint * @param keyspace name of keyspace * @param ranges ranges to send * @return this object for chaining */ - public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges) + public StreamPlan transferRanges(InetAddress to, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges) { - return transferRanges(to, keyspace, ranges, new String[0]); + return transferRanges(to, connecting, keyspace, ranges, new String[0]); } /** * Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}. * * @param to endpoint address of receiver + * @param connecting Actual connecting address of the endpoint * @param keyspace name of keyspace * @param ranges ranges to send * @param columnFamilies specific column families * @return this object for chaining */ - public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) + public StreamPlan transferRanges(InetAddress to, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) { - StreamSession session = getOrCreateSession(to); + StreamSession session = getOrCreateSession(to, connecting); session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer); return this; } @@ -120,7 +134,7 @@ public class StreamPlan */ public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails) { - StreamSession session = getOrCreateSession(to); + StreamSession session = getOrCreateSession(to, to); session.addTransferFiles(sstableDetails); return this; } @@ -176,12 +190,12 @@ public class StreamPlan return this; } - private StreamSession getOrCreateSession(InetAddress peer) + private StreamSession getOrCreateSession(InetAddress peer, InetAddress preferred) { StreamSession session = sessions.get(peer); if (session == null) { - session = new StreamSession(peer, connectionFactory); + session = new StreamSession(peer, preferred, connectionFactory); sessions.put(peer, session); } return session; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index add14f7..bde5934 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -106,7 +106,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> StreamResultFuture future = StreamManager.instance.getReceivingStream(planId); if (future == null) { - final StreamSession session = new StreamSession(from, null); + final StreamSession session = new StreamSession(from, socket.getInetAddress(), null); // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. future = new StreamResultFuture(planId, description, Collections.singleton(session)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 4fcbe36..db0c484 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -70,7 +70,7 @@ import org.apache.cassandra.utils.Pair; * * (a) This phase is started when the initiator onInitializationComplete() method is called. This method sends a * PrepareMessage that includes what files/sections this node will stream to the follower - * (stored in a StreamTranferTask, each column family has it's own transfer task) and what + * (stored in a StreamTransferTask, each column family has it's own transfer task) and what * the follower needs to stream back (StreamReceiveTask, same as above). If the initiator has * nothing to receive from the follower, it goes directly to its Streaming phase. Otherwise, * it waits for the follower PrepareMessage. @@ -117,7 +117,15 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe // is directly handled by the ConnectionHandler incoming and outgoing threads. private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher", FBUtilities.getAvailableProcessors()); + + /** + * Streaming endpoint. + * + * Each {@code StreamSession} is identified by this InetAddress which is broadcast address of the node streaming. + */ public final InetAddress peer; + /** Actual connecting address. Can be the same as {@linkplain #peer}. */ + public final InetAddress connecting; // should not be null when session is started private StreamResultFuture streamResult; @@ -155,14 +163,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe * Create new streaming session with the peer. * * @param peer Address of streaming peer + * @param connecting Actual connecting address * @param factory is used for establishing connection */ - public StreamSession(InetAddress peer, StreamConnectionFactory factory) + public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory) { this.peer = peer; + this.connecting = connecting; this.factory = factory; this.handler = new ConnectionHandler(this); - this.metrics = StreamingMetrics.get(peer); + this.metrics = StreamingMetrics.get(connecting); } public UUID planId() @@ -205,6 +215,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe { try { + logger.info("[Stream #{}] Starting streaming to {}{}", planId(), + peer, + peer.equals(connecting) ? "" : " through " + connecting); handler.initiate(); onInitializationComplete(); } @@ -219,7 +232,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe public Socket createConnection() throws IOException { assert factory != null; - return factory.createConnection(peer); + return factory.createConnection(connecting); } /** @@ -591,7 +604,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe List<StreamSummary> transferSummaries = Lists.newArrayList(); for (StreamTask transfer : transfers.values()) transferSummaries.add(transfer.getSummary()); - return new SessionInfo(peer, receivingSummaries, transferSummaries, state); + return new SessionInfo(peer, connecting, receivingSummaries, transferSummaries, state); } public synchronized void taskCompleted(StreamReceiveTask completedTask) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java index 658facf..809bc0d 100644 --- a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java +++ b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java @@ -36,6 +36,7 @@ public class SessionInfoCompositeData { private static final String[] ITEM_NAMES = new String[]{"planId", "peer", + "connecting", "receivingSummaries", "sendingSummaries", "state", @@ -43,6 +44,7 @@ public class SessionInfoCompositeData "sendingFiles"}; private static final String[] ITEM_DESCS = new String[]{"Plan ID", "Session peer", + "Connecting address", "Summaries of receiving data", "Summaries of sending data", "Current session state", @@ -56,6 +58,7 @@ public class SessionInfoCompositeData { ITEM_TYPES = new OpenType[]{SimpleType.STRING, SimpleType.STRING, + SimpleType.STRING, ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE), ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE), SimpleType.STRING, @@ -78,6 +81,7 @@ public class SessionInfoCompositeData Map<String, Object> valueMap = new HashMap<>(); valueMap.put(ITEM_NAMES[0], planId.toString()); valueMap.put(ITEM_NAMES[1], sessionInfo.peer.getHostAddress()); + valueMap.put(ITEM_NAMES[2], sessionInfo.connecting.getHostAddress()); Function<StreamSummary, CompositeData> fromStreamSummary = new Function<StreamSummary, CompositeData>() { public CompositeData apply(StreamSummary input) @@ -85,9 +89,9 @@ public class SessionInfoCompositeData return StreamSummaryCompositeData.toCompositeData(input); } }; - valueMap.put(ITEM_NAMES[2], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary)); - valueMap.put(ITEM_NAMES[3], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary)); - valueMap.put(ITEM_NAMES[4], sessionInfo.state.name()); + valueMap.put(ITEM_NAMES[3], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary)); + valueMap.put(ITEM_NAMES[4], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary)); + valueMap.put(ITEM_NAMES[5], sessionInfo.state.name()); Function<ProgressInfo, CompositeData> fromProgressInfo = new Function<ProgressInfo, CompositeData>() { public CompositeData apply(ProgressInfo input) @@ -95,8 +99,8 @@ public class SessionInfoCompositeData return ProgressInfoCompositeData.toCompositeData(planId, input); } }; - valueMap.put(ITEM_NAMES[5], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo)); - valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo)); + valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo)); + valueMap.put(ITEM_NAMES[7], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo)); try { return new CompositeDataSupport(COMPOSITE_TYPE, valueMap); @@ -112,10 +116,11 @@ public class SessionInfoCompositeData assert cd.getCompositeType().equals(COMPOSITE_TYPE); Object[] values = cd.getAll(ITEM_NAMES); - InetAddress peer; + InetAddress peer, connecting; try { peer = InetAddress.getByName((String) values[1]); + connecting = InetAddress.getByName((String) values[2]); } catch (UnknownHostException e) { @@ -129,9 +134,10 @@ public class SessionInfoCompositeData } }; SessionInfo info = new SessionInfo(peer, - fromArrayOfCompositeData((CompositeData[]) values[2], toStreamSummary), + connecting, fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary), - StreamSession.State.valueOf((String) values[4])); + fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary), + StreamSession.State.valueOf((String) values[5])); Function<CompositeData, ProgressInfo> toProgressInfo = new Function<CompositeData, ProgressInfo>() { public ProgressInfo apply(CompositeData input) @@ -139,11 +145,11 @@ public class SessionInfoCompositeData return ProgressInfoCompositeData.fromCompositeData(input); } }; - for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[5], toProgressInfo)) + for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo)) { info.updateProgress(progress); } - for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo)) + for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[7], toProgressInfo)) { info.updateProgress(progress); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index 27b50a7..d9f3607 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -706,7 +706,13 @@ public class NodeCmd outs.printf("%s %s%n", status.description, status.planId.toString()); for (SessionInfo info : status.sessions) { - outs.printf(" %s%n", info.peer.toString()); + outs.printf(" %s", info.peer.toString()); + // print private IP when it is used + if (!info.peer.equals(info.connecting)) + { + outs.printf(" (using %s)", info.connecting.toString()); + } + outs.printf("%n"); if (!info.receivingSummaries.isEmpty()) { outs.printf(" Receiving %d files, %d bytes total%n", info.getTotalFilesToReceive(), info.getTotalSizeToReceive()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java index 60fbf40..c8b6254 100644 --- a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java +++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java @@ -46,7 +46,7 @@ public class SessionInfoTest } StreamSummary sending = new StreamSummary(cfId, 10, 100); - SessionInfo info = new SessionInfo(local, summaries, Collections.singleton(sending), StreamSession.State.PREPARING); + SessionInfo info = new SessionInfo(local, local, summaries, Collections.singleton(sending), StreamSession.State.PREPARING); assert info.getTotalFilesToReceive() == 45; assert info.getTotalFilesToSend() == 10; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index ce0f9d0..b51f75b 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.streaming; +import java.net.InetAddress; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; @@ -43,7 +44,8 @@ public class StreamTransferTaskTest extends SchemaLoader String ks = "Keyspace1"; String cf = "Standard1"; - StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), null); + InetAddress peer = FBUtilities.getBroadcastAddress(); + StreamSession session = new StreamSession(peer, peer, null); ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf); // create two sstables http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 4cd578d..d2047fc 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -107,7 +107,7 @@ public class StreamingTransferTest extends SchemaLoader ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken())); StreamResultFuture futureResult = new StreamPlan("StreamingTransferTest") - .requestRanges(LOCAL, "Keyspace2", ranges) + .requestRanges(LOCAL, LOCAL, "Keyspace2", ranges) .execute(); UUID planId = futureResult.planId;