Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
        src/java/org/apache/cassandra/streaming/StreamOut.java
        test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9495eb59
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9495eb59
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9495eb59

Branch: refs/heads/trunk
Commit: 9495eb59c403cd97ba41e68e68903abb4f8ff113
Parents: 8d4b51d 18be7fa
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Aug 29 10:18:32 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Aug 29 10:18:32 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamSession.java      |   5 +-
 .../streaming/StreamingTransferTest.java        | 119 ++++++++++++-------
 3 files changed, 79 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9495eb59/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5621436,5cb1522..8a1004f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,61 -1,16 +1,62 @@@
 -1.2.10
 - * Add snitch, schema version, cluster, partitioner to JMX (CASSANDRA-5881)
 +2.0.1
 + * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614)
 + * Log Merkle tree stats (CASSANDRA-2698)
 + * Switch from crc32 to adler32 for compressed sstable checksums 
(CASSANDRA-5862)
 + * Improve offheap memcpy performance (CASSANDRA-5884)
 + * Use a range aware scanner for cleanup (CASSANDRA-2524)
 + * Cleanup doesn't need to inspect sstables that contain only local data 
 +   (CASSANDRA-5722)
 + * Add ability for CQL3 to list partition keys (CASSANDRA-4536)
 + * Improve native protocol serialization (CASSANDRA-5664)
 +Merged from 1.2:
   * Fix CqlRecordWriter with composite keys (CASSANDRA-5949)
 + * Add snitch, schema version, cluster, partitioner to JMX (CASSANDRA-5881)
   * Allow disabling SlabAllocator (CASSANDRA-5935)
   * Make user-defined compaction JMX blocking (CASSANDRA-4952)
+  * Fix streaming does not transfer wrapped range (CASSANDRA-5948)
  
  
 -1.2.9
 +2.0.0
 + * Fix thrift validation when inserting into CQL3 tables (CASSANDRA-5138)
 + * Fix periodic memtable flushing behavior with clean memtables 
(CASSANDRA-5931)
 + * Fix dateOf() function for pre-2.0 timestamp columns (CASSANDRA-5928)
 + * Fix SSTable unintentionally loads BF when opened for batch (CASSANDRA-5938)
 + * Add stream session progress to JMX (CASSANDRA-4757)
 + * Fix NPE during CAS operation (CASSANDRA-5925)
 +Merged from 1.2:
   * Fix getBloomFilterDiskSpaceUsed for AlwaysPresentFilter (CASSANDRA-5900)
 - * migrate 1.1 schema_columnfamilies.key_alias column to key_aliases
 -   (CASSANDRA-5800)
 - * add --migrate option to sstableupgrade and sstablescrub (CASSANDRA-5831)
 + * Don't announce schema version until we've loaded the changes locally
 +   (CASSANDRA-5904)
 + * Fix to support off heap bloom filters size greater than 2 GB 
(CASSANDRA-5903)
 + * Properly handle parsing huge map and set literals (CASSANDRA-5893)
 +
 +
 +2.0.0-rc2
 + * enable vnodes by default (CASSANDRA-5869)
 + * fix CAS contention timeout (CASSANDRA-5830)
 + * fix HsHa to respect max frame size (CASSANDRA-4573)
 + * Fix (some) 2i on composite components omissions (CASSANDRA-5851)
 + * cqlsh: add DESCRIBE FULL SCHEMA variant (CASSANDRA-5880)
 +Merged from 1.2:
 + * Correctly validate sparse composite cells in scrub (CASSANDRA-5855)
 + * Add KeyCacheHitRate metric to CF metrics (CASSANDRA-5868)
 + * cqlsh: add support for multiline comments (CASSANDRA-5798)
 + * Handle CQL3 SELECT duplicate IN restrictions on clustering columns
 +   (CASSANDRA-5856)
 +
 +
 +2.0.0-rc1
 + * improve DecimalSerializer performance (CASSANDRA-5837)
 + * fix potential spurious wakeup in AsyncOneResponse (CASSANDRA-5690)
 + * fix schema-related trigger issues (CASSANDRA-5774)
 + * Better validation when accessing CQL3 table from thrift (CASSANDRA-5138)
 + * Fix assertion error during repair (CASSANDRA-5801)
 + * Fix range tombstone bug (CASSANDRA-5805)
 + * DC-local CAS (CASSANDRA-5797)
 + * Add a native_protocol_version column to the system.local table 
