Repository: cassandra
Updated Branches:
  refs/heads/trunk e473769fb -> 0de0b8c03


Keep sstable level when bootstrapping

Patch by marcuse; reviewed by iamaleksey for CASSANDRA-7460


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0de0b8c0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0de0b8c0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0de0b8c0

Branch: refs/heads/trunk
Commit: 0de0b8c0372e825e834b1ffd9685d3db87d21378
Parents: e473769
Author: Marcus Eriksson <marc...@apache.org>
Authored: Tue Oct 7 07:35:53 2014 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Mon Oct 13 11:24:00 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                         |  1 +
 .../cassandra/db/compaction/LeveledManifest.java    | 14 ++++++++++++++
 .../org/apache/cassandra/dht/RangeStreamer.java     |  8 ++------
 .../apache/cassandra/io/sstable/SSTableLoader.java  |  2 +-
 .../apache/cassandra/io/sstable/SSTableWriter.java  | 10 ++++++++--
 .../cassandra/net/IncomingStreamingConnection.java  |  2 +-
 .../org/apache/cassandra/repair/LocalSyncTask.java  |  2 +-
 .../cassandra/repair/StreamingRepairTask.java       |  2 +-
 .../cassandra/streaming/ConnectionHandler.java      |  3 ++-
 .../cassandra/streaming/StreamCoordinator.java      |  8 +++++---
 .../org/apache/cassandra/streaming/StreamPlan.java  | 11 ++++++++---
 .../apache/cassandra/streaming/StreamReader.java    |  7 ++++---
 .../cassandra/streaming/StreamResultFuture.java     |  9 +++++----
 .../apache/cassandra/streaming/StreamSession.java   |  9 ++++++++-
 .../cassandra/streaming/StreamTransferTask.java     |  2 +-
 .../streaming/messages/FileMessageHeader.java       | 11 +++++++++--
 .../streaming/messages/OutgoingFileMessage.java     |  5 +++--
 .../streaming/messages/StreamInitMessage.java       |  9 +++++++--
 .../cassandra/streaming/messages/StreamMessage.java |  2 +-
 .../org/apache/cassandra/tools/SSTableImport.java   |  4 ++--
 .../apache/cassandra/io/sstable/SSTableUtils.java   |  2 +-
 .../cassandra/streaming/StreamTransferTaskTest.java |  2 +-
 .../apache/cassandra/tools/SSTableExportTest.java   | 16 ++++++++--------
 23 files changed, 94 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f602c0e..b6a3766 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Keep sstable levels when bootstrapping (CASSANDRA-7460)
  * Add Sigar library and perform basic OS settings check on startup 
(CASSANDRA-7838)
  * Support for scripting languages in user-defined functions (CASSANDRA-7526)
  * Support for aggregation functions (CASSANDRA-4914)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index a0836a8..6d3bf69 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
 
 public class LeveledManifest
@@ -330,6 +331,19 @@ public class LeveledManifest
                 return new CompactionCandidate(unrepairedMostInterresting, 0, 
Long.MAX_VALUE);
             }
         }
+
+        // during bootstrap we only do size tiering in L0 to make sure
+        // the streamed files can be placed in their original levels
+        if (StorageService.instance.isBootstrapMode())
+        {
+            List<SSTableReader> mostInteresting = 
getSSTablesForSTCS(getLevel(0));
+            if (!mostInteresting.isEmpty())
+            {
+                logger.info("Bootstrapping - doing STCS in L0");
+                return new CompactionCandidate(mostInteresting, 0, 
Long.MAX_VALUE);
+            }
+            return null;
+        }
         // LevelDB gives each level a score of how much data it contains vs 
its ideal amount, and
         // compacts the level with the highest score. But this falls apart 
spectacularly once you
         // get behind.  Consider this set of levels:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java 
b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index be58d77..388834f 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -109,16 +109,12 @@ public class RangeStreamer
         this.tokens = tokens;
         this.address = address;
         this.description = description;
-        this.streamPlan = new StreamPlan(description);
+        this.streamPlan = new StreamPlan(description, true);
     }
 
     public RangeStreamer(TokenMetadata metadata, InetAddress address, String 
description)
     {
-        this.metadata = metadata;
-        this.tokens = null;
-        this.address = address;
-        this.description = description;
-        this.streamPlan = new StreamPlan(description);
+        this(metadata, null, address, description);
     }
 
     public void addSourceFilter(ISourceFilter filter)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index fbd583c..991fa1d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -157,7 +157,7 @@ public class SSTableLoader implements StreamEventHandler
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
 
