Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
        CHANGES.txt
        src/java/org/apache/cassandra/dht/RangeStreamer.java
        src/java/org/apache/cassandra/repair/StreamingRepairTask.java
        src/java/org/apache/cassandra/service/StorageService.java
        src/java/org/apache/cassandra/streaming/SessionInfo.java
        src/java/org/apache/cassandra/streaming/StreamPlan.java
        src/java/org/apache/cassandra/streaming/StreamResultFuture.java
        src/java/org/apache/cassandra/streaming/StreamSession.java
        
src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
        src/java/org/apache/cassandra/tools/NodeCmd.java
        test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
        test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6cca24f4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6cca24f4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6cca24f4

Branch: refs/heads/trunk
Commit: 6cca24f442a17d2abedfffd4a5322cb61caeef74
Parents: 42f8590 c6867c2
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Oct 20 10:21:55 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Oct 20 10:21:55 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/SystemKeyspace.java |  8 ++++-
 .../org/apache/cassandra/dht/RangeStreamer.java |  4 ++-
 .../net/OutboundTcpConnectionPool.java          | 12 ++++----
 .../cassandra/repair/StreamingRepairTask.java   | 10 +++++--
 .../cassandra/service/StorageService.java       | 17 +++++++----
 .../apache/cassandra/streaming/SessionInfo.java |  3 ++
 .../cassandra/streaming/StreamCoordinator.java  | 20 ++++++-------
 .../apache/cassandra/streaming/StreamPlan.java  | 31 ++++++++++++++------
 .../cassandra/streaming/StreamResultFuture.java |  2 +-
 .../cassandra/streaming/StreamSession.java      | 24 +++++++++++----
 .../management/SessionInfoCompositeData.java    | 30 +++++++++++--------
 .../org/apache/cassandra/tools/NodeTool.java    |  8 ++++-
 .../cassandra/streaming/SessionInfoTest.java    |  2 +-
 .../streaming/StreamTransferTaskTest.java       |  4 ++-
 .../streaming/StreamingTransferTest.java        |  2 +-
 16 files changed, 119 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7cd5154,4ed7bed..4e5cd24
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,89 -1,4 +1,90 @@@
 -2.0.11:
 +2.1.1
 + * Fix IllegalArgumentException when a list of IN values containing tuples
 +   is passed as a single arg to a prepared statement with the v1 or v2
 +   protocol (CASSANDRA-8062)
 + * Fix ClassCastException in DISTINCT query on static columns with
 +   query paging (CASSANDRA-8108)
 + * Fix NPE on null nested UDT inside a set (CASSANDRA-8105)
 + * Fix exception when querying secondary index on set items or map keys
 +   when some clustering columns are specified (CASSANDRA-8073)
 + * Send proper error response when there is an error during native
 +   protocol message decode (CASSANDRA-8118)
 + * Gossip should ignore generation numbers too far in the future 
(CASSANDRA-8113)
 + * Fix NPE when creating a table with frozen sets, lists (CASSANDRA-8104)
 + * Fix high memory use due to tracking reads on incrementally opened sstable
 +   readers (CASSANDRA-8066)
 + * Fix EXECUTE request with skipMetadata=false returning no metadata
 +   (CASSANDRA-8054)
 + * Allow concurrent use of CQLBulkOutputFormat (CASSANDRA-7776)
 + * Shutdown JVM on OOM (CASSANDRA-7507)
 + * Upgrade netty version and enable epoll event loop (CASSANDRA-7761)
 + * Don't duplicate sstables smaller than split size when using
 +   the sstablesplitter tool (CASSANDRA-7616)
 + * Avoid re-parsing already prepared statements (CASSANDRA-7923)
 + * Fix some Thrift slice deletions and updates of COMPACT STORAGE
 +   tables with some clustering columns omitted (CASSANDRA-7990)
 + * Fix filtering for CONTAINS on sets (CASSANDRA-8033)
 + * Properly track added size (CASSANDRA-7239)
 + * Allow compilation in java 8 (CASSANDRA-7208)
 + * Fix Assertion error on RangeTombstoneList diff (CASSANDRA-8013)
 + * Release references to overlapping sstables during compaction 