(CASSANRDA-5819)
 + * Use index_interval from cassandra.yaml when upgraded (CASSANDRA-5822)
 + * Fix buffer underflow on socket close (CASSANDRA-5792)
 +Merged from 1.2:
   * fix bulk-loading compressed sstables (CASSANDRA-5820)
   * (Hadoop) fix quoting in CqlPagingRecordReader and CqlRecordWriter 
     (CASSANDRA-5824)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9495eb59/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index c87958b,0000000..5a16d81
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -1,631 -1,0 +1,632 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.streaming;
 +
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.util.*;
 +import java.util.concurrent.Future;
 +
 +import com.google.common.collect.Lists;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.gms.*;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.metrics.StreamingMetrics;
 +import org.apache.cassandra.streaming.messages.*;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
 +
 +/**
 + * Handles the streaming a one or more section of one of more sstables to and 
from a specific
 + * remote node.
 + *
 + * Both this node and the remote one will create a similar symmetrical 
StreamSession. A streaming
 + * session has the following life-cycle:
 + *
 + * 1. Connections Initialization
 + *
 + *   (a) A node (the initiator in the following) create a new StreamSession, 
initialize it (init())
 + *       and then start it (start()). Start will create a {@link 
ConnectionHandler} that will create
 + *       two connections to the remote node (the follower in the following) 
with whom to stream and send
 + *       a StreamInit message. The first connection will be the incoming 
connection for the
 + *       initiator, and the second connection will be the outgoing.
 + *   (b) Upon reception of that StreamInit message, the follower creates its 
own StreamSession,
 + *       initialize it if it still does not exist, and attach connecting 
socket to its ConnectionHandler
 + *       according to StreamInit message's isForOutgoing flag.
 + *   (d) When the both incoming and outgoing connections are established, 
StreamSession calls
 + *       StreamSession#onInitializationComplete method to start the streaming 
prepare phase
 + *       (StreamResultFuture.startStreaming()).
 + *
 + * 2. Streaming preparation phase
 + *
 + *   (a) This phase is started when the initiator onInitializationComplete() 
method is called. This method sends a
 + *       PrepareMessage that includes what files/sections this node will 
stream to the follower
 + *       (stored in a StreamTranferTask, each column family has it's own 
transfer task) and what
 + *       the follower needs to stream back (StreamReceiveTask, same as 
above). If the initiator has
 + *       nothing to receive from the follower, it goes directly to its 
Streaming phase. Otherwise,
 + *       it waits for the follower PrepareMessage.
 + *   (b) Upon reception of the PrepareMessage, the follower records which 
files/sections it will receive
 + *       and send back its own PrepareMessage with a summary of the 
files/sections that will be sent to
 + *       the initiator (prepare()). After having sent that message, the 
follower goes to its Streamning
 + *       phase.
 + *   (c) When the initiator receives the follower PrepareMessage, it records 
which files/sections it will
 + *       receive and then goes to his own Streaming phase.
 + *
 + * 3. Streaming phase
 + *
 + *   (a) The streaming phase is started by each node (the sender in the 
follower, but note that each side
 + *       of the StreamSession may be sender for some of the files) involved 
by calling startStreamingFiles().
 + *       This will sequentially send a FileMessage for each file of each 
SteamTransferTask. Each FileMessage
 + *       consists of a FileMessageHeader that indicates which file is coming 
and then start streaming the
 + *       content for that file (StreamWriter in FileMessage.serialize()). 
When a file is fully sent, the
 + *       fileSent() method is called for that file. If all the files for a 
StreamTransferTask are sent
 + *       (StreamTransferTask.complete()), the task is marked complete 
(taskCompleted()).
 + *   (b) On the receiving side, a SSTable will be written for the incoming 
file (StreamReader in
 + *       FileMessage.deserialize()) and once the FileMessage is fully 
received, the file will be marked as
 + *       complete (received()). When all files for the StreamReceiveTask have 
been received, the sstables
 + *       are added to the CFS (and 2ndary index are built, 
StreamReceiveTask.complete()) and the task
 + *       is marked complete (taskCompleted())
 + *   (b) If during the streaming of a particular file an I/O error occurs on 
the receiving end of a stream
 + *       (FileMessage.deserialize), the node will retry the file (up to 
DatabaseDescriptor.getMaxStreamingRetries())
 + *       by sending a RetryMessage to the sender. On receiving a 
RetryMessage, the sender simply issue a new
 + *       FileMessage for that file.
 + *   (c) When all transfer and receive tasks for a session are complete, the 
move to the Completion phase
 + *       (maybeCompleted()).
 + *
 + * 4. Completion phase
 + *
 + *   (a) When a node has finished all transfer and receive task, it enter the 
completion phase (maybeCompleted()).
 + *       If it had already received a CompleteMessage from the other side (it 
is in the WAIT_COMPLETE state), that
 + *       session is done is is closed (closeSession()). Otherwise, the node 
switch to the WAIT_COMPLETE state and
 + *       send a CompleteMessage to the other side.
 + */
 +public class StreamSession implements IEndpointStateChangeSubscriber, 