-        StreamPlan plan = new StreamPlan("Bulk Load", 0, 
connectionsPerHost).connectionFactory(client.getConnectionFactory());
+        StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, 
false).connectionFactory(client.getConnectionFactory());
 
         Map<InetAddress, Collection<Range<Token>>> endpointToRanges = 
client.getEndpointToRangesMap();
         openSSTables(endpointToRanges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index b5e7d02..ef8cd51 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -62,6 +62,7 @@ import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FilterFactory;
@@ -84,14 +85,19 @@ public class SSTableWriter extends SSTable
     private final MetadataCollector sstableMetadataCollector;
     private final long repairedAt;
 
-    public SSTableWriter(String filename, long keyCount, long repairedAt)
+    public SSTableWriter(String filename, long keyCount, long repairedAt, int 
sstableLevel)
     {
         this(filename,
              keyCount,
              repairedAt,
              Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)),
              StorageService.getPartitioner(),
-             new 
MetadataCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator));
+             new 
MetadataCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator).sstableLevel(sstableLevel));
+    }
+
+    public SSTableWriter(String filename, long keyCount)
+    {
+        this(filename, keyCount, ActiveRepairService.UNREPAIRED_SSTABLE, 0);
     }
 
     private static Set<Component> components(CFMetaData metadata)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java 
b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 003bbf9..de18d50 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -62,7 +62,7 @@ public class IncomingStreamingConnection extends Thread
             // The receiving side distinguish two connections by looking at 
StreamInitMessage#isForOutgoing.
             // Note: we cannot use the same socket for incoming and outgoing 
streams because we want to
             // parallelize said streams and the socket is blocking, so we 
might deadlock.
-            StreamResultFuture.initReceivingSide(init.sessionIndex, 
init.planId, init.description, init.from, socket, init.isForOutgoing, version);
+            StreamResultFuture.initReceivingSide(init.sessionIndex, 
init.planId, init.description, init.from, socket, init.isForOutgoing, version, 
init.keepSSTableLevel);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java 
b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 38f63ce..b34c508 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -57,7 +57,7 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
         InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : 
r2.endpoint;
 
         logger.info(String.format("[repair #%s] Performing streaming repair of 
%d ranges with %s", desc.sessionId, differences.size(), dst));
-        new StreamPlan("Repair", repairedAt, 1).listeners(this)
+        new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
                                             .flushBeforeTransfer(true)
                                             // request ranges from the remote 
node
                                             .requestRanges(dst, desc.keyspace, 
differences, desc.columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java 
b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index f30eb6f..1472720 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -50,7 +50,7 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
     public void run()
     {
         logger.info(String.format("[streaming task #%s] Performing streaming 
repair of %d ranges with %s", desc.sessionId, request.ranges.size(), 
request.dst));
-        new StreamPlan("Repair", repairedAt, 1).listeners(this)
+        new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
                                             .flushBeforeTransfer(true)
                                             // request ranges from the remote 
node
                                             .requestRanges(request.dst, 
desc.keyspace, request.ranges, desc.columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java 
b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 6092046..7a7ccbf 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -179,7 +179,8 @@ public class ConnectionHandler
                     session.sessionIndex(),
                     session.planId(),
                     session.description(),
-                    isForOutgoing);
+                    isForOutgoing,
+                    session.keepSSTableLevel());
             ByteBuffer messageBuf = message.createMessage(false, 
protocolVersion);
             getWriteChannel(socket).write(messageBuf);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java 
b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 71a853c..130bd45 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -45,11 +45,13 @@ public class StreamCoordinator
     private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>();
     private final int connectionsPerHost;
     private StreamConnectionFactory factory;
+    private final boolean keepSSTableLevel;
 
-    public StreamCoordinator(int connectionsPerHost, StreamConnectionFactory 
factory)
+    public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, 
StreamConnectionFactory factory)
     {
         this.connectionsPerHost = connectionsPerHost;
         this.factory = factory;
+        this.keepSSTableLevel = keepSSTableLevel;
     }
 
     public void setConnectionFactory(StreamConnectionFactory factory)
@@ -233,7 +235,7 @@ public class StreamCoordinator
             // create
             if (streamSessions.size() < connectionsPerHost)
             {
-                StreamSession session = new StreamSession(peer, factory, 
streamSessions.size());
+                StreamSession session = new StreamSession(peer, factory, 
streamSessions.size(), keepSSTableLevel);
                 streamSessions.put(++lastReturned, session);
                 return session;
             }
@@ -265,7 +267,7 @@ public class StreamCoordinator
             StreamSession session = streamSessions.get(id);
             if (session == null)
             {
-                session = new StreamSession(peer, factory, id);
+                session = new StreamSession(peer, factory, id, 
keepSSTableLevel);
                 streamSessions.put(id, session);
             }
             return session;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java 
b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index ca448a3..5aa1bc6 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -47,14 +47,19 @@ public class StreamPlan
      */
     public StreamPlan(String description)
     {
-        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1);
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false);
     }
 
