Repository: cassandra Updated Branches: refs/heads/trunk e473769fb -> 0de0b8c03
Keep sstable level when bootstrapping Patch by marcuse; reviewed by iamaleksey for CASSANDRA-7460 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0de0b8c0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0de0b8c0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0de0b8c0 Branch: refs/heads/trunk Commit: 0de0b8c0372e825e834b1ffd9685d3db87d21378 Parents: e473769 Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Oct 7 07:35:53 2014 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Oct 13 11:24:00 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/compaction/LeveledManifest.java | 14 ++++++++++++++ .../org/apache/cassandra/dht/RangeStreamer.java | 8 ++------ .../apache/cassandra/io/sstable/SSTableLoader.java | 2 +- .../apache/cassandra/io/sstable/SSTableWriter.java | 10 ++++++++-- .../cassandra/net/IncomingStreamingConnection.java | 2 +- .../org/apache/cassandra/repair/LocalSyncTask.java | 2 +- .../cassandra/repair/StreamingRepairTask.java | 2 +- .../cassandra/streaming/ConnectionHandler.java | 3 ++- .../cassandra/streaming/StreamCoordinator.java | 8 +++++--- .../org/apache/cassandra/streaming/StreamPlan.java | 11 ++++++++--- .../apache/cassandra/streaming/StreamReader.java | 7 ++++--- .../cassandra/streaming/StreamResultFuture.java | 9 +++++---- .../apache/cassandra/streaming/StreamSession.java | 9 ++++++++- .../cassandra/streaming/StreamTransferTask.java | 2 +- .../streaming/messages/FileMessageHeader.java | 11 +++++++++-- .../streaming/messages/OutgoingFileMessage.java | 5 +++-- .../streaming/messages/StreamInitMessage.java | 9 +++++++-- .../cassandra/streaming/messages/StreamMessage.java | 2 +- .../org/apache/cassandra/tools/SSTableImport.java | 4 ++-- .../apache/cassandra/io/sstable/SSTableUtils.java | 2 +- .../cassandra/streaming/StreamTransferTaskTest.java | 2 +- .../apache/cassandra/tools/SSTableExportTest.java | 16 ++++++++-------- 23 files changed, 94 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f602c0e..b6a3766 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Keep sstable levels when bootstrapping (CASSANDRA-7460) * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838) * Support for scripting languages in user-defined functions (CASSANDRA-7526) * Support for aggregation functions (CASSANDRA-4914) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index a0836a8..6d3bf69 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -38,6 +38,7 @@ import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.Pair; public class LeveledManifest @@ -330,6 +331,19 @@ public class LeveledManifest return new CompactionCandidate(unrepairedMostInterresting, 0, Long.MAX_VALUE); } } + + // during bootstrap we only do size tiering in L0 to make sure + // the streamed files can be placed in their original levels + if (StorageService.instance.isBootstrapMode()) + { + List<SSTableReader> mostInteresting = getSSTablesForSTCS(getLevel(0)); + if (!mostInteresting.isEmpty()) + { + logger.info("Bootstrapping - doing STCS in L0"); + return new CompactionCandidate(mostInteresting, 0, Long.MAX_VALUE); + } + return null; + } // LevelDB gives each level a score of how much data it contains vs its ideal amount, and // compacts the level with the highest score. But this falls apart spectacularly once you // get behind. Consider this set of levels: http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index be58d77..388834f 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -109,16 +109,12 @@ public class RangeStreamer this.tokens = tokens; this.address = address; this.description = description; - this.streamPlan = new StreamPlan(description); + this.streamPlan = new StreamPlan(description, true); } public RangeStreamer(TokenMetadata metadata, InetAddress address, String description) { - this.metadata = metadata; - this.tokens = null; - this.address = address; - this.description = description; - this.streamPlan = new StreamPlan(description); + this(metadata, null, address, description); } public void addSourceFilter(ISourceFilter filter) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index fbd583c..991fa1d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -157,7 +157,7 @@ public class SSTableLoader implements StreamEventHandler client.init(keyspace); outputHandler.output("Established connection to initial hosts"); - StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost).connectionFactory(client.getConnectionFactory()); + StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false).connectionFactory(client.getConnectionFactory()); Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap(); openSSTables(endpointToRanges); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index b5e7d02..ef8cd51 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -62,6 +62,7 @@ import org.apache.cassandra.io.util.FileMark; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.SegmentedFile; import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FilterFactory; @@ -84,14 +85,19 @@ public class SSTableWriter extends SSTable private final MetadataCollector sstableMetadataCollector; private final long repairedAt; - public SSTableWriter(String filename, long keyCount, long repairedAt) + public SSTableWriter(String filename, long keyCount, long repairedAt, int sstableLevel) { this(filename, keyCount, repairedAt, Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)), StorageService.getPartitioner(), - new MetadataCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator)); + new MetadataCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator).sstableLevel(sstableLevel)); + } + + public SSTableWriter(String filename, long keyCount) + { + this(filename, keyCount, ActiveRepairService.UNREPAIRED_SSTABLE, 0); } private static Set<Component> components(CFMetaData metadata) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java index 003bbf9..de18d50 100644 --- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java @@ -62,7 +62,7 @@ public class IncomingStreamingConnection extends Thread // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing. // Note: we cannot use the same socket for incoming and outgoing streams because we want to // parallelize said streams and the socket is blocking, so we might deadlock. - StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version); + StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version, init.keepSSTableLevel); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index 38f63ce..b34c508 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -57,7 +57,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint; logger.info(String.format("[repair #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, differences.size(), dst)); - new StreamPlan("Repair", repairedAt, 1).listeners(this) + new StreamPlan("Repair", repairedAt, 1, false).listeners(this) .flushBeforeTransfer(true) // request ranges from the remote node .requestRanges(dst, desc.keyspace, differences, desc.columnFamily) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index f30eb6f..1472720 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -50,7 +50,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler public void run() { logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst)); - new StreamPlan("Repair", repairedAt, 1).listeners(this) + new StreamPlan("Repair", repairedAt, 1, false).listeners(this) .flushBeforeTransfer(true) // request ranges from the remote node .requestRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index 6092046..7a7ccbf 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -179,7 +179,8 @@ public class ConnectionHandler session.sessionIndex(), session.planId(), session.description(), - isForOutgoing); + isForOutgoing, + session.keepSSTableLevel()); ByteBuffer messageBuf = message.createMessage(false, protocolVersion); getWriteChannel(socket).write(messageBuf); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamCoordinator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java index 71a853c..130bd45 100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@ -45,11 +45,13 @@ public class StreamCoordinator private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>(); private final int connectionsPerHost; private StreamConnectionFactory factory; + private final boolean keepSSTableLevel; - public StreamCoordinator(int connectionsPerHost, StreamConnectionFactory factory) + public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, StreamConnectionFactory factory) { this.connectionsPerHost = connectionsPerHost; this.factory = factory; + this.keepSSTableLevel = keepSSTableLevel; } public void setConnectionFactory(StreamConnectionFactory factory) @@ -233,7 +235,7 @@ public class StreamCoordinator // create if (streamSessions.size() < connectionsPerHost) { - StreamSession session = new StreamSession(peer, factory, streamSessions.size()); + StreamSession session = new StreamSession(peer, factory, streamSessions.size(), keepSSTableLevel); streamSessions.put(++lastReturned, session); return session; } @@ -265,7 +267,7 @@ public class StreamCoordinator StreamSession session = streamSessions.get(id); if (session == null) { - session = new StreamSession(peer, factory, id); + session = new StreamSession(peer, factory, id, keepSSTableLevel); streamSessions.put(id, session); } return session; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index ca448a3..5aa1bc6 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -47,14 +47,19 @@ public class StreamPlan */ public StreamPlan(String description) { - this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1); + this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false); } - public StreamPlan(String description, long repairedAt, int connectionsPerHost) + public StreamPlan(String description, boolean keepSSTableLevels) + { + this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels); + } + + public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels) { this.description = description; this.repairedAt = repairedAt; - this.coordinator = new StreamCoordinator(connectionsPerHost, new DefaultConnectionFactory()); + this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, new DefaultConnectionFactory()); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index b6e1aaf..34cbf02 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -58,6 +58,7 @@ public class StreamReader protected final StreamSession session; protected final Descriptor.Version inputVersion; protected final long repairedAt; + protected final int sstableLevel; protected Descriptor desc; @@ -69,6 +70,7 @@ public class StreamReader this.sections = header.sections; this.inputVersion = new Descriptor.Version(header.version); this.repairedAt = header.repairedAt; + this.sstableLevel = header.sstableLevel; } /** @@ -78,7 +80,7 @@ public class StreamReader */ public SSTableWriter read(ReadableByteChannel channel) throws IOException { - logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt); + logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel); long totalSize = totalSize(); Pair<String, String> kscf = Schema.instance.getCF(cfId); @@ -119,8 +121,7 @@ public class StreamReader if (localDir == null) throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir))); - - return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys, repairedAt); + return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys, repairedAt, sstableLevel); } protected void drain(InputStream dis, long bytesRead) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index f28a937..b8a5234 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -69,9 +69,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> set(getCurrentState()); } - private StreamResultFuture(UUID planId, String description) + private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels) { - this(planId, description, new StreamCoordinator(0, new DefaultConnectionFactory())); + this(planId, description, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory())); } static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners, StreamCoordinator coordinator) @@ -101,7 +101,8 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> InetAddress from, Socket socket, boolean isForOutgoing, - int version) throws IOException + int version, + boolean keepSSTableLevel) throws IOException { StreamResultFuture future = StreamManager.instance.getReceivingStream(planId); if (future == null) @@ -109,7 +110,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description); // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. - future = new StreamResultFuture(planId, description); + future = new StreamResultFuture(planId, description, keepSSTableLevel); StreamManager.instance.registerReceiving(future); } future.attachSocket(from, sessionIndex, socket, isForOutgoing, version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 2efa00d..560a9fa 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -133,6 +133,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber private int retries; private AtomicBoolean isAborted = new AtomicBoolean(false); + private final boolean keepSSTableLevel; public static enum State { @@ -153,13 +154,14 @@ public class StreamSession implements IEndpointStateChangeSubscriber * @param peer Address of streaming peer * @param factory is used for establishing connection */ - public StreamSession(InetAddress peer, StreamConnectionFactory factory, int index) + public StreamSession(InetAddress peer, StreamConnectionFactory factory, int index, boolean keepSSTableLevel) { this.peer = peer; this.index = index; this.factory = factory; this.handler = new ConnectionHandler(this); this.metrics = StreamingMetrics.get(peer); + this.keepSSTableLevel = keepSSTableLevel; } public UUID planId() @@ -177,6 +179,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber return streamResult == null ? null : streamResult.description; } + public boolean keepSSTableLevel() + { + return keepSSTableLevel; + } + /** * Bind this session to report to specific {@link StreamResultFuture} and * perform pre-streaming initialization. http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index b840ee5..18058c1 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -50,7 +50,7 @@ public class StreamTransferTask extends StreamTask public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt) { assert sstable != null && cfId.equals(sstable.metadata.cfId); - OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt); + OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel()); files.put(message.header.sequenceNumber, message); totalSize += message.header.size(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java index 284820e..5e378bc 100644 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java @@ -47,6 +47,7 @@ public class FileMessageHeader public final List<Pair<Long, Long>> sections; public final CompressionInfo compressionInfo; public final long repairedAt; + public final int sstableLevel; public FileMessageHeader(UUID cfId, int sequenceNumber, @@ -54,7 +55,8 @@ public class FileMessageHeader long estimatedKeys, List<Pair<Long, Long>> sections, CompressionInfo compressionInfo, - long repairedAt) + long repairedAt, + int sstableLevel) { this.cfId = cfId; this.sequenceNumber = sequenceNumber; @@ -63,6 +65,7 @@ public class FileMessageHeader this.sections = sections; this.compressionInfo = compressionInfo; this.repairedAt = repairedAt; + this.sstableLevel = sstableLevel; } /** @@ -96,6 +99,7 @@ public class FileMessageHeader sb.append(", transfer size: ").append(size()); sb.append(", compressed?: ").append(compressionInfo != null); sb.append(", repairedAt: ").append(repairedAt); + sb.append(", level: ").append(sstableLevel); sb.append(')'); return sb.toString(); } @@ -134,6 +138,7 @@ public class FileMessageHeader } CompressionInfo.serializer.serialize(header.compressionInfo, out, version); out.writeLong(header.repairedAt); + out.writeInt(header.sstableLevel); } public FileMessageHeader deserialize(DataInput in, int version) throws IOException @@ -148,7 +153,8 @@ public class FileMessageHeader sections.add(Pair.create(in.readLong(), in.readLong())); CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, MessagingService.current_version); long repairedAt = in.readLong(); - return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, estimatedKeys, sections, compressionInfo, repairedAt); + int sstableLevel = in.readInt(); + return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel); } public long serializedSize(FileMessageHeader header, int version) @@ -165,6 +171,7 @@ public class FileMessageHeader size += TypeSizes.NATIVE.sizeof(section.right); } size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version); + size += TypeSizes.NATIVE.sizeof(header.sstableLevel); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index 466e2cb..13af987 100644 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@ -60,7 +60,7 @@ public class OutgoingFileMessage extends StreamMessage public FileMessageHeader header; public SSTableReader sstable; - public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt) + public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel) { super(Type.FILE); this.sstable = sstable; @@ -77,7 +77,8 @@ public class OutgoingFileMessage extends StreamMessage estimatedKeys, sections, compressionInfo, - repairedAt); + repairedAt, + keepSSTableLevel ? sstable.getSSTableLevel() : 0); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java index a9ec4ae..0937f71 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java @@ -46,14 +46,16 @@ public class StreamInitMessage // true if this init message is to connect for outgoing message on receiving side public final boolean isForOutgoing; + public final boolean keepSSTableLevel; - public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing) + public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel) { this.from = from; this.sessionIndex = sessionIndex; this.planId = planId; this.description = description; this.isForOutgoing = isForOutgoing; + this.keepSSTableLevel = keepSSTableLevel; } /** @@ -105,6 +107,7 @@ public class StreamInitMessage UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version); out.writeUTF(message.description); out.writeBoolean(message.isForOutgoing); + out.writeBoolean(message.keepSSTableLevel); } public StreamInitMessage deserialize(DataInput in, int version) throws IOException @@ -114,7 +117,8 @@ public class StreamInitMessage UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); String description = in.readUTF(); boolean sentByInitiator = in.readBoolean(); - return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator); + boolean keepSSTableLevel = in.readBoolean(); + return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel); } public long serializedSize(StreamInitMessage message, int version) @@ -124,6 +128,7 @@ public class StreamInitMessage size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version); size += TypeSizes.NATIVE.sizeof(message.description); size += TypeSizes.NATIVE.sizeof(message.isForOutgoing); + size += TypeSizes.NATIVE.sizeof(message.keepSSTableLevel); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java index e527db1..372fdd3 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java @@ -32,7 +32,7 @@ import org.apache.cassandra.streaming.StreamSession; public abstract class StreamMessage { /** Streaming protocol version */ - public static final int CURRENT_VERSION = 2; + public static final int CURRENT_VERSION = 3; public static void serialize(StreamMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/tools/SSTableImport.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java index e678aaa..05b9dcb 100644 --- a/src/java/org/apache/cassandra/tools/SSTableImport.java +++ b/src/java/org/apache/cassandra/tools/SSTableImport.java @@ -302,7 +302,7 @@ public class SSTableImport Object[] data = parser.readValueAs(new TypeReference<Object[]>(){}); keyCountToImport = (keyCountToImport == null) ? data.length : keyCountToImport; - SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE); + SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport); System.out.printf("Importing %s keys...%n", keyCountToImport); @@ -375,7 +375,7 @@ public class SSTableImport System.out.printf("Importing %s keys...%n", keyCountToImport); parser = getParser(jsonFile); // renewing parser - SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE); + SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport); int lineNumber = 1; DecoratedKey prevStoredKey = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java index 157f89b..57c9477 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java @@ -211,7 +211,7 @@ public class SSTableUtils public SSTableReader write(int expectedSize, Appender appender) throws IOException { File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation) : new File(dest.filenameFor(Component.DATA)); - SSTableWriter writer = new SSTableWriter(datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE); + SSTableWriter writer = new SSTableWriter(datafile.getAbsolutePath(), expectedSize); while (appender.append(writer)) { /* pass */ } SSTableReader reader = writer.closeAndOpenReader(); // mark all components for removal http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 16fa77b..d84f9b7 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -63,7 +63,7 @@ public class StreamTransferTaskTest String ks = KEYSPACE1; String cf = "Standard1"; - StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), null, 0); + StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), null, 0, true); ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf); // create two sstables http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/test/unit/org/apache/cassandra/tools/SSTableExportTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java index 2009c0c..f93e168 100644 --- a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java +++ b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java @@ -97,7 +97,7 @@ public class SSTableExportTest { File tempSS = tempSSTableFile(KEYSPACE1, "Standard1"); ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1"); - SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE); + SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2); // Add rowA cfamily.addColumn(Util.cellname("colA"), ByteBufferUtil.bytes("valA"), System.currentTimeMillis()); @@ -134,7 +134,7 @@ public class SSTableExportTest { File tempSS = tempSSTableFile(KEYSPACE1, "Standard1"); ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1"); - SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE); + SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2); int nowInSec = (int)(System.currentTimeMillis() / 1000) + 42; //live for 42 seconds // Add rowA @@ -191,7 +191,7 @@ public class SSTableExportTest ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1"); File tempSS = tempSSTableFile(KEYSPACE1, "Standard1"); ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1"); - SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE); + SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2); // Add rowA cfamily.addColumn(Util.cellname("name"), ByteBufferUtil.bytes("val"), System.currentTimeMillis()); @@ -231,7 +231,7 @@ public class SSTableExportTest { File tempSS = tempSSTableFile(KEYSPACE1, "Counter1"); ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Counter1"); - SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE); + SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2); // Add rowA cfamily.addColumn(BufferCounterCell.createLocal(Util.cellname("colA"), 42, System.currentTimeMillis(), Long.MIN_VALUE)); @@ -263,7 +263,7 @@ public class SSTableExportTest { File tempSS = tempSSTableFile(KEYSPACE1, "ValuesWithQuotes"); ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "ValuesWithQuotes"); - SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE); + SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2); // Add rowA cfamily.addColumn(new BufferCell(Util.cellname("data"), UTF8Type.instance.fromString("{\"foo\":\"bar\"}"))); @@ -295,7 +295,7 @@ public class SSTableExportTest { File tempSS = tempSSTableFile(KEYSPACE1, "Standard1"); ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1"); - SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE); + SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2); // Add rowA cfamily.addColumn(Util.cellname("colName"), ByteBufferUtil.bytes("val"), System.currentTimeMillis()); @@ -357,7 +357,7 @@ public class SSTableExportTest { File tempSS = tempSSTableFile(KEYSPACE1, "UUIDKeys"); ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "UUIDKeys"); - SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE); + SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2); // Add a row cfamily.addColumn(column(CFMetaData.DEFAULT_KEY_ALIAS, "not a uuid", 1L)); @@ -387,7 +387,7 @@ public class SSTableExportTest { File tempSS = tempSSTableFile(KEYSPACE1, "AsciiKeys"); ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "AsciiKeys"); - SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE); + SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE, 0); // Add a row cfamily.addColumn(column("column", "value", 1L));