Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
        CHANGES.txt
        src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java
        
src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
        src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
        
src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9b9bf45b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9b9bf45b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9b9bf45b

Branch: refs/heads/trunk
Commit: 9b9bf45b676916dc9c81a6328fafef6496d62fb9
Parents: 3ba392f 2170ac4
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Tue Sep 9 18:39:29 2014 -0700
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Tue Sep 9 18:39:29 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/ArrayBackedSortedColumns.java  |  5 ++
 .../io/sstable/AbstractSSTableSimpleWriter.java | 11 ++-
 .../cassandra/io/sstable/CQLSSTableWriter.java  | 81 ++++++++++++++++----
 .../io/sstable/SSTableSimpleUnsortedWriter.java | 38 ++++++---
 .../io/sstable/CQLSSTableWriterTest.java        | 39 ++++++++++
 6 files changed, 143 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b9bf45b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d4a17de,3ee938a..60fd4c9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,80 -1,9 +1,81 @@@
 -2.0.11:
 +2.1.1
 + * (cqlsh) tab-completion for triggers (CASSANDRA-7824)
 + * (cqlsh): Support for query paging (CASSANDRA-7514)
 + * (cqlsh): Show progress of COPY operations (CASSANDRA-7789)
 + * Add syntax to remove multiple elements from a map (CASSANDRA-6599)
 + * Support non-equals conditions in lightweight transactions (CASSANDRA-6839)
 + * Add IF [NOT] EXISTS to create/drop triggers (CASSANDRA-7606)
 + * (cqlsh) Display the current logged-in user (CASSANDRA-7785)
 + * (cqlsh) Don't ignore CTRL-C during COPY FROM execution (CASSANDRA-7815)
 + * (cqlsh) Order UDTs according to cross-type dependencies in DESCRIBE
 +   output (CASSANDRA-7659)
 + * (cqlsh) Fix handling of CAS statement results (CASSANDRA-7671)
 + * (cqlsh) COPY TO/FROM improvements (CASSANDRA-7405)
 + * Support list index operations with conditions (CASSANDRA-7499)
 + * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731)
 + * Validate IPv6 wildcard addresses properly (CASSANDRA-7680)
 + * (cqlsh) Error when tracing query (CASSANDRA-7613)
 + * Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569)
 + * SSTableExport uses correct validator to create string representation of 
partition
 +   keys (CASSANDRA-7498)
 + * Avoid NPEs when receiving type changes for an unknown keyspace 
(CASSANDRA-7689)
 + * Add support for custom 2i validation (CASSANDRA-7575)
 + * Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
 + * Add listen_interface and rpc_interface options (CASSANDRA-7417)
 + * Improve schema merge performance (CASSANDRA-7444)
 + * Adjust MT depth based on # of partition validating (CASSANDRA-5263)
 + * Optimise NativeCell comparisons (CASSANDRA-6755)
 + * Configurable client timeout for cqlsh (CASSANDRA-7516)
 + * Include snippet of CQL query near syntax error in messages (CASSANDRA-7111)
 +Merged from 2.0:
+  * Make CQLSSTableWriter sync within partitions (CASSANDRA-7360)
   * Potentially use non-local replicas in CqlConfigHelper (CASSANDRA-7906)
 - * Explicitly disallowing mixing multi-column and single-column
 + * Explicitly disallow mixing multi-column and single-column
     relations on clustering columns (CASSANDRA-7711)
   * Better error message when condition is set on PK column (CASSANDRA-7804)
 + * Don't send schema change responses and events for no-op DDL
 +   statements (CASSANDRA-7600)
 + * (Hadoop) fix cluster initialisation for a split fetching (CASSANDRA-7774)
 + * Throw InvalidRequestException when queries contain relations on entire
 +   collection columns (CASSANDRA-7506)
 + * (cqlsh) enable CTRL-R history search with libedit (CASSANDRA-7577)
 + * (Hadoop) allow ACFRW to limit nodes to local DC (CASSANDRA-7252)
 + * (cqlsh) cqlsh should automatically disable tracing when selecting
 +   from system_traces (CASSANDRA-7641)
 + * (Hadoop) Add CqlOutputFormat (CASSANDRA-6927)
 + * Don't depend on cassandra config for nodetool ring (CASSANDRA-7508)
 + * (cqlsh) Fix failing cqlsh formatting tests (CASSANDRA-7703)
 + * Fix IncompatibleClassChangeError from hadoop2 (CASSANDRA-7229)
 + * Add 'nodetool sethintedhandoffthrottlekb' (CASSANDRA-7635)
 + * (cqlsh) Add tab-completion for CREATE/DROP USER IF [NOT] EXISTS 