-    public StreamPlan(String description, long repairedAt, int 
connectionsPerHost)
+    public StreamPlan(String description, boolean keepSSTableLevels)
+    {
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, 
keepSSTableLevels);
+    }
+
+    public StreamPlan(String description, long repairedAt, int 
connectionsPerHost, boolean keepSSTableLevels)
     {
         this.description = description;
         this.repairedAt = repairedAt;
-        this.coordinator = new StreamCoordinator(connectionsPerHost, new 
DefaultConnectionFactory());
+        this.coordinator = new StreamCoordinator(connectionsPerHost, 
keepSSTableLevels, new DefaultConnectionFactory());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java 
b/src/java/org/apache/cassandra/streaming/StreamReader.java
index b6e1aaf..34cbf02 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -58,6 +58,7 @@ public class StreamReader
     protected final StreamSession session;
     protected final Descriptor.Version inputVersion;
     protected final long repairedAt;
+    protected final int sstableLevel;
 
     protected Descriptor desc;
 
@@ -69,6 +70,7 @@ public class StreamReader
         this.sections = header.sections;
         this.inputVersion = new Descriptor.Version(header.version);
         this.repairedAt = header.repairedAt;
+        this.sstableLevel = header.sstableLevel;
     }
 
     /**
@@ -78,7 +80,7 @@ public class StreamReader
      */
     public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
-        logger.debug("reading file from {}, repairedAt = {}", session.peer, 
repairedAt);
+        logger.debug("reading file from {}, repairedAt = {}, level = {}", 
session.peer, repairedAt, sstableLevel);
         long totalSize = totalSize();
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
@@ -119,8 +121,7 @@ public class StreamReader
         if (localDir == null)
             throw new IOException("Insufficient disk space to store " + 
totalSize + " bytes");
         desc = 
Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
-
-        return new SSTableWriter(desc.filenameFor(Component.DATA), 
estimatedKeys, repairedAt);
+        return new SSTableWriter(desc.filenameFor(Component.DATA), 
estimatedKeys, repairedAt, sstableLevel);
     }
 
     protected void drain(InputStream dis, long bytesRead) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java 
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index f28a937..b8a5234 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -69,9 +69,9 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
             set(getCurrentState());
     }
 
-    private StreamResultFuture(UUID planId, String description)
+    private StreamResultFuture(UUID planId, String description, boolean 
keepSSTableLevels)
     {
-        this(planId, description, new StreamCoordinator(0, new 
DefaultConnectionFactory()));
+        this(planId, description, new StreamCoordinator(0, keepSSTableLevels, 
new DefaultConnectionFactory()));
     }
 
     static StreamResultFuture init(UUID planId, String description, 
Collection<StreamEventHandler> listeners, StreamCoordinator coordinator)
@@ -101,7 +101,8 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
                                                                     
InetAddress from,
                                                                     Socket 
socket,
                                                                     boolean 
