Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/733f6b0c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/733f6b0c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/733f6b0c Branch: refs/heads/cassandra-3.11 Commit: 733f6b0cf8c5f8d89b9a9bf102e9e37548bba601 Parents: e16f0ed 3a71382 Author: Jay Zhuang <jay.zhu...@yahoo.com> Authored: Tue May 1 15:08:51 2018 -0700 Committer: Jay Zhuang <jay.zhu...@yahoo.com> Committed: Tue May 1 15:10:13 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../io/compress/CompressedSequentialWriter.java | 17 ++- .../cassandra/io/util/SequentialWriter.java | 2 + .../CompressedSequentialWriterReopenTest.java | 148 +++++++++++++++++++ .../CompressedSequentialWriterTest.java | 53 +++++++ .../cassandra/io/util/SequentialWriterTest.java | 41 +++++ 6 files changed, 260 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/733f6b0c/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 857cf96,22ee346..9992802 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,25 -1,5 +1,26 @@@ -2.2.13 +3.0.17 + * Delay hints store excise by write timeout to avoid race with decommission (CASSANDRA-13740) + * Deprecate background repair and probablistic read_repair_chance table options + (CASSANDRA-13910) + * Add missed CQL keywords to documentation (CASSANDRA-14359) + * Fix unbounded validation compactions on repair / revert CASSANDRA-13797 (CASSANDRA-14332) + * Avoid deadlock when running nodetool refresh before node is fully up (CASSANDRA-14310) + * Handle all exceptions when opening sstables (CASSANDRA-14202) + * Handle incompletely written hint descriptors during startup (CASSANDRA-14080) + * Handle repeat open bound from SRP in read repair (CASSANDRA-14330) + * Use zero as default score in DynamicEndpointSnitch (CASSANDRA-14252) + * Respect max hint window when hinting for LWT (CASSANDRA-14215) + * Adding missing WriteType enum values to v3, v4, and v5 spec (CASSANDRA-13697) + * Don't regenerate bloomfilter and summaries on startup (CASSANDRA-11163) + * Fix NPE when performing comparison against a null frozen in LWT (CASSANDRA-14087) + * Log when SSTables are deleted (CASSANDRA-14302) + * Fix batch commitlog sync regression (CASSANDRA-14292) + * Write to pending endpoint when view replica is also base replica (CASSANDRA-14251) + * Chain commit log marker potential performance regression in batch commit mode (CASSANDRA-14194) + * Fully utilise specified compaction threads (CASSANDRA-14210) + * Pre-create deletion log records to finish compactions quicker (CASSANDRA-12763) +Merged from 2.2: + * Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743) * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411) * Fix JSON queries with IN restrictions and ORDER BY clause (CASSANDRA-14286) * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891) http://git-wip-us.apache.org/repos/asf/cassandra/blob/733f6b0c/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 74258cf,a7f9bb4..43f1fd0 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@@ -132,7 -129,10 +132,7 @@@ public class CompressedSequentialWrite // write corresponding checksum compressed.rewind(); crcMetadata.appendDirect(compressed, true); - lastFlushOffset += compressedLength + 4; + lastFlushOffset = uncompressedSize; - - // adjust our bufferOffset to account for the new uncompressed data we've now written out - resetBuffer(); } catch (IOException e) { @@@ -240,6 -239,19 +240,19 @@@ metadataWriter.resetAndTruncate(realMark.nextChunkIndex - 1); } + private void truncate(long toFileSize, long toBufferOffset) + { + try + { - channel.truncate(toFileSize); ++ fchannel.truncate(toFileSize); + lastFlushOffset = toBufferOffset; + } + catch (IOException e) + { + throw new FSWriteError(e, getPath()); + } + } + /** * Seek to the offset where next compressed data chunk should be stored. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/733f6b0c/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/SequentialWriter.java index 26316a2,452318e..d17ac34 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@@ -348,7 -443,8 +349,8 @@@ public class SequentialWriter extends B { try { - channel.truncate(toSize); + fchannel.truncate(toSize); + lastFlushOffset = toSize; } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/733f6b0c/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java index 0000000,33b4957..1bc3454 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java @@@ -1,0 -1,153 +1,148 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.io.compress; + + import java.io.IOException; + import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.HashSet; + import java.util.Map; + import java.util.Random; + import java.util.Set; + + import org.junit.Test; + + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.cql3.CQLTester; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; ++import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.apache.cassandra.utils.ByteBufferUtil; + + import static org.junit.Assert.assertEquals; + + public class CompressedSequentialWriterReopenTest extends CQLTester + { + @Test + public void badCompressor1() throws IOException + { + BadCompressor bad = new BadCompressor(); + byte [] test = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}; + byte [] out = new byte[10]; + bad.uncompress(test, 0, 20, out, 0); + for (int i = 0; i < 10; i++) + assertEquals(out[i], (byte)i); + } + + @Test + public void badCompressor2() throws IOException + { + BadCompressor bad = new BadCompressor(); + ByteBuffer input = ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}); + ByteBuffer output = ByteBuffer.allocate(40); + bad.compress(input, output); + for (int i = 0; i < 40; i++) + assertEquals(i % 20, output.get(i)); + } + + @Test + public void badCompressor3() throws IOException + { + BadCompressor bad = new BadCompressor(); + ByteBuffer input = ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}); + ByteBuffer output = ByteBuffer.allocate(10); + bad.uncompress(input, output); + for (int i = 0; i < 10; i++) + assertEquals(i, output.get(i)); + } + + @Test + public void compressionEnabled() throws Throwable + { - createTable("create table %s (id int primary key, t blob) with compression = {'sstable_compression':'org.apache.cassandra.io.compress.CompressedSequentialWriterReopenTest$BadCompressor'}"); ++ createTable("create table %s (id int primary key, t blob) with compression = {'class':'org.apache.cassandra.io.compress.CompressedSequentialWriterReopenTest$BadCompressor'}"); + byte [] blob = new byte[1000]; + (new Random()).nextBytes(blob); - Keyspace keyspace = Keyspace.open(keyspace()); - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(currentTable()); - cfs.disableAutoCompaction(); ++ getCurrentColumnFamilyStore().disableAutoCompaction(); + for (int i = 0; i < 10000; i++) + { + execute("insert into %s (id, t) values (?, ?)", i, ByteBuffer.wrap(blob)); + } - cfs.forceBlockingFlush(); ++ getCurrentColumnFamilyStore().forceBlockingFlush(); + for (int i = 0; i < 10000; i++) + { + execute("insert into %s (id, t) values (?, ?)", i, ByteBuffer.wrap(blob)); + } - cfs.forceBlockingFlush(); ++ getCurrentColumnFamilyStore().forceBlockingFlush(); + DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1); - cfs.forceMajorCompaction(); ++ getCurrentColumnFamilyStore().forceMajorCompaction(); + } + + public static class BadCompressor implements ICompressor + { + public static ICompressor create(Map<String, String> options) + { + return new BadCompressor(); + } + + @Override + public int initialCompressedBufferLength(int chunkLength) + { + return chunkLength * 2; + } + + @Override + public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException + { + System.arraycopy(input, inputOffset, output, outputOffset, inputLength / 2); + return inputLength / 2; + } + + @Override + public void compress(ByteBuffer input, ByteBuffer output) throws IOException + { + int len = input.remaining(); + byte [] arr = ByteBufferUtil.getArray(input); + output.put(arr); + output.put(arr); + input.position(len); + } + + @Override + public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException + { + byte [] arr = ByteBufferUtil.getArray(input); + output.put(arr, 0, arr.length / 2); + input.position(arr.length); + } + + @Override + public BufferType preferredBufferType() + { + return BufferType.ON_HEAP; + } + + @Override + public boolean supports(BufferType bufferType) + { + return true; + } + + @Override + public Set<String> supportedOptions() + { - return new HashSet<>(Arrays.asList(CompressionParameters.CRC_CHECK_CHANCE)); ++ return null; + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/733f6b0c/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java index e045aad,bca0354..f04439a --- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java @@@ -38,11 -39,11 +40,13 @@@ import org.apache.cassandra.db.marshal. import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.ChannelProxy; -import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.io.util.DataPosition; + import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; + import org.apache.cassandra.io.util.SequentialWriter; import org.apache.cassandra.io.util.SequentialWriterTest; +import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.utils.ChecksumType; public class CompressedSequentialWriterTest extends SequentialWriterTest { @@@ -111,6 -111,12 +115,12 @@@ { writer.write((byte)i); } + - if (bytesToTest <= CompressionParameters.DEFAULT_CHUNK_LENGTH) - assertEquals(writer.getLastFlushOffset(), CompressionParameters.DEFAULT_CHUNK_LENGTH); ++ if (bytesToTest <= CompressionParams.DEFAULT_CHUNK_LENGTH) ++ assertEquals(writer.getLastFlushOffset(), CompressionParams.DEFAULT_CHUNK_LENGTH); + else - assertTrue(writer.getLastFlushOffset() % CompressionParameters.DEFAULT_CHUNK_LENGTH == 0); ++ assertTrue(writer.getLastFlushOffset() % CompressionParams.DEFAULT_CHUNK_LENGTH == 0); + writer.resetAndTruncate(mark); writer.write(dataPost); writer.finish(); @@@ -159,6 -165,48 +169,49 @@@ writers.clear(); } + @Test + @Override + public void resetAndTruncateTest() + { + File tempFile = new File(Files.createTempDir(), "reset.txt"); + File offsetsFile = FileUtils.createTempFile("compressedsequentialwriter.offset", "test"); + final int bufferSize = 48; + final int writeSize = 64; + byte[] toWrite = new byte[writeSize]; ++ MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(Arrays.<AbstractType<?>>asList(BytesType.instance))); + + try (SequentialWriter writer = new CompressedSequentialWriter(tempFile, offsetsFile.getPath(), - new CompressionParameters(LZ4Compressor.instance),new MetadataCollector(CellNames.fromAbstractType(UTF8Type.instance, false)))) ++ CompressionParams.lz4(), sstableMetadataCollector)) + { + // write bytes greather than buffer + writer.write(toWrite); + long flushedOffset = writer.getLastFlushOffset(); - assertEquals(writeSize, writer.getFilePointer()); ++ assertEquals(writeSize, writer.position()); + // mark thi position - FileMark pos = writer.mark(); ++ DataPosition pos = writer.mark(); + // write another + writer.write(toWrite); + // another buffer should be flushed + assertEquals(flushedOffset * 2, writer.getLastFlushOffset()); - assertEquals(writeSize * 2, writer.getFilePointer()); ++ assertEquals(writeSize * 2, writer.position()); + // reset writer + writer.resetAndTruncate(pos); + // current position and flushed size should be changed - assertEquals(writeSize, writer.getFilePointer()); ++ assertEquals(writeSize, writer.position()); + assertEquals(flushedOffset, writer.getLastFlushOffset()); + // write another byte less than buffer + writer.write(new byte[]{0}); - assertEquals(writeSize + 1, writer.getFilePointer()); ++ assertEquals(writeSize + 1, writer.position()); + // flush off set should not be increase + assertEquals(flushedOffset, writer.getLastFlushOffset()); + writer.finish(); + } + catch (IOException e) + { + Assert.fail(); + } + } + protected TestableTransaction newTest() throws IOException { TestableCSW sw = new TestableCSW(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/733f6b0c/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java index f5a366e,15d6160..4d75103 --- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java @@@ -118,6 -120,46 +119,46 @@@ public class SequentialWriterTest exten } } + @Test + public void resetAndTruncateTest() + { + File tempFile = new File(Files.createTempDir(), "reset.txt"); + final int bufferSize = 48; + final int writeSize = 64; + byte[] toWrite = new byte[writeSize]; + try (SequentialWriter writer = new SequentialWriter(tempFile, bufferSize, BufferType.OFF_HEAP)) + { + // write bytes greather than buffer + writer.write(toWrite); + assertEquals(bufferSize, writer.getLastFlushOffset()); - assertEquals(writeSize, writer.getFilePointer()); ++ assertEquals(writeSize, writer.position()); + // mark thi position - FileMark pos = writer.mark(); ++ DataPosition pos = writer.mark(); + // write another + writer.write(toWrite); + // another buffer should be flushed + assertEquals(bufferSize * 2, writer.getLastFlushOffset()); - assertEquals(writeSize * 2, writer.getFilePointer()); ++ assertEquals(writeSize * 2, writer.position()); + // reset writer + writer.resetAndTruncate(pos); + // current position and flushed size should be changed - assertEquals(writeSize, writer.getFilePointer()); ++ assertEquals(writeSize, writer.position()); + assertEquals(writeSize, writer.getLastFlushOffset()); + // write another byte less than buffer + writer.write(new byte[]{0}); - assertEquals(writeSize + 1, writer.getFilePointer()); ++ assertEquals(writeSize + 1, writer.position()); + // flush off set should not be increase + assertEquals(writeSize, writer.getLastFlushOffset()); + writer.finish(); + } + catch (IOException e) + { + Assert.fail(); + } + // final file size check + assertEquals(writeSize + 1, tempFile.length()); + } + /** * Tests that the output stream exposed by SequentialWriter behaves as expected */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org