fix compressed stream sending extra chunk; patch by yukim reviewed by mkjellman for CASSANDRA-5105
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ee0be062 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ee0be062 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ee0be062 Branch: refs/heads/trunk Commit: ee0be0624b0ee1b2151b9be92a5d4e6334801cde Parents: bc42835 Author: Yuki Morishita <yu...@apache.org> Authored: Thu Feb 14 13:43:01 2013 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Thu Feb 14 13:43:01 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/compress/CompressionMetadata.java | 1 + .../compress/CompressedInputStreamTest.java | 106 +++++++++++++++ 3 files changed, 108 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee0be062/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index aa25b63..39b1948 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -19,6 +19,7 @@ * Fix missing columns in wide rows queries (CASSANDRA-5225) * Simplify auth setup and make system_auth ks alterable (CASSANDRA-5112) * Stop compactions from hanging during bootstrap (CASSANDRA-5244) + * fix compressed streaming sending extra chunk (CASSANDRA-5105) 1.2.1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee0be062/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index 0a3f8de..93b0091 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -199,6 +199,7 @@ public class CompressionMetadata { int startIndex = (int) (section.left / parameters.chunkLength()); int endIndex = (int) (section.right / parameters.chunkLength()); + endIndex = section.right % parameters.chunkLength() == 0 ? endIndex - 1 : endIndex; for (int i = startIndex; i <= endIndex; i++) { long offset = i * 8; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee0be062/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java new file mode 100644 index 0000000..ecb6e14 --- /dev/null +++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming.compress; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.RandomAccessFile; +import java.util.*; + +import org.junit.Test; + +import org.apache.cassandra.io.compress.*; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMetadata; +import org.apache.cassandra.utils.Pair; + +/** + */ +public class CompressedInputStreamTest +{ + @Test + public void testCompressedRead() throws Exception + { + testCompressedReadWith(new long[]{0L}); + testCompressedReadWith(new long[]{1L}); + testCompressedReadWith(new long[]{100L}); + + testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}); + } + + /** + * @param valuesToCheck array of longs of range(0-999) + * @throws Exception + */ + private void testCompressedReadWith(long[] valuesToCheck) throws Exception + { + assert valuesToCheck != null && valuesToCheck.length > 0; + + // write compressed data file of longs + File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db"); + Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath()); + SSTableMetadata.Collector collector = SSTableMetadata.createCollector(); + CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP); + CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), false, param, collector); + Map<Long, Long> index = new HashMap<Long, Long>(); + for (long l = 0L; l < 1000; l++) + { + index.put(l, writer.getFilePointer()); + writer.stream.writeLong(l); + } + writer.close(); + + CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath()); + List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>(); + for (long l : valuesToCheck) + { + long position = index.get(l); + sections.add(Pair.create(position, position + 8)); + } + CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections); + + // buffer up only relevant parts of file + int size = 0; + for (CompressionMetadata.Chunk c : chunks) + size += (c.length + 4); // 4bytes CRC + byte[] toRead = new byte[size]; + + RandomAccessFile f = new RandomAccessFile(tmp, "r"); + int pos = 0; + for (CompressionMetadata.Chunk c : chunks) + { + f.seek(c.offset); + pos += f.read(toRead, pos, c.length + 4); + } + f.close(); + + // read buffer using CompressedInputStream + CompressionInfo info = new CompressionInfo(chunks, param); + CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info); + DataInputStream dis = new DataInputStream(input); + + for (int i = 0; i < sections.size(); i++) + { + input.position(sections.get(i).left); + long exp = dis.readLong(); + assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp; + } + } +}