Fix compaction failure caused by reading un-flushed data

patch by Jay Zhuang; reviewed by Marcus Eriksson for CASSANDRA-12743


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3a713827
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3a713827
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3a713827

Branch: refs/heads/cassandra-3.0
Commit: 3a713827f48399f389ea851a19b8ec8cd2cc5773
Parents: 334dca9
Author: Jay Zhuang <jay.zhu...@yahoo.com>
Authored: Sat Apr 21 11:15:06 2018 -0700
Committer: Jay Zhuang <jay.zhu...@yahoo.com>
Committed: Tue May 1 15:07:01 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../io/compress/CompressedSequentialWriter.java |  17 ++-
 .../cassandra/io/util/SequentialWriter.java     |   2 +
 .../CompressedSequentialWriterReopenTest.java   | 153 +++++++++++++++++++
 .../CompressedSequentialWriterTest.java         |  52 +++++++
 .../cassandra/io/util/SequentialWriterTest.java |  41 +++++
 6 files changed, 264 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a713827/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5f6189f..22ee346 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.13
+ * Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743)
  * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411)
  * Fix JSON queries with IN restrictions and ORDER BY clause (CASSANDRA-14286)
  * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a713827/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java 
b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 9c7c776..a7f9bb4 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -129,7 +129,7 @@ public class CompressedSequentialWriter extends 
SequentialWriter
             // write corresponding checksum
             compressed.rewind();
             crcMetadata.appendDirect(compressed, true);
-            lastFlushOffset += compressedLength + 4;
+            lastFlushOffset = uncompressedSize;
 
             // adjust our bufferOffset to account for the new uncompressed 
data we've now written out
             resetBuffer();
@@ -235,10 +235,23 @@ public class CompressedSequentialWriter extends 
SequentialWriter
         chunkCount = realMark.nextChunkIndex - 1;
 
         // truncate data and index file
-        truncate(chunkOffset);
+        truncate(chunkOffset, bufferOffset);
         metadataWriter.resetAndTruncate(realMark.nextChunkIndex - 1);
     }
 