(CASSANDRA-7819)
 + * Send notification when opening compaction results early (CASSANDRA-8034)
 + * Make native server start block until properly bound (CASSANDRA-7885)
 + * (cqlsh) Fix IPv6 support (CASSANDRA-7988)
 + * Ignore fat clients when checking for endpoint collision (CASSANDRA-7939)
 + * Make sstablerepairedset take a list of files (CASSANDRA-7995)
 + * (cqlsh) Tab completeion for indexes on map keys (CASSANDRA-7972)
 + * (cqlsh) Fix UDT field selection in select clause (CASSANDRA-7891)
 + * Fix resource leak in event of corrupt sstable
 + * (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
 + * Provide visibility into prepared statements churn (CASSANDRA-7921, 
CASSANDRA-7930)
 + * Invalidate prepared statements when their keyspace or table is
 +   dropped (CASSANDRA-7566)
 + * cassandra-stress: fix support for NetworkTopologyStrategy (CASSANDRA-7945)
 + * Fix saving caches when a table is dropped (CASSANDRA-7784)
 + * Add better error checking of new stress profile (CASSANDRA-7716)
 + * Use ThreadLocalRandom and remove FBUtilities.threadLocalRandom 
(CASSANDRA-7934)
 + * Prevent operator mistakes due to simultaneous bootstrap (CASSANDRA-7069)
 + * cassandra-stress supports whitelist mode for node config (CASSANDRA-7658)
 + * GCInspector more closely tracks GC; cassandra-stress and nodetool report 
it (CASSANDRA-7916)
 + * nodetool won't output bogus ownership info without a keyspace 
(CASSANDRA-7173)
 + * Add human readable option to nodetool commands (CASSANDRA-5433)
 + * Don't try to set repairedAt on old sstables (CASSANDRA-7913)
 + * Add metrics for tracking PreparedStatement use (CASSANDRA-7719)
 + * (cqlsh) tab-completion for triggers (CASSANDRA-7824)
 + * (cqlsh) Support for query paging (CASSANDRA-7514)
 + * (cqlsh) Show progress of COPY operations (CASSANDRA-7789)
 + * Add syntax to remove multiple elements from a map (CASSANDRA-6599)
 + * Support non-equals conditions in lightweight transactions (CASSANDRA-6839)
 + * Add IF [NOT] EXISTS to create/drop triggers (CASSANDRA-7606)
 + * (cqlsh) Display the current logged-in user (CASSANDRA-7785)
 + * (cqlsh) Don't ignore CTRL-C during COPY FROM execution (CASSANDRA-7815)
 + * (cqlsh) Order UDTs according to cross-type dependencies in DESCRIBE
 +   output (CASSANDRA-7659)
 + * (cqlsh) Fix handling of CAS statement results (CASSANDRA-7671)
 + * (cqlsh) COPY TO/FROM improvements (CASSANDRA-7405)
 + * Support list index operations with conditions (CASSANDRA-7499)
 + * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731)
 + * Validate IPv6 wildcard addresses properly (CASSANDRA-7680)
 + * (cqlsh) Error when tracing query (CASSANDRA-7613)
 + * Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569)
 + * SSTableExport uses correct validator to create string representation of 
partition
 +   keys (CASSANDRA-7498)
 + * Avoid NPEs when receiving type changes for an unknown keyspace 
(CASSANDRA-7689)
 + * Add support for custom 2i validation (CASSANDRA-7575)
 + * Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
 + * Add listen_interface and rpc_interface options (CASSANDRA-7417)
 + * Improve schema merge performance (CASSANDRA-7444)
 + * Adjust MT depth based on # of partition validating (CASSANDRA-5263)
 + * Optimise NativeCell comparisons (CASSANDRA-6755)
 + * Configurable client timeout for cqlsh (CASSANDRA-7516)
 + * Include snippet of CQL query near syntax error in messages (CASSANDRA-7111)
 + * Make repair -pr work with -local (CASSANDRA-7450)
 + * Fix error in sstableloader with -cph > 1 (CASSANDRA-8007)
 + * Fix snapshot repair error on indexed tables (CASSANDRA-8020)
 + * Do not exit nodetool repair when receiving JMX NOTIF_LOST (CASSANDRA-7909)
++ * Stream to private IP when available (CASSANDRA-8084)
 +Merged from 2.0:
   * Reject conditions on DELETE unless full PK is given (CASSANDRA-6430)
   * Properly reject the token function DELETE (CASSANDRA-7747)
   * Force batchlog replay before decommissioning a node (CASSANDRA-7446)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index 3c647b6,30e6d47..8b16564
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -508,13 -497,19 +508,19 @@@ public class SystemKeyspac
          return hostIdMap;
      }
  
