Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
        CHANGES.txt
        
src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
        src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
        src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
        src/java/org/apache/cassandra/io/util/SequentialWriter.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/55750e07
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/55750e07
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/55750e07

Branch: refs/heads/cassandra-2.1
Commit: 55750e07d20b76bf2c9f07575bfcb9193734bf24
Parents: edf48f8 3679b1b
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Wed Jan 7 13:05:16 2015 +0000
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Wed Jan 7 13:05:16 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../io/compress/CompressedSequentialWriter.java |  6 +++
 .../io/compress/CompressionMetadata.java        |  9 +++++
 .../cassandra/io/sstable/SSTableWriter.java     | 21 ++++++-----
 .../io/util/ChecksummedSequentialWriter.java    |  6 +++
 .../cassandra/io/util/SequentialWriter.java     | 39 ++++++++++++++++----
 6 files changed, 67 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1c1bfe2,7aad4c0..372972d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,51 -1,5 +1,54 @@@
 +2.1.3
 + * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512)
 + * Properly calculate expected write size during compaction (CASSANDRA-8532)
 + * Invalidate affected prepared statements when a table's columns
 +   are altered (CASSANDRA-7910)
 + * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
 + * Fix regression in SSTableRewriter causing some rows to become unreadable 
 +   during compaction (CASSANDRA-8429)
 + * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
 + * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
 +   is disabled (CASSANDRA-8288)
 + * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
 + * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
 + * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
 + * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
 + * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
 + * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
 + * Disable mmap on Windows (CASSANDRA-6993)
 + * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
 + * Add auth support to cassandra-stress (CASSANDRA-7985)
 + * Fix ArrayIndexOutOfBoundsException when generating error message
 +   for some CQL syntax errors (CASSANDRA-8455)
 + * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
 + * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
 + * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
 + * Ensure memtable flush cannot expire commit log entries from its future 
(CASSANDRA-8383)
 + * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
 + * Remove tmplink files for offline compactions (CASSANDRA-8321)
 + * Reduce maxHintsInProgress (CASSANDRA-8415)
 + * BTree updates may call provided update function twice (CASSANDRA-8018)
 + * Release sstable references after anticompaction (CASSANDRA-8386)
 + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
 + * Fix high size calculations for prepared statements (CASSANDRA-8231)
 + * Centralize shared executors (CASSANDRA-8055)
 + * Fix filtering for CONTAINS (KEY) relations on frozen collection
 +   clustering columns when the query is restricted to a single
 +   partition (CASSANDRA-8203)
 + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
 + * Add more log info if readMeter is null (CASSANDRA-8238)
 + * add check of the system wall clock time at startup (CASSANDRA-8305)
 + * Support for frozen collections (CASSANDRA-7859)
 + * Fix overflow on histogram computation (CASSANDRA-8028)
 + * Have paxos reuse the timestamp generation of normal queries 
(CASSANDRA-7801)
 + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
 + * Improve JBOD disk utilization (CASSANDRA-7386)
 + * Log failed host when preparing incremental repair (CASSANDRA-8228)
 + * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
 +Merged from 2.0:
++=======
+ 2.0.12:
+  * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499)
   * Increase bf true positive count on key cache hit (CASSANDRA-8525)
   * Move MeteredFlusher to its own thread (CASSANDRA-8485)
   * Fix non-distinct results in DISTNCT queries on static columns when

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index d3c41fa,909d822..81bb3e9
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@@ -261,12 -271,12 +261,18 @@@ public class CompressedSequentialWrite
          }
      }
  
