Repository: cassandra
Updated Branches:
  refs/heads/trunk c5dee08df -> 9a7db292c


Pad uncompressed chunks when they would be interpreted as compressed

patch by Branimir Lambov; reviewed by Robert Stupp for CASSANDRA-14892


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a7db292
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a7db292
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a7db292

Branch: refs/heads/trunk
Commit: 9a7db292cc4e470cd913f5c850982a7d7300d6c8
Parents: c5dee08
Author: Branimir Lambov <branimir.lam...@datastax.com>
Authored: Fri Nov 16 12:32:31 2018 +0200
Committer: Branimir Lambov <branimir.lam...@datastax.com>
Committed: Thu Nov 29 11:29:05 2018 +0200

----------------------------------------------------------------------
 .../io/compress/CompressedSequentialWriter.java |  21 +++-
 .../apache/cassandra/utils/ByteBufferUtil.java  |  25 +++++
 .../CompressedSequentialWriterTest.java         |  94 +++++++++++++++--
 .../cassandra/io/compress/MockCompressor.java   | 103 +++++++++++++++++++
 .../cassandra/utils/ByteBufferUtilTest.java     |  64 +++++++++++-
 5 files changed, 291 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a7db292/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java 
b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index c35ecc8..5be96d1 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -32,6 +32,7 @@ import 
org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.utils.Throwables.merge;
 
@@ -148,13 +149,27 @@ public class CompressedSequentialWriter extends 
SequentialWriter
             throw new RuntimeException("Compression exception", e); // 
shouldn't happen
         }
 
+        int uncompressedLength = buffer.position();
         int compressedLength = compressed.position();
-        uncompressedSize += buffer.position();
+        uncompressedSize += uncompressedLength;
         ByteBuffer toWrite = compressed;
         if (compressedLength >= maxCompressedLength)
         {
             toWrite = buffer;
-            compressedLength = buffer.position();
+            if (uncompressedLength >= maxCompressedLength)
+            {
+                compressedLength = uncompressedLength;
+            }
+            else
+            {
+                // Pad the uncompressed data so that it reaches the max 
compressed length.
+                // This could make the chunk appear longer, but this path is 
only reached at the end of the file, where
+                // we use the file size to limit the buffer on reading.
+                assert maxCompressedLength <= buffer.capacity();   // verified 
by CompressionParams.validate
+                buffer.limit(maxCompressedLength);
+                ByteBufferUtil.writeZeroes(buffer, maxCompressedLength - 
uncompressedLength);
+                compressedLength = maxCompressedLength;
+            }
         }
         compressedSize += compressedLength;
 
@@ -178,7 +193,7 @@ public class CompressedSequentialWriter extends 
SequentialWriter
             throw new FSWriteError(e, getPath());
         }
         if (toWrite == buffer)
-            buffer.position(compressedLength);
+            buffer.position(uncompressedLength);
 
         // next chunk should be written right after current + length of the 
checksum (int)
         chunkOffset += compressedLength + 4;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a7db292/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java 
b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 50d35bc..d6c9e52 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -284,6 +284,31 @@ public class ByteBufferUtil
         return length;
     }
 
+    public static void writeZeroes(ByteBuffer dest, int count)
+    {
+        if (count >= 8)
+        {
+            // align
+            while ((dest.position() & 0x7) != 0)
+            {
+                dest.put((byte) 0);
+                --count;
+            }
+        }
+        // write aligned longs
+        while (count >= 8)
+        {
+            dest.putLong(0L);
+            count -= 8;
+        }
+        // finish up
+        while (count > 0)
+        {
+            dest.put((byte) 0);
+            --count;
+        }
+    }
+
     public static void writeWithLength(ByteBuffer bytes, DataOutputPlus out) 