+     /**
+      * Get preferred IP for given endpoint if it is known. Otherwise this 
returns given endpoint itself.
+      *
+      * @param ep endpoint address to check
+      * @return Preferred IP for given endpoint if present, otherwise returns 
given ep
+      */
      public static InetAddress getPreferredIP(InetAddress ep)
      {
 -        String req = "SELECT preferred_ip FROM system.%s WHERE peer='%s'";
 -        UntypedResultSet result = processInternal(String.format(req, 
PEERS_CF, ep.getHostAddress()));
 +        String req = "SELECT preferred_ip FROM system.%s WHERE peer=?";
 +        UntypedResultSet result = executeInternal(String.format(req, 
PEERS_CF), ep);
          if (!result.isEmpty() && result.one().has("preferred_ip"))
              return result.one().getInetAddress("preferred_ip");
-         return null;
+         return ep;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/dht/RangeStreamer.java
index d84a951,4e925d3..11d82d6
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@@ -31,8 -29,8 +31,9 @@@ import org.slf4j.LoggerFactory
  
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.gms.FailureDetector;
 +import org.apache.cassandra.gms.Gossiper;
  import org.apache.cassandra.gms.IFailureDetector;
  import org.apache.cassandra.locator.AbstractReplicationStrategy;
  import org.apache.cassandra.locator.IEndpointSnitch;
@@@ -308,8 -226,8 +310,8 @@@ public class RangeStreame
              Collection<Range<Token>> ranges = entry.getValue().getValue();
              /* Send messages to respective folks to stream data over to me */
              if (logger.isDebugEnabled())
 -                logger.debug("" + description + "ing from " + source + " 
ranges " + StringUtils.join(ranges, ", "));
 +                logger.debug("{}ing from {} ranges {}", description, source, 
StringUtils.join(ranges, ", "));
-             streamPlan.requestRanges(source, keyspace, ranges);
+             streamPlan.requestRanges(source, preferred, keyspace, ranges);
          }
  
          return streamPlan.execute();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 9af949d,4226184..25ec698
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@@ -57,17 -59,15 +60,18 @@@ public class StreamingRepairTask implem
  
      private void initiateStreaming()
      {
 +        long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
+         InetAddress dest = request.dst;
+         InetAddress preferred = SystemKeyspace.getPreferredIP(dest);
 +        if (desc.parentSessionId != null && 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != 
null)
 +            repairedAt = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
- 
          logger.info(String.format("[streaming task #%s] Performing streaming 
repair of %d ranges with %s", desc.sessionId, request.ranges.size(), 
request.dst));
 -        StreamResultFuture op = new StreamPlan("Repair")
 +        StreamResultFuture op = new StreamPlan("Repair", repairedAt, 1)
                                      .flushBeforeTransfer(true)
                                      // request ranges from the remote node
-                                     .requestRanges(request.dst, 
desc.keyspace, request.ranges, desc.columnFamily)
+                                     .requestRanges(dest, preferred, 
desc.keyspace, request.ranges, desc.columnFamily)
                                      // send ranges to the remote node
-                                     .transferRanges(request.dst, 
desc.keyspace, request.ranges, desc.columnFamily)
+                                     .transferRanges(dest, preferred, 
desc.keyspace, request.ranges, desc.columnFamily)
                                      .execute();
          op.addEventListener(this);
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 4fb0435,4973e40..849bc2c
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -1939,10 -1891,11 +1939,11 @@@ public class StorageService extends Not
              for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : 
rangesToFetch.get(keyspaceName))
              {
                  InetAddress source = entry.getKey();
+                 InetAddress preferred = SystemKeyspace.getPreferredIP(source);
                  Collection<Range<Token>> ranges = entry.getValue();
                  if (logger.isDebugEnabled())
 -                    logger.debug("Requesting from " + source + " ranges " + 
StringUtils.join(ranges, ", "));
 +                    logger.debug("Requesting from {} ranges {}", source, 
StringUtils.join(ranges, ", "));
-                 stream.requestRanges(source, keyspaceName, ranges);
+                 stream.requestRanges(source, preferred, keyspaceName, ranges);
              }
          }
          StreamResultFuture future = stream.execute();
@@@ -3173,9 -3027,10 +3175,10 @@@
  
              // stream all hints -- range list will be a singleton of "the 
entire ring"
              Token token = StorageService.getPartitioner().getMinimumToken();
 -            List<Range<Token>> ranges = Collections.singletonList(new 
Range<Token>(token, token));
 +            List<Range<Token>> ranges = Collections.singletonList(new 
Range<>(token, token));
  
              return new 
