Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ae64cc0d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ae64cc0d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ae64cc0d Branch: refs/heads/cassandra-2.2 Commit: ae64cc0da4b83289a88478c48ae73f608990a0ec Parents: afe3fe3 8385bb6 Author: Yuki Morishita <yu...@apache.org> Authored: Tue Nov 17 18:07:42 2015 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Nov 17 18:07:42 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../io/compress/CompressionMetadata.java | 30 +++++++++++ .../streaming/messages/FileMessageHeader.java | 53 ++++++++++++++++++-- .../streaming/messages/IncomingFileMessage.java | 2 +- .../streaming/messages/OutgoingFileMessage.java | 14 ++---- .../compress/CompressedInputStreamTest.java | 7 +++ 6 files changed, 91 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae64cc0d/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 4cb9275,008d4d4..572afc2 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,17 -1,5 +1,18 @@@ -2.1.12 +2.2.4 + * Don't do anticompaction after subrange repair (CASSANDRA-10422) + * Fix SimpleDateType type compatibility (CASSANDRA-10027) + * (Hadoop) fix splits calculation (CASSANDRA-10640) + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) + * Use most up-to-date version of schema for system tables (CASSANDRA-10652) + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) + * Expose phi values from failure detector via JMX and tweak debug + and trace logging (CASSANDRA-9526) + * Fix RangeNamesQueryPager (CASSANDRA-10509) + * Deprecate Pig support (CASSANDRA-10542) + * Reduce contention getting instances of CompositeType (CASSANDRA-10433) +Merged from 2.1: + * Create compression chunk for sending file only (CASSANDRA-10680) * Make buffered read size configurable (CASSANDRA-10249) * Forbid compact clustering column type changes in ALTER TABLE (CASSANDRA-8879) * Reject incremental repair with subrange repair (CASSANDRA-10422) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae64cc0d/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae64cc0d/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java index e9c99fe,34d9a01..e9a727f --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java @@@ -44,14 -43,16 +44,20 @@@ public class FileMessageHeade public final int sequenceNumber; /** SSTable version */ public final String version; + + /** SSTable format **/ + public final SSTableFormat.Type format; public final long estimatedKeys; public final List<Pair<Long, Long>> sections; + /** + * Compression info for SSTable to send. Can be null if SSTable is not compressed. + * On sender, this field is always null to avoid holding large number of Chunks. + * Use compressionMetadata instead. + */ public final CompressionInfo compressionInfo; + private final CompressionMetadata compressionMetadata; public final long repairedAt; + public final int sstableLevel; public FileMessageHeader(UUID cfId, int sequenceNumber, @@@ -70,10 -68,33 +76,38 @@@ this.estimatedKeys = estimatedKeys; this.sections = sections; this.compressionInfo = compressionInfo; + this.compressionMetadata = null; + this.repairedAt = repairedAt; ++ this.sstableLevel = sstableLevel; + } + + public FileMessageHeader(UUID cfId, + int sequenceNumber, + String version, ++ SSTableFormat.Type format, + long estimatedKeys, + List<Pair<Long, Long>> sections, + CompressionMetadata compressionMetadata, - long repairedAt) ++ long repairedAt, ++ int sstableLevel) + { + this.cfId = cfId; + this.sequenceNumber = sequenceNumber; + this.version = version; ++ this.format = format; + this.estimatedKeys = estimatedKeys; + this.sections = sections; + this.compressionInfo = null; + this.compressionMetadata = compressionMetadata; this.repairedAt = repairedAt; + this.sstableLevel = sstableLevel; } + public boolean isCompressed() + { + return compressionInfo != null || compressionMetadata != null; + } + /** * @return total file size to transfer in bytes */ @@@ -101,12 -126,10 +139,12 @@@ sb.append("cfId: ").append(cfId); sb.append(", #").append(sequenceNumber); sb.append(", version: ").append(version); + sb.append(", format: ").append(format); sb.append(", estimated keys: ").append(estimatedKeys); sb.append(", transfer size: ").append(size()); - sb.append(", compressed?: ").append(compressionInfo != null); + sb.append(", compressed?: ").append(isCompressed()); sb.append(", repairedAt: ").append(repairedAt); + sb.append(", level: ").append(sstableLevel); sb.append(')'); return sb.toString(); } @@@ -150,9 -166,13 +188,14 @@@ out.writeLong(section.left); out.writeLong(section.right); } - CompressionInfo.serializer.serialize(header.compressionInfo, out, version); + // construct CompressionInfo here to avoid holding large number of Chunks on heap. + CompressionInfo compressionInfo = null; + if (header.compressionMetadata != null) + compressionInfo = new CompressionInfo(header.compressionMetadata.getChunksForSections(header.sections), header.compressionMetadata.parameters); + CompressionInfo.serializer.serialize(compressionInfo, out, version); out.writeLong(header.repairedAt); + out.writeInt(header.sstableLevel); + return compressionInfo; } public FileMessageHeader deserialize(DataInput in, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae64cc0d/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae64cc0d/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index 5b34bd8,71902e1..c8175ea --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@@ -65,35 -66,26 +65,29 @@@ public class OutgoingFileMessage extend this.header = new FileMessageHeader(sstable.metadata.cfId, sequenceNumber, sstable.descriptor.version.toString(), + sstable.descriptor.formatType, estimatedKeys, sections, - compressionInfo, + sstable.compression ? sstable.getCompressionMetadata() : null, - repairedAt); + repairedAt, + keepSSTableLevel ? sstable.getSSTableLevel() : 0); } - public synchronized void serialize(DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException + public synchronized void serialize(DataOutputStreamPlus out, int version, StreamSession session) throws IOException { if (completed) { return; } - FileMessageHeader.serializer.serialize(header, out, version); + CompressionInfo compressionInfo = FileMessageHeader.serializer.serialize(header, out, version); final SSTableReader reader = ref.get(); - StreamWriter writer = header.compressionInfo == null ? + StreamWriter writer = compressionInfo == null ? new StreamWriter(reader, header.sections, session) : - new CompressedStreamWriter(reader, header.sections, compressionInfo, session); - writer.write(out.getChannel()); + new CompressedStreamWriter(reader, header.sections, - header.compressionInfo, session); ++ compressionInfo, session); + writer.write(out); } public synchronized void complete() http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae64cc0d/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java ----------------------------------------------------------------------