throws IOException
     {
         out.writeInt(bytes.remaining());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a7db292/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
 
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 0c0a33f..557bc32 100644
--- 
a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ 
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import static 
org.apache.cassandra.schema.CompressionParams.DEFAULT_CHUNK_LENGTH;
 import static org.apache.commons.io.FileUtils.readFileToByteArray;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -42,6 +43,7 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class CompressedSequentialWriterTest extends SequentialWriterTest
 {
@@ -59,20 +61,19 @@ public class CompressedSequentialWriterTest extends 
SequentialWriterTest
         testWrite(FileUtils.createTempFile(testName + "_small", "1"), 25, 
false);
 
         // Test to confirm pipeline w/chunk-aligned data writes works
-        testWrite(FileUtils.createTempFile(testName + "_chunkAligned", "1"), 
CompressionParams.DEFAULT_CHUNK_LENGTH, false);
+        testWrite(FileUtils.createTempFile(testName + "_chunkAligned", "1"), 
DEFAULT_CHUNK_LENGTH, false);
 
         // Test to confirm pipeline on non-chunk boundaries works
-        testWrite(FileUtils.createTempFile(testName + "_large", "1"), 
CompressionParams.DEFAULT_CHUNK_LENGTH * 3 + 100, false);
+        testWrite(FileUtils.createTempFile(testName + "_large", "1"), 
DEFAULT_CHUNK_LENGTH * 3 + 100, false);
 
         // Test small < 1 chunk data set
         testWrite(FileUtils.createTempFile(testName + "_small", "2"), 25, 
true);
 
         // Test to confirm pipeline w/chunk-aligned data writes works
-        testWrite(FileUtils.createTempFile(testName + "_chunkAligned", "2"), 
CompressionParams.DEFAULT_CHUNK_LENGTH, true);
+        testWrite(FileUtils.createTempFile(testName + "_chunkAligned", "2"), 
DEFAULT_CHUNK_LENGTH, true);
 
         // Test to confirm pipeline on non-chunk boundaries works
-        testWrite(FileUtils.createTempFile(testName + "_large", "2"), 
CompressionParams.DEFAULT_CHUNK_LENGTH * 3 + 100, true);
-
+        testWrite(FileUtils.createTempFile(testName + "_large", "2"), 
DEFAULT_CHUNK_LENGTH * 3 + 100, true);
     }
 
     @Test
@@ -121,14 +122,14 @@ public class CompressedSequentialWriterTest extends 
SequentialWriterTest
             DataPosition mark = writer.mark();
 
             // Write enough garbage to transition chunk
-            for (int i = 0; i < CompressionParams.DEFAULT_CHUNK_LENGTH; i++)
+            for (int i = 0; i < DEFAULT_CHUNK_LENGTH; i++)
             {
                 writer.write((byte)i);
             }
-            if (bytesToTest <= CompressionParams.DEFAULT_CHUNK_LENGTH)
-                assertEquals(writer.getLastFlushOffset(), 
CompressionParams.DEFAULT_CHUNK_LENGTH);
+            if (bytesToTest <= DEFAULT_CHUNK_LENGTH)
+                assertEquals(writer.getLastFlushOffset(), 
DEFAULT_CHUNK_LENGTH);
             else
-                assertTrue(writer.getLastFlushOffset() % 
CompressionParams.DEFAULT_CHUNK_LENGTH == 0);
+                assertTrue(writer.getLastFlushOffset() % DEFAULT_CHUNK_LENGTH 
== 0);
 
             writer.resetAndTruncate(mark);
             writer.write(dataPost);
@@ -163,6 +164,81 @@ public class CompressedSequentialWriterTest extends 
SequentialWriterTest
         }
     }
 
