Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/733f6b0c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/733f6b0c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/733f6b0c

Branch: refs/heads/cassandra-3.0
Commit: 733f6b0cf8c5f8d89b9a9bf102e9e37548bba601
Parents: e16f0ed 3a71382
Author: Jay Zhuang <jay.zhu...@yahoo.com>
Authored: Tue May 1 15:08:51 2018 -0700
Committer: Jay Zhuang <jay.zhu...@yahoo.com>
Committed: Tue May 1 15:10:13 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../io/compress/CompressedSequentialWriter.java |  17 ++-
 .../cassandra/io/util/SequentialWriter.java     |   2 +
 .../CompressedSequentialWriterReopenTest.java   | 148 +++++++++++++++++++
 .../CompressedSequentialWriterTest.java         |  53 +++++++
 .../cassandra/io/util/SequentialWriterTest.java |  41 +++++
 6 files changed, 260 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/733f6b0c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 857cf96,22ee346..9992802
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,25 -1,5 +1,26 @@@
 -2.2.13
 +3.0.17
 + * Delay hints store excise by write timeout to avoid race with decommission 
(CASSANDRA-13740)
 + * Deprecate background repair and probablistic read_repair_chance table 
options
 +   (CASSANDRA-13910)
 + * Add missed CQL keywords to documentation (CASSANDRA-14359)
 + * Fix unbounded validation compactions on repair / revert CASSANDRA-13797 
(CASSANDRA-14332)
 + * Avoid deadlock when running nodetool refresh before node is fully up 
(CASSANDRA-14310)
 + * Handle all exceptions when opening sstables (CASSANDRA-14202)
 + * Handle incompletely written hint descriptors during startup 
(CASSANDRA-14080)
 + * Handle repeat open bound from SRP in read repair (CASSANDRA-14330)
 + * Use zero as default score in DynamicEndpointSnitch (CASSANDRA-14252)
 + * Respect max hint window when hinting for LWT (CASSANDRA-14215)
 + * Adding missing WriteType enum values to v3, v4, and v5 spec 
(CASSANDRA-13697)
 + * Don't regenerate bloomfilter and summaries on startup (CASSANDRA-11163)
 + * Fix NPE when performing comparison against a null frozen in LWT 
(CASSANDRA-14087)
 + * Log when SSTables are deleted (CASSANDRA-14302)
 + * Fix batch commitlog sync regression (CASSANDRA-14292)
 + * Write to pending endpoint when view replica is also base replica 
(CASSANDRA-14251)
 + * Chain commit log marker potential performance regression in batch commit 
mode (CASSANDRA-14194)
 + * Fully utilise specified compaction threads (CASSANDRA-14210)
 + * Pre-create deletion log records to finish compactions quicker 
(CASSANDRA-12763)
 +Merged from 2.2:
+  * Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743)
   * Use Bounds instead of Range for sstables in anticompaction 
(CASSANDRA-14411)
   * Fix JSON queries with IN restrictions and ORDER BY clause (CASSANDRA-14286)
   * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/733f6b0c/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 74258cf,a7f9bb4..43f1fd0
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@@ -132,7 -129,10 +132,7 @@@ public class CompressedSequentialWrite
              // write corresponding checksum
              compressed.rewind();
              crcMetadata.appendDirect(compressed, true);
-             lastFlushOffset += compressedLength + 4;
+             lastFlushOffset = uncompressedSize;
 -
 -            // adjust our bufferOffset to account for the new uncompressed 
data we've now written out
 -            resetBuffer();
          }
          catch (IOException e)
          {
@@@ -240,6 -239,19 +240,19 @@@
          metadataWriter.resetAndTruncate(realMark.nextChunkIndex - 1);
      }
  