IFailureDetectionEventListener
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(StreamSession.class);
 +
 +    // Executor that establish the streaming connection. Once we're connected 
to the other end, the rest of the streaming
 +    // is directly handled by the ConnectionHandler incoming and outgoing 
threads.
 +    private static final DebuggableThreadPoolExecutor streamExecutor = 
DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
 +                                                                              
                                              
FBUtilities.getAvailableProcessors());
 +    public final InetAddress peer;
 +
 +    // should not be null when session is started
 +    private StreamResultFuture streamResult;
 +
 +    // stream requests to send to the peer
 +    private final List<StreamRequest> requests = new ArrayList<>();
 +    // streaming tasks are created and managed per ColumnFamily ID
 +    private final Map<UUID, StreamTransferTask> transfers = new HashMap<>();
 +    // data receivers, filled after receiving prepare message
 +    private final Map<UUID, StreamReceiveTask> receivers = new HashMap<>();
 +    private final StreamingMetrics metrics;
 +
 +    public final ConnectionHandler handler;
 +
 +    private int retries;
 +
 +    public static enum State
 +    {
 +        INITIALIZED,
 +        PREPARING,
 +        STREAMING,
 +        WAIT_COMPLETE,
 +        COMPLETE,
 +        FAILED,
 +    }
 +
 +    private volatile State state = State.INITIALIZED;
 +
 +    /**
 +     * Create new streaming session with the peer.
 +     *
 +     * @param peer Address of streaming peer
 +     */
 +    public StreamSession(InetAddress peer)
 +    {
 +        this.peer = peer;
 +        this.handler = new ConnectionHandler(this);
 +        this.metrics = StreamingMetrics.get(peer);
 +    }
 +
 +    public UUID planId()
 +    {
 +        return streamResult == null ? null : streamResult.planId;
 +    }
 +
 +    public String description()
 +    {
 +        return streamResult == null ? null : streamResult.description;
 +    }
 +
 +    /**
 +     * Bind this session to report to specific {@link StreamResultFuture} and
 +     * perform pre-streaming initialization.
 +     *
 +     * @param streamResult result to report to
 +     */
 +    public void init(StreamResultFuture streamResult)
 +    {
 +        this.streamResult = streamResult;
 +
 +        // register to gossiper/FD to fail on node failure
 +        Gossiper.instance.register(this);
 +        FailureDetector.instance.registerFailureDetectionEventListener(this);
 +    }
 +
 +    public void start()
 +    {
 +        if (requests.isEmpty() && transfers.isEmpty())
 +        {
 +            logger.info("[Stream #{}] Session does not have any tasks.", 
planId());
 +            closeSession(State.COMPLETE);
 +            return;
 +        }
 +
 +        streamExecutor.execute(new Runnable()
 +        {
 +            public void run()
 +            {
 +                try
 +                {
 +                    handler.initiate();
 +                    onInitializationComplete();
 +                }
 +                catch (IOException e)
 +                {
 +                    onError(e);
 +                }
 +            }
 +        });
 +    }
 +
 +    /**
 +     * Request data fetch task to this session.
 +     *
 +     * @param keyspace Requesting keyspace
 +     * @param ranges Ranges to retrieve data
 +     * @param columnFamilies ColumnFamily names. Can be empty if requesting 
all CF under the keyspace.
 +     */
 +    public void addStreamRequest(String keyspace, Collection<Range<Token>> 
ranges, Collection<String> columnFamilies)
 +    {
 +        requests.add(new StreamRequest(keyspace, ranges, columnFamilies));
 +    }
 +
 +    /**
 +     * Set up transfer for specific keyspace/ranges/CFs
 +     *
 +     * @param keyspace Transfer keyspace
 +     * @param ranges Transfer ranges
 +     * @param columnFamilies Transfer ColumnFamilies
 +     */
 +    public void addTransferRanges(String keyspace, Collection<Range<Token>> 
ranges, Collection<String> columnFamilies, boolean flushTables)
 +    {
 +        Collection<ColumnFamilyStore> stores = new HashSet<>();
 +        // if columnfamilies are not specified, we add all cf under the 
keyspace
 +        if (columnFamilies.isEmpty())
 +        {
 +            stores.addAll(Keyspace.open(keyspace).getColumnFamilyStores());
 +        }
 +        else
 +        {
 +            for (String cf : columnFamilies)
 +                stores.add(Keyspace.open(keyspace).getColumnFamilyStore(cf));
 +        }
 +
 +        if (flushTables)
 +            flushSSTables(stores);
 +
++        List<Range<Token>> normalizedRanges = Range.normalize(ranges);
 +        List<SSTableReader> sstables = Lists.newLinkedList();
 +        for (ColumnFamilyStore cfStore : stores)
 +        {
 +            List<AbstractBounds<RowPosition>> rowBoundsList = 
Lists.newLinkedList();
-             for (Range<Token> range : ranges)
++            for (Range<Token> range : normalizedRanges)
 +                rowBoundsList.add(range.toRowBounds());
 +            ColumnFamilyStore.ViewFragment view = 
cfStore.markReferenced(rowBoundsList);
 +            sstables.addAll(view.sstables);
 +        }
-         addTransferFiles(ranges, sstables);
++        addTransferFiles(normalizedRanges, sstables);
 +    }
 +
 +    /**
 +     * Set up transfer of the specific SSTables.
 +     * {@code sstables} must be marked as referenced so that not get deleted 
until transfer completes.
 +     *
 +     * @param ranges Transfer ranges
 +     * @param sstables Transfer files
 +     */
 +    public void addTransferFiles(Collection<Range<Token>> ranges, 
Collection<SSTableReader> sstables)
 +    {
 +        List<SSTableStreamingSections> sstableDetails = new 
ArrayList<>(sstables.size());
 +        for (SSTableReader sstable : sstables)
 +            sstableDetails.add(new SSTableStreamingSections(sstable, 
sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges)));
 +
 +        addTransferFiles(sstableDetails);
 +    }
 +
 +    public void addTransferFiles(Collection<SSTableStreamingSections> 
sstableDetails)
 +    {
 +        for (SSTableStreamingSections details : sstableDetails)
 +        {
 +            if (details.sections.isEmpty())
 +            {
 +                // A reference was acquired on the sstable and we won't 
stream it
 +                details.sstable.releaseReference();
 +                continue;
 +            }
 +
 +            UUID cfId = details.sstable.metadata.cfId;
 +            StreamTransferTask task = transfers.get(cfId);
 +            if (task == null)
 +            {
 +                task = new StreamTransferTask(this, cfId);
 +                transfers.put(cfId, task);
 +            }
 +            task.addTransferFile(details.sstable, details.estimatedKeys, 
details.sections);
 +        }
 +    }
 +
 +    public static class SSTableStreamingSections
 +    {
 +        public final SSTableReader sstable;
 +        public final List<Pair<Long, Long>> sections;
 +        public final long estimatedKeys;
 +
 +        public SSTableStreamingSections(SSTableReader sstable, 
List<Pair<Long, Long>> sections, long estimatedKeys)
 +        {
 +            this.sstable = sstable;
 +            this.sections = sections;
 +            this.estimatedKeys = estimatedKeys;
 +        }
 +    }
 +
 +    private void closeSession(State finalState)
 +    {
 +        state(finalState);
 +
 +        // Note that we shouldn't block on this close because this method is 
called on the handler
 +        // incoming thread (so we would deadlock).
 +        handler.close();
 +
 +        Gossiper.instance.unregister(this);
 +        
FailureDetector.instance.unregisterFailureDetectionEventListener(this);
 +        streamResult.handleSessionComplete(this);
 +    }
 +
 +    /**
 +     * Set current state to {@code newState}.
 +     *
 +     * @param newState new state to set
 +     */
 +    public void state(State newState)
 +    {
 +        state = newState;
 +    }
 +
 +    /**
 +     * @return current state
 +     */
 +    public State state()
 +    {
 +        return state;
 +    }
 +
 +    /**
 +     * Return if this session completed successfully.
 +     *
 +     * @return true if session completed successfully.
 +     */
 +    public boolean isSuccess()
 +    {
 +        return state == State.COMPLETE;
 +    }
 +
 +    public void messageReceived(StreamMessage message)
 +    {
 +        switch (message.type)
 +        {
 +            case PREPARE:
 +                PrepareMessage msg = (PrepareMessage) message;
 +                prepare(msg.requests, msg.summaries);
 +                break;
 +
 +            case FILE:
 +                receive((FileMessage) message);
 +                break;
 +
 +            case RECEIVED:
 +                ReceivedMessage received = (ReceivedMessage) message;
 +                received(received.cfId, received.sequenceNumber);
 +                break;
 +
 +            case RETRY:
 +                RetryMessage retry = (RetryMessage) message;
 +                retry(retry.cfId, retry.sequenceNumber);
 +                break;
 +
 +            case COMPLETE:
 +                complete();
 +                break;
 +
 +            case SESSION_FAILED:
 +                sessionFailed();
 +                break;
 +        }
 +    }
 +
 +    /**
 +     * Call back when connection initialization is complete to start the 
prepare phase.
 +     */
 +    public void onInitializationComplete()
 +    {
 +        // send prepare message
 +        state(State.PREPARING);
 +        PrepareMessage prepare = new PrepareMessage();
 +        prepare.requests.addAll(requests);
 +        for (StreamTransferTask task : transfers.values())
 +            prepare.summaries.add(task.getSummary());
 +        handler.sendMessage(prepare);
 +
 +        // if we don't need to prepare for receiving stream, start sending 
files immediately
 +        if (requests.isEmpty())
 +            startStreamingFiles();
 +    }
 +
 +    /**
 +     * Call back for handling exception during streaming.
 +     *
 +     * @param e thrown exception
 +     */
 +    public void onError(Throwable e)
 +    {
 +        logger.error("[Stream #" + planId() + "] Streaming error occurred", 
e);
 +        // send session failure message
 +        if (handler.isOutgoingConnected())
 +            handler.sendMessage(new SessionFailedMessage());
 +        // fail session
 +        closeSession(State.FAILED);
 +    }
 +
 +    /**
 +     * Prepare this session for sending/receiving files.
 +     */
 +    public void prepare(Collection<StreamRequest> requests, 
Collection<StreamSummary> summaries)
 +    {
 +        // prepare tasks
 +        state(State.PREPARING);
 +        for (StreamRequest request : requests)
 +            addTransferRanges(request.keyspace, request.ranges, 
request.columnFamilies, true); // always flush on stream request
 +        for (StreamSummary summary : summaries)
 +            prepareReceiving(summary);
 +
 +        // send back prepare message if prepare message contains stream 
request
 +        if (!requests.isEmpty())
 +        {
 +            PrepareMessage prepare = new PrepareMessage();
 +            for (StreamTransferTask task : transfers.values())
 +                prepare.summaries.add(task.getSummary());
 +            handler.sendMessage(prepare);
 +        }
 +
 +        // if there are files to stream
 +        if (!maybeCompleted())
 +            startStreamingFiles();
 +    }
 +
 +    /**
 +     * Call back after sending FileMessageHeader.
 +     *
 +     * @param header sent header
 +     */
 +    public void fileSent(FileMessageHeader header)
 +    {
 +        long headerSize = header.size();
 +        StreamingMetrics.totalOutgoingBytes.inc(headerSize);
 +        metrics.outgoingBytes.inc(headerSize);
 +    }
 +
 +    /**
 +     * Call back after receiving FileMessageHeader.
 +     *
 +     * @param message received file
 +     */
 +    public void receive(FileMessage message)
 +    {
 +        long headerSize = message.header.size();
 +        StreamingMetrics.totalIncomingBytes.inc(headerSize);
 +        metrics.incomingBytes.inc(headerSize);
 +        // send back file received message
 +        handler.sendMessage(new ReceivedMessage(message.header.cfId, 
message.header.sequenceNumber));
 +        receivers.get(message.header.cfId).received(message.sstable);
 +    }
 +
 +    public void progress(Descriptor desc, ProgressInfo.Direction direction, 
long bytes, long total)
 +    {
 +        ProgressInfo progress = new ProgressInfo(peer, 
desc.filenameFor(Component.DATA), direction, bytes, total);
 +        streamResult.handleProgress(progress);
 +    }
 +
 +    public void received(UUID cfId, int sequenceNumber)
 +    {
 +        transfers.get(cfId).complete(sequenceNumber);
 +    }
 +
 +    /**
 +     * Call back on receiving {@code StreamMessage.Type.RETRY} message.
 +     *
 +     * @param cfId ColumnFamily ID
 +     * @param sequenceNumber Sequence number to indicate which file to stream 
again
 +     */
 +    public void retry(UUID cfId, int sequenceNumber)
 +    {
 +        FileMessage message = 
transfers.get(cfId).createMessageForRetry(sequenceNumber);
 +        handler.sendMessage(message);
 +    }
 +
 +    /**
 +     * Check if session is completed on receiving {@code 
StreamMessage.Type.COMPLETE} message.
 +     */
 +    public synchronized void complete()
 +    {
 +        if (state == State.WAIT_COMPLETE)
 +        {
 +            closeSession(State.COMPLETE);
 +        }
 +        else
 +        {
 +            state(State.WAIT_COMPLETE);
 +        }
 +    }
 +
 +    /**
 +     * Call back on receiving {@code StreamMessage.Type.SESSION_FAILED} 
message.
 +     */
 +    public synchronized void sessionFailed()
 +    {
 +        closeSession(State.FAILED);
 +    }
 +
 +    public void doRetry(FileMessageHeader header, Throwable e)
 +    {
 +        logger.warn("[Stream #" + planId() + "] Retrying for following 
error", e);
 +        // retry
 +        retries++;
 +        if (retries > DatabaseDescriptor.getMaxStreamingRetries())
 +            onError(new IOException("Too many retries for " + header, e));
 +        else
 +            handler.sendMessage(new RetryMessage(header.cfId, 
header.sequenceNumber));
 +    }
 +
 +    /**
 +     * @return Current snapshot of this session info.
 +     */
 +    public SessionInfo getSessionInfo()
 +    {
 +        List<StreamSummary> receivingSummaries = Lists.newArrayList();
 +        for (StreamTask receiver : receivers.values())
 +            receivingSummaries.add(receiver.getSummary());
 +        List<StreamSummary> transferSummaries = Lists.newArrayList();
 +        for (StreamTask transfer : transfers.values())
 +            transferSummaries.add(transfer.getSummary());
 +        return new SessionInfo(peer, receivingSummaries, transferSummaries, 
state);
 +    }
 +
 +    public synchronized void taskCompleted(StreamReceiveTask completedTask)
 +    {
 +        receivers.remove(completedTask.cfId);
 +        maybeCompleted();
 +    }
 +
 +    public synchronized void taskCompleted(StreamTransferTask completedTask)
 +    {
 +        transfers.remove(completedTask.cfId);
 +        maybeCompleted();
 +    }
 +
 +    public void onJoin(InetAddress endpoint, EndpointState epState) {}
 +    public void onChange(InetAddress endpoint, ApplicationState state, 
VersionedValue value) {}
 +    public void onAlive(InetAddress endpoint, EndpointState state) {}
 +    public void onDead(InetAddress endpoint, EndpointState state) {}
 +
 +    public void onRemove(InetAddress endpoint)
 +    {
 +        convict(endpoint, Double.MAX_VALUE);
 +    }
 +
 +    public void onRestart(InetAddress endpoint, EndpointState epState)
 +    {
 +        convict(endpoint, Double.MAX_VALUE);
 +    }
 +
 +    public void convict(InetAddress endpoint, double phi)
 +    {
 +        if (!endpoint.equals(peer))
 +            return;
 +
 +        // We want a higher confidence in the failure detection than usual 
because failing a streaming wrongly has a high cost.
 +        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
 +            return;
 +
 +        closeSession(State.FAILED);
 +    }
 +
 +    private boolean maybeCompleted()
 +    {
 +        boolean completed = receivers.isEmpty() && transfers.isEmpty();
 +        if (completed)
 +        {
 +            if (state == State.WAIT_COMPLETE)
 +            {
 +                closeSession(State.COMPLETE);
 +            }
 +            else
 +            {
 +                // notify peer that this session is completed
 +                handler.sendMessage(new CompleteMessage());
 +                state(State.WAIT_COMPLETE);
 +            }
 +        }
 +        return completed;
 +    }
 +
 +    /**
 +     * Flushes matching column families from the given keyspace, or all 
columnFamilies
 +     * if the cf list is empty.
 +     */
 +    private void flushSSTables(Iterable<ColumnFamilyStore> stores)
 +    {
 +        List<Future<?>> flushes = new ArrayList<>();
 +        for (ColumnFamilyStore cfs : stores)
 +            flushes.add(cfs.forceFlush());
 +        FBUtilities.waitOnFutures(flushes);
 +    }
 +
 +    private void prepareReceiving(StreamSummary summary)
 +    {
 +        if (summary.files > 0)
 +            receivers.put(summary.cfId, new StreamReceiveTask(this, 
summary.cfId, summary.files, summary.totalSize));
 +    }
 +
 +    private void startStreamingFiles()
 +    {
 +        streamResult.handleSessionPrepared(this);
 +
 +        state(State.STREAMING);
 +        for (StreamTransferTask task : transfers.values())
 +        {
 +            Collection<FileMessage> messages = task.getFileMessages();
 +            if (messages.size() > 0)
 +                handler.sendMessages(messages);
 +            else
 +                taskCompleted(task); // there is no file to send
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9495eb59/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 2c389da,82c6b1c..0fceb17
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -128,22 -71,34 +128,34 @@@ public class StreamingTransferTest exte
       * Create and transfer a single sstable, and return the keys that should 
have been transferred.
       * The Mutator must create the given column, but it may also create any 
other columns it pleases.
       */
-     private List<String> createAndTransfer(ColumnFamilyStore cfs, Mutator 
mutator) throws Exception
 -    private List<String> createAndTransfer(Table table, ColumnFamilyStore 
cfs, Mutator mutator, boolean transferSSTables) throws Exception
++    private List<String> createAndTransfer(ColumnFamilyStore cfs, Mutator 
mutator, boolean transferSSTables) throws Exception
      {
          // write a temporary SSTable, and unregister it
 -        logger.debug("Mutating " + cfs.columnFamily);
 +        logger.debug("Mutating " + cfs.name);
          long timestamp = 1234;
          for (int i = 1; i <= 3; i++)
              mutator.mutate("key" + i, "col" + i, timestamp);
          cfs.forceBlockingFlush();
          Util.compactAll(cfs).get();
          assertEquals(1, cfs.getSSTables().size());
-         SSTableReader sstable = cfs.getSSTables().iterator().next();
-         cfs.clearUnsafe();
  
          // transfer the first and last key
 -        logger.debug("Transferring " + cfs.columnFamily);
 +        logger.debug("Transferring " + cfs.name);
-         transfer(sstable);
+         int[] offs;
+         if (transferSSTables)
+         {
+             SSTableReader sstable = cfs.getSSTables().iterator().next();
+             cfs.clearUnsafe();
 -            transferSSTables(table, sstable);
++            transferSSTables(sstable);
+             offs = new int[]{1, 3};
+         }
+         else
+         {
+             long beforeStreaming = System.currentTimeMillis();
 -            transferRanges(table, cfs);
++            transferRanges(cfs);
+             cfs.discardSSTables(beforeStreaming);
+             offs = new int[]{2, 3};
+         }
  
          // confirm that a single SSTable was transferred and registered
          assertEquals(1, cfs.getSSTables().size());
@@@ -172,32 -125,64 +183,76 @@@
          return keys;
      }
  
-     private void transfer(SSTableReader sstable) throws Exception
 -    private void transferRanges(Table table, ColumnFamilyStore cfs) throws 
Exception
++    private void transferSSTables(SSTableReader sstable) throws Exception
      {
          IPartitioner p = StorageService.getPartitioner();
 -        List<Range<Token>> ranges = new ArrayList<Range<Token>>();
 -        ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), 
p.getToken(ByteBufferUtil.bytes("key0"))));
 -        StreamOutSession session = StreamOutSession.create(table.name, LOCAL, 
(IStreamCallback) null);
 -        StreamOut.transferRanges(session, Arrays.asList(cfs), ranges, 
OperationType.BOOTSTRAP);
 -        session.await();
 +        List<Range<Token>> ranges = new ArrayList<>();
 +        ranges.add(new Range<>(p.getMinimumToken(), 
p.getToken(ByteBufferUtil.bytes("key1"))));
 +        ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), 