+    private void truncate(long toFileSize, long toBufferOffset)
+    {
+        try
+        {
+            channel.truncate(toFileSize);
+            lastFlushOffset = toBufferOffset;
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+    }
+
     /**
      * Seek to the offset where next compressed data chunk should be stored.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a713827/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java 
b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 0c39469..452318e 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -430,6 +430,7 @@ public class SequentialWriter extends OutputStream 
implements WritableByteChanne
             throw new FSReadError(e, getPath());
         }
 
+        bufferOffset = truncateTarget;
         resetBuffer();
     }
 
@@ -443,6 +444,7 @@ public class SequentialWriter extends OutputStream 
implements WritableByteChanne
         try
         {
             channel.truncate(toSize);
+            lastFlushOffset = toSize;
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a713827/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java
 
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java
new file mode 100644
index 0000000..33b4957
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertEquals;
+
+public class CompressedSequentialWriterReopenTest extends CQLTester
+{
+    @Test
+    public void badCompressor1() throws IOException
+    {
+        BadCompressor bad = new BadCompressor();
+        byte [] test = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 
13, 14, 15, 16, 17, 18, 19};
+        byte [] out = new byte[10];
+        bad.uncompress(test, 0, 20, out, 0);
+        for (int i = 0; i < 10; i++)
+            assertEquals(out[i], (byte)i);
+    }
+
+    @Test
+    public void badCompressor2() throws IOException
+    {
+        BadCompressor bad = new BadCompressor();
+        ByteBuffer input = ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 
8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19});
+        ByteBuffer output = ByteBuffer.allocate(40);
+        bad.compress(input, output);
+        for (int i = 0; i < 40; i++)
+            assertEquals(i % 20, output.get(i));
+    }
+
+    @Test
+    public void badCompressor3() throws IOException
+    {
+        BadCompressor bad = new BadCompressor();
+        ByteBuffer input = ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 
8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19});
+        ByteBuffer output = ByteBuffer.allocate(10);
+        bad.uncompress(input, output);
+        for (int i = 0; i < 10; i++)
+            assertEquals(i, output.get(i));
+    }
+
+    @Test
+    public void compressionEnabled() throws Throwable
+    {
+        createTable("create table %s (id int primary key, t blob) with 
compression = 
{'sstable_compression':'org.apache.cassandra.io.compress.CompressedSequentialWriterReopenTest$BadCompressor'}");
+        byte [] blob = new byte[1000];
+        (new Random()).nextBytes(blob);
+        Keyspace keyspace = Keyspace.open(keyspace());
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(currentTable());
+        cfs.disableAutoCompaction();
+        for (int i = 0; i < 10000; i++)
+        {
+            execute("insert into %s (id, t) values (?, ?)", i, 
ByteBuffer.wrap(blob));
+        }
+        cfs.forceBlockingFlush();
+        for (int i = 0; i < 10000; i++)
+        {
+            execute("insert into %s (id, t) values (?, ?)", i, 
ByteBuffer.wrap(blob));
+        }
+        cfs.forceBlockingFlush();
+        DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1);
+        cfs.forceMajorCompaction();
+    }
+
+    public static class BadCompressor implements ICompressor
+    {
+        public static ICompressor create(Map<String, String> options)
+        {
+            return new BadCompressor();
+        }
+
+        @Override
+        public int initialCompressedBufferLength(int chunkLength)
+        {
+            return chunkLength * 2;
+        }
+
+        @Override
+        public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset) throws IOException
+        {
+            System.arraycopy(input, inputOffset, output, outputOffset, 
inputLength / 2);
+            return inputLength / 2;
+        }
+
+        @Override
+        public void compress(ByteBuffer input, ByteBuffer output) throws 
IOException
+        {
+            int len = input.remaining();
+            byte [] arr = ByteBufferUtil.getArray(input);
+            output.put(arr);
+            output.put(arr);
+            input.position(len);
+        }
+
+        @Override
+        public void uncompress(ByteBuffer input, ByteBuffer output) throws 
IOException
+        {
+            byte [] arr = ByteBufferUtil.getArray(input);
+            output.put(arr, 0, arr.length / 2);
+            input.position(arr.length);
+        }
+
+        @Override
+        public BufferType preferredBufferType()
+        {
+            return BufferType.ON_HEAP;
+        }
+
+        @Override
+        public boolean supports(BufferType bufferType)
+        {
+            return true;
+        }
+
+        @Override
+        public Set<String> supportedOptions()
+        {
+            return new 
HashSet<>(Arrays.asList(CompressionParameters.CRC_CHECK_CHANCE));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a713827/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
 
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 43c44fd..bca0354 100644
--- 
a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ 
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.io.compress;
 
+import com.google.common.io.Files;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.File;
@@ -26,6 +27,7 @@ import java.util.*;
 
 import static org.apache.commons.io.FileUtils.readFileToByteArray;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import org.junit.After;
 import org.junit.Test;
@@ -38,7 +40,9 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.io.util.SequentialWriterTest;
 
 public class CompressedSequentialWriterTest extends SequentialWriterTest
@@ -107,6 +111,12 @@ public class CompressedSequentialWriterTest extends 
SequentialWriterTest
                 {
                     writer.write((byte)i);
                 }
+
+                if (bytesToTest <= CompressionParameters.DEFAULT_CHUNK_LENGTH)
+                    assertEquals(writer.getLastFlushOffset(), 
CompressionParameters.DEFAULT_CHUNK_LENGTH);
+                else
+                    assertTrue(writer.getLastFlushOffset() % 
CompressionParameters.DEFAULT_CHUNK_LENGTH == 0);
+
                 writer.resetAndTruncate(mark);
                 writer.write(dataPost);
                 writer.finish();
@@ -155,6 +165,48 @@ public class CompressedSequentialWriterTest extends 
SequentialWriterTest
         writers.clear();
     }
 
+    @Test
+    @Override
+    public void resetAndTruncateTest()
+    {
+        File tempFile = new File(Files.createTempDir(), "reset.txt");
+        File offsetsFile = 
FileUtils.createTempFile("compressedsequentialwriter.offset", "test");
+        final int bufferSize = 48;
+        final int writeSize = 64;
+        byte[] toWrite = new byte[writeSize];
+
+        try (SequentialWriter writer = new 
CompressedSequentialWriter(tempFile, offsetsFile.getPath(),
+                                                                               
 new CompressionParameters(LZ4Compressor.instance),new 
MetadataCollector(CellNames.fromAbstractType(UTF8Type.instance, false))))
+        {
+            // write bytes greather than buffer
+            writer.write(toWrite);
+            long flushedOffset = writer.getLastFlushOffset();
+            assertEquals(writeSize, writer.getFilePointer());
+            // mark thi position
+            FileMark pos = writer.mark();
+            // write another
+            writer.write(toWrite);
+            // another buffer should be flushed
+            assertEquals(flushedOffset * 2, writer.getLastFlushOffset());
+            assertEquals(writeSize * 2, writer.getFilePointer());
+            // reset writer
+            writer.resetAndTruncate(pos);
+            // current position and flushed size should be changed
+            assertEquals(writeSize, writer.getFilePointer());
+            assertEquals(flushedOffset, writer.getLastFlushOffset());
+            // write another byte less than buffer
+            writer.write(new byte[]{0});
+            assertEquals(writeSize + 1, writer.getFilePointer());
+            // flush off set should not be increase
+            assertEquals(flushedOffset, writer.getLastFlushOffset());
+            writer.finish();
+        }
+        catch (IOException e)
+        {
+            Assert.fail();
+        }
+    }
+
     protected TestableTransaction newTest() throws IOException
     {
         TestableCSW sw = new TestableCSW();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a713827/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java 
b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index fd38427..15d6160 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
 
 import static org.apache.commons.io.FileUtils.*;
+import static org.junit.Assert.assertEquals;
 
 public class SequentialWriterTest extends AbstractTransactionalTest
 {
@@ -119,6 +120,46 @@ public class SequentialWriterTest extends 
AbstractTransactionalTest
         }
     }
 
+    @Test
+    public void resetAndTruncateTest()
+    {
+        File tempFile = new File(Files.createTempDir(), "reset.txt");
+        final int bufferSize = 48;
+        final int writeSize = 64;
+        byte[] toWrite = new byte[writeSize];
+        try (SequentialWriter writer = new SequentialWriter(tempFile, 
bufferSize, BufferType.OFF_HEAP))
+        {
+            // write bytes greather than buffer
+            writer.write(toWrite);
+            assertEquals(bufferSize, writer.getLastFlushOffset());
+            assertEquals(writeSize, writer.getFilePointer());
+            // mark thi position
+            FileMark pos = writer.mark();
+            // write another
+            writer.write(toWrite);
+            // another buffer should be flushed
+            assertEquals(bufferSize * 2, writer.getLastFlushOffset());
+            assertEquals(writeSize * 2, writer.getFilePointer());
+            // reset writer
+            writer.resetAndTruncate(pos);
+            // current position and flushed size should be changed
+            assertEquals(writeSize, writer.getFilePointer());
+            assertEquals(writeSize, writer.getLastFlushOffset());
+            // write another byte less than buffer
+            writer.write(new byte[]{0});
+            assertEquals(writeSize + 1, writer.getFilePointer());
+            // flush off set should not be increase
+            assertEquals(writeSize, writer.getLastFlushOffset());
+            writer.finish();
+        }
+        catch (IOException e)
+        {
+            Assert.fail();
+        }
+        // final file size check
+        assertEquals(writeSize + 1, tempFile.length());
+    }
+
     /**
      * Tests that the output stream exposed by SequentialWriter behaves as 
expected
      */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to