Create compression chunk when sending file only patch by yukim; reviewed by Paulo Motta for CASSANDRA-10680
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8385bb63 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8385bb63 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8385bb63 Branch: refs/heads/cassandra-3.1 Commit: 8385bb639ad8a6a86393a05813fd9b0b45876a2e Parents: 9b97766 Author: Yuki Morishita <yu...@apache.org> Authored: Mon Nov 9 23:01:31 2015 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Nov 17 17:58:37 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../io/compress/CompressionMetadata.java | 30 ++++++++++++ .../streaming/messages/FileMessageHeader.java | 49 ++++++++++++++++++-- .../streaming/messages/IncomingFileMessage.java | 2 +- .../streaming/messages/OutgoingFileMessage.java | 15 ++---- .../compress/CompressedInputStreamTest.java | 7 +++ 6 files changed, 87 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 08db386..008d4d4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.12 + * Create compression chunk for sending file only (CASSANDRA-10680) * Make buffered read size configurable (CASSANDRA-10249) * Forbid compact clustering column type changes in ALTER TABLE (CASSANDRA-8879) * Reject incremental repair with subrange repair (CASSANDRA-10422) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index 1dc2df3..0de69a6 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -229,6 +229,36 @@ public class CompressionMetadata } /** + * @param sections Collection of sections in uncompressed file. Should not contain sections that overlap each other. + * @return Total chunk size in bytes for given sections including checksum. + */ + public long getTotalSizeForSections(Collection<Pair<Long, Long>> sections) + { + long size = 0; + long lastOffset = -1; + for (Pair<Long, Long> section : sections) + { + int startIndex = (int) (section.left / parameters.chunkLength()); + int endIndex = (int) (section.right / parameters.chunkLength()); + endIndex = section.right % parameters.chunkLength() == 0 ? endIndex - 1 : endIndex; + for (int i = startIndex; i <= endIndex; i++) + { + long offset = i * 8L; + long chunkOffset = chunkOffsets.getLong(offset); + if (chunkOffset > lastOffset) + { + lastOffset = chunkOffset; + long nextChunkOffset = offset + 8 == chunkOffsetsSize + ? compressedFileLength + : chunkOffsets.getLong(offset + 8); + size += (nextChunkOffset - chunkOffset); + } + } + } + return size; + } + + /** * @param sections Collection of sections in uncompressed file * @return Array of chunks which corresponds to given sections of uncompressed file, sorted by chunk offset */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java index 284820e..34d9a01 100644 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java @@ -37,7 +37,7 @@ import org.apache.cassandra.utils.UUIDSerializer; */ public class FileMessageHeader { - public static IVersionedSerializer<FileMessageHeader> serializer = new FileMessageHeaderSerializer(); + public static FileMessageHeaderSerializer serializer = new FileMessageHeaderSerializer(); public final UUID cfId; public final int sequenceNumber; @@ -45,7 +45,13 @@ public class FileMessageHeader public final String version; public final long estimatedKeys; public final List<Pair<Long, Long>> sections; + /** + * Compression info for SSTable to send. Can be null if SSTable is not compressed. + * On sender, this field is always null to avoid holding large number of Chunks. + * Use compressionMetadata instead. + */ public final CompressionInfo compressionInfo; + private final CompressionMetadata compressionMetadata; public final long repairedAt; public FileMessageHeader(UUID cfId, @@ -62,9 +68,33 @@ public class FileMessageHeader this.estimatedKeys = estimatedKeys; this.sections = sections; this.compressionInfo = compressionInfo; + this.compressionMetadata = null; + this.repairedAt = repairedAt; + } + + public FileMessageHeader(UUID cfId, + int sequenceNumber, + String version, + long estimatedKeys, + List<Pair<Long, Long>> sections, + CompressionMetadata compressionMetadata, + long repairedAt) + { + this.cfId = cfId; + this.sequenceNumber = sequenceNumber; + this.version = version; + this.estimatedKeys = estimatedKeys; + this.sections = sections; + this.compressionInfo = null; + this.compressionMetadata = compressionMetadata; this.repairedAt = repairedAt; } + public boolean isCompressed() + { + return compressionInfo != null || compressionMetadata != null; + } + /** * @return total file size to transfer in bytes */ @@ -77,6 +107,10 @@ public class FileMessageHeader for (CompressionMetadata.Chunk chunk : compressionInfo.chunks) size += chunk.length + 4; // 4 bytes for CRC } + else if (compressionMetadata != null) + { + size = compressionMetadata.getTotalSizeForSections(sections); + } else { for (Pair<Long, Long> section : sections) @@ -94,7 +128,7 @@ public class FileMessageHeader sb.append(", version: ").append(version); sb.append(", estimated keys: ").append(estimatedKeys); sb.append(", transfer size: ").append(size()); - sb.append(", compressed?: ").append(compressionInfo != null); + sb.append(", compressed?: ").append(isCompressed()); sb.append(", repairedAt: ").append(repairedAt); sb.append(')'); return sb.toString(); @@ -117,9 +151,9 @@ public class FileMessageHeader return result; } - static class FileMessageHeaderSerializer implements IVersionedSerializer<FileMessageHeader> + static class FileMessageHeaderSerializer { - public void serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException + public CompressionInfo serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException { UUIDSerializer.serializer.serialize(header.cfId, out, version); out.writeInt(header.sequenceNumber); @@ -132,8 +166,13 @@ public class FileMessageHeader out.writeLong(section.left); out.writeLong(section.right); } - CompressionInfo.serializer.serialize(header.compressionInfo, out, version); + // construct CompressionInfo here to avoid holding large number of Chunks on heap. + CompressionInfo compressionInfo = null; + if (header.compressionMetadata != null) + compressionInfo = new CompressionInfo(header.compressionMetadata.getChunksForSections(header.sections), header.compressionMetadata.parameters); + CompressionInfo.serializer.serialize(compressionInfo, out, version); out.writeLong(header.repairedAt); + return compressionInfo; } public FileMessageHeader deserialize(DataInput in, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java index 494af85..cb39275 100644 --- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java @@ -40,7 +40,7 @@ public class IncomingFileMessage extends StreamMessage { DataInputStream input = new DataInputStream(Channels.newInputStream(in)); FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version); - StreamReader reader = header.compressionInfo == null ? new StreamReader(header, session) + StreamReader reader = !header.isCompressed() ? new StreamReader(header, session) : new CompressedStreamReader(header, session); try http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index 082e306..71902e1 100644 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@ -63,18 +63,12 @@ public class OutgoingFileMessage extends StreamMessage SSTableReader sstable = ref.get(); filename = sstable.getFilename(); - CompressionInfo compressionInfo = null; - if (sstable.compression) - { - CompressionMetadata meta = sstable.getCompressionMetadata(); - compressionInfo = new CompressionInfo(meta.getChunksForSections(sections), meta.parameters); - } this.header = new FileMessageHeader(sstable.metadata.cfId, sequenceNumber, sstable.descriptor.version.toString(), estimatedKeys, sections, - compressionInfo, + sstable.compression ? sstable.getCompressionMetadata() : null, repairedAt); } @@ -85,13 +79,12 @@ public class OutgoingFileMessage extends StreamMessage return; } - FileMessageHeader.serializer.serialize(header, out, version); + CompressionInfo compressionInfo = FileMessageHeader.serializer.serialize(header, out, version); final SSTableReader reader = ref.get(); - StreamWriter writer = header.compressionInfo == null ? + StreamWriter writer = compressionInfo == null ? new StreamWriter(reader, header.sections, session) : - new CompressedStreamWriter(reader, header.sections, - header.compressionInfo, session); + new CompressedStreamWriter(reader, header.sections, compressionInfo, session); writer.write(out.getChannel()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java index 42a83a0..f3007da 100644 --- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java @@ -37,6 +37,8 @@ import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.utils.Pair; +import static org.junit.Assert.assertEquals; + /** */ public class CompressedInputStreamTest @@ -86,6 +88,11 @@ public class CompressedInputStreamTest sections.add(Pair.create(position, position + 8)); } CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections); + long totalSize = comp.getTotalSizeForSections(sections); + long expectedSize = 0; + for (CompressionMetadata.Chunk c : chunks) + expectedSize += c.length + 4; + assertEquals(expectedSize, totalSize); // buffer up only relevant parts of file int size = 0;