p.getMinimumToken()));
 +        transfer(sstable, ranges);
      }
  
 -    private void transferSSTables(Table table, SSTableReader sstable) throws 
Exception
++    private void transferRanges(ColumnFamilyStore cfs) throws Exception
+     {
+         IPartitioner p = StorageService.getPartitioner();
 -        List<Range<Token>> ranges = new ArrayList<Range<Token>>();
 -        ranges.add(new Range<Token>(p.getMinimumToken(), 
p.getToken(ByteBufferUtil.bytes("key1"))));
 -        ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key2")), 
p.getMinimumToken()));
 -        StreamOutSession session = StreamOutSession.create(table.name, LOCAL, 
(IStreamCallback) null);
 -        StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, 
OperationType.BOOTSTRAP);
 -        session.await();
++        List<Range<Token>> ranges = new ArrayList<>();
++        // wrapped range
++        ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), 
p.getToken(ByteBufferUtil.bytes("key0"))));
++        new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, 
cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()).execute().get();
++    }
++
 +    private void transfer(SSTableReader sstable, List<Range<Token>> ranges) 
throws Exception
 +    {
 +        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, 
makeStreamingDetails(ranges, Arrays.asList(sstable))).execute().get();
 +    }
 +
 +    private Collection<StreamSession.SSTableStreamingSections> 