StreamPlan("Hints").transferRanges(hintsDestinationHost,
+                                                           preferred,
                                                                        
Keyspace.SYSTEM_KS,
                                                                        ranges,
                                                                        
SystemKeyspace.HINTS_CF)
@@@ -3375,20 -3186,19 +3378,21 @@@
                      // stream ranges
                      for (InetAddress address : endpointRanges.keySet())
                      {
 +                        logger.debug("Will stream range {} of keyspace {} to 
endpoint {}", endpointRanges.get(address), keyspace, address);
-                         streamPlan.transferRanges(address, keyspace, 
endpointRanges.get(address));
+                         InetAddress preferred = 
SystemKeyspace.getPreferredIP(address);
+                         streamPlan.transferRanges(address, preferred, 
keyspace, endpointRanges.get(address));
                      }
  
                      // stream requests
                      Multimap<InetAddress, Range<Token>> workMap = 
RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints);
                      for (InetAddress address : workMap.keySet())
                      {
 +                        logger.debug("Will request range {} of keyspace {} 
from endpoint {}", workMap.get(address), keyspace, address);
-                         streamPlan.requestRanges(address, keyspace, 
workMap.get(address));
+                         InetAddress preferred = 
SystemKeyspace.getPreferredIP(address);
+                         streamPlan.requestRanges(address, preferred, 
keyspace, workMap.get(address));
                      }
  
-                     if (logger.isDebugEnabled())
-                         logger.debug("Keyspace {}: work map {}.", keyspace, 
workMap);
+                     logger.debug("Keyspace {}: work map {}.", keyspace, 
workMap);
                  }
              }
          }
@@@ -3863,16 -3649,17 +3867,17 @@@
          StreamPlan streamPlan = new StreamPlan("Unbootstrap");
          for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>> entry : 
sessionsToStreamByKeyspace.entrySet())
          {
 -            final String keyspaceName = entry.getKey();
 -            final Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = 
entry.getValue();
 +            String keyspaceName = entry.getKey();
 +            Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = 
entry.getValue();
  
 -            for (final Map.Entry<InetAddress, List<Range<Token>>> rangesEntry 
: rangesPerEndpoint.entrySet())
 +            for (Map.Entry<InetAddress, List<Range<Token>>> rangesEntry : 
rangesPerEndpoint.entrySet())
              {
 -                final List<Range<Token>> ranges = rangesEntry.getValue();
 -                final InetAddress newEndpoint = rangesEntry.getKey();
 -                final InetAddress preferred = 
SystemKeyspace.getPreferredIP(newEndpoint);
 +                List<Range<Token>> ranges = rangesEntry.getValue();
 +                InetAddress newEndpoint = rangesEntry.getKey();
++                InetAddress preferred = 
SystemKeyspace.getPreferredIP(newEndpoint);
  
                  // TODO each call to transferRanges re-flushes, this is 
potentially a lot of waste
-                 streamPlan.transferRanges(newEndpoint, keyspaceName, ranges);
+                 streamPlan.transferRanges(newEndpoint, preferred, 
keyspaceName, ranges);
              }
          }
          return streamPlan.execute();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/SessionInfo.java