isForOutgoing,
-                                                                    int 
version) throws IOException
+                                                                    int 
version,
+                                                                    boolean 
keepSSTableLevel) throws IOException
     {
         StreamResultFuture future = 
StreamManager.instance.getReceivingStream(planId);
         if (future == null)
@@ -109,7 +110,7 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
             logger.info("[Stream #{} ID#{}] Creating new streaming plan for 
{}", planId, sessionIndex, description);
 
             // The main reason we create a StreamResultFuture on the receiving 
side is for JMX exposure.
-            future = new StreamResultFuture(planId, description);
+            future = new StreamResultFuture(planId, description, 
keepSSTableLevel);
             StreamManager.instance.registerReceiving(future);
         }
         future.attachSocket(from, sessionIndex, socket, isForOutgoing, 
version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 2efa00d..560a9fa 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -133,6 +133,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     private int retries;
 
     private AtomicBoolean isAborted = new AtomicBoolean(false);
+    private final boolean keepSSTableLevel;
 
     public static enum State
     {
@@ -153,13 +154,14 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
      * @param peer Address of streaming peer
      * @param factory is used for establishing connection
      */
-    public StreamSession(InetAddress peer, StreamConnectionFactory factory, 
int index)
+    public StreamSession(InetAddress peer, StreamConnectionFactory factory, 
int index, boolean keepSSTableLevel)
     {
         this.peer = peer;
         this.index = index;
         this.factory = factory;
         this.handler = new ConnectionHandler(this);
         this.metrics = StreamingMetrics.get(peer);
+        this.keepSSTableLevel = keepSSTableLevel;
     }
 
     public UUID planId()
@@ -177,6 +179,11 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         return streamResult == null ? null : streamResult.description;
     }
 
+    public boolean keepSSTableLevel()
+    {
+        return keepSSTableLevel;
+    }
+
     /**
      * Bind this session to report to specific {@link StreamResultFuture} and
      * perform pre-streaming initialization.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java 
b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index b840ee5..18058c1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -50,7 +50,7 @@ public class StreamTransferTask extends StreamTask
     public synchronized void addTransferFile(SSTableReader sstable, long 
estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
     {
         assert sstable != null && cfId.equals(sstable.metadata.cfId);
-        OutgoingFileMessage message = new OutgoingFileMessage(sstable, 
sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
+        OutgoingFileMessage message = new OutgoingFileMessage(sstable, 
sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, 
session.keepSSTableLevel());
         files.put(message.header.sequenceNumber, message);
         totalSize += message.header.size();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java 
b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 284820e..5e378bc 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -47,6 +47,7 @@ public class FileMessageHeader
     public final List<Pair<Long, Long>> sections;
     public final CompressionInfo compressionInfo;
     public final long repairedAt;
+    public final int sstableLevel;
 
     public FileMessageHeader(UUID cfId,
                              int sequenceNumber,
@@ -54,7 +55,8 @@ public class FileMessageHeader
                              long estimatedKeys,
                              List<Pair<Long, Long>> sections,
                              CompressionInfo compressionInfo,
-                             long repairedAt)
+                             long repairedAt,
+                             int sstableLevel)
     {
         this.cfId = cfId;
         this.sequenceNumber = sequenceNumber;
@@ -63,6 +65,7 @@ public class FileMessageHeader
         this.sections = sections;
         this.compressionInfo = compressionInfo;
         this.repairedAt = repairedAt;
+        this.sstableLevel = sstableLevel;
     }
 
     /**
@@ -96,6 +99,7 @@ public class FileMessageHeader
         sb.append(", transfer size: ").append(size());
         sb.append(", compressed?: ").append(compressionInfo != null);
         sb.append(", repairedAt: ").append(repairedAt);
+        sb.append(", level: ").append(sstableLevel);
         sb.append(')');
         return sb.toString();
     }
@@ -134,6 +138,7 @@ public class FileMessageHeader
             }
             CompressionInfo.serializer.serialize(header.compressionInfo, out, 
version);
             out.writeLong(header.repairedAt);
+            out.writeInt(header.sstableLevel);
         }
 
         public FileMessageHeader deserialize(DataInput in, int version) throws 
IOException
@@ -148,7 +153,8 @@ public class FileMessageHeader
                 sections.add(Pair.create(in.readLong(), in.readLong()));
             CompressionInfo compressionInfo = 
CompressionInfo.serializer.deserialize(in, MessagingService.current_version);
             long repairedAt = in.readLong();
-            return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, 
estimatedKeys, sections, compressionInfo, repairedAt);
+            int sstableLevel = in.readInt();
+            return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, 
estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel);
         }
 
         public long serializedSize(FileMessageHeader header, int version)
@@ -165,6 +171,7 @@ public class FileMessageHeader
                 size += TypeSizes.NATIVE.sizeof(section.right);
             }
             size += 
CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
+            size += TypeSizes.NATIVE.sizeof(header.sstableLevel);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 466e2cb..13af987 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -60,7 +60,7 @@ public class OutgoingFileMessage extends StreamMessage
     public FileMessageHeader header;
     public SSTableReader sstable;
 
-    public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long 
estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
+    public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long 
estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean 
keepSSTableLevel)
     {
         super(Type.FILE);
         this.sstable = sstable;
@@ -77,7 +77,8 @@ public class OutgoingFileMessage extends StreamMessage
                                             estimatedKeys,
                                             sections,
                                             compressionInfo,
-                                            repairedAt);
+                                            repairedAt,
+                                            keepSSTableLevel ? 
sstable.getSSTableLevel() : 0);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index a9ec4ae..0937f71 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -46,14 +46,16 @@ public class StreamInitMessage
 
     // true if this init message is to connect for outgoing message on 
receiving side
     public final boolean isForOutgoing;
+    public final boolean keepSSTableLevel;
 
-    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, 
String description, boolean isForOutgoing)
+    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, 
String description, boolean isForOutgoing, boolean keepSSTableLevel)
     {
         this.from = from;
         this.sessionIndex = sessionIndex;
         this.planId = planId;
         this.description = description;
         this.isForOutgoing = isForOutgoing;
+        this.keepSSTableLevel = keepSSTableLevel;
     }
 
     /**
@@ -105,6 +107,7 @@ public class StreamInitMessage
             UUIDSerializer.serializer.serialize(message.planId, out, 
MessagingService.current_version);
             out.writeUTF(message.description);
             out.writeBoolean(message.isForOutgoing);
+            out.writeBoolean(message.keepSSTableLevel);
         }
 
         public StreamInitMessage deserialize(DataInput in, int version) throws 
IOException
@@ -114,7 +117,8 @@ public class StreamInitMessage
             UUID planId = UUIDSerializer.serializer.deserialize(in, 
MessagingService.current_version);
             String description = in.readUTF();
             boolean sentByInitiator = in.readBoolean();
-            return new StreamInitMessage(from, sessionIndex, planId, 
description, sentByInitiator);
+            boolean keepSSTableLevel = in.readBoolean();
+            return new StreamInitMessage(from, sessionIndex, planId, 
description, sentByInitiator, keepSSTableLevel);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
@@ -124,6 +128,7 @@ public class StreamInitMessage
             size += UUIDSerializer.serializer.serializedSize(message.planId, 
MessagingService.current_version);
             size += TypeSizes.NATIVE.sizeof(message.description);
             size += TypeSizes.NATIVE.sizeof(message.isForOutgoing);
+            size += TypeSizes.NATIVE.sizeof(message.keepSSTableLevel);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index e527db1..372fdd3 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.streaming.StreamSession;
 public abstract class StreamMessage
 {
     /** Streaming protocol version */
-    public static final int CURRENT_VERSION = 2;
+    public static final int CURRENT_VERSION = 3;
 
     public static void serialize(StreamMessage message, 
DataOutputStreamAndChannel out, int version, StreamSession session) throws 
IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/tools/SSTableImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java 
b/src/java/org/apache/cassandra/tools/SSTableImport.java
index e678aaa..05b9dcb 100644
--- a/src/java/org/apache/cassandra/tools/SSTableImport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableImport.java
@@ -302,7 +302,7 @@ public class SSTableImport
         Object[] data = parser.readValueAs(new TypeReference<Object[]>(){});
 
         keyCountToImport = (keyCountToImport == null) ? data.length : 
keyCountToImport;
-        SSTableWriter writer = new SSTableWriter(ssTablePath, 
keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(ssTablePath, 
keyCountToImport);
 
         System.out.printf("Importing %s keys...%n", keyCountToImport);
 
@@ -375,7 +375,7 @@ public class SSTableImport
         System.out.printf("Importing %s keys...%n", keyCountToImport);
 
         parser = getParser(jsonFile); // renewing parser
-        SSTableWriter writer = new SSTableWriter(ssTablePath, 
keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(ssTablePath, 
keyCountToImport);
 
         int lineNumber = 1;
         DecoratedKey prevStoredKey = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 157f89b..57c9477 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -211,7 +211,7 @@ public class SSTableUtils
         public SSTableReader write(int expectedSize, Appender appender) throws 
IOException
         {
             File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, 
generation) : new File(dest.filenameFor(Component.DATA));
-            SSTableWriter writer = new 
SSTableWriter(datafile.getAbsolutePath(), expectedSize, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+            SSTableWriter writer = new 
SSTableWriter(datafile.getAbsolutePath(), expectedSize);
             while (appender.append(writer)) { /* pass */ }
             SSTableReader reader = writer.closeAndOpenReader();
             // mark all components for removal

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 16fa77b..d84f9b7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -63,7 +63,7 @@ public class StreamTransferTaskTest
         String ks = KEYSPACE1;
         String cf = "Standard1";
 
-        StreamSession session = new 
StreamSession(FBUtilities.getBroadcastAddress(), null, 0);
+        StreamSession session = new 
StreamSession(FBUtilities.getBroadcastAddress(), null, 0, true);
         ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
 
         // create two sstables

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java 
b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
index 2009c0c..f93e168 100644
--- a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
+++ b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
@@ -97,7 +97,7 @@ public class SSTableExportTest
     {
         File tempSS = tempSSTableFile(KEYSPACE1, "Standard1");
         ColumnFamily cfamily = 
ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         // Add rowA
         cfamily.addColumn(Util.cellname("colA"), ByteBufferUtil.bytes("valA"), 
System.currentTimeMillis());
@@ -134,7 +134,7 @@ public class SSTableExportTest
     {
         File tempSS = tempSSTableFile(KEYSPACE1, "Standard1");
         ColumnFamily cfamily = 
ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         int nowInSec = (int)(System.currentTimeMillis() / 1000) + 42; //live 
for 42 seconds
         // Add rowA
@@ -191,7 +191,7 @@ public class SSTableExportTest
         ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1");
         File tempSS = tempSSTableFile(KEYSPACE1, "Standard1");
         ColumnFamily cfamily = 
ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         // Add rowA
         cfamily.addColumn(Util.cellname("name"), ByteBufferUtil.bytes("val"), 
System.currentTimeMillis());
@@ -231,7 +231,7 @@ public class SSTableExportTest
     {
         File tempSS = tempSSTableFile(KEYSPACE1, "Counter1");
         ColumnFamily cfamily = 
ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Counter1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         // Add rowA
         cfamily.addColumn(BufferCounterCell.createLocal(Util.cellname("colA"), 
42, System.currentTimeMillis(), Long.MIN_VALUE));
@@ -263,7 +263,7 @@ public class SSTableExportTest
     {
         File tempSS = tempSSTableFile(KEYSPACE1, "ValuesWithQuotes");
         ColumnFamily cfamily = 
ArrayBackedSortedColumns.factory.create(KEYSPACE1, "ValuesWithQuotes");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         // Add rowA
         cfamily.addColumn(new BufferCell(Util.cellname("data"), 
UTF8Type.instance.fromString("{\"foo\":\"bar\"}")));
@@ -295,7 +295,7 @@ public class SSTableExportTest
     {
         File tempSS = tempSSTableFile(KEYSPACE1, "Standard1");
         ColumnFamily cfamily = 
ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         // Add rowA
         cfamily.addColumn(Util.cellname("colName"), 
ByteBufferUtil.bytes("val"), System.currentTimeMillis());
@@ -357,7 +357,7 @@ public class SSTableExportTest
     {
         File tempSS = tempSSTableFile(KEYSPACE1, "UUIDKeys");
         ColumnFamily cfamily = 
ArrayBackedSortedColumns.factory.create(KEYSPACE1, "UUIDKeys");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         // Add a row
         cfamily.addColumn(column(CFMetaData.DEFAULT_KEY_ALIAS, "not a uuid", 
1L));
@@ -387,7 +387,7 @@ public class SSTableExportTest
     {
         File tempSS = tempSSTableFile(KEYSPACE1, "AsciiKeys");
         ColumnFamily cfamily = 
ArrayBackedSortedColumns.factory.create(KEYSPACE1, "AsciiKeys");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, 
ActiveRepairService.UNREPAIRED_SSTABLE, 0);
 
         // Add a row
         cfamily.addColumn(column("column", "value", 1L));

Reply via email to