(CASSANDRA-7611)
 + * Catch errors when the JVM pulls the rug out from GCInspector 
(CASSANDRA-5345)
 + * cqlsh fails when version number parts are not int (CASSANDRA-7524)
 +Merged from 1.2:
 + * Don't index tombstones (CASSANDRA-7828)
 + * Improve PasswordAuthenticator default super user setup (CASSANDRA-7788)
 +
 +
 +2.1.0
 + * (cqlsh) Removed "ALTER TYPE <name> RENAME TO <name>" from tab-completion
 +   (CASSANDRA-7895)
 + * Fixed IllegalStateException in anticompaction (CASSANDRA-7892)
 + * cqlsh: DESCRIBE support for frozen UDTs, tuples (CASSANDRA-7863)
 + * Avoid exposing internal classes over JMX (CASSANDRA-7879)
 + * Add null check for keys when freezing collection (CASSANDRA-7869)
 + * Improve stress workload realism (CASSANDRA-7519)
 +
 +2.1.0-rc7
 + * Add frozen keyword and require UDT to be frozen (CASSANDRA-7857)
 + * Track added sstable size correctly (CASSANDRA-7239)
 + * (cqlsh) Fix case insensitivity (CASSANDRA-7834)
 + * Fix failure to stream ranges when moving (CASSANDRA-7836)
 + * Correctly remove tmplink files (CASSANDRA-7803)
 + * (cqlsh) Fix column name formatting for functions, CAS operations,
 +   and UDT field selections (CASSANDRA-7806)
 + * (cqlsh) Fix COPY FROM handling of null/empty primary key
 +   values (CASSANDRA-7792)
 + * Fix ordering of static cells (CASSANDRA-7763)
 +Merged from 2.0:
   * Forbid re-adding dropped counter columns (CASSANDRA-7831)
   * Fix CFMetaData#isThriftCompatible() for PK-only tables (CASSANDRA-7832)
   * Always reject inequality on the partition key without token()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b9bf45b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index c0fae24,389e0f8..b5ed8d2
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@@ -66,32 -54,14 +66,37 @@@ public class ArrayBackedSortedColumns e
      {
          super(metadata);
          this.reversed = reversed;
 -        this.columns = new ArrayList<Column>();
 +        this.deletionInfo = DeletionInfo.live();
 +        this.cells = cells;
 +        this.size = size;
 +        this.sortedSize = sortedSize;
 +        this.isSorted = size == sortedSize;
      }