index 98e945b,4f80461..3bcb20c
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@@ -33,7 -33,7 +33,8 @@@ import com.google.common.collect.Iterab
  public final class SessionInfo implements Serializable
  {
      public final InetAddress peer;
 +    public final int sessionIndex;
+     public final InetAddress connecting;
      /** Immutable collection of receiving summaries */
      public final Collection<StreamSummary> receivingSummaries;
      /** Immutable collection of sending summaries*/
@@@ -45,13 -45,13 +46,15 @@@
      private final Map<String, ProgressInfo> sendingFiles;
  
      public SessionInfo(InetAddress peer,
 +                       int sessionIndex,
+                        InetAddress connecting,
                         Collection<StreamSummary> receivingSummaries,
                         Collection<StreamSummary> sendingSummaries,
                         StreamSession.State state)
      {
          this.peer = peer;
 +        this.sessionIndex = sessionIndex;
+         this.connecting = connecting;
          this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
          this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
          this.receivingFiles = new ConcurrentHashMap<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 48192b4,0000000..a0c99fe
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@@ -1,289 -1,0 +1,289 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.streaming;
 +
 +import java.net.InetAddress;
 +import java.util.*;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +/**
 + * {@link StreamCoordinator} is a helper class that abstracts away 
maintaining multiple
 + * StreamSession and ProgressInfo instances per peer.
 + *
 + * This class coordinates multiple SessionStreams per peer in both the 
outgoing StreamPlan context and on the
 + * inbound StreamResultFuture context.
 + */
 +public class StreamCoordinator
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(StreamCoordinator.class);
 +
 +    // Executor strictly for establishing the initial connections. Once we're 
connected to the other end the rest of the
 +    // streaming is handled directly by the ConnectionHandler's incoming and 
outgoing threads.
 +    private static final DebuggableThreadPoolExecutor streamExecutor = 
DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
 +                                                                              
                                              
FBUtilities.getAvailableProcessors());
 +
 +    private Map<InetAddress, HostStreamingData> peerSessions = new 
HashMap<>();
 +    private final int connectionsPerHost;
 +    private StreamConnectionFactory factory;
 +
 +    public StreamCoordinator(int connectionsPerHost, StreamConnectionFactory 
factory)
 +    {
 +        this.connectionsPerHost = connectionsPerHost;
 +        this.factory = factory;
 +    }
 +
 +    public void setConnectionFactory(StreamConnectionFactory factory)
 +    {
 +        this.factory = factory;
 +    }
 +
 +    /**
 +     * @return true if any stream session is active
 +     */
 +    public synchronized boolean hasActiveSessions()
 +    {
 +        for (HostStreamingData data : peerSessions.values())
 +        {
 +            if (data.hasActiveSessions())
 +                return true;
 +        }
 +        return false;
 +    }
 +
 +    public synchronized Collection<StreamSession> getAllStreamSessions()
 +    {
 +        Collection<StreamSession> results = new ArrayList<>();
 +        for (HostStreamingData data : peerSessions.values())
 +        {
 +            results.addAll(data.getAllStreamSessions());
 +        }
 +        return results;
 +    }
 +
 +    public boolean isReceiving()
 +    {
 +        return connectionsPerHost == 0;
 +    }
 +
 +    public void connectAllStreamSessions()
 +    {
 +        for (HostStreamingData data : peerSessions.values())
 +            data.connectAllStreamSessions();
 +    }
 +
 +    public synchronized Set<InetAddress> getPeers()
 +    {
 +        return new HashSet<>(peerSessions.keySet());
 +    }
 +
-     public synchronized StreamSession getOrCreateNextSession(InetAddress peer)
++    public synchronized StreamSession getOrCreateNextSession(InetAddress 
peer, InetAddress connecting)
 +    {
-         return getOrCreateHostData(peer).getOrCreateNextSession(peer);
++        return getOrCreateHostData(peer).getOrCreateNextSession(peer, 
connecting);
 +    }
 +
-     public synchronized StreamSession getOrCreateSessionById(InetAddress 
peer, int id)
++    public synchronized StreamSession getOrCreateSessionById(InetAddress 
peer, int id, InetAddress connecting)
 +    {
-         return getOrCreateHostData(peer).getOrCreateSessionById(peer, id);
++        return getOrCreateHostData(peer).getOrCreateSessionById(peer, id, 
connecting);
 +    }
 +
 +    public synchronized void updateProgress(ProgressInfo info)
 +    {
 +        getHostData(info.peer).updateProgress(info);
 +    }
 +
 +    public synchronized void addSessionInfo(SessionInfo session)
 +    {
 +        HostStreamingData data = getOrCreateHostData(session.peer);
 +        data.addSessionInfo(session);
 +    }
 +
 +    public synchronized Set<SessionInfo> getAllSessionInfo()
 +    {
 +        Set<SessionInfo> result = new HashSet<>();
 +        for (HostStreamingData data : peerSessions.values())
 +        {
 +            result.addAll(data.getAllSessionInfo());
 +        }
 +        return result;
 +    }
 +
 +    public synchronized void transferFiles(InetAddress to, 
Collection<StreamSession.SSTableStreamingSections> sstableDetails)
 +    {
 +        HostStreamingData sessionList = getOrCreateHostData(to);
 +
 +        if (connectionsPerHost > 1)
 +        {
 +            List<List<StreamSession.SSTableStreamingSections>> buckets = 
sliceSSTableDetails(sstableDetails);
 +
 +            for (List<StreamSession.SSTableStreamingSections> subList : 
buckets)
 +            {
-                 StreamSession session = 
sessionList.getOrCreateNextSession(to);
++                StreamSession session = 
sessionList.getOrCreateNextSession(to, to);
 +                session.addTransferFiles(subList);
 +            }
 +        }
 +        else
 +        {
-             StreamSession session = sessionList.getOrCreateNextSession(to);
++            StreamSession session = sessionList.getOrCreateNextSession(to, 
to);
 +            session.addTransferFiles(sstableDetails);
 +        }
 +    }
 +
 +    private List<List<StreamSession.SSTableStreamingSections>> 
sliceSSTableDetails(Collection<StreamSession.SSTableStreamingSections> 
sstableDetails)
 +    {
 +        // There's no point in divvying things up into more buckets than we 
have sstableDetails
 +        int targetSlices = Math.min(sstableDetails.size(), 
connectionsPerHost);
 +        int step = Math.round((float) sstableDetails.size() / (float) 
targetSlices);
 +        int index = 0;
 +
 +        List<List<StreamSession.SSTableStreamingSections>> result = new 
ArrayList<>();
 +        List<StreamSession.SSTableStreamingSections> slice = null;
 +        Iterator<StreamSession.SSTableStreamingSections> iter = 
sstableDetails.iterator();
 +        while (iter.hasNext())
 +        {
 +            StreamSession.SSTableStreamingSections streamSession = 
iter.next();
 +
 +            if (index % step == 0)
 +            {
 +                slice = new ArrayList<>();
 +                result.add(slice);
 +            }
 +            slice.add(streamSession);
 +            ++index;
 +            iter.remove();
 +        }
 +
 +        return result;
 +    }
 +
 +    private HostStreamingData getHostData(InetAddress peer)
 +    {
 +        HostStreamingData data = peerSessions.get(peer);
 +        if (data == null)
 +            throw new IllegalArgumentException("Unknown peer requested: " + 
peer.toString());
 +        return data;
 +    }
 +
 +    private HostStreamingData getOrCreateHostData(InetAddress peer)
 +    {
 +        HostStreamingData data = peerSessions.get(peer);
 +        if (data == null)
 +        {
 +            data = new HostStreamingData();
 +            peerSessions.put(peer, data);
 +        }
 +        return data;
 +    }
 +
 +    private static class StreamSessionConnector implements Runnable
 +    {
 +        private final StreamSession session;
 +        public StreamSessionConnector(StreamSession session)
 +        {
 +            this.session = session;
 +        }
 +
 +        @Override
 +        public void run()
 +        {
 +            session.start();
 +            logger.info("[Stream #{}, ID#{}] Beginning stream session with 
{}", session.planId(), session.sessionIndex(), session.peer);
 +        }
 +    }
 +
 +    private class HostStreamingData
 +    {
 +        private Map<Integer, StreamSession> streamSessions = new HashMap<>();
 +        private Map<Integer, SessionInfo> sessionInfos = new HashMap<>();
 +
 +        private int lastReturned = -1;
 +
 +        public boolean hasActiveSessions()
 +        {
 +            for (StreamSession session : streamSessions.values())
 +            {
 +                StreamSession.State state = session.state();
 +                if (state != StreamSession.State.COMPLETE && state != 
StreamSession.State.FAILED)
 +                    return true;
 +            }
 +            return false;
 +        }
 +
-         public StreamSession getOrCreateNextSession(InetAddress peer)
++        public StreamSession getOrCreateNextSession(InetAddress peer, 
InetAddress connecting)
 +        {
 +            // create
 +            if (streamSessions.size() < connectionsPerHost)
 +            {
-                 StreamSession session = new StreamSession(peer, factory, 
streamSessions.size());
++                StreamSession session = new StreamSession(peer, connecting, 
factory, streamSessions.size());
 +                streamSessions.put(++lastReturned, session);
 +                return session;
 +            }
 +            // get
 +            else
 +            {
 +                if (lastReturned >= streamSessions.size() - 1)
 +                    lastReturned = 0;
 +
 +                return streamSessions.get(lastReturned++);
 +            }
 +        }
 +
 +        public void connectAllStreamSessions()
 +        {
 +            for (StreamSession session : streamSessions.values())
 +            {
 +                streamExecutor.execute(new StreamSessionConnector(session));
 +            }
 +        }
 +
 +        public Collection<StreamSession> getAllStreamSessions()
 +        {
 +            return 
Collections.unmodifiableCollection(streamSessions.values());
 +        }
 +
-         public StreamSession getOrCreateSessionById(InetAddress peer, int id)
++        public StreamSession getOrCreateSessionById(InetAddress peer, int id, 
InetAddress connecting)
 +        {
 +            StreamSession session = streamSessions.get(id);
 +            if (session == null)
 +            {
-                 session = new StreamSession(peer, factory, id);
++                session = new StreamSession(peer, connecting, factory, id);
 +                streamSessions.put(id, session);
 +            }
 +            return session;
 +        }
 +
 +        public void updateProgress(ProgressInfo info)
 +        {
 +            sessionInfos.get(info.sessionIndex).updateProgress(info);
 +        }
 +
 +        public void addSessionInfo(SessionInfo info)
 +        {
 +            sessionInfos.put(info.sessionIndex, info);
 +        }
 +
 +        public Collection<SessionInfo> getAllSessionInfo()
 +        {
 +            return sessionInfos.values();
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamPlan.java
index f7b6203,326bf48..1bb0ce5
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@@ -81,10 -76,10 +83,10 @@@ public class StreamPla
       * @param columnFamilies specific column families
       * @return this object for chaining
       */
-     public StreamPlan requestRanges(InetAddress from, String keyspace, 
Collection<Range<Token>> ranges, String... columnFamilies)
+     public StreamPlan requestRanges(InetAddress from, InetAddress connecting, 
String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
      {
-         StreamSession session = coordinator.getOrCreateNextSession(from);
 -        StreamSession session = getOrCreateSession(from, connecting);
 -        session.addStreamRequest(keyspace, ranges, 
Arrays.asList(columnFamilies));
++        StreamSession session = coordinator.getOrCreateNextSession(from, 
connecting);
 +        session.addStreamRequest(keyspace, ranges, 
Arrays.asList(columnFamilies), repairedAt);
          return this;
      }
  
@@@ -110,10 -117,10 +124,10 @@@
       * @param columnFamilies specific column families
       * @return this object for chaining
       */
-     public StreamPlan transferRanges(InetAddress to, String keyspace, 
Collection<Range<Token>> ranges, String... columnFamilies)
+     public StreamPlan transferRanges(InetAddress to, InetAddress connecting, 
String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
      {
-         StreamSession session = coordinator.getOrCreateNextSession(to);
 -        StreamSession session = getOrCreateSession(to, connecting);
 -        session.addTransferRanges(keyspace, ranges, 
Arrays.asList(columnFamilies), flushBeforeTransfer);
++        StreamSession session = coordinator.getOrCreateNextSession(to, 
connecting);
 +        session.addTransferRanges(keyspace, ranges, 
Arrays.asList(columnFamilies), flushBeforeTransfer, repairedAt);
          return this;
      }
  
@@@ -182,5 -189,16 +196,4 @@@
          this.flushBeforeTransfer = flushBeforeTransfer;
          return this;
      }
--
 -    private StreamSession getOrCreateSession(InetAddress peer, InetAddress 
preferred)
 -    {
 -        StreamSession session = sessions.get(peer);
 -        if (session == null)
 -        {
 -            session = new StreamSession(peer, preferred, connectionFactory);
 -            sessions.put(peer, session);
 -        }
 -        return session;
 -    }
 -
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index f28a937,bde5934..6a6f2b9
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@@ -124,10 -130,11 +124,10 @@@ public final class StreamResultFuture e
          return future;
      }
  
 -    public void attachSocket(InetAddress from, Socket socket, boolean 
isForOutgoing, int version) throws IOException
 +    private void attachSocket(InetAddress from, int sessionIndex, Socket 
socket, boolean isForOutgoing, int version) throws IOException
      {
-         StreamSession session = coordinator.getOrCreateSessionById(from, 
sessionIndex);
 -        StreamSession session = ongoingSessions.get(from);
 -        if (session == null)
 -            throw new RuntimeException(String.format("Got connection from %s 
for stream session %s but no such session locally", from, planId));
++        StreamSession session = coordinator.getOrCreateSessionById(from, 
sessionIndex, socket.getInetAddress());
 +        session.init(this);
          session.handler.initiateOnReceivingSide(socket, isForOutgoing, 
version);
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 2efa00d,db0c484..3ba296c
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -109,11 -109,23 +109,19 @@@ import org.apache.cassandra.utils.Pair
   *       session is done is is closed (closeSession()). Otherwise, the node 
switch to the WAIT_COMPLETE state and
   *       send a CompleteMessage to the other side.
   */
 -public class StreamSession implements IEndpointStateChangeSubscriber, 
IFailureDetectionEventListener
 +public class StreamSession implements IEndpointStateChangeSubscriber
  {
      private static final Logger logger = 
LoggerFactory.getLogger(StreamSession.class);
+ 
 -    // Executor that establish the streaming connection. Once we're connected 
to the other end, the rest of the streaming
 -    // is directly handled by the ConnectionHandler incoming and outgoing 
threads.
 -    private static final DebuggableThreadPoolExecutor streamExecutor = 
DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
 -                                                                              
                                              
FBUtilities.getAvailableProcessors());
 -
+     /**
+      * Streaming endpoint.
+      *
+      * Each {@code StreamSession} is identified by this InetAddress which is 
broadcast address of the node streaming.
+      */
      public final InetAddress peer;
 +    private final int index;
+     /** Actual connecting address. Can be the same as {@linkplain #peer}. */
+     public final InetAddress connecting;
  
      // should not be null when session is started
      private StreamResultFuture streamResult;
@@@ -151,15 -163,16 +159,17 @@@
       * Create new streaming session with the peer.
       *
       * @param peer Address of streaming peer
+      * @param connecting Actual connecting address
       * @param factory is used for establishing connection
       */
-     public StreamSession(InetAddress peer, StreamConnectionFactory factory, 
int index)
 -    public StreamSession(InetAddress peer, InetAddress connecting, 
StreamConnectionFactory factory)
++    public StreamSession(InetAddress peer, InetAddress connecting, 
StreamConnectionFactory factory, int index)
      {
          this.peer = peer;
+         this.connecting = connecting;
 +        this.index = index;
          this.factory = factory;
          this.handler = new ConnectionHandler(this);
-         this.metrics = StreamingMetrics.get(peer);
+         this.metrics = StreamingMetrics.get(connecting);
      }
  
      public UUID planId()
@@@ -197,16 -209,24 +207,18 @@@
              return;
          }
  
 -        streamExecutor.execute(new Runnable()
 +        try
          {
-             logger.info("[Stream #{}, ID#{}] Beginning stream session with 
{}", planId(), sessionIndex(), peer);
 -            public void run()
 -            {
 -                try
 -                {
 -                    logger.info("[Stream #{}] Starting streaming to {}{}", 
planId(),
 -                                                                           
peer,
 -                                                                           
peer.equals(connecting) ? "" : " through " + connecting);
 -                    handler.initiate();
 -                    onInitializationComplete();
 -                }
 -                catch (IOException e)
 -                {
 -                    onError(e);
 -                }
 -            }
 -        });
++            logger.info("[Stream #{}] Starting streaming to {}{}", planId(),
++                                                                   peer,
++                                                                   
peer.equals(connecting) ? "" : " through " + connecting);
 +            handler.initiate();
 +            onInitializationComplete();
 +        }
 +        catch (Exception e)
 +        {
 +            onError(e);
 +        }
      }
  
      public Socket createConnection() throws IOException
@@@ -595,7 -604,7 +607,7 @@@
          List<StreamSummary> transferSummaries = Lists.newArrayList();
          for (StreamTask transfer : transfers.values())
              transferSummaries.add(transfer.getSummary());
-         return new SessionInfo(peer, index, receivingSummaries, 
transferSummaries, state);
 -        return new SessionInfo(peer, connecting, receivingSummaries, 
transferSummaries, state);
++        return new SessionInfo(peer, index, connecting, receivingSummaries, 
transferSummaries, state);
      }
  
      public synchronized void taskCompleted(StreamReceiveTask completedTask)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
index bef6682,809bc0d..63e4ab7
--- 
a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
+++ 
b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@@ -40,10 -41,10 +41,11 @@@ public class SessionInfoCompositeDat
                                                              
"sendingSummaries",
                                                              "state",
                                                              "receivingFiles",
 -                                                            "sendingFiles"};
 +                                                            "sendingFiles",
 +                                                            "sessionIndex"};
      private static final String[] ITEM_DESCS = new String[]{"Plan ID",
                                                              "Session peer",
+                                                             "Connecting 
address",
                                                              "Summaries of 
receiving data",
                                                              "Summaries of 
sending data",
                                                              "Current session 
state",
@@@ -98,9 -99,8 +102,9 @@@
                  return ProgressInfoCompositeData.toCompositeData(planId, 
input);
              }
          };
-         valueMap.put(ITEM_NAMES[5], 
toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
-         valueMap.put(ITEM_NAMES[6], 
toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
-         valueMap.put(ITEM_NAMES[7], sessionInfo.sessionIndex);
+         valueMap.put(ITEM_NAMES[6], 
toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
+         valueMap.put(ITEM_NAMES[7], 
toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
++        valueMap.put(ITEM_NAMES[8], sessionInfo.sessionIndex);
          try
          {
              return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
@@@ -133,10 -134,10 +138,11 @@@
              }
          };
          SessionInfo info = new SessionInfo(peer,
-                                            (int)values[7],
-                                            
fromArrayOfCompositeData((CompositeData[]) values[2], toStreamSummary),
++                                           (int)values[8],
+                                            connecting,
                                             
fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
-                                            
StreamSession.State.valueOf((String) values[4]));
+                                            
fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary),
+                                            
StreamSession.State.valueOf((String) values[5]));
          Function<CompositeData, ProgressInfo> toProgressInfo = new 
Function<CompositeData, ProgressInfo>()
          {
              public ProgressInfo apply(CompositeData input)

Reply via email to