fix compressed stream sending extra chunk; patch by yukim reviewed by mkjellman 
for CASSANDRA-5105


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ee0be062
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ee0be062
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ee0be062

Branch: refs/heads/cassandra-1.2
Commit: ee0be0624b0ee1b2151b9be92a5d4e6334801cde
Parents: bc42835
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Feb 14 13:43:01 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Feb 14 13:43:01 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/io/compress/CompressionMetadata.java |    1 +
 .../compress/CompressedInputStreamTest.java        |  106 +++++++++++++++
 3 files changed, 108 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee0be062/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index aa25b63..39b1948 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
  * Fix missing columns in wide rows queries (CASSANDRA-5225)
  * Simplify auth setup and make system_auth ks alterable (CASSANDRA-5112)
  * Stop compactions from hanging during bootstrap (CASSANDRA-5244)
+ * fix compressed streaming sending extra chunk (CASSANDRA-5105)
 
 
 1.2.1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee0be062/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java 
b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 0a3f8de..93b0091 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -199,6 +199,7 @@ public class CompressionMetadata
         {
             int startIndex = (int) (section.left / parameters.chunkLength());
             int endIndex = (int) (section.right / parameters.chunkLength());
+            endIndex = section.right % parameters.chunkLength() == 0 ? 
endIndex - 1 : endIndex;
             for (int i = startIndex; i <= endIndex; i++)
             {
                 long offset = i * 8;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee0be062/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
 
b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
new file mode 100644
index 0000000..ecb6e14
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.compress;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.*;
+
+import org.junit.Test;
+
+import org.apache.cassandra.io.compress.*;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMetadata;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ */
+public class CompressedInputStreamTest
+{
+    @Test
+    public void testCompressedRead() throws Exception
+    {
+        testCompressedReadWith(new long[]{0L});
+        testCompressedReadWith(new long[]{1L});
+        testCompressedReadWith(new long[]{100L});
+
+        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L});
+    }
+
+    /**
+     * @param valuesToCheck array of longs of range(0-999)
+     * @throws Exception
+     */
+    private void testCompressedReadWith(long[] valuesToCheck) throws Exception
+    {
+        assert valuesToCheck != null && valuesToCheck.length > 0;
+
+        // write compressed data file of longs
+        File tmp = new File(File.createTempFile("cassandra", 
"unittest").getParent(), "ks-cf-ib-1-Data.db");
+        Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
+        SSTableMetadata.Collector collector = 
SSTableMetadata.createCollector();
+        CompressionParameters param = new 
CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
+        CompressedSequentialWriter writer = new 
CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), 
false, param, collector);
+        Map<Long, Long> index = new HashMap<Long, Long>();
+        for (long l = 0L; l < 1000; l++)
+        {
+            index.put(l, writer.getFilePointer());
+            writer.stream.writeLong(l);
+        }
+        writer.close();
+
+        CompressionMetadata comp = 
CompressionMetadata.create(tmp.getAbsolutePath());
+        List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+        for (long l : valuesToCheck)
+        {
+            long position = index.get(l);
+            sections.add(Pair.create(position, position + 8));
+        }
+        CompressionMetadata.Chunk[] chunks = 
comp.getChunksForSections(sections);
+
+        // buffer up only relevant parts of file
+        int size = 0;
+        for (CompressionMetadata.Chunk c : chunks)
+            size += (c.length + 4); // 4bytes CRC
+        byte[] toRead = new byte[size];
+
+        RandomAccessFile f = new RandomAccessFile(tmp, "r");
+        int pos = 0;
+        for (CompressionMetadata.Chunk c : chunks)
+        {
+            f.seek(c.offset);
+            pos += f.read(toRead, pos, c.length + 4);
+        }
+        f.close();
+
+        // read buffer using CompressedInputStream
+        CompressionInfo info = new CompressionInfo(chunks, param);
+        CompressedInputStream input = new CompressedInputStream(new 
ByteArrayInputStream(toRead), info);
+        DataInputStream dis = new DataInputStream(input);
+
+        for (int i = 0; i < sections.size(); i++)
+        {
+            input.position(sections.get(i).left);
+            long exp = dis.readLong();
+            assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + 
" but was " + exp;
+        }
+    }
+}

Reply via email to