+ 
 -    private ArrayBackedSortedColumns(Collection<Column> columns, CFMetaData 
metadata, boolean reversed)
++    protected ArrayBackedSortedColumns(CFMetaData metadata, boolean reversed)
+     {
 -        super(metadata);
 -        this.reversed = reversed;
 -        this.columns = new ArrayList<Column>(columns);
++        this(metadata, reversed, EMPTY_ARRAY, 0, 0);
++    }
 +
 +    private ArrayBackedSortedColumns(ArrayBackedSortedColumns original)
 +    {
 +        super(original.metadata);
 +        this.reversed = original.reversed;
 +        this.deletionInfo = DeletionInfo.live(); // this is INTENTIONALLY not 
set to original.deletionInfo.
 +        this.cells = Arrays.copyOf(original.cells, original.size);
 +        this.size = original.size;
 +        this.sortedSize = original.sortedSize;
 +        this.isSorted = original.isSorted;
 +    }
 +
 +    public static ArrayBackedSortedColumns localCopy(ColumnFamily original, 
AbstractAllocator allocator)
 +    {
 +        ArrayBackedSortedColumns copy = new 
ArrayBackedSortedColumns(original.metadata, false, new 
Cell[original.getColumnCount()], 0, 0);
 +        for (Cell cell : original)
 +            copy.internalAdd(cell.localCopy(original.metadata, allocator));
 +        copy.sortedSize = copy.size; // internalAdd doesn't update sortedSize.
 +        copy.delete(original);
 +        return copy;
      }
  
      public ColumnFamily.Factory getFactory()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b9bf45b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index ae8300c,2c6f82a..f8999bf
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@@ -112,7 -111,7 +112,7 @@@ public abstract class AbstractSSTableSi
          currentSuperColumn = name;
      }
  
-     private void addColumn(Cell cell)
 -    protected void addColumn(Column column) throws IOException
