Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: CHANGES.txt src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9b9bf45b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9b9bf45b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9b9bf45b Branch: refs/heads/cassandra-2.1 Commit: 9b9bf45b676916dc9c81a6328fafef6496d62fb9 Parents: 3ba392f 2170ac4 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Tue Sep 9 18:39:29 2014 -0700 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Sep 9 18:39:29 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/ArrayBackedSortedColumns.java | 5 ++ .../io/sstable/AbstractSSTableSimpleWriter.java | 11 ++- .../cassandra/io/sstable/CQLSSTableWriter.java | 81 ++++++++++++++++---- .../io/sstable/SSTableSimpleUnsortedWriter.java | 38 ++++++--- .../io/sstable/CQLSSTableWriterTest.java | 39 ++++++++++ 6 files changed, 143 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b9bf45b/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index d4a17de,3ee938a..60fd4c9 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,80 -1,9 +1,81 @@@ -2.0.11: +2.1.1 + * (cqlsh) tab-completion for triggers (CASSANDRA-7824) + * (cqlsh): Support for query paging (CASSANDRA-7514) + * (cqlsh): Show progress of COPY operations (CASSANDRA-7789) + * Add syntax to remove multiple elements from a map (CASSANDRA-6599) + * Support non-equals conditions in lightweight transactions (CASSANDRA-6839) + * Add IF [NOT] EXISTS to create/drop triggers (CASSANDRA-7606) + * (cqlsh) Display the current logged-in user (CASSANDRA-7785) + * (cqlsh) Don't ignore CTRL-C during COPY FROM execution (CASSANDRA-7815) + * (cqlsh) Order UDTs according to cross-type dependencies in DESCRIBE + output (CASSANDRA-7659) + * (cqlsh) Fix handling of CAS statement results (CASSANDRA-7671) + * (cqlsh) COPY TO/FROM improvements (CASSANDRA-7405) + * Support list index operations with conditions (CASSANDRA-7499) + * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731) + * Validate IPv6 wildcard addresses properly (CASSANDRA-7680) + * (cqlsh) Error when tracing query (CASSANDRA-7613) + * Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569) + * SSTableExport uses correct validator to create string representation of partition + keys (CASSANDRA-7498) + * Avoid NPEs when receiving type changes for an unknown keyspace (CASSANDRA-7689) + * Add support for custom 2i validation (CASSANDRA-7575) + * Pig support for hadoop CqlInputFormat (CASSANDRA-6454) + * Add listen_interface and rpc_interface options (CASSANDRA-7417) + * Improve schema merge performance (CASSANDRA-7444) + * Adjust MT depth based on # of partition validating (CASSANDRA-5263) + * Optimise NativeCell comparisons (CASSANDRA-6755) + * Configurable client timeout for cqlsh (CASSANDRA-7516) + * Include snippet of CQL query near syntax error in messages (CASSANDRA-7111) +Merged from 2.0: + * Make CQLSSTableWriter sync within partitions (CASSANDRA-7360) * Potentially use non-local replicas in CqlConfigHelper (CASSANDRA-7906) - * Explicitly disallowing mixing multi-column and single-column + * Explicitly disallow mixing multi-column and single-column relations on clustering columns (CASSANDRA-7711) * Better error message when condition is set on PK column (CASSANDRA-7804) + * Don't send schema change responses and events for no-op DDL + statements (CASSANDRA-7600) + * (Hadoop) fix cluster initialisation for a split fetching (CASSANDRA-7774) + * Throw InvalidRequestException when queries contain relations on entire + collection columns (CASSANDRA-7506) + * (cqlsh) enable CTRL-R history search with libedit (CASSANDRA-7577) + * (Hadoop) allow ACFRW to limit nodes to local DC (CASSANDRA-7252) + * (cqlsh) cqlsh should automatically disable tracing when selecting + from system_traces (CASSANDRA-7641) + * (Hadoop) Add CqlOutputFormat (CASSANDRA-6927) + * Don't depend on cassandra config for nodetool ring (CASSANDRA-7508) + * (cqlsh) Fix failing cqlsh formatting tests (CASSANDRA-7703) + * Fix IncompatibleClassChangeError from hadoop2 (CASSANDRA-7229) + * Add 'nodetool sethintedhandoffthrottlekb' (CASSANDRA-7635) + * (cqlsh) Add tab-completion for CREATE/DROP USER IF [NOT] EXISTS (CASSANDRA-7611) + * Catch errors when the JVM pulls the rug out from GCInspector (CASSANDRA-5345) + * cqlsh fails when version number parts are not int (CASSANDRA-7524) +Merged from 1.2: + * Don't index tombstones (CASSANDRA-7828) + * Improve PasswordAuthenticator default super user setup (CASSANDRA-7788) + + +2.1.0 + * (cqlsh) Removed "ALTER TYPE <name> RENAME TO <name>" from tab-completion + (CASSANDRA-7895) + * Fixed IllegalStateException in anticompaction (CASSANDRA-7892) + * cqlsh: DESCRIBE support for frozen UDTs, tuples (CASSANDRA-7863) + * Avoid exposing internal classes over JMX (CASSANDRA-7879) + * Add null check for keys when freezing collection (CASSANDRA-7869) + * Improve stress workload realism (CASSANDRA-7519) + +2.1.0-rc7 + * Add frozen keyword and require UDT to be frozen (CASSANDRA-7857) + * Track added sstable size correctly (CASSANDRA-7239) + * (cqlsh) Fix case insensitivity (CASSANDRA-7834) + * Fix failure to stream ranges when moving (CASSANDRA-7836) + * Correctly remove tmplink files (CASSANDRA-7803) + * (cqlsh) Fix column name formatting for functions, CAS operations, + and UDT field selections (CASSANDRA-7806) + * (cqlsh) Fix COPY FROM handling of null/empty primary key + values (CASSANDRA-7792) + * Fix ordering of static cells (CASSANDRA-7763) +Merged from 2.0: * Forbid re-adding dropped counter columns (CASSANDRA-7831) * Fix CFMetaData#isThriftCompatible() for PK-only tables (CASSANDRA-7832) * Always reject inequality on the partition key without token() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b9bf45b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java index c0fae24,389e0f8..b5ed8d2 --- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java +++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java @@@ -66,32 -54,14 +66,37 @@@ public class ArrayBackedSortedColumns e { super(metadata); this.reversed = reversed; - this.columns = new ArrayList<Column>(); + this.deletionInfo = DeletionInfo.live(); + this.cells = cells; + this.size = size; + this.sortedSize = sortedSize; + this.isSorted = size == sortedSize; } + - private ArrayBackedSortedColumns(Collection<Column> columns, CFMetaData metadata, boolean reversed) ++ protected ArrayBackedSortedColumns(CFMetaData metadata, boolean reversed) + { - super(metadata); - this.reversed = reversed; - this.columns = new ArrayList<Column>(columns); ++ this(metadata, reversed, EMPTY_ARRAY, 0, 0); ++ } + + private ArrayBackedSortedColumns(ArrayBackedSortedColumns original) + { + super(original.metadata); + this.reversed = original.reversed; + this.deletionInfo = DeletionInfo.live(); // this is INTENTIONALLY not set to original.deletionInfo. + this.cells = Arrays.copyOf(original.cells, original.size); + this.size = original.size; + this.sortedSize = original.sortedSize; + this.isSorted = original.isSorted; + } + + public static ArrayBackedSortedColumns localCopy(ColumnFamily original, AbstractAllocator allocator) + { + ArrayBackedSortedColumns copy = new ArrayBackedSortedColumns(original.metadata, false, new Cell[original.getColumnCount()], 0, 0); + for (Cell cell : original) + copy.internalAdd(cell.localCopy(original.metadata, allocator)); + copy.sortedSize = copy.size; // internalAdd doesn't update sortedSize. + copy.delete(original); + return copy; } public ColumnFamily.Factory getFactory() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b9bf45b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index ae8300c,2c6f82a..f8999bf --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@@ -112,7 -111,7 +112,7 @@@ public abstract class AbstractSSTableSi currentSuperColumn = name; } - private void addColumn(Cell cell) - protected void addColumn(Column column) throws IOException ++ protected void addColumn(Cell cell) throws IOException { if (columnFamily.metadata().isSuper()) { @@@ -130,9 -129,9 +130,9 @@@ * @param value the column value * @param timestamp the column timestamp */ - public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp) + public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp) throws IOException { - addColumn(new Column(name, value, timestamp)); + addColumn(new BufferCell(metadata.comparator.cellFromByteBuffer(name), value, timestamp)); } /** @@@ -145,9 -144,9 +145,9 @@@ * expiring the column, and as a consequence should be synchronized with the cassandra servers time. If {@code timestamp} represents * the insertion time in microseconds (which is not required), this should be {@code (timestamp / 1000) + (ttl * 1000)}. */ - public void addExpiringColumn(ByteBuffer name, ByteBuffer value, long timestamp, int ttl, long expirationTimestampMS) + public void addExpiringColumn(ByteBuffer name, ByteBuffer value, long timestamp, int ttl, long expirationTimestampMS) throws IOException { - addColumn(new ExpiringColumn(name, value, timestamp, ttl, (int)(expirationTimestampMS / 1000))); + addColumn(new BufferExpiringCell(metadata.comparator.cellFromByteBuffer(name), value, timestamp, ttl, (int)(expirationTimestampMS / 1000))); } /** @@@ -155,11 -154,11 +155,11 @@@ * @param name the column name * @param value the value of the counter */ - public void addCounterColumn(ByteBuffer name, long value) + public void addCounterColumn(ByteBuffer name, long value) throws IOException { - addColumn(new CounterColumn(name, - CounterContext.instance().createRemote(counterid, 1L, value, HeapAllocator.instance), - System.currentTimeMillis())); + addColumn(new BufferCounterCell(metadata.comparator.cellFromByteBuffer(name), + CounterContext.instance().createGlobal(counterid, 1L, value), + System.currentTimeMillis())); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b9bf45b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 427d2d4,49a1259..bf4da24 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@@ -32,7 -32,7 +32,8 @@@ import com.google.common.collect.Immuta import org.apache.cassandra.cql3.statements.*; import org.apache.cassandra.cql3.*; import org.apache.cassandra.config.*; + import org.apache.cassandra.db.*; +import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; @@@ -202,18 -203,27 +203,27 @@@ public class CQLSSTableWriter implement long now = System.currentTimeMillis() * 1000; UpdateParameters params = new UpdateParameters(insert.cfm, - values, - insert.getTimestamp(now, values), - insert.getTimeToLive(values), - Collections.<ByteBuffer, ColumnGroupMap>emptyMap()); + options, + insert.getTimestamp(now, options), + insert.getTimeToLive(options), + Collections.<ByteBuffer, CQL3Row>emptyMap()); - for (ByteBuffer key: keys) + try { - if (writer.currentKey() == null || !key.equals(writer.currentKey().getKey())) - writer.newRow(key); - insert.addUpdateForKey(writer.currentColumnFamily(), key, clusteringPrefix, params); - for (ByteBuffer key: keys) ++ for (ByteBuffer key : keys) + { - if (writer.currentKey() == null || !key.equals(writer.currentKey().key)) ++ if (writer.currentKey() == null || !key.equals(writer.currentKey().getKey())) + writer.newRow(key); + insert.addUpdateForKey(writer.currentColumnFamily(), key, clusteringPrefix, params); + } + return this; + } + catch (BufferedWriter.SyncException e) + { + // If we use a BufferedWriter and had a problem writing to disk, the IOException has been + // wrapped in a SyncException (see BufferedWriter below). We want to extract that IOE. + throw (IOException)e.getCause(); } - return this; } /** @@@ -471,21 -474,58 +481,58 @@@ if (insert == null) throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()"); - AbstractSSTableSimpleWriter writer; - if (sorted) + AbstractSSTableSimpleWriter writer = sorted + ? new SSTableSimpleWriter(directory, schema, partitioner) + : new BufferedWriter(directory, schema, partitioner, bufferSizeInMB); + return new CQLSSTableWriter(writer, insert, boundNames); + } + } + + /** + * CQLSSTableWriter doesn't use the method addColumn() from AbstractSSTableSimpleWriter. + * Instead, it adds cells directly to the ColumnFamily the latter exposes. But this means + * that the sync() method of SSTableSimpleUnsortedWriter is not called (at least not for + * each CQL row, so adding many rows to the same partition can buffer too much data in + * memory - #7360). So we create a slightly modified SSTableSimpleUnsortedWriter that uses + * a tweaked ColumnFamily object that calls back the proper method after each added cell + * so we sync when we should. + */ + private static class BufferedWriter extends SSTableSimpleUnsortedWriter + { + public BufferedWriter(File directory, CFMetaData metadata, IPartitioner partitioner, long bufferSizeInMB) + { + super(directory, metadata, partitioner, bufferSizeInMB); + } + + @Override + protected ColumnFamily createColumnFamily() + { - return new TreeMapBackedSortedColumns(metadata) ++ return new ArrayBackedSortedColumns(metadata, false) { - writer = new SSTableSimpleWriter(directory, - schema, - partitioner); - } - else + @Override - public void addColumn(Column column, Allocator allocator) ++ public void addColumn(Cell cell) + { - super.addColumn(column, allocator); ++ super.addColumn(cell); + try + { - countColumn(column); ++ countColumn(cell); + } + catch (IOException e) + { + // addColumn does not throw IOException but we want to report this to the user, + // so wrap it in a temporary RuntimeException that we'll catch in rawAddRow above. + throw new SyncException(e); + } + } + }; + } + + static class SyncException extends RuntimeException + { + SyncException(IOException ioe) { - writer = new SSTableSimpleUnsortedWriter(directory, - schema, - partitioner, - bufferSizeInMB); + super(ioe); } - return new CQLSSTableWriter(writer, insert, boundNames); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b9bf45b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index c871a35,39ec71d..ad3c451 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@@ -27,9 -27,12 +27,11 @@@ import java.util.concurrent.Synchronous import com.google.common.base.Throwables; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.Column; +import org.apache.cassandra.db.ArrayBackedSortedColumns; ++import org.apache.cassandra.db.Cell; import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.TreeMapBackedSortedColumns; + import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.compress.CompressionParameters; @@@ -99,8 -102,21 +101,21 @@@ public class SSTableSimpleUnsortedWrite protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException { - currentSize += key.getKey().remaining() + ColumnFamily.serializer.serializedSize(columnFamily, MessagingService.current_version) * 1.2; + // Nothing to do since we'll sync if needed in addColumn. + } + + @Override - protected void addColumn(Column column) throws IOException ++ protected void addColumn(Cell cell) throws IOException + { - super.addColumn(column); - countColumn(column); ++ super.addColumn(cell); ++ countColumn(cell); + } + - protected void countColumn(Column column) throws IOException ++ protected void countColumn(Cell cell) throws IOException + { - currentSize += column.serializedSize(TypeSizes.NATIVE); ++ currentSize += cell.serializedSize(metadata.comparator, TypeSizes.NATIVE); + // We don't want to sync in writeRow() only as this might blow up the bufferSize for wide rows. if (currentSize > bufferSize) sync(); } @@@ -111,18 -127,23 +126,23 @@@ // If the CF already exist in memory, we'll just continue adding to it if (previous == null) { - previous = ArrayBackedSortedColumns.factory.create(metadata); + previous = createColumnFamily(); buffer.put(currentKey, previous); - } - else - { - // We will reuse a CF that we have counted already. But because it will be easier to add the full size - // of the CF in the next writeRow call than to find out the delta, we just remove the size until that next call - currentSize -= currentKey.getKey().remaining() + ColumnFamily.serializer.serializedSize(previous, MessagingService.current_version) * 1.2; + + // Since this new CF will be written by the next sync(), count its header. And a CF header + // on disk is: + // - the row key: 2 bytes size + key size bytes + // - the row level deletion infos: 4 + 8 bytes - currentSize += 14 + currentKey.key.remaining(); ++ currentSize += 14 + currentKey.getKey().remaining(); } return previous; } + protected ColumnFamily createColumnFamily() throws IOException + { - return TreeMapBackedSortedColumns.factory.create(metadata); ++ return ArrayBackedSortedColumns.factory.create(metadata); + } + public void close() throws IOException { sync(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b9bf45b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java ----------------------------------------------------------------------