Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5cc02dd9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5cc02dd9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5cc02dd9 Branch: refs/heads/trunk Commit: 5cc02dd9ac4e9f081540586769e82e1544532e1e Parents: 63a9f9b ae64cc0 Author: Yuki Morishita <yu...@apache.org> Authored: Tue Nov 17 19:31:38 2015 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Nov 17 19:31:38 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../io/compress/CompressionMetadata.java | 30 +++++++++++ .../streaming/messages/FileMessageHeader.java | 55 ++++++++++++++++++-- .../streaming/messages/IncomingFileMessage.java | 2 +- .../streaming/messages/OutgoingFileMessage.java | 14 ++--- .../compression/CompressedInputStreamTest.java | 7 +++ 6 files changed, 93 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5cc02dd9/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index b68cf0d,572afc2..4510462 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -9,7 -3,17 +9,8 @@@ Merged from 2.2 * Fix SimpleDateType type compatibility (CASSANDRA-10027) * (Hadoop) fix splits calculation (CASSANDRA-10640) * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) - * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) - * Use most up-to-date version of schema for system tables (CASSANDRA-10652) - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) - * Expose phi values from failure detector via JMX and tweak debug - and trace logging (CASSANDRA-9526) - * Fix RangeNamesQueryPager (CASSANDRA-10509) - * Deprecate Pig support (CASSANDRA-10542) - * Reduce contention getting instances of CompositeType (CASSANDRA-10433) Merged from 2.1: + * Create compression chunk for sending file only (CASSANDRA-10680) - * Make buffered read size configurable (CASSANDRA-10249) * Forbid compact clustering column type changes in ALTER TABLE (CASSANDRA-8879) * Reject incremental repair with subrange repair (CASSANDRA-10422) * Add a nodetool command to refresh size_estimates (CASSANDRA-9579) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5cc02dd9/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5cc02dd9/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java index 123b983,e9a727f..e1e13b7 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java @@@ -52,10 -49,15 +52,16 @@@ public class FileMessageHeade public final SSTableFormat.Type format; public final long estimatedKeys; public final List<Pair<Long, Long>> sections; + /** + * Compression info for SSTable to send. Can be null if SSTable is not compressed. + * On sender, this field is always null to avoid holding large number of Chunks. + * Use compressionMetadata instead. + */ public final CompressionInfo compressionInfo; + private final CompressionMetadata compressionMetadata; public final long repairedAt; public final int sstableLevel; + public final SerializationHeader.Component header; public FileMessageHeader(UUID cfId, int sequenceNumber, @@@ -75,11 -76,38 +81,41 @@@ this.estimatedKeys = estimatedKeys; this.sections = sections; this.compressionInfo = compressionInfo; + this.compressionMetadata = null; + this.repairedAt = repairedAt; + this.sstableLevel = sstableLevel; ++ this.header = header; + } + + public FileMessageHeader(UUID cfId, + int sequenceNumber, - String version, ++ Version version, + SSTableFormat.Type format, + long estimatedKeys, + List<Pair<Long, Long>> sections, + CompressionMetadata compressionMetadata, + long repairedAt, - int sstableLevel) ++ int sstableLevel, ++ SerializationHeader.Component header) + { + this.cfId = cfId; + this.sequenceNumber = sequenceNumber; + this.version = version; + this.format = format; + this.estimatedKeys = estimatedKeys; + this.sections = sections; + this.compressionInfo = null; + this.compressionMetadata = compressionMetadata; this.repairedAt = repairedAt; this.sstableLevel = sstableLevel; + this.header = header; } + public boolean isCompressed() + { + return compressionInfo != null || compressionMetadata != null; + } + /** * @return total file size to transfer in bytes */ @@@ -156,15 -188,17 +196,20 @@@ out.writeLong(section.left); out.writeLong(section.right); } - CompressionInfo.serializer.serialize(header.compressionInfo, out, version); + // construct CompressionInfo here to avoid holding large number of Chunks on heap. + CompressionInfo compressionInfo = null; + if (header.compressionMetadata != null) + compressionInfo = new CompressionInfo(header.compressionMetadata.getChunksForSections(header.sections), header.compressionMetadata.parameters); + CompressionInfo.serializer.serialize(compressionInfo, out, version); out.writeLong(header.repairedAt); out.writeInt(header.sstableLevel); + + if (version >= StreamMessage.VERSION_30) + SerializationHeader.serializer.serialize(header.version, header.header, out); + return compressionInfo; } - public FileMessageHeader deserialize(DataInput in, int version) throws IOException + public FileMessageHeader deserialize(DataInputPlus in, int version) throws IOException { UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); int sequenceNumber = in.readInt(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5cc02dd9/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java index 19f9e12,31ab2a8..d881d43 --- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java @@@ -40,9 -39,9 +40,9 @@@ public class IncomingFileMessage extend @SuppressWarnings("resource") public IncomingFileMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException { - DataInputStream input = new DataInputStream(Channels.newInputStream(in)); + DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in)); FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version); - StreamReader reader = header.compressionInfo == null ? new StreamReader(header, session) + StreamReader reader = !header.isCompressed() ? new StreamReader(header, session) : new CompressedStreamReader(header, session); try http://git-wip-us.apache.org/repos/asf/cassandra/blob/5cc02dd9/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index 82e6620,c8175ea..f10b42e --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@@ -62,22 -62,15 +62,16 @@@ public class OutgoingFileMessage extend SSTableReader sstable = ref.get(); filename = sstable.getFilename(); - CompressionInfo compressionInfo = null; - if (sstable.compression) - { - CompressionMetadata meta = sstable.getCompressionMetadata(); - compressionInfo = new CompressionInfo(meta.getChunksForSections(sections), meta.parameters); - } this.header = new FileMessageHeader(sstable.metadata.cfId, sequenceNumber, - sstable.descriptor.version.toString(), + sstable.descriptor.version, sstable.descriptor.formatType, estimatedKeys, sections, - compressionInfo, + sstable.compression ? sstable.getCompressionMetadata() : null, repairedAt, - keepSSTableLevel ? sstable.getSSTableLevel() : 0); + keepSSTableLevel ? sstable.getSSTableLevel() : 0, + sstable.header == null ? null : sstable.header.toComponent()); } public synchronized void serialize(DataOutputStreamPlus out, int version, StreamSession session) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/5cc02dd9/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java index db05a3e,0000000..5646592 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java @@@ -1,122 -1,0 +1,129 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming.compression; + +import java.io.*; +import java.util.*; + +import org.junit.Test; +import org.apache.cassandra.db.ClusteringComparator; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.streaming.compress.CompressedInputStream; +import org.apache.cassandra.streaming.compress.CompressionInfo; +import org.apache.cassandra.utils.ChecksumType; +import org.apache.cassandra.utils.Pair; + ++import static org.junit.Assert.assertEquals; ++ +/** + */ +public class CompressedInputStreamTest +{ + @Test + public void testCompressedRead() throws Exception + { + testCompressedReadWith(new long[]{0L}, false); + testCompressedReadWith(new long[]{1L}, false); + testCompressedReadWith(new long[]{100L}, false); + + testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false); + } + + @Test(expected = EOFException.class) + public void testTruncatedRead() throws Exception + { + testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true); + } + /** + * @param valuesToCheck array of longs of range(0-999) + * @throws Exception + */ + private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception + { + assert valuesToCheck != null && valuesToCheck.length > 0; + + // write compressed data file of longs + File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db"); + Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath()); + MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance)); + CompressionParams param = CompressionParams.snappy(32); + CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector); + Map<Long, Long> index = new HashMap<Long, Long>(); + for (long l = 0L; l < 1000; l++) + { + index.put(l, writer.position()); + writer.writeLong(l); + } + writer.finish(); + + CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath()); + List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>(); + for (long l : valuesToCheck) + { + long position = index.get(l); + sections.add(Pair.create(position, position + 8)); + } + CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections); ++ long totalSize = comp.getTotalSizeForSections(sections); ++ long expectedSize = 0; ++ for (CompressionMetadata.Chunk c : chunks) ++ expectedSize += c.length + 4; ++ assertEquals(expectedSize, totalSize); + + // buffer up only relevant parts of file + int size = 0; + for (CompressionMetadata.Chunk c : chunks) + size += (c.length + 4); // 4bytes CRC + byte[] toRead = new byte[size]; + + RandomAccessFile f = new RandomAccessFile(tmp, "r"); + int pos = 0; + for (CompressionMetadata.Chunk c : chunks) + { + f.seek(c.offset); + pos += f.read(toRead, pos, c.length + 4); + } + f.close(); + + if (testTruncate) + { + byte [] actuallyRead = new byte[50]; + System.arraycopy(toRead, 0, actuallyRead, 0, 50); + toRead = actuallyRead; + } + + // read buffer using CompressedInputStream + CompressionInfo info = new CompressionInfo(chunks, param); + CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, + ChecksumType.CRC32, () -> 1.0); + DataInputStream in = new DataInputStream(input); + + for (int i = 0; i < sections.size(); i++) + { + input.position(sections.get(i).left); + long readValue = in.readLong(); + assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue; + } + } +}