+     public void abort()
+     {
+         super.abort();
+         metadataWriter.abort();
+     }
+ 
 +    @Override
 +    public void writeFullChecksum(Descriptor descriptor)
 +    {
 +        crcMetadata.writeFullChecksum(descriptor);
 +    }
 +
      /**
       * Class to hold a mark to the position of the file
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 173722f,5b0154b..f19d502
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@@ -356,25 -360,38 +356,34 @@@ public class CompressionMetadat
           */
          public void resetAndTruncate(int chunkIndex)
          {
 +            count = chunkIndex;
 +        }
 +
 +        public void close(long dataLength, int chunks) throws IOException
 +        {
 +            DataOutputStream out = null;
              try
              {
 -                seek(dataLengthOffset
 -                     + 8 // size reserved for uncompressed data length
 -                     + 4 // size reserved for chunk count
 -                     + (chunkIndex * 8L));
 -                getChannel().truncate(getFilePointer());
 +              out = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(filePath)));
 +                  assert chunks == count;
 +                  writeHeader(out, dataLength, chunks);
 +                for (int i = 0 ; i < count ; i++)
 +                    out.writeLong(offsets.getLong(i * 8));
              }
 -            catch (IOException e)
 +            finally
              {
 -                throw new FSWriteError(e, filePath);
 +                FileUtils.closeQuietly(out);
              }
          }
+ 
 -        public void close() throws IOException
 -        {
 -            if (getChannel().isOpen()) // if RAF.closed were public we could 
just use that, but it's not
 -                getChannel().force(true);
 -            super.close();
 -        }
 -