makeStreamingDetails(List<Range<Token>> ranges, Collection<SSTableReader> 
sstables)
 +    {
 +        ArrayList<StreamSession.SSTableStreamingSections> details = new 
ArrayList<>();
 +        for (SSTableReader sstable : sstables)
 +        {
 +            details.add(new StreamSession.SSTableStreamingSections(sstable,
 +                                                                   
sstable.getPositionsForRanges(ranges),
 +                                                                   
sstable.estimatedKeysForRanges(ranges)));
 +        }
 +        return details;
      }
  
+     private void doTransferTable(boolean transferSSTables) throws Exception
+     {
 -        final Table table = Table.open("Keyspace1");
 -        final ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1");
++        final Keyspace keyspace = Keyspace.open("Keyspace1");
++        final ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore("Indexed1");
+ 
 -        List<String> keys = createAndTransfer(table, cfs, new Mutator()
++        List<String> keys = createAndTransfer(cfs, new Mutator()
+         {
+             public void mutate(String key, String col, long timestamp) throws 
Exception
+             {
+                 long val = key.hashCode();
 -                RowMutation rm = new RowMutation("Keyspace1", 
ByteBufferUtil.bytes(key));
 -                ColumnFamily cf = ColumnFamily.create(table.name, 
cfs.columnFamily);
++                ColumnFamily cf = 
TreeMapBackedSortedColumns.factory.create(keyspace.getName(), cfs.name);
+                 cf.addColumn(column(col, "v", timestamp));
+                 cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), 
ByteBufferUtil.bytes(val), timestamp));
 -                rm.add(cf);
++                RowMutation rm = new RowMutation("Keyspace1", 
ByteBufferUtil.bytes(key), cf);
+                 logger.debug("Applying row to transfer " + rm);
+                 rm.apply();
+             }
+         }, transferSSTables);
+ 
+         // confirm that the secondary index was recovered
+         for (String key : keys)
+         {
+             long val = key.hashCode();
 -            IPartitioner p = StorageService.getPartitioner();
+             IndexExpression expr = new 
IndexExpression(ByteBufferUtil.bytes("birthdate"),
 -                    IndexOperator.EQ,
 -                    ByteBufferUtil.bytes(val));
++                                                              
IndexOperator.EQ,
++                                                              
ByteBufferUtil.bytes(val));
+             List<IndexExpression> clause = Arrays.asList(expr);
+             IDiskAtomFilter filter = new IdentityQueryFilter();
+             Range<RowPosition> range = Util.range("", "");
 -            List<Row> rows = cfs.search(clause, range, 100, filter);