++    protected void addColumn(Cell cell) throws IOException
      {
          if (columnFamily.metadata().isSuper())
          {
@@@ -130,9 -129,9 +130,9 @@@
       * @param value the column value
       * @param timestamp the column timestamp
       */
-     public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+     public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp) 
throws IOException
      {
 -        addColumn(new Column(name, value, timestamp));
 +        addColumn(new 
BufferCell(metadata.comparator.cellFromByteBuffer(name), value, timestamp));
      }
  
      /**
@@@ -145,9 -144,9 +145,9 @@@
       * expiring the column, and as a consequence should be synchronized with 
the cassandra servers time. If {@code timestamp} represents
       * the insertion time in microseconds (which is not required), this 
should be {@code (timestamp / 1000) + (ttl * 1000)}.
       */
-     public void addExpiringColumn(ByteBuffer name, ByteBuffer value, long 
timestamp, int ttl, long expirationTimestampMS)
+     public void addExpiringColumn(ByteBuffer name, ByteBuffer value, long 
timestamp, int ttl, long expirationTimestampMS) throws IOException
      {
 -        addColumn(new ExpiringColumn(name, value, timestamp, ttl, 
(int)(expirationTimestampMS / 1000)));
 +        addColumn(new 
BufferExpiringCell(metadata.comparator.cellFromByteBuffer(name), value, 
timestamp, ttl, (int)(expirationTimestampMS / 1000)));
      }
  
      /**
@@@ -155,11 -154,11 +155,11 @@@
       * @param name the column name
       * @param value the value of the counter
       */
-     public void addCounterColumn(ByteBuffer name, long value)
+     public void addCounterColumn(ByteBuffer name, long value) throws 
IOException
      {
 -        addColumn(new CounterColumn(name,
 -                                    
CounterContext.instance().createRemote(counterid, 1L, value, 
HeapAllocator.instance),
 -                                    System.currentTimeMillis()));
 +        addColumn(new 
BufferCounterCell(metadata.comparator.cellFromByteBuffer(name),
 +                                        
CounterContext.instance().createGlobal(counterid, 1L, value),
 +                                        System.currentTimeMillis()));
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b9bf45b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 427d2d4,49a1259..bf4da24
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@@ -32,7 -32,7 +32,8 @@@ import com.google.common.collect.Immuta
  import org.apache.cassandra.cql3.statements.*;
  import org.apache.cassandra.cql3.*;
  import org.apache.cassandra.config.*;
+ import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.composites.Composite;
  import org.apache.cassandra.db.marshal.AbstractType;
  import org.apache.cassandra.dht.IPartitioner;
  import org.apache.cassandra.dht.Murmur3Partitioner;
@@@ -202,18 -203,27 +203,27 @@@ public class CQLSSTableWriter implement
  
          long now = System.currentTimeMillis() * 1000;
          UpdateParameters params = new UpdateParameters(insert.cfm,
 -                                                       values,
 -                                                       
insert.getTimestamp(now, values),
 -                                                       
insert.getTimeToLive(values),
 -                                                       
Collections.<ByteBuffer, ColumnGroupMap>emptyMap());
 +                                                       options,
 +                                                       
insert.getTimestamp(now, options),
 +                                                       
insert.getTimeToLive(options),
 +                                                       
Collections.<ByteBuffer, CQL3Row>emptyMap());
  
-         for (ByteBuffer key: keys)
+         try
          {
-             if (writer.currentKey() == null || 
!key.equals(writer.currentKey().getKey()))
-                 writer.newRow(key);
-             insert.addUpdateForKey(writer.currentColumnFamily(), key, 
clusteringPrefix, params);
 -            for (ByteBuffer key: keys)
++            for (ByteBuffer key : keys)
+             {
 -                if (writer.currentKey() == null || 
!key.equals(writer.currentKey().key))
++                if (writer.currentKey() == null || 
!key.equals(writer.currentKey().getKey()))
+                     writer.newRow(key);
+                 insert.addUpdateForKey(writer.currentColumnFamily(), key, 
clusteringPrefix, params);
+             }
+             return this;
+         }
+         catch (BufferedWriter.SyncException e)
+         {
+             // If we use a BufferedWriter and had a problem writing to disk, 
the IOException has been
+             // wrapped in a SyncException (see BufferedWriter below). We want 
to extract that IOE.
+             throw (IOException)e.getCause();
          }
-         return this;
      }
  
      /**
@@@ -471,21 -474,58 +481,58 @@@
              if (insert == null)
                  throw new IllegalStateException("No insert statement 
specified, you should provide an insert statement through using()");
  
-             AbstractSSTableSimpleWriter writer;
-             if (sorted)
+             AbstractSSTableSimpleWriter writer = sorted
+                                                ? new 
SSTableSimpleWriter(directory, schema, partitioner)
+                                                : new 
BufferedWriter(directory, schema, partitioner, bufferSizeInMB);
+             return new CQLSSTableWriter(writer, insert, boundNames);
+         }
+     }
+ 
+     /**
+      * CQLSSTableWriter doesn't use the method addColumn() from 
AbstractSSTableSimpleWriter.
+      * Instead, it adds cells directly to the ColumnFamily the latter 
exposes. But this means
+      * that the sync() method of SSTableSimpleUnsortedWriter is not called 
(at least not for
+      * each CQL row, so adding many rows to the same partition can buffer too 
much data in
+      * memory - #7360). So we create a slightly modified 
SSTableSimpleUnsortedWriter that uses
+      * a tweaked ColumnFamily object that calls back the proper method after 
each added cell
+      * so we sync when we should.
+      */
+     private static class BufferedWriter extends SSTableSimpleUnsortedWriter
+     {
+         public BufferedWriter(File directory, CFMetaData metadata, 
IPartitioner partitioner, long bufferSizeInMB)
+         {
+             super(directory, metadata, partitioner, bufferSizeInMB);
+         }
+ 
+         @Override
+         protected ColumnFamily createColumnFamily()
+         {
 -            return new TreeMapBackedSortedColumns(metadata)
++            return new ArrayBackedSortedColumns(metadata, false)
              {
-                 writer = new SSTableSimpleWriter(directory,
-                                                  schema,
-                                                  partitioner);
-             }
-             else
+                 @Override
 -                public void addColumn(Column column, Allocator allocator)
++                public void addColumn(Cell cell)
+                 {
 -                    super.addColumn(column, allocator);
++                    super.addColumn(cell);
+                     try
+                     {
 -                        countColumn(column);
++                        countColumn(cell);
+                     }
+                     catch (IOException e)
+                     {
+                         // addColumn does not throw IOException but we want 
to report this to the user,
+                         // so wrap it in a temporary RuntimeException that 
we'll catch in rawAddRow above.
+                         throw new SyncException(e);
+                     }
+                 }
+             };
+         }
+ 
+         static class SyncException extends RuntimeException
+         {
+             SyncException(IOException ioe)
              {
-                 writer = new SSTableSimpleUnsortedWriter(directory,
-                                                          schema,
-                                                          partitioner,
-                                                          bufferSizeInMB);
+                 super(ioe);
              }
-             return new CQLSSTableWriter(writer, insert, boundNames);
          }
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b9bf45b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index c871a35,39ec71d..ad3c451
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@@ -27,9 -27,12 +27,11 @@@ import java.util.concurrent.Synchronous
  import com.google.common.base.Throwables;
  
  import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.db.Column;
 +import org.apache.cassandra.db.ArrayBackedSortedColumns;
