Repository: cassandra Updated Branches: refs/heads/trunk c5dee08df -> 9a7db292c
Pad uncompressed chunks when they would be interpreted as compressed patch by Branimir Lambov; reviewed by Robert Stupp for CASSANDRA-14892 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a7db292 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a7db292 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a7db292 Branch: refs/heads/trunk Commit: 9a7db292cc4e470cd913f5c850982a7d7300d6c8 Parents: c5dee08 Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Fri Nov 16 12:32:31 2018 +0200 Committer: Branimir Lambov <branimir.lam...@datastax.com> Committed: Thu Nov 29 11:29:05 2018 +0200 ---------------------------------------------------------------------- .../io/compress/CompressedSequentialWriter.java | 21 +++- .../apache/cassandra/utils/ByteBufferUtil.java | 25 +++++ .../CompressedSequentialWriterTest.java | 94 +++++++++++++++-- .../cassandra/io/compress/MockCompressor.java | 103 +++++++++++++++++++ .../cassandra/utils/ByteBufferUtilTest.java | 64 +++++++++++- 5 files changed, 291 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a7db292/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index c35ecc8..5be96d1 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -32,6 +32,7 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.*; import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.utils.Throwables.merge; @@ -148,13 +149,27 @@ public class CompressedSequentialWriter extends SequentialWriter throw new RuntimeException("Compression exception", e); // shouldn't happen } + int uncompressedLength = buffer.position(); int compressedLength = compressed.position(); - uncompressedSize += buffer.position(); + uncompressedSize += uncompressedLength; ByteBuffer toWrite = compressed; if (compressedLength >= maxCompressedLength) { toWrite = buffer; - compressedLength = buffer.position(); + if (uncompressedLength >= maxCompressedLength) + { + compressedLength = uncompressedLength; + } + else + { + // Pad the uncompressed data so that it reaches the max compressed length. + // This could make the chunk appear longer, but this path is only reached at the end of the file, where + // we use the file size to limit the buffer on reading. + assert maxCompressedLength <= buffer.capacity(); // verified by CompressionParams.validate + buffer.limit(maxCompressedLength); + ByteBufferUtil.writeZeroes(buffer, maxCompressedLength - uncompressedLength); + compressedLength = maxCompressedLength; + } } compressedSize += compressedLength; @@ -178,7 +193,7 @@ public class CompressedSequentialWriter extends SequentialWriter throw new FSWriteError(e, getPath()); } if (toWrite == buffer) - buffer.position(compressedLength); + buffer.position(uncompressedLength); // next chunk should be written right after current + length of the checksum (int) chunkOffset += compressedLength + 4; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a7db292/src/java/org/apache/cassandra/utils/ByteBufferUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java index 50d35bc..d6c9e52 100644 --- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java +++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java @@ -284,6 +284,31 @@ public class ByteBufferUtil return length; } + public static void writeZeroes(ByteBuffer dest, int count) + { + if (count >= 8) + { + // align + while ((dest.position() & 0x7) != 0) + { + dest.put((byte) 0); + --count; + } + } + // write aligned longs + while (count >= 8) + { + dest.putLong(0L); + count -= 8; + } + // finish up + while (count > 0) + { + dest.put((byte) 0); + --count; + } + } + public static void writeWithLength(ByteBuffer bytes, DataOutputPlus out) throws IOException { out.writeInt(bytes.remaining()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a7db292/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java index 0c0a33f..557bc32 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import static org.apache.cassandra.schema.CompressionParams.DEFAULT_CHUNK_LENGTH; import static org.apache.commons.io.FileUtils.readFileToByteArray; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -42,6 +43,7 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.*; import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.utils.ByteBufferUtil; public class CompressedSequentialWriterTest extends SequentialWriterTest { @@ -59,20 +61,19 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest testWrite(FileUtils.createTempFile(testName + "_small", "1"), 25, false); // Test to confirm pipeline w/chunk-aligned data writes works - testWrite(FileUtils.createTempFile(testName + "_chunkAligned", "1"), CompressionParams.DEFAULT_CHUNK_LENGTH, false); + testWrite(FileUtils.createTempFile(testName + "_chunkAligned", "1"), DEFAULT_CHUNK_LENGTH, false); // Test to confirm pipeline on non-chunk boundaries works - testWrite(FileUtils.createTempFile(testName + "_large", "1"), CompressionParams.DEFAULT_CHUNK_LENGTH * 3 + 100, false); + testWrite(FileUtils.createTempFile(testName + "_large", "1"), DEFAULT_CHUNK_LENGTH * 3 + 100, false); // Test small < 1 chunk data set testWrite(FileUtils.createTempFile(testName + "_small", "2"), 25, true); // Test to confirm pipeline w/chunk-aligned data writes works - testWrite(FileUtils.createTempFile(testName + "_chunkAligned", "2"), CompressionParams.DEFAULT_CHUNK_LENGTH, true); + testWrite(FileUtils.createTempFile(testName + "_chunkAligned", "2"), DEFAULT_CHUNK_LENGTH, true); // Test to confirm pipeline on non-chunk boundaries works - testWrite(FileUtils.createTempFile(testName + "_large", "2"), CompressionParams.DEFAULT_CHUNK_LENGTH * 3 + 100, true); - + testWrite(FileUtils.createTempFile(testName + "_large", "2"), DEFAULT_CHUNK_LENGTH * 3 + 100, true); } @Test @@ -121,14 +122,14 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest DataPosition mark = writer.mark(); // Write enough garbage to transition chunk - for (int i = 0; i < CompressionParams.DEFAULT_CHUNK_LENGTH; i++) + for (int i = 0; i < DEFAULT_CHUNK_LENGTH; i++) { writer.write((byte)i); } - if (bytesToTest <= CompressionParams.DEFAULT_CHUNK_LENGTH) - assertEquals(writer.getLastFlushOffset(), CompressionParams.DEFAULT_CHUNK_LENGTH); + if (bytesToTest <= DEFAULT_CHUNK_LENGTH) + assertEquals(writer.getLastFlushOffset(), DEFAULT_CHUNK_LENGTH); else - assertTrue(writer.getLastFlushOffset() % CompressionParams.DEFAULT_CHUNK_LENGTH == 0); + assertTrue(writer.getLastFlushOffset() % DEFAULT_CHUNK_LENGTH == 0); writer.resetAndTruncate(mark); writer.write(dataPost); @@ -163,6 +164,81 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest } } + @Test + public void testShortUncompressedChunk() throws IOException + { + // Test uncompressed chunk below threshold (CASSANDRA-14892) + compressionParameters = CompressionParams.lz4(DEFAULT_CHUNK_LENGTH, DEFAULT_CHUNK_LENGTH); + testWrite(FileUtils.createTempFile("14892", "1"), compressionParameters.maxCompressedLength() - 1, false); + } + + @Test + public void testUncompressedChunks() throws IOException + { + for (double ratio = 1.25; ratio >= 1; ratio -= 1.0/16) + testUncompressedChunks(ratio); + } + + private void testUncompressedChunks(double ratio) throws IOException + { + for (int compressedSizeExtra : new int[] {-3, 0, 1, 3, 15, 1051}) + testUncompressedChunks(ratio, compressedSizeExtra); + } + + private void testUncompressedChunks(double ratio, int compressedSizeExtra) throws IOException + { + for (int size = (int) (DEFAULT_CHUNK_LENGTH / ratio - 5); size <= DEFAULT_CHUNK_LENGTH / ratio + 5; ++size) + testUncompressedChunks(size, ratio, compressedSizeExtra); + } + + private void testUncompressedChunks(int size, double ratio, int extra) throws IOException + { + // System.out.format("size %d ratio %f extra %d\n", size, ratio, extra); + ByteBuffer b = ByteBuffer.allocate(size); + ByteBufferUtil.writeZeroes(b, size); + b.flip(); + + File f = FileUtils.createTempFile("testUncompressedChunks", "1"); + String filename = f.getPath(); + MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(Collections.singletonList(BytesType.instance))); + compressionParameters = new CompressionParams(MockCompressor.class.getTypeName(), + MockCompressor.paramsFor(ratio, extra), + DEFAULT_CHUNK_LENGTH, ratio); + try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, f.getPath() + ".metadata", + null, SequentialWriterOption.DEFAULT, + compressionParameters, + sstableMetadataCollector)) + { + writer.write(b); + writer.finish(); + b.flip(); + } + + assert f.exists(); + try (FileHandle.Builder builder = new FileHandle.Builder(filename).withCompressionMetadata(new CompressionMetadata(filename + ".metadata", f.length(), true)); + FileHandle fh = builder.complete(); + RandomAccessReader reader = fh.createReader()) + { + assertEquals(size, reader.length()); + byte[] result = new byte[(int)reader.length()]; + + reader.readFully(result); + assert(reader.isEOF()); + + assert Arrays.equals(b.array(), result); + } + finally + { + if (f.exists()) + f.delete(); + File metadata = new File(f + ".metadata"); + if (metadata.exists()) + metadata.delete(); + } + + } + + private ByteBuffer makeBB(int size) { return compressionParameters.getSstableCompressor().preferredBufferType().allocate(size); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a7db292/test/unit/org/apache/cassandra/io/compress/MockCompressor.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/MockCompressor.java b/test/unit/org/apache/cassandra/io/compress/MockCompressor.java new file mode 100644 index 0000000..d57f4b7 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/compress/MockCompressor.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * Mock compressor used to test the effect of different output sizes in compression. + * Writes only the size of the given buffer, and on decompression expands to sequence of that many 0s. + */ +public class MockCompressor implements ICompressor +{ + final int extra; + final double ratio; + + public static Map<String, String> paramsFor(double ratio, int extra) + { + return ImmutableMap.of("extra", "" + extra, "ratio", "" + ratio); + } + + public static MockCompressor create(Map<String, String> opts) + { + return new MockCompressor(Integer.parseInt(opts.get("extra")), + Double.parseDouble(opts.get("ratio"))); + } + + private MockCompressor(int extra, double ratio) + { + this.extra = extra; + this.ratio = ratio; + } + + public int initialCompressedBufferLength(int chunkLength) + { + return (int) Math.ceil(chunkLength / ratio + extra); + } + + public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) + throws IOException + { + final ByteBuffer outputBuffer = ByteBuffer.wrap(output, outputOffset, output.length - outputOffset); + uncompress(ByteBuffer.wrap(input, inputOffset, inputLength), + outputBuffer); + return outputBuffer.position(); + } + + public void compress(ByteBuffer input, ByteBuffer output) throws IOException + { + int inputLength = input.remaining(); + int outputLength = initialCompressedBufferLength(inputLength); + // assume the input is all zeros, write its length and pad until the required size + output.putInt(inputLength); + for (int i = 4; i < outputLength; ++i) + output.put((byte) i); + input.position(input.limit()); + } + + public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException + { + int outputLength = input.getInt(); + ByteBufferUtil.writeZeroes(output, outputLength); + } + + public BufferType preferredBufferType() + { + return BufferType.OFF_HEAP; + } + + public boolean supports(BufferType bufferType) + { + return true; + } + + public Set<String> supportedOptions() + { + return ImmutableSet.of("extra", "ratio"); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a7db292/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java b/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java index f2746f6..c5de60b 100644 --- a/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java +++ b/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.utils; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.Arrays; @@ -33,6 +34,8 @@ import org.apache.cassandra.io.util.DataOutputBuffer; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; public class ByteBufferUtilTest { @@ -288,8 +291,8 @@ public class ByteBufferUtilTest a.limit(bytes.length - 1).position(0); b.limit(bytes.length - 1).position(1); - Assert.assertFalse(ByteBufferUtil.startsWith(a, b)); - Assert.assertFalse(ByteBufferUtil.startsWith(a, b.slice())); + assertFalse(ByteBufferUtil.startsWith(a, b)); + assertFalse(ByteBufferUtil.startsWith(a, b.slice())); Assert.assertTrue(ByteBufferUtil.endsWith(a, b)); Assert.assertTrue(ByteBufferUtil.endsWith(a, b.slice())); @@ -297,7 +300,60 @@ public class ByteBufferUtilTest a.position(5); - Assert.assertFalse(ByteBufferUtil.startsWith(a, b)); - Assert.assertFalse(ByteBufferUtil.endsWith(a, b)); + assertFalse(ByteBufferUtil.startsWith(a, b)); + assertFalse(ByteBufferUtil.endsWith(a, b)); + } + + @Test + public void testWriteZeroes() + { + byte[] initial = new byte[1024]; + Arrays.fill(initial, (byte) 1); + for (ByteBuffer b : new ByteBuffer[] { ByteBuffer.allocate(1024), ByteBuffer.allocateDirect(1024) }) + { + for (int i = 0; i <= 32; ++i) + for (int j = 1024; j >= 1024 - 32; --j) + { + b.clear(); + b.put(initial); + b.flip(); + b.position(i); + ByteBufferUtil.writeZeroes(b, j-i); + assertEquals(j, b.position()); + int ii = 0; + for (; ii < i; ++ii) + assertEquals(initial[ii], b.get(ii)); + for (; ii < j; ++ii) + assertEquals(0, b.get(ii)); + for (; ii < 1024; ++ii) + assertEquals(initial[ii], b.get(ii)); + + b.clear(); + b.put(initial); + b.limit(j).position(i); + ByteBuffer slice = b.slice(); + ByteBufferUtil.writeZeroes(slice, slice.capacity()); + assertFalse(slice.hasRemaining()); + b.clear(); // reset position and limit for check + ii = 0; + for (; ii < i; ++ii) + assertEquals(initial[ii], b.get(ii)); + for (; ii < j; ++ii) + assertEquals(0, b.get(ii)); + for (; ii < 1024; ++ii) + assertEquals(initial[ii], b.get(ii)); + + slice.clear(); + try + { + ByteBufferUtil.writeZeroes(slice, slice.capacity() + 1); + fail("Line above should throw."); + } + catch (BufferOverflowException | IndexOutOfBoundsException e) + { + // correct path + } + } + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org