Fix compaction failure caused by reading un-flushed data patch by Jay Zhuang; reviewed by Marcus Eriksson for CASSANDRA-12743
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3a713827 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3a713827 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3a713827 Branch: refs/heads/cassandra-3.0 Commit: 3a713827f48399f389ea851a19b8ec8cd2cc5773 Parents: 334dca9 Author: Jay Zhuang <jay.zhu...@yahoo.com> Authored: Sat Apr 21 11:15:06 2018 -0700 Committer: Jay Zhuang <jay.zhu...@yahoo.com> Committed: Tue May 1 15:07:01 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../io/compress/CompressedSequentialWriter.java | 17 ++- .../cassandra/io/util/SequentialWriter.java | 2 + .../CompressedSequentialWriterReopenTest.java | 153 +++++++++++++++++++ .../CompressedSequentialWriterTest.java | 52 +++++++ .../cassandra/io/util/SequentialWriterTest.java | 41 +++++ 6 files changed, 264 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a713827/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5f6189f..22ee346 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.13 + * Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743) * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411) * Fix JSON queries with IN restrictions and ORDER BY clause (CASSANDRA-14286) * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a713827/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 9c7c776..a7f9bb4 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -129,7 +129,7 @@ public class CompressedSequentialWriter extends SequentialWriter // write corresponding checksum compressed.rewind(); crcMetadata.appendDirect(compressed, true); - lastFlushOffset += compressedLength + 4; + lastFlushOffset = uncompressedSize; // adjust our bufferOffset to account for the new uncompressed data we've now written out resetBuffer(); @@ -235,10 +235,23 @@ public class CompressedSequentialWriter extends SequentialWriter chunkCount = realMark.nextChunkIndex - 1; // truncate data and index file - truncate(chunkOffset); + truncate(chunkOffset, bufferOffset); metadataWriter.resetAndTruncate(realMark.nextChunkIndex - 1); } + private void truncate(long toFileSize, long toBufferOffset) + { + try + { + channel.truncate(toFileSize); + lastFlushOffset = toBufferOffset; + } + catch (IOException e) + { + throw new FSWriteError(e, getPath()); + } + } + /** * Seek to the offset where next compressed data chunk should be stored. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a713827/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index 0c39469..452318e 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -430,6 +430,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne throw new FSReadError(e, getPath()); } + bufferOffset = truncateTarget; resetBuffer(); } @@ -443,6 +444,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne try { channel.truncate(toSize); + lastFlushOffset = toSize; } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a713827/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java new file mode 100644 index 0000000..33b4957 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.junit.Assert.assertEquals; + +public class CompressedSequentialWriterReopenTest extends CQLTester +{ + @Test + public void badCompressor1() throws IOException + { + BadCompressor bad = new BadCompressor(); + byte [] test = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}; + byte [] out = new byte[10]; + bad.uncompress(test, 0, 20, out, 0); + for (int i = 0; i < 10; i++) + assertEquals(out[i], (byte)i); + } + + @Test + public void badCompressor2() throws IOException + { + BadCompressor bad = new BadCompressor(); + ByteBuffer input = ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}); + ByteBuffer output = ByteBuffer.allocate(40); + bad.compress(input, output); + for (int i = 0; i < 40; i++) + assertEquals(i % 20, output.get(i)); + } + + @Test + public void badCompressor3() throws IOException + { + BadCompressor bad = new BadCompressor(); + ByteBuffer input = ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}); + ByteBuffer output = ByteBuffer.allocate(10); + bad.uncompress(input, output); + for (int i = 0; i < 10; i++) + assertEquals(i, output.get(i)); + } + + @Test + public void compressionEnabled() throws Throwable + { + createTable("create table %s (id int primary key, t blob) with compression = {'sstable_compression':'org.apache.cassandra.io.compress.CompressedSequentialWriterReopenTest$BadCompressor'}"); + byte [] blob = new byte[1000]; + (new Random()).nextBytes(blob); + Keyspace keyspace = Keyspace.open(keyspace()); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(currentTable()); + cfs.disableAutoCompaction(); + for (int i = 0; i < 10000; i++) + { + execute("insert into %s (id, t) values (?, ?)", i, ByteBuffer.wrap(blob)); + } + cfs.forceBlockingFlush(); + for (int i = 0; i < 10000; i++) + { + execute("insert into %s (id, t) values (?, ?)", i, ByteBuffer.wrap(blob)); + } + cfs.forceBlockingFlush(); + DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1); + cfs.forceMajorCompaction(); + } + + public static class BadCompressor implements ICompressor + { + public static ICompressor create(Map<String, String> options) + { + return new BadCompressor(); + } + + @Override + public int initialCompressedBufferLength(int chunkLength) + { + return chunkLength * 2; + } + + @Override + public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException + { + System.arraycopy(input, inputOffset, output, outputOffset, inputLength / 2); + return inputLength / 2; + } + + @Override + public void compress(ByteBuffer input, ByteBuffer output) throws IOException + { + int len = input.remaining(); + byte [] arr = ByteBufferUtil.getArray(input); + output.put(arr); + output.put(arr); + input.position(len); + } + + @Override + public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException + { + byte [] arr = ByteBufferUtil.getArray(input); + output.put(arr, 0, arr.length / 2); + input.position(arr.length); + } + + @Override + public BufferType preferredBufferType() + { + return BufferType.ON_HEAP; + } + + @Override + public boolean supports(BufferType bufferType) + { + return true; + } + + @Override + public Set<String> supportedOptions() + { + return new HashSet<>(Arrays.asList(CompressionParameters.CRC_CHECK_CHANCE)); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a713827/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java index 43c44fd..bca0354 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.io.compress; +import com.google.common.io.Files; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.File; @@ -26,6 +27,7 @@ import java.util.*; import static org.apache.commons.io.FileUtils.readFileToByteArray; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.junit.After; import org.junit.Test; @@ -38,7 +40,9 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.io.util.SequentialWriter; import org.apache.cassandra.io.util.SequentialWriterTest; public class CompressedSequentialWriterTest extends SequentialWriterTest @@ -107,6 +111,12 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest { writer.write((byte)i); } + + if (bytesToTest <= CompressionParameters.DEFAULT_CHUNK_LENGTH) + assertEquals(writer.getLastFlushOffset(), CompressionParameters.DEFAULT_CHUNK_LENGTH); + else + assertTrue(writer.getLastFlushOffset() % CompressionParameters.DEFAULT_CHUNK_LENGTH == 0); + writer.resetAndTruncate(mark); writer.write(dataPost); writer.finish(); @@ -155,6 +165,48 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest writers.clear(); } + @Test + @Override + public void resetAndTruncateTest() + { + File tempFile = new File(Files.createTempDir(), "reset.txt"); + File offsetsFile = FileUtils.createTempFile("compressedsequentialwriter.offset", "test"); + final int bufferSize = 48; + final int writeSize = 64; + byte[] toWrite = new byte[writeSize]; + + try (SequentialWriter writer = new CompressedSequentialWriter(tempFile, offsetsFile.getPath(), + new CompressionParameters(LZ4Compressor.instance),new MetadataCollector(CellNames.fromAbstractType(UTF8Type.instance, false)))) + { + // write bytes greather than buffer + writer.write(toWrite); + long flushedOffset = writer.getLastFlushOffset(); + assertEquals(writeSize, writer.getFilePointer()); + // mark thi position + FileMark pos = writer.mark(); + // write another + writer.write(toWrite); + // another buffer should be flushed + assertEquals(flushedOffset * 2, writer.getLastFlushOffset()); + assertEquals(writeSize * 2, writer.getFilePointer()); + // reset writer + writer.resetAndTruncate(pos); + // current position and flushed size should be changed + assertEquals(writeSize, writer.getFilePointer()); + assertEquals(flushedOffset, writer.getLastFlushOffset()); + // write another byte less than buffer + writer.write(new byte[]{0}); + assertEquals(writeSize + 1, writer.getFilePointer()); + // flush off set should not be increase + assertEquals(flushedOffset, writer.getLastFlushOffset()); + writer.finish(); + } + catch (IOException e) + { + Assert.fail(); + } + } + protected TestableTransaction newTest() throws IOException { TestableCSW sw = new TestableCSW(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a713827/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java index fd38427..15d6160 100644 --- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java @@ -36,6 +36,7 @@ import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest; import static org.apache.commons.io.FileUtils.*; +import static org.junit.Assert.assertEquals; public class SequentialWriterTest extends AbstractTransactionalTest { @@ -119,6 +120,46 @@ public class SequentialWriterTest extends AbstractTransactionalTest } } + @Test + public void resetAndTruncateTest() + { + File tempFile = new File(Files.createTempDir(), "reset.txt"); + final int bufferSize = 48; + final int writeSize = 64; + byte[] toWrite = new byte[writeSize]; + try (SequentialWriter writer = new SequentialWriter(tempFile, bufferSize, BufferType.OFF_HEAP)) + { + // write bytes greather than buffer + writer.write(toWrite); + assertEquals(bufferSize, writer.getLastFlushOffset()); + assertEquals(writeSize, writer.getFilePointer()); + // mark thi position + FileMark pos = writer.mark(); + // write another + writer.write(toWrite); + // another buffer should be flushed + assertEquals(bufferSize * 2, writer.getLastFlushOffset()); + assertEquals(writeSize * 2, writer.getFilePointer()); + // reset writer + writer.resetAndTruncate(pos); + // current position and flushed size should be changed + assertEquals(writeSize, writer.getFilePointer()); + assertEquals(writeSize, writer.getLastFlushOffset()); + // write another byte less than buffer + writer.write(new byte[]{0}); + assertEquals(writeSize + 1, writer.getFilePointer()); + // flush off set should not be increase + assertEquals(writeSize, writer.getLastFlushOffset()); + writer.finish(); + } + catch (IOException e) + { + Assert.fail(); + } + // final file size check + assertEquals(writeSize + 1, tempFile.length()); + } + /** * Tests that the output stream exposed by SequentialWriter behaves as expected */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org