Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: CHANGES.txt src/java/org/apache/cassandra/dht/RangeStreamer.java src/java/org/apache/cassandra/repair/StreamingRepairTask.java src/java/org/apache/cassandra/service/StorageService.java src/java/org/apache/cassandra/streaming/SessionInfo.java src/java/org/apache/cassandra/streaming/StreamPlan.java src/java/org/apache/cassandra/streaming/StreamResultFuture.java src/java/org/apache/cassandra/streaming/StreamSession.java src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java src/java/org/apache/cassandra/tools/NodeCmd.java test/unit/org/apache/cassandra/streaming/SessionInfoTest.java test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6cca24f4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6cca24f4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6cca24f4 Branch: refs/heads/trunk Commit: 6cca24f442a17d2abedfffd4a5322cb61caeef74 Parents: 42f8590 c6867c2 Author: Yuki Morishita <yu...@apache.org> Authored: Mon Oct 20 10:21:55 2014 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Mon Oct 20 10:21:55 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/SystemKeyspace.java | 8 ++++- .../org/apache/cassandra/dht/RangeStreamer.java | 4 ++- .../net/OutboundTcpConnectionPool.java | 12 ++++---- .../cassandra/repair/StreamingRepairTask.java | 10 +++++-- .../cassandra/service/StorageService.java | 17 +++++++---- .../apache/cassandra/streaming/SessionInfo.java | 3 ++ .../cassandra/streaming/StreamCoordinator.java | 20 ++++++------- .../apache/cassandra/streaming/StreamPlan.java | 31 ++++++++++++++------ .../cassandra/streaming/StreamResultFuture.java | 2 +- .../cassandra/streaming/StreamSession.java | 24 +++++++++++---- .../management/SessionInfoCompositeData.java | 30 +++++++++++-------- .../org/apache/cassandra/tools/NodeTool.java | 8 ++++- .../cassandra/streaming/SessionInfoTest.java | 2 +- .../streaming/StreamTransferTaskTest.java | 4 ++- .../streaming/StreamingTransferTest.java | 2 +- 16 files changed, 119 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 7cd5154,4ed7bed..4e5cd24 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,89 -1,4 +1,90 @@@ -2.0.11: +2.1.1 + * Fix IllegalArgumentException when a list of IN values containing tuples + is passed as a single arg to a prepared statement with the v1 or v2 + protocol (CASSANDRA-8062) + * Fix ClassCastException in DISTINCT query on static columns with + query paging (CASSANDRA-8108) + * Fix NPE on null nested UDT inside a set (CASSANDRA-8105) + * Fix exception when querying secondary index on set items or map keys + when some clustering columns are specified (CASSANDRA-8073) + * Send proper error response when there is an error during native + protocol message decode (CASSANDRA-8118) + * Gossip should ignore generation numbers too far in the future (CASSANDRA-8113) + * Fix NPE when creating a table with frozen sets, lists (CASSANDRA-8104) + * Fix high memory use due to tracking reads on incrementally opened sstable + readers (CASSANDRA-8066) + * Fix EXECUTE request with skipMetadata=false returning no metadata + (CASSANDRA-8054) + * Allow concurrent use of CQLBulkOutputFormat (CASSANDRA-7776) + * Shutdown JVM on OOM (CASSANDRA-7507) + * Upgrade netty version and enable epoll event loop (CASSANDRA-7761) + * Don't duplicate sstables smaller than split size when using + the sstablesplitter tool (CASSANDRA-7616) + * Avoid re-parsing already prepared statements (CASSANDRA-7923) + * Fix some Thrift slice deletions and updates of COMPACT STORAGE + tables with some clustering columns omitted (CASSANDRA-7990) + * Fix filtering for CONTAINS on sets (CASSANDRA-8033) + * Properly track added size (CASSANDRA-7239) + * Allow compilation in java 8 (CASSANDRA-7208) + * Fix Assertion error on RangeTombstoneList diff (CASSANDRA-8013) + * Release references to overlapping sstables during compaction (CASSANDRA-7819) + * Send notification when opening compaction results early (CASSANDRA-8034) + * Make native server start block until properly bound (CASSANDRA-7885) + * (cqlsh) Fix IPv6 support (CASSANDRA-7988) + * Ignore fat clients when checking for endpoint collision (CASSANDRA-7939) + * Make sstablerepairedset take a list of files (CASSANDRA-7995) + * (cqlsh) Tab completeion for indexes on map keys (CASSANDRA-7972) + * (cqlsh) Fix UDT field selection in select clause (CASSANDRA-7891) + * Fix resource leak in event of corrupt sstable + * (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131) + * Provide visibility into prepared statements churn (CASSANDRA-7921, CASSANDRA-7930) + * Invalidate prepared statements when their keyspace or table is + dropped (CASSANDRA-7566) + * cassandra-stress: fix support for NetworkTopologyStrategy (CASSANDRA-7945) + * Fix saving caches when a table is dropped (CASSANDRA-7784) + * Add better error checking of new stress profile (CASSANDRA-7716) + * Use ThreadLocalRandom and remove FBUtilities.threadLocalRandom (CASSANDRA-7934) + * Prevent operator mistakes due to simultaneous bootstrap (CASSANDRA-7069) + * cassandra-stress supports whitelist mode for node config (CASSANDRA-7658) + * GCInspector more closely tracks GC; cassandra-stress and nodetool report it (CASSANDRA-7916) + * nodetool won't output bogus ownership info without a keyspace (CASSANDRA-7173) + * Add human readable option to nodetool commands (CASSANDRA-5433) + * Don't try to set repairedAt on old sstables (CASSANDRA-7913) + * Add metrics for tracking PreparedStatement use (CASSANDRA-7719) + * (cqlsh) tab-completion for triggers (CASSANDRA-7824) + * (cqlsh) Support for query paging (CASSANDRA-7514) + * (cqlsh) Show progress of COPY operations (CASSANDRA-7789) + * Add syntax to remove multiple elements from a map (CASSANDRA-6599) + * Support non-equals conditions in lightweight transactions (CASSANDRA-6839) + * Add IF [NOT] EXISTS to create/drop triggers (CASSANDRA-7606) + * (cqlsh) Display the current logged-in user (CASSANDRA-7785) + * (cqlsh) Don't ignore CTRL-C during COPY FROM execution (CASSANDRA-7815) + * (cqlsh) Order UDTs according to cross-type dependencies in DESCRIBE + output (CASSANDRA-7659) + * (cqlsh) Fix handling of CAS statement results (CASSANDRA-7671) + * (cqlsh) COPY TO/FROM improvements (CASSANDRA-7405) + * Support list index operations with conditions (CASSANDRA-7499) + * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731) + * Validate IPv6 wildcard addresses properly (CASSANDRA-7680) + * (cqlsh) Error when tracing query (CASSANDRA-7613) + * Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569) + * SSTableExport uses correct validator to create string representation of partition + keys (CASSANDRA-7498) + * Avoid NPEs when receiving type changes for an unknown keyspace (CASSANDRA-7689) + * Add support for custom 2i validation (CASSANDRA-7575) + * Pig support for hadoop CqlInputFormat (CASSANDRA-6454) + * Add listen_interface and rpc_interface options (CASSANDRA-7417) + * Improve schema merge performance (CASSANDRA-7444) + * Adjust MT depth based on # of partition validating (CASSANDRA-5263) + * Optimise NativeCell comparisons (CASSANDRA-6755) + * Configurable client timeout for cqlsh (CASSANDRA-7516) + * Include snippet of CQL query near syntax error in messages (CASSANDRA-7111) + * Make repair -pr work with -local (CASSANDRA-7450) + * Fix error in sstableloader with -cph > 1 (CASSANDRA-8007) + * Fix snapshot repair error on indexed tables (CASSANDRA-8020) + * Do not exit nodetool repair when receiving JMX NOTIF_LOST (CASSANDRA-7909) ++ * Stream to private IP when available (CASSANDRA-8084) +Merged from 2.0: * Reject conditions on DELETE unless full PK is given (CASSANDRA-6430) * Properly reject the token function DELETE (CASSANDRA-7747) * Force batchlog replay before decommissioning a node (CASSANDRA-7446) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java index 3c647b6,30e6d47..8b16564 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@@ -508,13 -497,19 +508,19 @@@ public class SystemKeyspac return hostIdMap; } + /** + * Get preferred IP for given endpoint if it is known. Otherwise this returns given endpoint itself. + * + * @param ep endpoint address to check + * @return Preferred IP for given endpoint if present, otherwise returns given ep + */ public static InetAddress getPreferredIP(InetAddress ep) { - String req = "SELECT preferred_ip FROM system.%s WHERE peer='%s'"; - UntypedResultSet result = processInternal(String.format(req, PEERS_CF, ep.getHostAddress())); + String req = "SELECT preferred_ip FROM system.%s WHERE peer=?"; + UntypedResultSet result = executeInternal(String.format(req, PEERS_CF), ep); if (!result.isEmpty() && result.one().has("preferred_ip")) return result.one().getInetAddress("preferred_ip"); - return null; + return ep; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/dht/RangeStreamer.java index d84a951,4e925d3..11d82d6 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@@ -31,8 -29,8 +31,9 @@@ import org.slf4j.LoggerFactory import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.IEndpointSnitch; @@@ -308,8 -226,8 +310,8 @@@ public class RangeStreame Collection<Range<Token>> ranges = entry.getValue().getValue(); /* Send messages to respective folks to stream data over to me */ if (logger.isDebugEnabled()) - logger.debug("" + description + "ing from " + source + " ranges " + StringUtils.join(ranges, ", ")); + logger.debug("{}ing from {} ranges {}", description, source, StringUtils.join(ranges, ", ")); - streamPlan.requestRanges(source, keyspace, ranges); + streamPlan.requestRanges(source, preferred, keyspace, ranges); } return streamPlan.execute(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/StreamingRepairTask.java index 9af949d,4226184..25ec698 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@@ -57,17 -59,15 +60,18 @@@ public class StreamingRepairTask implem private void initiateStreaming() { + long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE; + InetAddress dest = request.dst; + InetAddress preferred = SystemKeyspace.getPreferredIP(dest); + if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null) + repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt; - logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst)); - StreamResultFuture op = new StreamPlan("Repair") + StreamResultFuture op = new StreamPlan("Repair", repairedAt, 1) .flushBeforeTransfer(true) // request ranges from the remote node - .requestRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily) + .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) // send ranges to the remote node - .transferRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily) + .transferRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) .execute(); op.addEventListener(this); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 4fb0435,4973e40..849bc2c --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -1939,10 -1891,11 +1939,11 @@@ public class StorageService extends Not for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName)) { InetAddress source = entry.getKey(); + InetAddress preferred = SystemKeyspace.getPreferredIP(source); Collection<Range<Token>> ranges = entry.getValue(); if (logger.isDebugEnabled()) - logger.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", ")); + logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", ")); - stream.requestRanges(source, keyspaceName, ranges); + stream.requestRanges(source, preferred, keyspaceName, ranges); } } StreamResultFuture future = stream.execute(); @@@ -3173,9 -3027,10 +3175,10 @@@ // stream all hints -- range list will be a singleton of "the entire ring" Token token = StorageService.getPartitioner().getMinimumToken(); - List<Range<Token>> ranges = Collections.singletonList(new Range<Token>(token, token)); + List<Range<Token>> ranges = Collections.singletonList(new Range<>(token, token)); return new StreamPlan("Hints").transferRanges(hintsDestinationHost, + preferred, Keyspace.SYSTEM_KS, ranges, SystemKeyspace.HINTS_CF) @@@ -3375,20 -3186,19 +3378,21 @@@ // stream ranges for (InetAddress address : endpointRanges.keySet()) { + logger.debug("Will stream range {} of keyspace {} to endpoint {}", endpointRanges.get(address), keyspace, address); - streamPlan.transferRanges(address, keyspace, endpointRanges.get(address)); + InetAddress preferred = SystemKeyspace.getPreferredIP(address); + streamPlan.transferRanges(address, preferred, keyspace, endpointRanges.get(address)); } // stream requests Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints); for (InetAddress address : workMap.keySet()) { + logger.debug("Will request range {} of keyspace {} from endpoint {}", workMap.get(address), keyspace, address); - streamPlan.requestRanges(address, keyspace, workMap.get(address)); + InetAddress preferred = SystemKeyspace.getPreferredIP(address); + streamPlan.requestRanges(address, preferred, keyspace, workMap.get(address)); } - if (logger.isDebugEnabled()) - logger.debug("Keyspace {}: work map {}.", keyspace, workMap); + logger.debug("Keyspace {}: work map {}.", keyspace, workMap); } } } @@@ -3863,16 -3649,17 +3867,17 @@@ StreamPlan streamPlan = new StreamPlan("Unbootstrap"); for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>> entry : sessionsToStreamByKeyspace.entrySet()) { - final String keyspaceName = entry.getKey(); - final Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = entry.getValue(); + String keyspaceName = entry.getKey(); + Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = entry.getValue(); - for (final Map.Entry<InetAddress, List<Range<Token>>> rangesEntry : rangesPerEndpoint.entrySet()) + for (Map.Entry<InetAddress, List<Range<Token>>> rangesEntry : rangesPerEndpoint.entrySet()) { - final List<Range<Token>> ranges = rangesEntry.getValue(); - final InetAddress newEndpoint = rangesEntry.getKey(); - final InetAddress preferred = SystemKeyspace.getPreferredIP(newEndpoint); + List<Range<Token>> ranges = rangesEntry.getValue(); + InetAddress newEndpoint = rangesEntry.getKey(); ++ InetAddress preferred = SystemKeyspace.getPreferredIP(newEndpoint); // TODO each call to transferRanges re-flushes, this is potentially a lot of waste - streamPlan.transferRanges(newEndpoint, keyspaceName, ranges); + streamPlan.transferRanges(newEndpoint, preferred, keyspaceName, ranges); } } return streamPlan.execute(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/SessionInfo.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/SessionInfo.java index 98e945b,4f80461..3bcb20c --- a/src/java/org/apache/cassandra/streaming/SessionInfo.java +++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java @@@ -33,7 -33,7 +33,8 @@@ import com.google.common.collect.Iterab public final class SessionInfo implements Serializable { public final InetAddress peer; + public final int sessionIndex; + public final InetAddress connecting; /** Immutable collection of receiving summaries */ public final Collection<StreamSummary> receivingSummaries; /** Immutable collection of sending summaries*/ @@@ -45,13 -45,13 +46,15 @@@ private final Map<String, ProgressInfo> sendingFiles; public SessionInfo(InetAddress peer, + int sessionIndex, + InetAddress connecting, Collection<StreamSummary> receivingSummaries, Collection<StreamSummary> sendingSummaries, StreamSession.State state) { this.peer = peer; + this.sessionIndex = sessionIndex; + this.connecting = connecting; this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries); this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries); this.receivingFiles = new ConcurrentHashMap<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/StreamCoordinator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamCoordinator.java index 48192b4,0000000..a0c99fe mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@@ -1,289 -1,0 +1,289 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.net.InetAddress; +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; +import org.apache.cassandra.utils.FBUtilities; + +/** + * {@link StreamCoordinator} is a helper class that abstracts away maintaining multiple + * StreamSession and ProgressInfo instances per peer. + * + * This class coordinates multiple SessionStreams per peer in both the outgoing StreamPlan context and on the + * inbound StreamResultFuture context. + */ +public class StreamCoordinator +{ + private static final Logger logger = LoggerFactory.getLogger(StreamCoordinator.class); + + // Executor strictly for establishing the initial connections. Once we're connected to the other end the rest of the + // streaming is handled directly by the ConnectionHandler's incoming and outgoing threads. + private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher", + FBUtilities.getAvailableProcessors()); + + private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>(); + private final int connectionsPerHost; + private StreamConnectionFactory factory; + + public StreamCoordinator(int connectionsPerHost, StreamConnectionFactory factory) + { + this.connectionsPerHost = connectionsPerHost; + this.factory = factory; + } + + public void setConnectionFactory(StreamConnectionFactory factory) + { + this.factory = factory; + } + + /** + * @return true if any stream session is active + */ + public synchronized boolean hasActiveSessions() + { + for (HostStreamingData data : peerSessions.values()) + { + if (data.hasActiveSessions()) + return true; + } + return false; + } + + public synchronized Collection<StreamSession> getAllStreamSessions() + { + Collection<StreamSession> results = new ArrayList<>(); + for (HostStreamingData data : peerSessions.values()) + { + results.addAll(data.getAllStreamSessions()); + } + return results; + } + + public boolean isReceiving() + { + return connectionsPerHost == 0; + } + + public void connectAllStreamSessions() + { + for (HostStreamingData data : peerSessions.values()) + data.connectAllStreamSessions(); + } + + public synchronized Set<InetAddress> getPeers() + { + return new HashSet<>(peerSessions.keySet()); + } + - public synchronized StreamSession getOrCreateNextSession(InetAddress peer) ++ public synchronized StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting) + { - return getOrCreateHostData(peer).getOrCreateNextSession(peer); ++ return getOrCreateHostData(peer).getOrCreateNextSession(peer, connecting); + } + - public synchronized StreamSession getOrCreateSessionById(InetAddress peer, int id) ++ public synchronized StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting) + { - return getOrCreateHostData(peer).getOrCreateSessionById(peer, id); ++ return getOrCreateHostData(peer).getOrCreateSessionById(peer, id, connecting); + } + + public synchronized void updateProgress(ProgressInfo info) + { + getHostData(info.peer).updateProgress(info); + } + + public synchronized void addSessionInfo(SessionInfo session) + { + HostStreamingData data = getOrCreateHostData(session.peer); + data.addSessionInfo(session); + } + + public synchronized Set<SessionInfo> getAllSessionInfo() + { + Set<SessionInfo> result = new HashSet<>(); + for (HostStreamingData data : peerSessions.values()) + { + result.addAll(data.getAllSessionInfo()); + } + return result; + } + + public synchronized void transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails) + { + HostStreamingData sessionList = getOrCreateHostData(to); + + if (connectionsPerHost > 1) + { + List<List<StreamSession.SSTableStreamingSections>> buckets = sliceSSTableDetails(sstableDetails); + + for (List<StreamSession.SSTableStreamingSections> subList : buckets) + { - StreamSession session = sessionList.getOrCreateNextSession(to); ++ StreamSession session = sessionList.getOrCreateNextSession(to, to); + session.addTransferFiles(subList); + } + } + else + { - StreamSession session = sessionList.getOrCreateNextSession(to); ++ StreamSession session = sessionList.getOrCreateNextSession(to, to); + session.addTransferFiles(sstableDetails); + } + } + + private List<List<StreamSession.SSTableStreamingSections>> sliceSSTableDetails(Collection<StreamSession.SSTableStreamingSections> sstableDetails) + { + // There's no point in divvying things up into more buckets than we have sstableDetails + int targetSlices = Math.min(sstableDetails.size(), connectionsPerHost); + int step = Math.round((float) sstableDetails.size() / (float) targetSlices); + int index = 0; + + List<List<StreamSession.SSTableStreamingSections>> result = new ArrayList<>(); + List<StreamSession.SSTableStreamingSections> slice = null; + Iterator<StreamSession.SSTableStreamingSections> iter = sstableDetails.iterator(); + while (iter.hasNext()) + { + StreamSession.SSTableStreamingSections streamSession = iter.next(); + + if (index % step == 0) + { + slice = new ArrayList<>(); + result.add(slice); + } + slice.add(streamSession); + ++index; + iter.remove(); + } + + return result; + } + + private HostStreamingData getHostData(InetAddress peer) + { + HostStreamingData data = peerSessions.get(peer); + if (data == null) + throw new IllegalArgumentException("Unknown peer requested: " + peer.toString()); + return data; + } + + private HostStreamingData getOrCreateHostData(InetAddress peer) + { + HostStreamingData data = peerSessions.get(peer); + if (data == null) + { + data = new HostStreamingData(); + peerSessions.put(peer, data); + } + return data; + } + + private static class StreamSessionConnector implements Runnable + { + private final StreamSession session; + public StreamSessionConnector(StreamSession session) + { + this.session = session; + } + + @Override + public void run() + { + session.start(); + logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", session.planId(), session.sessionIndex(), session.peer); + } + } + + private class HostStreamingData + { + private Map<Integer, StreamSession> streamSessions = new HashMap<>(); + private Map<Integer, SessionInfo> sessionInfos = new HashMap<>(); + + private int lastReturned = -1; + + public boolean hasActiveSessions() + { + for (StreamSession session : streamSessions.values()) + { + StreamSession.State state = session.state(); + if (state != StreamSession.State.COMPLETE && state != StreamSession.State.FAILED) + return true; + } + return false; + } + - public StreamSession getOrCreateNextSession(InetAddress peer) ++ public StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting) + { + // create + if (streamSessions.size() < connectionsPerHost) + { - StreamSession session = new StreamSession(peer, factory, streamSessions.size()); ++ StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size()); + streamSessions.put(++lastReturned, session); + return session; + } + // get + else + { + if (lastReturned >= streamSessions.size() - 1) + lastReturned = 0; + + return streamSessions.get(lastReturned++); + } + } + + public void connectAllStreamSessions() + { + for (StreamSession session : streamSessions.values()) + { + streamExecutor.execute(new StreamSessionConnector(session)); + } + } + + public Collection<StreamSession> getAllStreamSessions() + { + return Collections.unmodifiableCollection(streamSessions.values()); + } + - public StreamSession getOrCreateSessionById(InetAddress peer, int id) ++ public StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting) + { + StreamSession session = streamSessions.get(id); + if (session == null) + { - session = new StreamSession(peer, factory, id); ++ session = new StreamSession(peer, connecting, factory, id); + streamSessions.put(id, session); + } + return session; + } + + public void updateProgress(ProgressInfo info) + { + sessionInfos.get(info.sessionIndex).updateProgress(info); + } + + public void addSessionInfo(SessionInfo info) + { + sessionInfos.put(info.sessionIndex, info); + } + + public Collection<SessionInfo> getAllSessionInfo() + { + return sessionInfos.values(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamPlan.java index f7b6203,326bf48..1bb0ce5 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@@ -81,10 -76,10 +83,10 @@@ public class StreamPla * @param columnFamilies specific column families * @return this object for chaining */ - public StreamPlan requestRanges(InetAddress from, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) + public StreamPlan requestRanges(InetAddress from, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) { - StreamSession session = coordinator.getOrCreateNextSession(from); - StreamSession session = getOrCreateSession(from, connecting); - session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies)); ++ StreamSession session = coordinator.getOrCreateNextSession(from, connecting); + session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies), repairedAt); return this; } @@@ -110,10 -117,10 +124,10 @@@ * @param columnFamilies specific column families * @return this object for chaining */ - public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) + public StreamPlan transferRanges(InetAddress to, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) { - StreamSession session = coordinator.getOrCreateNextSession(to); - StreamSession session = getOrCreateSession(to, connecting); - session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer); ++ StreamSession session = coordinator.getOrCreateNextSession(to, connecting); + session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer, repairedAt); return this; } @@@ -182,5 -189,16 +196,4 @@@ this.flushBeforeTransfer = flushBeforeTransfer; return this; } -- - private StreamSession getOrCreateSession(InetAddress peer, InetAddress preferred) - { - StreamSession session = sessions.get(peer); - if (session == null) - { - session = new StreamSession(peer, preferred, connectionFactory); - sessions.put(peer, session); - } - return session; - } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamResultFuture.java index f28a937,bde5934..6a6f2b9 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@@ -124,10 -130,11 +124,10 @@@ public final class StreamResultFuture e return future; } - public void attachSocket(InetAddress from, Socket socket, boolean isForOutgoing, int version) throws IOException + private void attachSocket(InetAddress from, int sessionIndex, Socket socket, boolean isForOutgoing, int version) throws IOException { - StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex); - StreamSession session = ongoingSessions.get(from); - if (session == null) - throw new RuntimeException(String.format("Got connection from %s for stream session %s but no such session locally", from, planId)); ++ StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, socket.getInetAddress()); + session.init(this); session.handler.initiateOnReceivingSide(socket, isForOutgoing, version); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java index 2efa00d,db0c484..3ba296c --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@@ -109,11 -109,23 +109,19 @@@ import org.apache.cassandra.utils.Pair * session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and * send a CompleteMessage to the other side. */ -public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener +public class StreamSession implements IEndpointStateChangeSubscriber { private static final Logger logger = LoggerFactory.getLogger(StreamSession.class); + - // Executor that establish the streaming connection. Once we're connected to the other end, the rest of the streaming - // is directly handled by the ConnectionHandler incoming and outgoing threads. - private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher", - FBUtilities.getAvailableProcessors()); - + /** + * Streaming endpoint. + * + * Each {@code StreamSession} is identified by this InetAddress which is broadcast address of the node streaming. + */ public final InetAddress peer; + private final int index; + /** Actual connecting address. Can be the same as {@linkplain #peer}. */ + public final InetAddress connecting; // should not be null when session is started private StreamResultFuture streamResult; @@@ -151,15 -163,16 +159,17 @@@ * Create new streaming session with the peer. * * @param peer Address of streaming peer + * @param connecting Actual connecting address * @param factory is used for establishing connection */ - public StreamSession(InetAddress peer, StreamConnectionFactory factory, int index) - public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory) ++ public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index) { this.peer = peer; + this.connecting = connecting; + this.index = index; this.factory = factory; this.handler = new ConnectionHandler(this); - this.metrics = StreamingMetrics.get(peer); + this.metrics = StreamingMetrics.get(connecting); } public UUID planId() @@@ -197,16 -209,24 +207,18 @@@ return; } - streamExecutor.execute(new Runnable() + try { - logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", planId(), sessionIndex(), peer); - public void run() - { - try - { - logger.info("[Stream #{}] Starting streaming to {}{}", planId(), - peer, - peer.equals(connecting) ? "" : " through " + connecting); - handler.initiate(); - onInitializationComplete(); - } - catch (IOException e) - { - onError(e); - } - } - }); ++ logger.info("[Stream #{}] Starting streaming to {}{}", planId(), ++ peer, ++ peer.equals(connecting) ? "" : " through " + connecting); + handler.initiate(); + onInitializationComplete(); + } + catch (Exception e) + { + onError(e); + } } public Socket createConnection() throws IOException @@@ -595,7 -604,7 +607,7 @@@ List<StreamSummary> transferSummaries = Lists.newArrayList(); for (StreamTask transfer : transfers.values()) transferSummaries.add(transfer.getSummary()); - return new SessionInfo(peer, index, receivingSummaries, transferSummaries, state); - return new SessionInfo(peer, connecting, receivingSummaries, transferSummaries, state); ++ return new SessionInfo(peer, index, connecting, receivingSummaries, transferSummaries, state); } public synchronized void taskCompleted(StreamReceiveTask completedTask) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java index bef6682,809bc0d..63e4ab7 --- a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java +++ b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java @@@ -40,10 -41,10 +41,11 @@@ public class SessionInfoCompositeDat "sendingSummaries", "state", "receivingFiles", - "sendingFiles"}; + "sendingFiles", + "sessionIndex"}; private static final String[] ITEM_DESCS = new String[]{"Plan ID", "Session peer", + "Connecting address", "Summaries of receiving data", "Summaries of sending data", "Current session state", @@@ -98,9 -99,8 +102,9 @@@ return ProgressInfoCompositeData.toCompositeData(planId, input); } }; - valueMap.put(ITEM_NAMES[5], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo)); - valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo)); - valueMap.put(ITEM_NAMES[7], sessionInfo.sessionIndex); + valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo)); + valueMap.put(ITEM_NAMES[7], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo)); ++ valueMap.put(ITEM_NAMES[8], sessionInfo.sessionIndex); try { return new CompositeDataSupport(COMPOSITE_TYPE, valueMap); @@@ -133,10 -134,10 +138,11 @@@ } }; SessionInfo info = new SessionInfo(peer, - (int)values[7], - fromArrayOfCompositeData((CompositeData[]) values[2], toStreamSummary), ++ (int)values[8], + connecting, fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary), - StreamSession.State.valueOf((String) values[4])); + fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary), + StreamSession.State.valueOf((String) values[5])); Function<CompositeData, ProgressInfo> toProgressInfo = new Function<CompositeData, ProgressInfo>() { public ProgressInfo apply(CompositeData input)