++            List<Row> rows = cfs.search(range, clause, filter, 100);
+             assertEquals(1, rows.size());
+             assert rows.get(0).key.key.equals(ByteBufferUtil.bytes(key));
+         }
+     }
+ 
      /**
       * Test to make sure RangeTombstones at column index boundary transferred 
correctly.
       */
@@@ -223,7 -208,7 +278,7 @@@
  
          SSTableReader sstable = cfs.getSSTables().iterator().next();
          cfs.clearUnsafe();
-         transfer(sstable);
 -        transferSSTables(table, sstable);
++        transferSSTables(sstable);
  
          // confirm that a single SSTable was transferred and registered
          assertEquals(1, cfs.getSSTables().size());
@@@ -300,12 -278,12 +331,12 @@@
                  entries.put(key, cf);
                  cleanedEntries.put(key, cfCleaned);
                  cfs.addSSTable(SSTableUtils.prepare()
 -                        .ks(table.name)
 -                        .cf(cfs.columnFamily)
 -                        .generation(0)
 -                        .write(entries));
 +                    .ks(keyspace.getName())
 +                    .cf(cfs.name)
 +                    .generation(0)
 +                    .write(entries));
              }
-         });
+         }, true);
  
          // filter pre-cleaned entries locally, and ensure that the end result 
is equal
          cleanedEntries.keySet().retainAll(keys);
@@@ -319,7 -297,7 +350,7 @@@
  
          // Retransfer the file, making sure it is now idempotent (see 
CASSANDRA-3481)
          cfs.clearUnsafe();
-         transfer(streamed);
 -        transferSSTables(table, streamed);
++        transferSSTables(streamed);
          SSTableReader restreamed = cfs.getSSTables().iterator().next();
          SSTableUtils.assertContentEquals(streamed, restreamed);
      }

Reply via email to