Merge branch 'cassandra-1.2' into cassandra-2.0 Conflicts: src/java/org/apache/cassandra/streaming/StreamOut.java test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9495eb59 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9495eb59 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9495eb59 Branch: refs/heads/trunk Commit: 9495eb59c403cd97ba41e68e68903abb4f8ff113 Parents: 8d4b51d 18be7fa Author: Yuki Morishita <yu...@apache.org> Authored: Thu Aug 29 10:18:32 2013 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Thu Aug 29 10:18:32 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/StreamSession.java | 5 +- .../streaming/StreamingTransferTest.java | 119 ++++++++++++------- 3 files changed, 79 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9495eb59/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 5621436,5cb1522..8a1004f --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,61 -1,16 +1,62 @@@ -1.2.10 - * Add snitch, schema version, cluster, partitioner to JMX (CASSANDRA-5881) +2.0.1 + * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614) + * Log Merkle tree stats (CASSANDRA-2698) + * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862) + * Improve offheap memcpy performance (CASSANDRA-5884) + * Use a range aware scanner for cleanup (CASSANDRA-2524) + * Cleanup doesn't need to inspect sstables that contain only local data + (CASSANDRA-5722) + * Add ability for CQL3 to list partition keys (CASSANDRA-4536) + * Improve native protocol serialization (CASSANDRA-5664) +Merged from 1.2: * Fix CqlRecordWriter with composite keys (CASSANDRA-5949) + * Add snitch, schema version, cluster, partitioner to JMX (CASSANDRA-5881) * Allow disabling SlabAllocator (CASSANDRA-5935) * Make user-defined compaction JMX blocking (CASSANDRA-4952) + * Fix streaming does not transfer wrapped range (CASSANDRA-5948) -1.2.9 +2.0.0 + * Fix thrift validation when inserting into CQL3 tables (CASSANDRA-5138) + * Fix periodic memtable flushing behavior with clean memtables (CASSANDRA-5931) + * Fix dateOf() function for pre-2.0 timestamp columns (CASSANDRA-5928) + * Fix SSTable unintentionally loads BF when opened for batch (CASSANDRA-5938) + * Add stream session progress to JMX (CASSANDRA-4757) + * Fix NPE during CAS operation (CASSANDRA-5925) +Merged from 1.2: * Fix getBloomFilterDiskSpaceUsed for AlwaysPresentFilter (CASSANDRA-5900) - * migrate 1.1 schema_columnfamilies.key_alias column to key_aliases - (CASSANDRA-5800) - * add --migrate option to sstableupgrade and sstablescrub (CASSANDRA-5831) + * Don't announce schema version until we've loaded the changes locally + (CASSANDRA-5904) + * Fix to support off heap bloom filters size greater than 2 GB (CASSANDRA-5903) + * Properly handle parsing huge map and set literals (CASSANDRA-5893) + + +2.0.0-rc2 + * enable vnodes by default (CASSANDRA-5869) + * fix CAS contention timeout (CASSANDRA-5830) + * fix HsHa to respect max frame size (CASSANDRA-4573) + * Fix (some) 2i on composite components omissions (CASSANDRA-5851) + * cqlsh: add DESCRIBE FULL SCHEMA variant (CASSANDRA-5880) +Merged from 1.2: + * Correctly validate sparse composite cells in scrub (CASSANDRA-5855) + * Add KeyCacheHitRate metric to CF metrics (CASSANDRA-5868) + * cqlsh: add support for multiline comments (CASSANDRA-5798) + * Handle CQL3 SELECT duplicate IN restrictions on clustering columns + (CASSANDRA-5856) + + +2.0.0-rc1 + * improve DecimalSerializer performance (CASSANDRA-5837) + * fix potential spurious wakeup in AsyncOneResponse (CASSANDRA-5690) + * fix schema-related trigger issues (CASSANDRA-5774) + * Better validation when accessing CQL3 table from thrift (CASSANDRA-5138) + * Fix assertion error during repair (CASSANDRA-5801) + * Fix range tombstone bug (CASSANDRA-5805) + * DC-local CAS (CASSANDRA-5797) + * Add a native_protocol_version column to the system.local table (CASSANRDA-5819) + * Use index_interval from cassandra.yaml when upgraded (CASSANDRA-5822) + * Fix buffer underflow on socket close (CASSANDRA-5792) +Merged from 1.2: * fix bulk-loading compressed sstables (CASSANDRA-5820) * (Hadoop) fix quoting in CqlPagingRecordReader and CqlRecordWriter (CASSANDRA-5824) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9495eb59/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java index c87958b,0000000..5a16d81 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@@ -1,631 -1,0 +1,632 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.*; +import java.util.concurrent.Future; + +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.*; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.metrics.StreamingMetrics; +import org.apache.cassandra.streaming.messages.*; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +/** + * Handles the streaming a one or more section of one of more sstables to and from a specific + * remote node. + * + * Both this node and the remote one will create a similar symmetrical StreamSession. A streaming + * session has the following life-cycle: + * + * 1. Connections Initialization + * + * (a) A node (the initiator in the following) create a new StreamSession, initialize it (init()) + * and then start it (start()). Start will create a {@link ConnectionHandler} that will create + * two connections to the remote node (the follower in the following) with whom to stream and send + * a StreamInit message. The first connection will be the incoming connection for the + * initiator, and the second connection will be the outgoing. + * (b) Upon reception of that StreamInit message, the follower creates its own StreamSession, + * initialize it if it still does not exist, and attach connecting socket to its ConnectionHandler + * according to StreamInit message's isForOutgoing flag. + * (d) When the both incoming and outgoing connections are established, StreamSession calls + * StreamSession#onInitializationComplete method to start the streaming prepare phase + * (StreamResultFuture.startStreaming()). + * + * 2. Streaming preparation phase + * + * (a) This phase is started when the initiator onInitializationComplete() method is called. This method sends a + * PrepareMessage that includes what files/sections this node will stream to the follower + * (stored in a StreamTranferTask, each column family has it's own transfer task) and what + * the follower needs to stream back (StreamReceiveTask, same as above). If the initiator has + * nothing to receive from the follower, it goes directly to its Streaming phase. Otherwise, + * it waits for the follower PrepareMessage. + * (b) Upon reception of the PrepareMessage, the follower records which files/sections it will receive + * and send back its own PrepareMessage with a summary of the files/sections that will be sent to + * the initiator (prepare()). After having sent that message, the follower goes to its Streamning + * phase. + * (c) When the initiator receives the follower PrepareMessage, it records which files/sections it will + * receive and then goes to his own Streaming phase. + * + * 3. Streaming phase + * + * (a) The streaming phase is started by each node (the sender in the follower, but note that each side + * of the StreamSession may be sender for some of the files) involved by calling startStreamingFiles(). + * This will sequentially send a FileMessage for each file of each SteamTransferTask. Each FileMessage + * consists of a FileMessageHeader that indicates which file is coming and then start streaming the + * content for that file (StreamWriter in FileMessage.serialize()). When a file is fully sent, the + * fileSent() method is called for that file. If all the files for a StreamTransferTask are sent + * (StreamTransferTask.complete()), the task is marked complete (taskCompleted()). + * (b) On the receiving side, a SSTable will be written for the incoming file (StreamReader in + * FileMessage.deserialize()) and once the FileMessage is fully received, the file will be marked as + * complete (received()). When all files for the StreamReceiveTask have been received, the sstables + * are added to the CFS (and 2ndary index are built, StreamReceiveTask.complete()) and the task + * is marked complete (taskCompleted()) + * (b) If during the streaming of a particular file an I/O error occurs on the receiving end of a stream + * (FileMessage.deserialize), the node will retry the file (up to DatabaseDescriptor.getMaxStreamingRetries()) + * by sending a RetryMessage to the sender. On receiving a RetryMessage, the sender simply issue a new + * FileMessage for that file. + * (c) When all transfer and receive tasks for a session are complete, the move to the Completion phase + * (maybeCompleted()). + * + * 4. Completion phase + * + * (a) When a node has finished all transfer and receive task, it enter the completion phase (maybeCompleted()). + * If it had already received a CompleteMessage from the other side (it is in the WAIT_COMPLETE state), that + * session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and + * send a CompleteMessage to the other side. + */ +public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener +{ + private static final Logger logger = LoggerFactory.getLogger(StreamSession.class); + + // Executor that establish the streaming connection. Once we're connected to the other end, the rest of the streaming + // is directly handled by the ConnectionHandler incoming and outgoing threads. + private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher", + FBUtilities.getAvailableProcessors()); + public final InetAddress peer; + + // should not be null when session is started + private StreamResultFuture streamResult; + + // stream requests to send to the peer + private final List<StreamRequest> requests = new ArrayList<>(); + // streaming tasks are created and managed per ColumnFamily ID + private final Map<UUID, StreamTransferTask> transfers = new HashMap<>(); + // data receivers, filled after receiving prepare message + private final Map<UUID, StreamReceiveTask> receivers = new HashMap<>(); + private final StreamingMetrics metrics; + + public final ConnectionHandler handler; + + private int retries; + + public static enum State + { + INITIALIZED, + PREPARING, + STREAMING, + WAIT_COMPLETE, + COMPLETE, + FAILED, + } + + private volatile State state = State.INITIALIZED; + + /** + * Create new streaming session with the peer. + * + * @param peer Address of streaming peer + */ + public StreamSession(InetAddress peer) + { + this.peer = peer; + this.handler = new ConnectionHandler(this); + this.metrics = StreamingMetrics.get(peer); + } + + public UUID planId() + { + return streamResult == null ? null : streamResult.planId; + } + + public String description() + { + return streamResult == null ? null : streamResult.description; + } + + /** + * Bind this session to report to specific {@link StreamResultFuture} and + * perform pre-streaming initialization. + * + * @param streamResult result to report to + */ + public void init(StreamResultFuture streamResult) + { + this.streamResult = streamResult; + + // register to gossiper/FD to fail on node failure + Gossiper.instance.register(this); + FailureDetector.instance.registerFailureDetectionEventListener(this); + } + + public void start() + { + if (requests.isEmpty() && transfers.isEmpty()) + { + logger.info("[Stream #{}] Session does not have any tasks.", planId()); + closeSession(State.COMPLETE); + return; + } + + streamExecutor.execute(new Runnable() + { + public void run() + { + try + { + handler.initiate(); + onInitializationComplete(); + } + catch (IOException e) + { + onError(e); + } + } + }); + } + + /** + * Request data fetch task to this session. + * + * @param keyspace Requesting keyspace + * @param ranges Ranges to retrieve data + * @param columnFamilies ColumnFamily names. Can be empty if requesting all CF under the keyspace. + */ + public void addStreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies) + { + requests.add(new StreamRequest(keyspace, ranges, columnFamilies)); + } + + /** + * Set up transfer for specific keyspace/ranges/CFs + * + * @param keyspace Transfer keyspace + * @param ranges Transfer ranges + * @param columnFamilies Transfer ColumnFamilies + */ + public void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables) + { + Collection<ColumnFamilyStore> stores = new HashSet<>(); + // if columnfamilies are not specified, we add all cf under the keyspace + if (columnFamilies.isEmpty()) + { + stores.addAll(Keyspace.open(keyspace).getColumnFamilyStores()); + } + else + { + for (String cf : columnFamilies) + stores.add(Keyspace.open(keyspace).getColumnFamilyStore(cf)); + } + + if (flushTables) + flushSSTables(stores); + ++ List<Range<Token>> normalizedRanges = Range.normalize(ranges); + List<SSTableReader> sstables = Lists.newLinkedList(); + for (ColumnFamilyStore cfStore : stores) + { + List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList(); - for (Range<Token> range : ranges) ++ for (Range<Token> range : normalizedRanges) + rowBoundsList.add(range.toRowBounds()); + ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList); + sstables.addAll(view.sstables); + } - addTransferFiles(ranges, sstables); ++ addTransferFiles(normalizedRanges, sstables); + } + + /** + * Set up transfer of the specific SSTables. + * {@code sstables} must be marked as referenced so that not get deleted until transfer completes. + * + * @param ranges Transfer ranges + * @param sstables Transfer files + */ + public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables) + { + List<SSTableStreamingSections> sstableDetails = new ArrayList<>(sstables.size()); + for (SSTableReader sstable : sstables) + sstableDetails.add(new SSTableStreamingSections(sstable, sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges))); + + addTransferFiles(sstableDetails); + } + + public void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails) + { + for (SSTableStreamingSections details : sstableDetails) + { + if (details.sections.isEmpty()) + { + // A reference was acquired on the sstable and we won't stream it + details.sstable.releaseReference(); + continue; + } + + UUID cfId = details.sstable.metadata.cfId; + StreamTransferTask task = transfers.get(cfId); + if (task == null) + { + task = new StreamTransferTask(this, cfId); + transfers.put(cfId, task); + } + task.addTransferFile(details.sstable, details.estimatedKeys, details.sections); + } + } + + public static class SSTableStreamingSections + { + public final SSTableReader sstable; + public final List<Pair<Long, Long>> sections; + public final long estimatedKeys; + + public SSTableStreamingSections(SSTableReader sstable, List<Pair<Long, Long>> sections, long estimatedKeys) + { + this.sstable = sstable; + this.sections = sections; + this.estimatedKeys = estimatedKeys; + } + } + + private void closeSession(State finalState) + { + state(finalState); + + // Note that we shouldn't block on this close because this method is called on the handler + // incoming thread (so we would deadlock). + handler.close(); + + Gossiper.instance.unregister(this); + FailureDetector.instance.unregisterFailureDetectionEventListener(this); + streamResult.handleSessionComplete(this); + } + + /** + * Set current state to {@code newState}. + * + * @param newState new state to set + */ + public void state(State newState) + { + state = newState; + } + + /** + * @return current state + */ + public State state() + { + return state; + } + + /** + * Return if this session completed successfully. + * + * @return true if session completed successfully. + */ + public boolean isSuccess() + { + return state == State.COMPLETE; + } + + public void messageReceived(StreamMessage message) + { + switch (message.type) + { + case PREPARE: + PrepareMessage msg = (PrepareMessage) message; + prepare(msg.requests, msg.summaries); + break; + + case FILE: + receive((FileMessage) message); + break; + + case RECEIVED: + ReceivedMessage received = (ReceivedMessage) message; + received(received.cfId, received.sequenceNumber); + break; + + case RETRY: + RetryMessage retry = (RetryMessage) message; + retry(retry.cfId, retry.sequenceNumber); + break; + + case COMPLETE: + complete(); + break; + + case SESSION_FAILED: + sessionFailed(); + break; + } + } + + /** + * Call back when connection initialization is complete to start the prepare phase. + */ + public void onInitializationComplete() + { + // send prepare message + state(State.PREPARING); + PrepareMessage prepare = new PrepareMessage(); + prepare.requests.addAll(requests); + for (StreamTransferTask task : transfers.values()) + prepare.summaries.add(task.getSummary()); + handler.sendMessage(prepare); + + // if we don't need to prepare for receiving stream, start sending files immediately + if (requests.isEmpty()) + startStreamingFiles(); + } + + /** + * Call back for handling exception during streaming. + * + * @param e thrown exception + */ + public void onError(Throwable e) + { + logger.error("[Stream #" + planId() + "] Streaming error occurred", e); + // send session failure message + if (handler.isOutgoingConnected()) + handler.sendMessage(new SessionFailedMessage()); + // fail session + closeSession(State.FAILED); + } + + /** + * Prepare this session for sending/receiving files. + */ + public void prepare(Collection<StreamRequest> requests, Collection<StreamSummary> summaries) + { + // prepare tasks + state(State.PREPARING); + for (StreamRequest request : requests) + addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true); // always flush on stream request + for (StreamSummary summary : summaries) + prepareReceiving(summary); + + // send back prepare message if prepare message contains stream request + if (!requests.isEmpty()) + { + PrepareMessage prepare = new PrepareMessage(); + for (StreamTransferTask task : transfers.values()) + prepare.summaries.add(task.getSummary()); + handler.sendMessage(prepare); + } + + // if there are files to stream + if (!maybeCompleted()) + startStreamingFiles(); + } + + /** + * Call back after sending FileMessageHeader. + * + * @param header sent header + */ + public void fileSent(FileMessageHeader header) + { + long headerSize = header.size(); + StreamingMetrics.totalOutgoingBytes.inc(headerSize); + metrics.outgoingBytes.inc(headerSize); + } + + /** + * Call back after receiving FileMessageHeader. + * + * @param message received file + */ + public void receive(FileMessage message) + { + long headerSize = message.header.size(); + StreamingMetrics.totalIncomingBytes.inc(headerSize); + metrics.incomingBytes.inc(headerSize); + // send back file received message + handler.sendMessage(new ReceivedMessage(message.header.cfId, message.header.sequenceNumber)); + receivers.get(message.header.cfId).received(message.sstable); + } + + public void progress(Descriptor desc, ProgressInfo.Direction direction, long bytes, long total) + { + ProgressInfo progress = new ProgressInfo(peer, desc.filenameFor(Component.DATA), direction, bytes, total); + streamResult.handleProgress(progress); + } + + public void received(UUID cfId, int sequenceNumber) + { + transfers.get(cfId).complete(sequenceNumber); + } + + /** + * Call back on receiving {@code StreamMessage.Type.RETRY} message. + * + * @param cfId ColumnFamily ID + * @param sequenceNumber Sequence number to indicate which file to stream again + */ + public void retry(UUID cfId, int sequenceNumber) + { + FileMessage message = transfers.get(cfId).createMessageForRetry(sequenceNumber); + handler.sendMessage(message); + } + + /** + * Check if session is completed on receiving {@code StreamMessage.Type.COMPLETE} message. + */ + public synchronized void complete() + { + if (state == State.WAIT_COMPLETE) + { + closeSession(State.COMPLETE); + } + else + { + state(State.WAIT_COMPLETE); + } + } + + /** + * Call back on receiving {@code StreamMessage.Type.SESSION_FAILED} message. + */ + public synchronized void sessionFailed() + { + closeSession(State.FAILED); + } + + public void doRetry(FileMessageHeader header, Throwable e) + { + logger.warn("[Stream #" + planId() + "] Retrying for following error", e); + // retry + retries++; + if (retries > DatabaseDescriptor.getMaxStreamingRetries()) + onError(new IOException("Too many retries for " + header, e)); + else + handler.sendMessage(new RetryMessage(header.cfId, header.sequenceNumber)); + } + + /** + * @return Current snapshot of this session info. + */ + public SessionInfo getSessionInfo() + { + List<StreamSummary> receivingSummaries = Lists.newArrayList(); + for (StreamTask receiver : receivers.values()) + receivingSummaries.add(receiver.getSummary()); + List<StreamSummary> transferSummaries = Lists.newArrayList(); + for (StreamTask transfer : transfers.values()) + transferSummaries.add(transfer.getSummary()); + return new SessionInfo(peer, receivingSummaries, transferSummaries, state); + } + + public synchronized void taskCompleted(StreamReceiveTask completedTask) + { + receivers.remove(completedTask.cfId); + maybeCompleted(); + } + + public synchronized void taskCompleted(StreamTransferTask completedTask) + { + transfers.remove(completedTask.cfId); + maybeCompleted(); + } + + public void onJoin(InetAddress endpoint, EndpointState epState) {} + public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} + public void onAlive(InetAddress endpoint, EndpointState state) {} + public void onDead(InetAddress endpoint, EndpointState state) {} + + public void onRemove(InetAddress endpoint) + { + convict(endpoint, Double.MAX_VALUE); + } + + public void onRestart(InetAddress endpoint, EndpointState epState) + { + convict(endpoint, Double.MAX_VALUE); + } + + public void convict(InetAddress endpoint, double phi) + { + if (!endpoint.equals(peer)) + return; + + // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost. + if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold()) + return; + + closeSession(State.FAILED); + } + + private boolean maybeCompleted() + { + boolean completed = receivers.isEmpty() && transfers.isEmpty(); + if (completed) + { + if (state == State.WAIT_COMPLETE) + { + closeSession(State.COMPLETE); + } + else + { + // notify peer that this session is completed + handler.sendMessage(new CompleteMessage()); + state(State.WAIT_COMPLETE); + } + } + return completed; + } + + /** + * Flushes matching column families from the given keyspace, or all columnFamilies + * if the cf list is empty. + */ + private void flushSSTables(Iterable<ColumnFamilyStore> stores) + { + List<Future<?>> flushes = new ArrayList<>(); + for (ColumnFamilyStore cfs : stores) + flushes.add(cfs.forceFlush()); + FBUtilities.waitOnFutures(flushes); + } + + private void prepareReceiving(StreamSummary summary) + { + if (summary.files > 0) + receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize)); + } + + private void startStreamingFiles() + { + streamResult.handleSessionPrepared(this); + + state(State.STREAMING); + for (StreamTransferTask task : transfers.values()) + { + Collection<FileMessage> messages = task.getFileMessages(); + if (messages.size() > 0) + handler.sendMessages(messages); + else + taskCompleted(task); // there is no file to send + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9495eb59/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 2c389da,82c6b1c..0fceb17 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@@ -128,22 -71,34 +128,34 @@@ public class StreamingTransferTest exte * Create and transfer a single sstable, and return the keys that should have been transferred. * The Mutator must create the given column, but it may also create any other columns it pleases. */ - private List<String> createAndTransfer(ColumnFamilyStore cfs, Mutator mutator) throws Exception - private List<String> createAndTransfer(Table table, ColumnFamilyStore cfs, Mutator mutator, boolean transferSSTables) throws Exception ++ private List<String> createAndTransfer(ColumnFamilyStore cfs, Mutator mutator, boolean transferSSTables) throws Exception { // write a temporary SSTable, and unregister it - logger.debug("Mutating " + cfs.columnFamily); + logger.debug("Mutating " + cfs.name); long timestamp = 1234; for (int i = 1; i <= 3; i++) mutator.mutate("key" + i, "col" + i, timestamp); cfs.forceBlockingFlush(); Util.compactAll(cfs).get(); assertEquals(1, cfs.getSSTables().size()); - SSTableReader sstable = cfs.getSSTables().iterator().next(); - cfs.clearUnsafe(); // transfer the first and last key - logger.debug("Transferring " + cfs.columnFamily); + logger.debug("Transferring " + cfs.name); - transfer(sstable); + int[] offs; + if (transferSSTables) + { + SSTableReader sstable = cfs.getSSTables().iterator().next(); + cfs.clearUnsafe(); - transferSSTables(table, sstable); ++ transferSSTables(sstable); + offs = new int[]{1, 3}; + } + else + { + long beforeStreaming = System.currentTimeMillis(); - transferRanges(table, cfs); ++ transferRanges(cfs); + cfs.discardSSTables(beforeStreaming); + offs = new int[]{2, 3}; + } // confirm that a single SSTable was transferred and registered assertEquals(1, cfs.getSSTables().size()); @@@ -172,32 -125,64 +183,76 @@@ return keys; } - private void transfer(SSTableReader sstable) throws Exception - private void transferRanges(Table table, ColumnFamilyStore cfs) throws Exception ++ private void transferSSTables(SSTableReader sstable) throws Exception { IPartitioner p = StorageService.getPartitioner(); - List<Range<Token>> ranges = new ArrayList<Range<Token>>(); - ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0")))); - StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback) null); - StreamOut.transferRanges(session, Arrays.asList(cfs), ranges, OperationType.BOOTSTRAP); - session.await(); + List<Range<Token>> ranges = new ArrayList<>(); + ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1")))); + ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken())); + transfer(sstable, ranges); } - private void transferSSTables(Table table, SSTableReader sstable) throws Exception ++ private void transferRanges(ColumnFamilyStore cfs) throws Exception + { + IPartitioner p = StorageService.getPartitioner(); - List<Range<Token>> ranges = new ArrayList<Range<Token>>(); - ranges.add(new Range<Token>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1")))); - ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken())); - StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback) null); - StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP); - session.await(); ++ List<Range<Token>> ranges = new ArrayList<>(); ++ // wrapped range ++ ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0")))); ++ new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()).execute().get(); ++ } ++ + private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception + { + new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Arrays.asList(sstable))).execute().get(); + } + + private Collection<StreamSession.SSTableStreamingSections> makeStreamingDetails(List<Range<Token>> ranges, Collection<SSTableReader> sstables) + { + ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>(); + for (SSTableReader sstable : sstables) + { + details.add(new StreamSession.SSTableStreamingSections(sstable, + sstable.getPositionsForRanges(ranges), + sstable.estimatedKeysForRanges(ranges))); + } + return details; } + private void doTransferTable(boolean transferSSTables) throws Exception + { - final Table table = Table.open("Keyspace1"); - final ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1"); ++ final Keyspace keyspace = Keyspace.open("Keyspace1"); ++ final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Indexed1"); + - List<String> keys = createAndTransfer(table, cfs, new Mutator() ++ List<String> keys = createAndTransfer(cfs, new Mutator() + { + public void mutate(String key, String col, long timestamp) throws Exception + { + long val = key.hashCode(); - RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key)); - ColumnFamily cf = ColumnFamily.create(table.name, cfs.columnFamily); ++ ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(keyspace.getName(), cfs.name); + cf.addColumn(column(col, "v", timestamp)); + cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(val), timestamp)); - rm.add(cf); ++ RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key), cf); + logger.debug("Applying row to transfer " + rm); + rm.apply(); + } + }, transferSSTables); + + // confirm that the secondary index was recovered + for (String key : keys) + { + long val = key.hashCode(); - IPartitioner p = StorageService.getPartitioner(); + IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), - IndexOperator.EQ, - ByteBufferUtil.bytes(val)); ++ IndexOperator.EQ, ++ ByteBufferUtil.bytes(val)); + List<IndexExpression> clause = Arrays.asList(expr); + IDiskAtomFilter filter = new IdentityQueryFilter(); + Range<RowPosition> range = Util.range("", ""); - List<Row> rows = cfs.search(clause, range, 100, filter); ++ List<Row> rows = cfs.search(range, clause, filter, 100); + assertEquals(1, rows.size()); + assert rows.get(0).key.key.equals(ByteBufferUtil.bytes(key)); + } + } + /** * Test to make sure RangeTombstones at column index boundary transferred correctly. */ @@@ -223,7 -208,7 +278,7 @@@ SSTableReader sstable = cfs.getSSTables().iterator().next(); cfs.clearUnsafe(); - transfer(sstable); - transferSSTables(table, sstable); ++ transferSSTables(sstable); // confirm that a single SSTable was transferred and registered assertEquals(1, cfs.getSSTables().size()); @@@ -300,12 -278,12 +331,12 @@@ entries.put(key, cf); cleanedEntries.put(key, cfCleaned); cfs.addSSTable(SSTableUtils.prepare() - .ks(table.name) - .cf(cfs.columnFamily) - .generation(0) - .write(entries)); + .ks(keyspace.getName()) + .cf(cfs.name) + .generation(0) + .write(entries)); } - }); + }, true); // filter pre-cleaned entries locally, and ensure that the end result is equal cleanedEntries.keySet().retainAll(keys); @@@ -319,7 -297,7 +350,7 @@@ // Retransfer the file, making sure it is now idempotent (see CASSANDRA-3481) cfs.clearUnsafe(); - transfer(streamed); - transferSSTables(table, streamed); ++ transferSSTables(streamed); SSTableReader restreamed = cfs.getSSTables().iterator().next(); SSTableUtils.assertContentEquals(streamed, restreamed); }