++import org.apache.cassandra.db.Cell;
  import org.apache.cassandra.db.ColumnFamily;
 -import org.apache.cassandra.db.ColumnFamilyType;
  import org.apache.cassandra.db.DecoratedKey;
 -import org.apache.cassandra.db.TreeMapBackedSortedColumns;
+ import org.apache.cassandra.db.TypeSizes;
  import org.apache.cassandra.db.marshal.AbstractType;
  import org.apache.cassandra.dht.IPartitioner;
  import org.apache.cassandra.io.compress.CompressionParameters;
@@@ -99,8 -102,21 +101,21 @@@ public class SSTableSimpleUnsortedWrite
  
      protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) 
throws IOException
      {
-         currentSize += key.getKey().remaining() + 
ColumnFamily.serializer.serializedSize(columnFamily, 
MessagingService.current_version) * 1.2;
+         // Nothing to do since we'll sync if needed in addColumn.
+     }
+ 
+     @Override
 -    protected void addColumn(Column column) throws IOException
++    protected void addColumn(Cell cell) throws IOException
+     {
 -        super.addColumn(column);
 -        countColumn(column);
++        super.addColumn(cell);
++        countColumn(cell);
+     }
+ 
 -    protected void countColumn(Column column) throws IOException
++    protected void countColumn(Cell cell) throws IOException
+     {
 -        currentSize += column.serializedSize(TypeSizes.NATIVE);
++        currentSize += cell.serializedSize(metadata.comparator, 
TypeSizes.NATIVE);
  
+         // We don't want to sync in writeRow() only as this might blow up the 
bufferSize for wide rows.
          if (currentSize > bufferSize)
              sync();
      }
@@@ -111,18 -127,23 +126,23 @@@
          // If the CF already exist in memory, we'll just continue adding to it
          if (previous == null)
          {
-             previous = ArrayBackedSortedColumns.factory.create(metadata);
+             previous = createColumnFamily();
              buffer.put(currentKey, previous);
-         }
-         else
-         {
-             // We will reuse a CF that we have counted already. But because 
it will be easier to add the full size
-             // of the CF in the next writeRow call than to find out the 
delta, we just remove the size until that next call
-             currentSize -= currentKey.getKey().remaining() + 
ColumnFamily.serializer.serializedSize(previous, 
MessagingService.current_version) * 1.2;
+ 
+             // Since this new CF will be written by the next sync(), count 
its header. And a CF header
+             // on disk is:
+             //   - the row key: 2 bytes size + key size bytes
+             //   - the row level deletion infos: 4 + 8 bytes
 -            currentSize += 14 + currentKey.key.remaining();
++            currentSize += 14 + currentKey.getKey().remaining();
          }
          return previous;
      }
  
+     protected ColumnFamily createColumnFamily() throws IOException
+     {
 -        return TreeMapBackedSortedColumns.factory.create(metadata);
++        return ArrayBackedSortedColumns.factory.create(metadata);
+     }
+ 
      public void close() throws IOException
      {
          sync();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b9bf45b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------

Reply via email to