+         public void abort()
+         {
 -            try
 -            {
 -                super.close();
 -            }
 -            catch (Throwable t)
++            if (offsets != null)
+             {
 -                logger.warn("Suppressed exception while closing 
CompressionMetadata.Writer for {}", filePath, t);
++                offsets.unreference();
++                offsets = null;
+             }
+         }
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 595012d,08e5527..b0365ad
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@@ -339,23 -313,9 +339,19 @@@ public class SSTableWriter extends SSTa
       */
      public void abort()
      {
 -        assert descriptor.temporary;
 -        iwriter.abort();
 -        dataFile.abort();
 +        abort(true);
 +    }
 +    public void abort(boolean closeBf)
 +    {
 +        assert descriptor.type.isTemporary;
 +        if (iwriter == null && dataFile == null)
 +            return;
++
 +        if (iwriter != null)
-         {
-             FileUtils.closeQuietly(iwriter.indexFile);
-             if (closeBf)
-             {
-                 iwriter.bf.close();
-             }
-         }
++            iwriter.abort(closeBf);
++
 +        if (dataFile!= null)
-             FileUtils.closeQuietly(dataFile);
++            dataFile.abort();
  
          Set<Component> components = SSTable.componentsFor(descriptor);
          try
@@@ -589,7 -431,7 +585,7 @@@
      /**
       * Encapsulates writing the index and filter for an SSTable. The state of 
this object is not valid until it has been closed.
       */
--    class IndexWriter implements Closeable
++    class IndexWriter
      {
          private final SequentialWriter indexFile;
          public final SegmentedFile.Builder builder;
@@@ -633,6 -469,6 +629,13 @@@
              builder.addPotentialBoundary(indexPosition);
          }
  
++        public void abort(boolean closeBf)
++        {
++            indexFile.abort();
++            if (closeBf)
++                bf.close();
++        }
++
          /**
           * Closes the index and bloomfilter, making the public state of this 
writer valid for consumption.
           */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index b95bf32,0000000..f4281b2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@@ -1,53 -1,0 +1,59 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.util;
 +
 +import java.io.File;
 +
 +import org.apache.cassandra.io.sstable.Descriptor;
 +
 +public class ChecksummedSequentialWriter extends SequentialWriter
 +{
 +    private final SequentialWriter crcWriter;
 +    private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
 +
 +    public ChecksummedSequentialWriter(File file, int bufferSize, File 
crcPath)
 +    {
 +        super(file, bufferSize);
 +        crcWriter = new SequentialWriter(crcPath, 8 * 1024);
 +        crcMetadata = new 
DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
 +        crcMetadata.writeChunkSize(buffer.length);
 +    }
 +
 +    protected void flushData()
 +    {
 +        super.flushData();
 +        crcMetadata.append(buffer, 0, validBufferBytes);
 +    }
 +
 +    public void writeFullChecksum(Descriptor descriptor)
 +    {
 +        crcMetadata.writeFullChecksum(descriptor);
 +    }
 +
 +    public void close()
 +    {
 +        super.close();
 +        crcWriter.close();
 +    }
++
++    public void abort()
++    {
++        super.abort();
++        crcWriter.abort();
++    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55750e07/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 7a7eb63,b980cf1..227c79d
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@@ -18,10 -18,11 +18,13 @@@
  package org.apache.cassandra.io.util;
  
  import java.io.*;
 +import java.nio.ByteBuffer;
  import java.nio.channels.ClosedChannelException;
 +import java.nio.channels.WritableByteChannel;
  
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.io.FSReadError;
  import org.apache.cassandra.io.FSWriteError;
@@@ -36,17 -32,23 +39,19 @@@ import org.apache.cassandra.utils.CLibr
   * Adds buffering, mark, and fsyncing to OutputStream.  We always fsync on 
close; we may also
   * fsync incrementally if Config.trickle_fsync is enabled.
   */
 -public class SequentialWriter extends OutputStream
 +public class SequentialWriter extends OutputStream implements 
WritableByteChannel
  {
+     private static final Logger logger = 
LoggerFactory.getLogger(SequentialWriter.class);
+ 
      // isDirty - true if this.buffer contains any un-synced bytes
      protected boolean isDirty = false, syncNeeded = false;
  
      // absolute path to the given file
      private final String filePath;
  
 -    // so we can use the write(int) path w/o tons of new byte[] allocations
 -    private final byte[] singleByteBuffer = new byte[1];
 -
      protected byte[] buffer;
 -    private final boolean skipIOCache;
      private final int fd;
--    private final int directoryFD;
++    private int directoryFD;
      // directory should be synced only after first file sync, in other words, 
only once per file
      private boolean directorySynced = false;
  
@@@ -437,21 -387,47 +442,39 @@@
  
          buffer = null;
  
-         try
-         {
-             out.close();
-         }
-         catch (IOException e)
 -        if (skipIOCache && bytesSinceCacheFlush > 0)
 -            CLibrary.trySkipCache(fd, 0, 0);
 -
+         cleanup(true);
+     }
+ 
+     public void abort()
+     {
+         cleanup(false);
+     }
+ 
+     private void cleanup(boolean throwExceptions)
+     {
 -        FileUtils.closeQuietly(metadata);
 -
 -        try { CLibrary.tryCloseFD(directoryFD); }
 -        catch (Throwable t) { handle(t, throwExceptions); }
++        if (directoryFD >= 0)
 +        {
-             throw new FSWriteError(e, getPath());
++            try { CLibrary.tryCloseFD(directoryFD); }
++            catch (Throwable t) { handle(t, throwExceptions); }
++            directoryFD = -1;
 +        }
  
-         CLibrary.tryCloseFD(directoryFD);
++        // close is idempotent
+         try { out.close(); }
+         catch (Throwable t) { handle(t, throwExceptions); }
+     }
+ 
+     private void handle(Throwable t, boolean throwExceptions)
+     {
+         if (!throwExceptions)
+             logger.warn("Suppressing exception thrown while aborting writer", 
t);
+         else
+             throw new FSWriteError(t, getPath());
      }
  
 -    /**
 -     * Turn on digest computation on this writer.
 -     * This can only be called before any data is written to this write,
 -     * otherwise an IllegalStateException is thrown.
 -     */
 -    public void setDataIntegrityWriter(DataIntegrityMetadata.ChecksumWriter 
writer)
 +    // hack to make life easier for subclasses
 +    public void writeFullChecksum(Descriptor descriptor)
      {
 -        if (current != 0)
 -            throw new IllegalStateException();
 -        metadata = writer;
 -        metadata.writeChunkSize(buffer.length);
      }
  
      /**

Reply via email to