+     private void truncate(long toFileSize, long toBufferOffset)
+     {
+         try
+         {
 -            channel.truncate(toFileSize);
++            fchannel.truncate(toFileSize);
+             lastFlushOffset = toBufferOffset;
+         }
+         catch (IOException e)
+         {
+             throw new FSWriteError(e, getPath());
+         }
+     }
+ 
      /**
       * Seek to the offset where next compressed data chunk should be stored.
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/733f6b0c/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 26316a2,452318e..d17ac34
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@@ -348,7 -443,8 +349,8 @@@ public class SequentialWriter extends B
      {
          try
          {
 -            channel.truncate(toSize);
 +            fchannel.truncate(toSize);
+             lastFlushOffset = toSize;
          }
          catch (IOException e)
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/733f6b0c/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java
index 0000000,33b4957..1bc3454
mode 000000,100644..100644
--- 
a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java
+++ 
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterReopenTest.java
@@@ -1,0 -1,153 +1,148 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.io.compress;
+ 
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
 -import java.util.Arrays;
 -import java.util.HashSet;
+ import java.util.Map;
+ import java.util.Random;
+ import java.util.Set;
+ 
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.cql3.CQLTester;
 -import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ 
+ import static org.junit.Assert.assertEquals;
+ 
+ public class CompressedSequentialWriterReopenTest extends CQLTester
+ {
+     @Test
+     public void badCompressor1() throws IOException
+     {
+         BadCompressor bad = new BadCompressor();
+         byte [] test = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 
13, 14, 15, 16, 17, 18, 19};
+         byte [] out = new byte[10];
+         bad.uncompress(test, 0, 20, out, 0);
+         for (int i = 0; i < 10; i++)
+             assertEquals(out[i], (byte)i);
+     }
+ 
+     @Test
+     public void badCompressor2() throws IOException
+     {
+         BadCompressor bad = new BadCompressor();
+         ByteBuffer input = ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6, 
7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19});
+         ByteBuffer output = ByteBuffer.allocate(40);
+         bad.compress(input, output);
+         for (int i = 0; i < 40; i++)
+             assertEquals(i % 20, output.get(i));
+     }
+ 
+     @Test
+     public void badCompressor3() throws IOException
+     {
+         BadCompressor bad = new BadCompressor();
+         ByteBuffer input = ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6, 
7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19});
+         ByteBuffer output = ByteBuffer.allocate(10);
+         bad.uncompress(input, output);
+         for (int i = 0; i < 10; i++)
+             assertEquals(i, output.get(i));
+     }
+ 
+     @Test
+     public void compressionEnabled() throws Throwable
+     {
 -        createTable("create table %s (id int primary key, t blob) with 
compression = 
{'sstable_compression':'org.apache.cassandra.io.compress.CompressedSequentialWriterReopenTest$BadCompressor'}");
++        createTable("create table %s (id int primary key, t blob) with 
compression = 
{'class':'org.apache.cassandra.io.compress.CompressedSequentialWriterReopenTest$BadCompressor'}");
+         byte [] blob = new byte[1000];
+         (new Random()).nextBytes(blob);
 -        Keyspace keyspace = Keyspace.open(keyspace());
 -        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(currentTable());
 -        cfs.disableAutoCompaction();
++        getCurrentColumnFamilyStore().disableAutoCompaction();
+         for (int i = 0; i < 10000; i++)
+         {
+             execute("insert into %s (id, t) values (?, ?)", i, 
ByteBuffer.wrap(blob));
+         }
 -        cfs.forceBlockingFlush();
++        getCurrentColumnFamilyStore().forceBlockingFlush();
+         for (int i = 0; i < 10000; i++)
+         {
+             execute("insert into %s (id, t) values (?, ?)", i, 
ByteBuffer.wrap(blob));
+         }
 -        cfs.forceBlockingFlush();
++        getCurrentColumnFamilyStore().forceBlockingFlush();
+         DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1);
 -        cfs.forceMajorCompaction();
++        getCurrentColumnFamilyStore().forceMajorCompaction();
+     }
+ 
+     public static class BadCompressor implements ICompressor
+     {
+         public static ICompressor create(Map<String, String> options)
+         {
+             return new BadCompressor();
+         }
+ 
+         @Override
+         public int initialCompressedBufferLength(int chunkLength)
+         {
+             return chunkLength * 2;
+         }
+ 
+         @Override
+         public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset) throws IOException
+         {
+             System.arraycopy(input, inputOffset, output, outputOffset, 
inputLength / 2);
+             return inputLength / 2;
+         }
+ 
+         @Override
+         public void compress(ByteBuffer input, ByteBuffer output) throws 
IOException
+         {
+             int len = input.remaining();
+             byte [] arr = ByteBufferUtil.getArray(input);
+             output.put(arr);
+             output.put(arr);
+             input.position(len);
+         }
+ 
+         @Override
+         public void uncompress(ByteBuffer input, ByteBuffer output) throws 
IOException
+         {
+             byte [] arr = ByteBufferUtil.getArray(input);
+             output.put(arr, 0, arr.length / 2);
+             input.position(arr.length);
+         }
+ 
+         @Override
+         public BufferType preferredBufferType()
+         {
+             return BufferType.ON_HEAP;
+         }
+ 
+         @Override
+         public boolean supports(BufferType bufferType)
+         {
+             return true;
+         }
+ 
+         @Override
+         public Set<String> supportedOptions()
+         {
 -            return new 
HashSet<>(Arrays.asList(CompressionParameters.CRC_CHECK_CHANCE));
++            return null;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/733f6b0c/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index e045aad,bca0354..f04439a
--- 
a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ 
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@@ -38,11 -39,11 +40,13 @@@ import org.apache.cassandra.db.marshal.
  import org.apache.cassandra.db.marshal.UTF8Type;
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.io.util.ChannelProxy;
 -import org.apache.cassandra.io.util.FileMark;
 +import org.apache.cassandra.io.util.DataPosition;
+ import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.io.util.RandomAccessReader;
+ import org.apache.cassandra.io.util.SequentialWriter;
  import org.apache.cassandra.io.util.SequentialWriterTest;
 +import org.apache.cassandra.schema.CompressionParams;
 +import org.apache.cassandra.utils.ChecksumType;
  
  public class CompressedSequentialWriterTest extends SequentialWriterTest
  {
@@@ -111,6 -111,12 +115,12 @@@
                  {
                      writer.write((byte)i);
                  }
+ 
 -                if (bytesToTest <= CompressionParameters.DEFAULT_CHUNK_LENGTH)
 -                    assertEquals(writer.getLastFlushOffset(), 
CompressionParameters.DEFAULT_CHUNK_LENGTH);
++                if (bytesToTest <= CompressionParams.DEFAULT_CHUNK_LENGTH)
++                    assertEquals(writer.getLastFlushOffset(), 
CompressionParams.DEFAULT_CHUNK_LENGTH);
+                 else
 -                    assertTrue(writer.getLastFlushOffset() % 
CompressionParameters.DEFAULT_CHUNK_LENGTH == 0);
++                    assertTrue(writer.getLastFlushOffset() % 
CompressionParams.DEFAULT_CHUNK_LENGTH == 0);
+ 
                  writer.resetAndTruncate(mark);
                  writer.write(dataPost);
                  writer.finish();
@@@ -159,6 -165,48 +169,49 @@@
          writers.clear();
      }
  
+     @Test
+     @Override
+     public void resetAndTruncateTest()
+     {
+         File tempFile = new File(Files.createTempDir(), "reset.txt");
+         File offsetsFile = 
FileUtils.createTempFile("compressedsequentialwriter.offset", "test");
+         final int bufferSize = 48;
+         final int writeSize = 64;
+         byte[] toWrite = new byte[writeSize];
++        MetadataCollector sstableMetadataCollector = new 
MetadataCollector(new 
ClusteringComparator(Arrays.<AbstractType<?>>asList(BytesType.instance)));
+ 
+         try (SequentialWriter writer = new 
CompressedSequentialWriter(tempFile, offsetsFile.getPath(),
 -                                                                              
  new CompressionParameters(LZ4Compressor.instance),new 
MetadataCollector(CellNames.fromAbstractType(UTF8Type.instance, false))))
++                                                                      
CompressionParams.lz4(), sstableMetadataCollector))
+         {
+             // write bytes greather than buffer
+             writer.write(toWrite);
+             long flushedOffset = writer.getLastFlushOffset();
 -            assertEquals(writeSize, writer.getFilePointer());
++            assertEquals(writeSize, writer.position());
+             // mark thi position
 -            FileMark pos = writer.mark();
++            DataPosition pos = writer.mark();
+             // write another
+             writer.write(toWrite);
+             // another buffer should be flushed
+             assertEquals(flushedOffset * 2, writer.getLastFlushOffset());
 -            assertEquals(writeSize * 2, writer.getFilePointer());
++            assertEquals(writeSize * 2, writer.position());
+             // reset writer
+             writer.resetAndTruncate(pos);
+             // current position and flushed size should be changed
 -            assertEquals(writeSize, writer.getFilePointer());
++            assertEquals(writeSize, writer.position());
+             assertEquals(flushedOffset, writer.getLastFlushOffset());
+             // write another byte less than buffer
+             writer.write(new byte[]{0});
 -            assertEquals(writeSize + 1, writer.getFilePointer());
++            assertEquals(writeSize + 1, writer.position());
+             // flush off set should not be increase
+             assertEquals(flushedOffset, writer.getLastFlushOffset());
+             writer.finish();
+         }
+         catch (IOException e)
+         {
+             Assert.fail();
+         }
+     }
+ 
      protected TestableTransaction newTest() throws IOException
      {
          TestableCSW sw = new TestableCSW();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/733f6b0c/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index f5a366e,15d6160..4d75103
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@@ -118,6 -120,46 +119,46 @@@ public class SequentialWriterTest exten
          }
      }
  
+     @Test
+     public void resetAndTruncateTest()
+     {
+         File tempFile = new File(Files.createTempDir(), "reset.txt");
+         final int bufferSize = 48;
+         final int writeSize = 64;
+         byte[] toWrite = new byte[writeSize];
+         try (SequentialWriter writer = new SequentialWriter(tempFile, 
bufferSize, BufferType.OFF_HEAP))
+         {
+             // write bytes greather than buffer
+             writer.write(toWrite);
+             assertEquals(bufferSize, writer.getLastFlushOffset());
 -            assertEquals(writeSize, writer.getFilePointer());
++            assertEquals(writeSize, writer.position());
+             // mark thi position
 -            FileMark pos = writer.mark();
++            DataPosition pos = writer.mark();
+             // write another
+             writer.write(toWrite);
+             // another buffer should be flushed
+             assertEquals(bufferSize * 2, writer.getLastFlushOffset());
 -            assertEquals(writeSize * 2, writer.getFilePointer());
++            assertEquals(writeSize * 2, writer.position());
+             // reset writer
+             writer.resetAndTruncate(pos);
+             // current position and flushed size should be changed
 -            assertEquals(writeSize, writer.getFilePointer());
++            assertEquals(writeSize, writer.position());
+             assertEquals(writeSize, writer.getLastFlushOffset());
+             // write another byte less than buffer
+             writer.write(new byte[]{0});
 -            assertEquals(writeSize + 1, writer.getFilePointer());
++            assertEquals(writeSize + 1, writer.position());
+             // flush off set should not be increase
+             assertEquals(writeSize, writer.getLastFlushOffset());
+             writer.finish();
+         }
+         catch (IOException e)
+         {
+             Assert.fail();
+         }
+         // final file size check
+         assertEquals(writeSize + 1, tempFile.length());
+     }
+ 
      /**
       * Tests that the output stream exposed by SequentialWriter behaves as 
expected
       */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to