+    @Test
+    public void testShortUncompressedChunk() throws IOException
+    {
+        // Test uncompressed chunk below threshold (CASSANDRA-14892)
+        compressionParameters = CompressionParams.lz4(DEFAULT_CHUNK_LENGTH, 
DEFAULT_CHUNK_LENGTH);
+        testWrite(FileUtils.createTempFile("14892", "1"), 
compressionParameters.maxCompressedLength() - 1, false);
+    }
+
+    @Test
+    public void testUncompressedChunks() throws IOException
+    {
+        for (double ratio = 1.25; ratio >= 1; ratio -= 1.0/16)
+            testUncompressedChunks(ratio);
+    }
+
+    private void testUncompressedChunks(double ratio) throws IOException
+    {
+        for (int compressedSizeExtra : new int[] {-3, 0, 1, 3, 15, 1051})
+            testUncompressedChunks(ratio, compressedSizeExtra);
+    }
+
+    private void testUncompressedChunks(double ratio, int compressedSizeExtra) 
throws IOException
+    {
+        for (int size = (int) (DEFAULT_CHUNK_LENGTH / ratio - 5); size <= 
DEFAULT_CHUNK_LENGTH / ratio + 5; ++size)
+            testUncompressedChunks(size, ratio, compressedSizeExtra);
+    }
+
+    private void testUncompressedChunks(int size, double ratio, int extra) 
throws IOException
+    {
+        // System.out.format("size %d ratio %f extra %d\n", size, ratio, 
extra);
+        ByteBuffer b = ByteBuffer.allocate(size);
+        ByteBufferUtil.writeZeroes(b, size);
+        b.flip();
+
+        File f = FileUtils.createTempFile("testUncompressedChunks", "1");
+        String filename = f.getPath();
+        MetadataCollector sstableMetadataCollector = new MetadataCollector(new 
ClusteringComparator(Collections.singletonList(BytesType.instance)));
+        compressionParameters = new 
CompressionParams(MockCompressor.class.getTypeName(),
+                                                      
MockCompressor.paramsFor(ratio, extra),
+                                                      DEFAULT_CHUNK_LENGTH, 
ratio);
+        try (CompressedSequentialWriter writer = new 
CompressedSequentialWriter(f, f.getPath() + ".metadata",
+                                                                               
 null, SequentialWriterOption.DEFAULT,
+                                                                               
 compressionParameters,
+                                                                               
 sstableMetadataCollector))
