Repository: cassandra Updated Branches: refs/heads/trunk e0524c099 -> 63945228f
Use the correct IP/Port for Streaming when localAddress is left unbound patch by Dinesh Joshi; reviewed by jasobrown for CASSANDRA-14389 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/63945228 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/63945228 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/63945228 Branch: refs/heads/trunk Commit: 63945228fc0fabea2cfcf1f1b4d0a29ed3964107 Parents: e0524c0 Author: Dinesh A. Joshi <dinesh.jo...@apple.com> Authored: Thu Apr 19 17:23:19 2018 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Sun Apr 22 16:35:47 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/dht/RangeStreamer.java | 4 +- .../apache/cassandra/net/MessagingService.java | 10 +++ .../cassandra/net/async/NettyFactory.java | 1 - .../repair/AsymmetricLocalSyncTask.java | 4 +- .../apache/cassandra/repair/LocalSyncTask.java | 9 ++- .../cassandra/repair/StreamingRepairTask.java | 12 ++-- .../cassandra/service/StorageService.java | 12 ++-- .../cassandra/streaming/StreamCoordinator.java | 21 +++--- .../apache/cassandra/streaming/StreamPlan.java | 39 ++---------- .../cassandra/streaming/StreamResultFuture.java | 14 ++-- .../cassandra/streaming/StreamSession.java | 42 ++++++++---- .../streaming/StreamingMessageSender.java | 3 + .../async/NettyStreamingMessageSender.java | 12 +++- .../streaming/CassandraStreamManagerTest.java | 1 - .../cassandra/dht/StreamStateStoreTest.java | 4 +- .../cassandra/net/MessagingServiceTest.java | 51 ++++++++++++++- .../cassandra/repair/LocalSyncTaskTest.java | 6 +- .../repair/StreamingRepairTaskTest.java | 6 +- .../cassandra/streaming/StreamSessionTest.java | 67 ++++++++++++++++++++ .../streaming/StreamTransferTaskTest.java | 4 +- .../streaming/StreamingTransferTest.java | 2 +- .../async/NettyStreamingMessageSenderTest.java | 2 +- .../async/StreamingInboundHandlerTest.java | 2 +- 24 files changed, 218 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0e15a8d..4cdd8ba 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Use the correct IP/Port for Streaming when localAddress is left unbound (CASSANDAR-14389) * nodetool listsnapshots is missing local system keyspace snapshots (CASSANDRA-14381) * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402) * Rename nodetool --with-port to --print-port to disambiguate from --port (CASSANDRA-14392) http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index dfabac2..110fed6 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -34,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.IFailureDetector; @@ -428,7 +427,6 @@ public class RangeStreamer { String keyspace = entry.getKey(); InetAddressAndPort source = entry.getValue().getKey(); - InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(source); Collection<Range<Token>> ranges = entry.getValue().getValue(); // filter out already streamed ranges @@ -441,7 +439,7 @@ public class RangeStreamer if (logger.isTraceEnabled()) logger.trace("{}ing from {} ranges {}", description, source, StringUtils.join(ranges, ", ")); /* Send messages to respective folks to stream data over to me */ - streamPlan.requestRanges(source, preferred, keyspace, ranges); + streamPlan.requestRanges(source, keyspace, ranges); } return streamPlan.execute(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index c6ef986..a590723 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -1594,6 +1594,16 @@ public final class MessagingService implements MessagingServiceMBean bounds.left.getPartitioner().getClass().getName())); } + /** + * This method is used to determine the preferred IP & Port of a peer using the + * {@link OutboundMessagingPool} and SystemKeyspace. + */ + public InetAddressAndPort getPreferredRemoteAddr(InetAddressAndPort to) + { + OutboundMessagingPool pool = channelManagers.get(to); + return pool != null ? pool.getPreferredRemoteAddr() : SystemKeyspace.getPreferredIP(to); + } + private OutboundMessagingPool getMessagingConnection(InetAddressAndPort to) { OutboundMessagingPool pool = channelManagers.get(to); http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/net/async/NettyFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java index 86ed4e7..5bbac45 100644 --- a/src/java/org/apache/cassandra/net/async/NettyFactory.java +++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java @@ -334,7 +334,6 @@ public final class NettyFactory .option(ChannelOption.TCP_NODELAY, params.tcpNoDelay) .option(ChannelOption.WRITE_BUFFER_WATER_MARK, params.waterMark) .handler(new OutboundInitializer(params)); - bootstrap.localAddress(params.connectionId.local().address, 0); InetAddressAndPort remoteAddress = params.connectionId.connectionAddress(); bootstrap.remoteAddress(new InetSocketAddress(remoteAddress.address, remoteAddress.port)); return bootstrap; http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java index 8d58673..2ca524f 100644 --- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java @@ -21,7 +21,6 @@ package org.apache.cassandra.repair; import java.util.List; import java.util.UUID; -import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.InetAddressAndPort; @@ -49,7 +48,6 @@ public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements Strea public void startSync(List<Range<Token>> rangesToFetch) { - InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(fetchFrom); StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, @@ -57,7 +55,7 @@ public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements Strea .listeners(this) .flushBeforeTransfer(pendingRepair == null) // request ranges from the remote node - .requestRanges(fetchFrom, preferred, desc.keyspace, rangesToFetch, desc.columnFamily); + .requestRanges(fetchFrom, desc.keyspace, rangesToFetch, desc.columnFamily); plan.execute(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index 3901c75..d7e0387 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -59,16 +59,16 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler } @VisibleForTesting - StreamPlan createStreamPlan(InetAddressAndPort dst, InetAddressAndPort preferred, List<Range<Token>> differences) + StreamPlan createStreamPlan(InetAddressAndPort dst, List<Range<Token>> differences) { StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind) .listeners(this) .flushBeforeTransfer(pendingRepair == null) - .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily); // request ranges from the remote node + .requestRanges(dst, desc.keyspace, differences, desc.columnFamily); // request ranges from the remote node if (!pullRepair) { // send ranges to the remote node if we are not performing a pull repair - plan.transferRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily); + plan.transferRanges(dst, desc.keyspace, differences, desc.columnFamily); } return plan; @@ -84,13 +84,12 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint; - InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(dst); String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst); logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); - createStreamPlan(dst, preferred, differences).execute(); + createStreamPlan(dst, differences).execute(); } public void handleStreamEvent(StreamEvent event) http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index 725e84d..5d2b2ec 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -25,11 +25,9 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.SyncComplete; import org.apache.cassandra.streaming.PreviewKind; @@ -70,21 +68,19 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler public void run() { - InetAddressAndPort dest = dst; - InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(dest); logger.info("[streaming task #{}] Performing streaming repair of {} ranges with {}", desc.sessionId, ranges.size(), dst); - createStreamPlan(dest, preferred).execute(); + createStreamPlan(dst).execute(); } @VisibleForTesting - StreamPlan createStreamPlan(InetAddressAndPort dest, InetAddressAndPort preferred) + StreamPlan createStreamPlan(InetAddressAndPort dest) { StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind) .listeners(this) .flushBeforeTransfer(pendingRepair == null) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary - .requestRanges(dest, preferred, desc.keyspace, ranges, desc.columnFamily); // request ranges from the remote node + .requestRanges(dest, desc.keyspace, ranges, desc.columnFamily); // request ranges from the remote node if (!asymmetric) - sp.transferRanges(dest, preferred, desc.keyspace, ranges, desc.columnFamily); // send ranges to the remote node + sp.transferRanges(dest, desc.keyspace, ranges, desc.columnFamily); // send ranges to the remote node return sp; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index de5d62b..5ea8c75 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2805,11 +2805,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName)) { InetAddressAndPort source = entry.getKey(); - InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(source); Collection<Range<Token>> ranges = entry.getValue(); if (logger.isDebugEnabled()) logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", ")); - stream.requestRanges(source, preferred, keyspaceName, ranges); + stream.requestRanges(source, keyspaceName, ranges); } } StreamResultFuture future = stream.execute(); @@ -4327,8 +4326,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE for (InetAddressAndPort address : endpointRanges.keySet()) { logger.debug("Will stream range {} of keyspace {} to endpoint {}", endpointRanges.get(address), keyspace, address); - InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(address); - streamPlan.transferRanges(address, preferred, keyspace, endpointRanges.get(address)); + streamPlan.transferRanges(address, keyspace, endpointRanges.get(address)); } // stream requests @@ -4336,8 +4334,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE for (InetAddressAndPort address : workMap.keySet()) { logger.debug("Will request range {} of keyspace {} from endpoint {}", workMap.get(address), keyspace, address); - InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(address); - streamPlan.requestRanges(address, preferred, keyspace, workMap.get(address)); + streamPlan.requestRanges(address, keyspace, workMap.get(address)); } logger.debug("Keyspace {}: work map {}.", keyspace, workMap); @@ -5107,10 +5104,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { List<Range<Token>> ranges = rangesEntry.getValue(); InetAddressAndPort newEndpoint = rangesEntry.getKey(); - InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(newEndpoint); // TODO each call to transferRanges re-flushes, this is potentially a lot of waste - streamPlan.transferRanges(newEndpoint, preferred, keyspaceName, ranges); + streamPlan.transferRanges(newEndpoint, keyspaceName, ranges); } } return streamPlan.execute(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/streaming/StreamCoordinator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java index 6b92dfe..6ea8e00 100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@ -147,14 +147,14 @@ public class StreamCoordinator return new HashSet<>(peerSessions.keySet()); } - public synchronized StreamSession getOrCreateNextSession(InetAddressAndPort peer, InetAddressAndPort connecting) + public synchronized StreamSession getOrCreateNextSession(InetAddressAndPort peer) { - return getOrCreateHostData(peer).getOrCreateNextSession(peer, connecting); + return getOrCreateHostData(peer).getOrCreateNextSession(peer); } - public synchronized StreamSession getOrCreateSessionById(InetAddressAndPort peer, int id, InetAddressAndPort connecting) + public synchronized StreamSession getOrCreateSessionById(InetAddressAndPort peer, int id) { - return getOrCreateHostData(peer).getOrCreateSessionById(peer, id, connecting); + return getOrCreateHostData(peer).getOrCreateSessionById(peer, id); } public StreamSession getSessionById(InetAddressAndPort peer, int id) @@ -193,13 +193,13 @@ public class StreamCoordinator for (Collection<OutgoingStream> bucket : buckets) { - StreamSession session = sessionList.getOrCreateNextSession(to, to); + StreamSession session = sessionList.getOrCreateNextSession(to); session.addTransferStreams(bucket); } } else { - StreamSession session = sessionList.getOrCreateNextSession(to, to); + StreamSession session = sessionList.getOrCreateNextSession(to); session.addTransferStreams(streams); } } @@ -230,6 +230,7 @@ public class StreamCoordinator private HostStreamingData getHostData(InetAddressAndPort peer) { HostStreamingData data = peerSessions.get(peer); + if (data == null) throw new IllegalArgumentException("Unknown peer requested: " + peer); return data; @@ -275,12 +276,12 @@ public class StreamCoordinator return false; } - public StreamSession getOrCreateNextSession(InetAddressAndPort peer, InetAddressAndPort connecting) + public StreamSession getOrCreateNextSession(InetAddressAndPort peer) { // create if (streamSessions.size() < connectionsPerHost) { - StreamSession session = new StreamSession(streamOperation, peer, connecting, factory, streamSessions.size(), pendingRepair, previewKind); + StreamSession session = new StreamSession(streamOperation, peer, factory, streamSessions.size(), pendingRepair, previewKind); streamSessions.put(++lastReturned, session); return session; } @@ -307,12 +308,12 @@ public class StreamCoordinator return Collections.unmodifiableCollection(streamSessions.values()); } - public StreamSession getOrCreateSessionById(InetAddressAndPort peer, int id, InetAddressAndPort connecting) + public StreamSession getOrCreateSessionById(InetAddressAndPort peer, int id) { StreamSession session = streamSessions.get(id); if (session == null) { - session = new StreamSession(streamOperation, peer, connecting, factory, id, pendingRepair, previewKind); + session = new StreamSession(streamOperation, peer, factory, id, pendingRepair, previewKind); streamSessions.put(id, session); } return session; http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index 98d68ce..b56f165 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -68,29 +68,27 @@ public class StreamPlan * Request data in {@code keyspace} and {@code ranges} from specific node. * * @param from endpoint address to fetch data from. - * @param connecting Actual connecting address for the endpoint * @param keyspace name of keyspace * @param ranges ranges to fetch * @return this object for chaining */ - public StreamPlan requestRanges(InetAddressAndPort from, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges) + public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, Collection<Range<Token>> ranges) { - return requestRanges(from, connecting, keyspace, ranges, EMPTY_COLUMN_FAMILIES); + return requestRanges(from, keyspace, ranges, EMPTY_COLUMN_FAMILIES); } /** * Request data in {@code columnFamilies} under {@code keyspace} and {@code ranges} from specific node. * * @param from endpoint address to fetch data from. - * @param connecting Actual connecting address for the endpoint * @param keyspace name of keyspace * @param ranges ranges to fetch * @param columnFamilies specific column families * @return this object for chaining */ - public StreamPlan requestRanges(InetAddressAndPort from, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) + public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) { - StreamSession session = coordinator.getOrCreateNextSession(from, connecting); + StreamSession session = coordinator.getOrCreateNextSession(from); session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies)); return this; } @@ -98,40 +96,15 @@ public class StreamPlan /** * Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}. * - * @see #transferRanges(InetAddressAndPort, InetAddressAndPort, String, java.util.Collection, String...) - */ - public StreamPlan transferRanges(InetAddressAndPort to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) - { - return transferRanges(to, to, keyspace, ranges, columnFamilies); - } - - /** - * Add transfer task to send data of specific keyspace and ranges. - * * @param to endpoint address of receiver - * @param connecting Actual connecting address of the endpoint - * @param keyspace name of keyspace - * @param ranges ranges to send - * @return this object for chaining - */ - public StreamPlan transferRanges(InetAddressAndPort to, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges) - { - return transferRanges(to, connecting, keyspace, ranges, EMPTY_COLUMN_FAMILIES); - } - - /** - * Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}. - * - * @param to endpoint address of receiver - * @param connecting Actual connecting address of the endpoint * @param keyspace name of keyspace * @param ranges ranges to send * @param columnFamilies specific column families * @return this object for chaining */ - public StreamPlan transferRanges(InetAddressAndPort to, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) + public StreamPlan transferRanges(InetAddressAndPort to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) { - StreamSession session = coordinator.getOrCreateNextSession(to, connecting); + StreamSession session = coordinator.getOrCreateNextSession(to); session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer); return this; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 3b11fb6..ef8976d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -112,14 +112,16 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> StreamResultFuture future = StreamManager.instance.getReceivingStream(planId); if (future == null) { - logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, streamOperation.getDescription()); + logger.info("[Stream #{} ID#{}] Creating new streaming plan for {} from {} channel.remote {} channel.local {} channel.id {}", + planId, sessionIndex, streamOperation.getDescription(), from, channel.remoteAddress(), channel.localAddress(), channel.id()); // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. future = new StreamResultFuture(planId, streamOperation, pendingRepair, previewKind); StreamManager.instance.registerReceiving(future); } future.attachConnection(from, sessionIndex, channel); - logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, streamOperation.getDescription()); + logger.info("[Stream #{}, ID#{}] Received streaming plan for {} from {} channel.remote {} channel.local {} channel.id {}", + planId, sessionIndex, streamOperation.getDescription(), from, channel.remoteAddress(), channel.localAddress(), channel.id()); return future; } @@ -137,13 +139,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> private void attachConnection(InetAddressAndPort from, int sessionIndex, Channel channel) { - SocketAddress addr = channel.remoteAddress(); - //In the case of unit tests, if you use the EmbeddedChannel, channel.remoteAddress() - //does not return an InetSocketAddress, but an EmbeddedSocketAddress. Hence why we need the type check here - InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from.address); - //Need to turn connecting into a InetAddressAndPort with the correct port. I think getting the port from "from" - //Will work since we don't actually have ports diverge across network interfaces - StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, InetAddressAndPort.getByAddressOverrideDefaults(connecting, from.port)); + StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex); session.init(this); session.attach(channel); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 42d1d97..c56616e 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -21,6 +21,7 @@ import java.net.SocketTimeoutException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.*; @@ -124,6 +125,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber private static final Logger logger = LoggerFactory.getLogger(StreamSession.class); private final StreamOperation streamOperation; + /** * Streaming endpoint. * @@ -131,10 +133,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber */ public final InetAddressAndPort peer; - private final int index; + /** + * Preferred IP Address/Port of the peer; this is the address that will be connect to. Can be the same as {@linkplain #peer}. + */ + private final InetAddressAndPort preferredPeerInetAddressAndPort; - /** Actual connecting address. Can be the same as {@linkplain #peer}. */ - public final InetAddressAndPort connecting; + private final int index; // should not be null when session is started private StreamResultFuture streamResult; @@ -172,23 +176,33 @@ public class StreamSession implements IEndpointStateChangeSubscriber /** * Create new streaming session with the peer. - * @param streamOperation - * @param peer Address of streaming peer - * @param connecting Actual connecting address */ - public StreamSession(StreamOperation streamOperation, InetAddressAndPort peer, InetAddressAndPort connecting, StreamConnectionFactory factory, int index, UUID pendingRepair, PreviewKind previewKind) + public StreamSession(StreamOperation streamOperation, InetAddressAndPort peer, StreamConnectionFactory factory, + int index, UUID pendingRepair, PreviewKind previewKind) + { + this(streamOperation, peer, factory, index, pendingRepair, previewKind, MessagingService.instance()::getPreferredRemoteAddr); + } + + @VisibleForTesting + public StreamSession(StreamOperation streamOperation, InetAddressAndPort peer, StreamConnectionFactory factory, + int index, UUID pendingRepair, PreviewKind previewKind, + Function<InetAddressAndPort, InetAddressAndPort> preferredIpMapper) { this.streamOperation = streamOperation; this.peer = peer; - this.connecting = connecting; this.index = index; + InetAddressAndPort preferredPeerEndpoint = preferredIpMapper.apply(peer); + this.preferredPeerInetAddressAndPort = (preferredPeerEndpoint == null) ? peer : preferredPeerEndpoint; OutboundConnectionIdentifier id = OutboundConnectionIdentifier.stream(InetAddressAndPort.getByAddressOverrideDefaults(FBUtilities.getJustLocalAddress(), 0), - InetAddressAndPort.getByAddressOverrideDefaults(connecting.address, MessagingService.instance().portFor(connecting))); + preferredPeerInetAddressAndPort); + this.messageSender = new NettyStreamingMessageSender(this, id, factory, StreamMessage.CURRENT_VERSION, previewKind.isPreview()); - this.metrics = StreamingMetrics.get(connecting); + this.metrics = StreamingMetrics.get(preferredPeerInetAddressAndPort); this.pendingRepair = pendingRepair; this.previewKind = previewKind; + + logger.debug("Creating stream session peer={} preferredPeerInetAddressAndPort={}", peer, preferredPeerInetAddressAndPort); } public UUID planId() @@ -267,7 +281,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber { logger.info("[Stream #{}] Starting streaming to {}{}", planId(), peer, - peer.equals(connecting) ? "" : " through " + connecting); + peer.equals(preferredPeerInetAddressAndPort) ? "" : " through " + preferredPeerInetAddressAndPort); messageSender.initialize(); onInitializationComplete(); } @@ -515,7 +529,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber logger.error("[Stream #{}] Did not receive response from peer {}{} for {} secs. Is peer down? " + "If not, maybe try increasing streaming_keep_alive_period_in_secs.", planId(), peer.getHostAddress(true), - peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(true), + peer.equals(preferredPeerInetAddressAndPort) ? "" : " through " + preferredPeerInetAddressAndPort.getHostAddress(true), 2 * DatabaseDescriptor.getStreamingKeepAlivePeriod(), e); } @@ -523,7 +537,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber { logger.error("[Stream #{}] Streaming error occurred on session with peer {}{}", planId(), peer.getHostAddress(true), - peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(true), + peer.equals(preferredPeerInetAddressAndPort) ? "" : " through " + preferredPeerInetAddressAndPort.getHostAddress(true), e); } } @@ -677,7 +691,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber List<StreamSummary> transferSummaries = Lists.newArrayList(); for (StreamTask transfer : transfers.values()) transferSummaries.add(transfer.getSummary()); - return new SessionInfo(peer, index, connecting, receivingSummaries, transferSummaries, state); + return new SessionInfo(peer, index, preferredPeerInetAddressAndPort, receivingSummaries, transferSummaries, state); } public synchronized void taskCompleted(StreamReceiveTask completedTask) http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java index 9562981..accf554 100644 --- a/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java +++ b/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java @@ -20,6 +20,7 @@ package org.apache.cassandra.streaming; import java.io.IOException; +import org.apache.cassandra.net.async.OutboundConnectionIdentifier; import org.apache.cassandra.streaming.messages.StreamMessage; public interface StreamingMessageSender @@ -28,6 +29,8 @@ public interface StreamingMessageSender void sendMessage(StreamMessage message) throws IOException; + OutboundConnectionIdentifier getConnectionId(); + boolean connected(); void close(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java index bbc451d..1bcb013 100644 --- a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java +++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java @@ -135,7 +135,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender } @Override - public void initialize() throws IOException + public void initialize() { StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddressAndPort(), session.sessionIndex(), @@ -184,6 +184,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(NettyFactory.instance.streamingGroup, NettyFactory.INBOUND_STREAM_HANDLER_NAME, new StreamingInboundHandler(connectionId.remote(), protocolVersion, session)); channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE); + logger.debug("Creating channel id {} local {} remote {}", channel.id(), channel.localAddress(), channel.remoteAddress()); return channel; } @@ -495,7 +496,8 @@ public class NettyStreamingMessageSender implements StreamingMessageSender { closed = true; logger.debug("{} Closing stream connection channels on {}", createLogTag(session, null), connectionId); - channelKeepAlives.stream().map(scheduledFuture -> scheduledFuture.cancel(false)); + for (ScheduledFuture<?> future : channelKeepAlives) + future.cancel(false); channelKeepAlives.clear(); List<Future<Void>> futures = new ArrayList<>(threadToChannelMap.size()); @@ -508,4 +510,10 @@ public class NettyStreamingMessageSender implements StreamingMessageSender if (controlMessageChannel != null) controlMessageChannel.close(); } + + @Override + public OutboundConnectionIdentifier getConnectionId() + { + return connectionId; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java index 8497e71..80e4bfb 100644 --- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java @@ -96,7 +96,6 @@ public class CassandraStreamManagerTest { return new StreamSession(StreamOperation.REPAIR, InetAddressAndPort.getByName("127.0.0.1"), - InetAddressAndPort.getByName("127.0.0.2"), connectionFactory, 0, pendingRepair, http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java index 8f0a407..bf71c09 100644 --- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java +++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java @@ -52,7 +52,7 @@ public class StreamStateStoreTest Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100")); InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); - StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, local, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE); + StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE); session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf")); StreamStateStore store = new StreamStateStore(); @@ -73,7 +73,7 @@ public class StreamStateStoreTest // add different range within the same keyspace Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200")); - session = new StreamSession(StreamOperation.BOOTSTRAP, local, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE); + session = new StreamSession(StreamOperation.BOOTSTRAP, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE); session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf")); session.state(StreamSession.State.COMPLETE); store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/net/MessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index c3ebe32..b56cd62 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -309,7 +309,7 @@ public class MessagingServiceTest private static void addDCLatency(long sentAt, long nowTime) throws IOException { - MessageIn.deriveConstructionTime(InetAddressAndPort.getLocalHost(), (int)sentAt, nowTime); + MessageIn.deriveConstructionTime(InetAddressAndPort.getLocalHost(), (int) sentAt, nowTime); } public static class MockBackPressureStrategy implements BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState> @@ -422,6 +422,7 @@ public class MessagingServiceTest /** * Make sure that if internode authenticatino fails for an outbound connection that all the code that relies * on getting the connection pool handles the null return + * * @throws Exception */ @Test @@ -660,4 +661,52 @@ public class MessagingServiceTest Assert.assertEquals(0, messagingService.serverChannels.size()); } } + + + @Test + public void getPreferredRemoteAddrUsesPrivateIp() throws UnknownHostException + { + MessagingService ms = MessagingService.instance(); + InetAddressAndPort local = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.4", 7000); + InetAddressAndPort remote = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.151", 7000); + InetAddressAndPort privateIp = InetAddressAndPort.getByName("127.0.0.6"); + + OutboundMessagingPool pool = new OutboundMessagingPool(privateIp, local, null, + new MockBackPressureStrategy(null).newState(remote), + ALLOW_NOTHING_AUTHENTICATOR); + ms.channelManagers.put(remote, pool); + + Assert.assertEquals(privateIp, ms.getPreferredRemoteAddr(remote)); + } + + @Test + public void getPreferredRemoteAddrUsesPreferredIp() throws UnknownHostException + { + MessagingService ms = MessagingService.instance(); + InetAddressAndPort remote = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.115", 7000); + + InetAddressAndPort preferredIp = InetAddressAndPort.getByName("127.0.0.16"); + SystemKeyspace.updatePreferredIP(remote, preferredIp); + + Assert.assertEquals(preferredIp, ms.getPreferredRemoteAddr(remote)); + } + + @Test + public void getPreferredRemoteAddrUsesPrivateIpOverridesPreferredIp() throws UnknownHostException + { + MessagingService ms = MessagingService.instance(); + InetAddressAndPort local = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.4", 7000); + InetAddressAndPort remote = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.105", 7000); + InetAddressAndPort privateIp = InetAddressAndPort.getByName("127.0.0.6"); + + OutboundMessagingPool pool = new OutboundMessagingPool(privateIp, local, null, + new MockBackPressureStrategy(null).newState(remote), + ALLOW_NOTHING_AUTHENTICATOR); + ms.channelManagers.put(remote, pool); + + InetAddressAndPort preferredIp = InetAddressAndPort.getByName("127.0.0.16"); + SystemKeyspace.updatePreferredIP(remote, preferredIp); + + Assert.assertEquals(privateIp, ms.getPreferredRemoteAddr(remote)); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index 95046bd..802a673 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -61,7 +61,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest public static ColumnFamilyStore cfs; @BeforeClass - public static void defineSchema() throws Exception + public static void defineSchema() { SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE1, @@ -148,7 +148,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE); - StreamPlan plan = task.createStreamPlan(PARTICIPANT1, PARTICIPANT2, Lists.newArrayList(RANGE1)); + StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1)); assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair()); assertTrue(plan.getFlushBeforeTransfer()); @@ -165,7 +165,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false, PreviewKind.NONE); - StreamPlan plan = task.createStreamPlan(PARTICIPANT1, PARTICIPANT2, Lists.newArrayList(RANGE1)); + StreamPlan plan = task.createStreamPlan(PARTICIPANT1, Lists.newArrayList(RANGE1)); assertEquals(desc.parentSessionId, plan.getPendingRepair()); assertFalse(plan.getFlushBeforeTransfer()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java index f0ff0e0..b845e93 100644 --- a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java @@ -60,7 +60,7 @@ public class StreamingRepairTaskTest extends AbstractRepairTest } @Test - public void incrementalStreamPlan() throws Exception + public void incrementalStreamPlan() { UUID sessionID = registerSession(cfs, true, true); ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); @@ -69,7 +69,7 @@ public class StreamingRepairTaskTest extends AbstractRepairTest SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE); StreamingRepairTask task = new StreamingRepairTask(desc, request.initiator, request.src, request.dst, request.ranges, desc.sessionId, PreviewKind.NONE, false); - StreamPlan plan = task.createStreamPlan(request.src, request.dst); + StreamPlan plan = task.createStreamPlan(request.dst); Assert.assertFalse(plan.getFlushBeforeTransfer()); } @@ -82,7 +82,7 @@ public class StreamingRepairTaskTest extends AbstractRepairTest SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE); StreamingRepairTask task = new StreamingRepairTask(desc, request.initiator, request.src, request.dst, request.ranges, null, PreviewKind.NONE, false); - StreamPlan plan = task.createStreamPlan(request.src, request.dst); + StreamPlan plan = task.createStreamPlan(request.dst); Assert.assertTrue(plan.getFlushBeforeTransfer()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java new file mode 100644 index 0000000..7ea09ea --- /dev/null +++ b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.UUID; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessagingServiceTest; + +import static org.junit.Assert.assertEquals; + +public class StreamSessionTest +{ + @BeforeClass + public static void beforeClass() throws UnknownHostException + { + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setBackPressureStrategy(new MessagingServiceTest.MockBackPressureStrategy(Collections.emptyMap())); + DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.3")); + } + + @Test + public void testStreamSessionUsesCorrectRemoteIp_Succeeds() throws UnknownHostException + { + InetAddressAndPort localAddr = InetAddressAndPort.getByName("127.0.0.1:7000"); + InetAddressAndPort preferredAddr = InetAddressAndPort.getByName("127.0.0.2:7000"); + StreamSession streamSession = new StreamSession(StreamOperation.BOOTSTRAP, localAddr, + new DefaultConnectionFactory(), 0, UUID.randomUUID(), PreviewKind.ALL, + inetAddressAndPort -> preferredAddr); + + assertEquals(preferredAddr, streamSession.getMessageSender().getConnectionId().connectionAddress()); + } + + @Test + public void testStreamSessionUsesCorrectRemoteIpNullMapper_Succeeds() throws UnknownHostException + { + InetAddressAndPort localAddr = InetAddressAndPort.getByName("127.0.0.1:7000"); + + StreamSession streamSession = new StreamSession(StreamOperation.BOOTSTRAP, localAddr, + new DefaultConnectionFactory(), 0, UUID.randomUUID(), PreviewKind.ALL, (peer) -> null); + + assertEquals(localAddr, streamSession.getMessageSender().getConnectionId().connectionAddress()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 45c917a..63dcfba 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -76,7 +76,7 @@ public class StreamTransferTaskTest public void testScheduleTimeout() throws Exception { InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); - StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, peer, (connectionId, protocolVersion) -> new EmbeddedChannel(), 0, UUID.randomUUID(), PreviewKind.ALL); + StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, (connectionId, protocolVersion) -> new EmbeddedChannel(), 0, UUID.randomUUID(), PreviewKind.ALL); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); // create two sstables @@ -124,7 +124,7 @@ public class StreamTransferTaskTest InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, null, PreviewKind.NONE); StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), streamCoordinator); - StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, peer, null, 0, null, PreviewKind.NONE); + StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, null, 0, null, PreviewKind.NONE); session.init(future); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 575200a..2143903 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -144,7 +144,7 @@ public class StreamingTransferTest ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken())); StreamResultFuture futureResult = new StreamPlan(StreamOperation.OTHER) - .requestRanges(LOCAL, LOCAL, KEYSPACE2, ranges) + .requestRanges(LOCAL, KEYSPACE2, ranges) .execute(); UUID planId = futureResult.planId; http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java index fd22a65..52f097a 100644 --- a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java +++ b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java @@ -63,7 +63,7 @@ public class NettyStreamingMessageSenderTest channel = new EmbeddedChannel(); channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE); UUID pendingRepair = UUID.randomUUID(); - session = new StreamSession(StreamOperation.BOOTSTRAP, REMOTE_ADDR, REMOTE_ADDR, (connectionId, protocolVersion) -> null, 0, pendingRepair, PreviewKind.ALL); + session = new StreamSession(StreamOperation.BOOTSTRAP, REMOTE_ADDR, (connectionId, protocolVersion) -> null, 0, pendingRepair, PreviewKind.ALL); StreamResultFuture future = StreamResultFuture.initReceivingSide(0, UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, pendingRepair, session.getPreviewKind()); session.init(future); sender = session.getMessageSender(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/63945228/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java index 78a7879..0a13596 100644 --- a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java +++ b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java @@ -132,7 +132,7 @@ public class StreamingInboundHandlerTest private StreamSession createSession(SessionIdentifier sid) { - return new StreamSession(StreamOperation.BOOTSTRAP, sid.from, sid.from, (connectionId, protocolVersion) -> null, sid.sessionIndex, UUID.randomUUID(), PreviewKind.ALL); + return new StreamSession(StreamOperation.BOOTSTRAP, sid.from, (connectionId, protocolVersion) -> null, sid.sessionIndex, UUID.randomUUID(), PreviewKind.ALL); } @Test (expected = IllegalStateException.class) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org