+        {
+            writer.write(b);
+            writer.finish();
+            b.flip();
+        }
+
+        assert f.exists();
+        try (FileHandle.Builder builder = new 
FileHandle.Builder(filename).withCompressionMetadata(new 
CompressionMetadata(filename + ".metadata", f.length(), true));
+             FileHandle fh = builder.complete();
+             RandomAccessReader reader = fh.createReader())
+        {
+            assertEquals(size, reader.length());
+            byte[] result = new byte[(int)reader.length()];
+
+            reader.readFully(result);
+            assert(reader.isEOF());
+
+            assert Arrays.equals(b.array(), result);
+        }
+        finally
+        {
+            if (f.exists())
+                f.delete();
+            File metadata = new File(f + ".metadata");
+            if (metadata.exists())
+                metadata.delete();
+        }
+
+    }
+
+
     private ByteBuffer makeBB(int size)
     {
         return 
compressionParameters.getSstableCompressor().preferredBufferType().allocate(size);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a7db292/test/unit/org/apache/cassandra/io/compress/MockCompressor.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/MockCompressor.java 
b/test/unit/org/apache/cassandra/io/compress/MockCompressor.java
new file mode 100644
index 0000000..d57f4b7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/compress/MockCompressor.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Mock compressor used to test the effect of different output sizes in 
compression.
+ * Writes only the size of the given buffer, and on decompression expands to 
sequence of that many 0s.
+ */
+public class MockCompressor implements ICompressor
+{
+    final int extra;
+    final double ratio;
+
+    public static Map<String, String> paramsFor(double ratio, int extra)
+    {
+        return ImmutableMap.of("extra", "" + extra, "ratio", "" + ratio);
+    }
+
+    public static MockCompressor create(Map<String, String> opts)
+    {
+        return new MockCompressor(Integer.parseInt(opts.get("extra")),
+                                  Double.parseDouble(opts.get("ratio")));
+    }
+
+    private MockCompressor(int extra, double ratio)
+    {
+        this.extra = extra;
+        this.ratio = ratio;
+    }
+
+    public int initialCompressedBufferLength(int chunkLength)
+    {
+        return (int) Math.ceil(chunkLength / ratio + extra);
+    }
+
+    public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset)
+    throws IOException
+    {
+        final ByteBuffer outputBuffer = ByteBuffer.wrap(output, outputOffset, 
output.length - outputOffset);
+        uncompress(ByteBuffer.wrap(input, inputOffset, inputLength),
+                   outputBuffer);
+        return outputBuffer.position();
+    }
+
+    public void compress(ByteBuffer input, ByteBuffer output) throws 
IOException
+    {
+        int inputLength = input.remaining();
+        int outputLength = initialCompressedBufferLength(inputLength);
+        // assume the input is all zeros, write its length and pad until the 
required size
+        output.putInt(inputLength);
+        for (int i = 4; i < outputLength; ++i)
+            output.put((byte) i);
+        input.position(input.limit());
+    }
+
+    public void uncompress(ByteBuffer input, ByteBuffer output) throws 
IOException
+    {
+        int outputLength = input.getInt();
+        ByteBufferUtil.writeZeroes(output, outputLength);
+    }
+
+    public BufferType preferredBufferType()
+    {
+        return BufferType.OFF_HEAP;
+    }
+
+    public boolean supports(BufferType bufferType)
+    {
+        return true;
+    }
+
+    public Set<String> supportedOptions()
+    {
+        return ImmutableSet.of("extra", "ratio");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a7db292/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java 
b/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java
index f2746f6..c5de60b 100644
--- a/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java
+++ b/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.utils;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.Arrays;
@@ -33,6 +34,8 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 public class ByteBufferUtilTest
 {
@@ -288,8 +291,8 @@ public class ByteBufferUtilTest
         a.limit(bytes.length - 1).position(0);
         b.limit(bytes.length - 1).position(1);
 
-        Assert.assertFalse(ByteBufferUtil.startsWith(a, b));
-        Assert.assertFalse(ByteBufferUtil.startsWith(a, b.slice()));
+        assertFalse(ByteBufferUtil.startsWith(a, b));
+        assertFalse(ByteBufferUtil.startsWith(a, b.slice()));
 
         Assert.assertTrue(ByteBufferUtil.endsWith(a, b));
         Assert.assertTrue(ByteBufferUtil.endsWith(a, b.slice()));
@@ -297,7 +300,60 @@ public class ByteBufferUtilTest
 
         a.position(5);
 
-        Assert.assertFalse(ByteBufferUtil.startsWith(a, b));
-        Assert.assertFalse(ByteBufferUtil.endsWith(a, b));
+        assertFalse(ByteBufferUtil.startsWith(a, b));
+        assertFalse(ByteBufferUtil.endsWith(a, b));
+    }
+
+    @Test
+    public void testWriteZeroes()
+    {
+        byte[] initial = new byte[1024];
+        Arrays.fill(initial, (byte) 1);
+        for (ByteBuffer b : new ByteBuffer[] { ByteBuffer.allocate(1024), 
ByteBuffer.allocateDirect(1024) })
+        {
+            for (int i = 0; i <= 32; ++i)
+                for (int j = 1024; j >= 1024 - 32; --j)
+                {
+                    b.clear();
+                    b.put(initial);
+                    b.flip();
+                    b.position(i);
+                    ByteBufferUtil.writeZeroes(b, j-i);
+                    assertEquals(j, b.position());
+                    int ii = 0;
+                    for (; ii < i; ++ii)
+                        assertEquals(initial[ii], b.get(ii));
+                    for (; ii < j; ++ii)
+                        assertEquals(0, b.get(ii));
+                    for (; ii < 1024; ++ii)
+                        assertEquals(initial[ii], b.get(ii));
+
+                    b.clear();
+                    b.put(initial);
+                    b.limit(j).position(i);
+                    ByteBuffer slice = b.slice();
+                    ByteBufferUtil.writeZeroes(slice, slice.capacity());
+                    assertFalse(slice.hasRemaining());
+                    b.clear();  // reset position and limit for check
+                    ii = 0;
+                    for (; ii < i; ++ii)
+                        assertEquals(initial[ii], b.get(ii));
+                    for (; ii < j; ++ii)
+                        assertEquals(0, b.get(ii));
+                    for (; ii < 1024; ++ii)
+                        assertEquals(initial[ii], b.get(ii));
+
+                    slice.clear();
+                    try
+                    {
+                        ByteBufferUtil.writeZeroes(slice, slice.capacity() + 
1);
+                        fail("Line above should throw.");
+                    }
+                    catch (BufferOverflowException | IndexOutOfBoundsException 
e)
+                    {
+                        // correct path
+                    }
+                }
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to