Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 c1d2b86a4 -> 2170ac4d3
Make CQLSSTableWriter sync within partitions patch by slebresne; reviewed by blerer for CASSANDRA-7360 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2170ac4d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2170ac4d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2170ac4d Branch: refs/heads/cassandra-2.0 Commit: 2170ac4d3922425f008f441d2d92e3233449c765 Parents: c1d2b86 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Tue Sep 9 18:18:12 2014 -0700 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Sep 9 18:18:12 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/TreeMapBackedSortedColumns.java | 2 +- .../io/sstable/AbstractSSTableSimpleWriter.java | 11 ++- .../cassandra/io/sstable/CQLSSTableWriter.java | 82 ++++++++++++++++---- .../io/sstable/SSTableSimpleUnsortedWriter.java | 38 ++++++--- .../io/sstable/CQLSSTableWriterTest.java | 39 ++++++++++ 6 files changed, 140 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2170ac4d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2c0cae6..3ee938a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.11: + * Make CQLSSTableWriter sync within partitions (CASSANDRA-7360) * Potentially use non-local replicas in CqlConfigHelper (CASSANDRA-7906) * Explicitly disallowing mixing multi-column and single-column relations on clustering columns (CASSANDRA-7711) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2170ac4d/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java index 466833b..ae6e798 100644 --- a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java +++ b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java @@ -49,7 +49,7 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn return (AbstractType<?>)map.comparator(); } - private TreeMapBackedSortedColumns(CFMetaData metadata) + protected TreeMapBackedSortedColumns(CFMetaData metadata) { super(metadata); this.map = new TreeMap<ByteBuffer, Column>(metadata.comparator); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2170ac4d/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index db87226..2c6f82a 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -111,7 +111,7 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable currentSuperColumn = name; } - private void addColumn(Column column) + protected void addColumn(Column column) throws IOException { if (columnFamily.metadata().isSuper()) { @@ -129,7 +129,7 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable * @param value the column value * @param timestamp the column timestamp */ - public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp) + public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp) throws IOException { addColumn(new Column(name, value, timestamp)); } @@ -144,7 +144,7 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable * expiring the column, and as a consequence should be synchronized with the cassandra servers time. If {@code timestamp} represents * the insertion time in microseconds (which is not required), this should be {@code (timestamp / 1000) + (ttl * 1000)}. */ - public void addExpiringColumn(ByteBuffer name, ByteBuffer value, long timestamp, int ttl, long expirationTimestampMS) + public void addExpiringColumn(ByteBuffer name, ByteBuffer value, long timestamp, int ttl, long expirationTimestampMS) throws IOException { addColumn(new ExpiringColumn(name, value, timestamp, ttl, (int)(expirationTimestampMS / 1000))); } @@ -154,7 +154,7 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable * @param name the column name * @param value the value of the counter */ - public void addCounterColumn(ByteBuffer name, long value) + public void addCounterColumn(ByteBuffer name, long value) throws IOException { addColumn(new CounterColumn(name, CounterContext.instance().createRemote(counterid, 1L, value, HeapAllocator.instance), @@ -179,8 +179,7 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable return currentKey; } - protected abstract void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException; - protected abstract ColumnFamily getColumnFamily(); + protected abstract ColumnFamily getColumnFamily() throws IOException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2170ac4d/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 61990ec..49a1259 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.cassandra.cql3.statements.*; import org.apache.cassandra.cql3.*; import org.apache.cassandra.config.*; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; @@ -40,6 +41,7 @@ import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.utils.Allocator; import org.apache.cassandra.utils.Pair; /** @@ -206,13 +208,22 @@ public class CQLSSTableWriter implements Closeable insert.getTimeToLive(values), Collections.<ByteBuffer, ColumnGroupMap>emptyMap()); - for (ByteBuffer key: keys) + try { - if (writer.currentKey() == null || !key.equals(writer.currentKey().key)) - writer.newRow(key); - insert.addUpdateForKey(writer.currentColumnFamily(), key, clusteringPrefix, params); + for (ByteBuffer key: keys) + { + if (writer.currentKey() == null || !key.equals(writer.currentKey().key)) + writer.newRow(key); + insert.addUpdateForKey(writer.currentColumnFamily(), key, clusteringPrefix, params); + } + return this; + } + catch (BufferedWriter.SyncException e) + { + // If we use a BufferedWriter and had a problem writing to disk, the IOException has been + // wrapped in a SyncException (see BufferedWriter below). We want to extract that IOE. + throw (IOException)e.getCause(); } - return this; } /** @@ -463,21 +474,58 @@ public class CQLSSTableWriter implements Closeable if (insert == null) throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()"); - AbstractSSTableSimpleWriter writer; - if (sorted) + AbstractSSTableSimpleWriter writer = sorted + ? new SSTableSimpleWriter(directory, schema, partitioner) + : new BufferedWriter(directory, schema, partitioner, bufferSizeInMB); + return new CQLSSTableWriter(writer, insert, boundNames); + } + } + + /** + * CQLSSTableWriter doesn't use the method addColumn() from AbstractSSTableSimpleWriter. + * Instead, it adds cells directly to the ColumnFamily the latter exposes. But this means + * that the sync() method of SSTableSimpleUnsortedWriter is not called (at least not for + * each CQL row, so adding many rows to the same partition can buffer too much data in + * memory - #7360). So we create a slightly modified SSTableSimpleUnsortedWriter that uses + * a tweaked ColumnFamily object that calls back the proper method after each added cell + * so we sync when we should. + */ + private static class BufferedWriter extends SSTableSimpleUnsortedWriter + { + public BufferedWriter(File directory, CFMetaData metadata, IPartitioner partitioner, long bufferSizeInMB) + { + super(directory, metadata, partitioner, bufferSizeInMB); + } + + @Override + protected ColumnFamily createColumnFamily() + { + return new TreeMapBackedSortedColumns(metadata) { - writer = new SSTableSimpleWriter(directory, - schema, - partitioner); - } - else + @Override + public void addColumn(Column column, Allocator allocator) + { + super.addColumn(column, allocator); + try + { + countColumn(column); + } + catch (IOException e) + { + // addColumn does not throw IOException but we want to report this to the user, + // so wrap it in a temporary RuntimeException that we'll catch in rawAddRow above. + throw new SyncException(e); + } + } + }; + } + + static class SyncException extends RuntimeException + { + SyncException(IOException ioe) { - writer = new SSTableSimpleUnsortedWriter(directory, - schema, - partitioner, - bufferSizeInMB); + super(ioe); } - return new CQLSSTableWriter(writer, insert, boundNames); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2170ac4d/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index 6b39024..39ec71d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -27,10 +27,12 @@ import java.util.concurrent.SynchronousQueue; import com.google.common.base.Throwables; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.Column; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.TreeMapBackedSortedColumns; +import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.compress.CompressionParameters; @@ -100,30 +102,48 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException { - currentSize += key.key.remaining() + ColumnFamily.serializer.serializedSize(columnFamily, MessagingService.current_version) * 1.2; + // Nothing to do since we'll sync if needed in addColumn. + } + + @Override + protected void addColumn(Column column) throws IOException + { + super.addColumn(column); + countColumn(column); + } + + protected void countColumn(Column column) throws IOException + { + currentSize += column.serializedSize(TypeSizes.NATIVE); + // We don't want to sync in writeRow() only as this might blow up the bufferSize for wide rows. if (currentSize > bufferSize) sync(); } - protected ColumnFamily getColumnFamily() + protected ColumnFamily getColumnFamily() throws IOException { ColumnFamily previous = buffer.get(currentKey); // If the CF already exist in memory, we'll just continue adding to it if (previous == null) { - previous = TreeMapBackedSortedColumns.factory.create(metadata); + previous = createColumnFamily(); buffer.put(currentKey, previous); - } - else - { - // We will reuse a CF that we have counted already. But because it will be easier to add the full size - // of the CF in the next writeRow call than to find out the delta, we just remove the size until that next call - currentSize -= currentKey.key.remaining() + ColumnFamily.serializer.serializedSize(previous, MessagingService.current_version) * 1.2; + + // Since this new CF will be written by the next sync(), count its header. And a CF header + // on disk is: + // - the row key: 2 bytes size + key size bytes + // - the row level deletion infos: 4 + 8 bytes + currentSize += 14 + currentKey.key.remaining(); } return previous; } + protected ColumnFamily createColumnFamily() throws IOException + { + return TreeMapBackedSortedColumns.factory.create(metadata); + } + public void close() throws IOException { sync(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2170ac4d/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java index bdc4b94..de814e1 100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@ -18,6 +18,9 @@ package org.apache.cassandra.io.sstable; import java.io.File; +import java.io.FilenameFilter; +import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Iterator; import com.google.common.collect.ImmutableMap; @@ -119,4 +122,40 @@ public class CQLSSTableWriterTest assertEquals(null, row.getBytes("v1")); // Using getBytes because we know it won't NPE assertEquals(12, row.getInt("v2")); } + + @Test + public void testSyncWithinPartition() throws Exception + { + // Check that the write respect the buffer size even if we only insert rows withing the same partition (#7360) + // To do that simply, we use a writer with a buffer of 1MB, and write 2 rows in the same partition with a value + // > 1MB and validate that this created more than 1 sstable. + File tempdir = Files.createTempDir(); + String schema = "CREATE TABLE ks.test (" + + " k int PRIMARY KEY," + + " v blob" + + ")"; + String insert = "INSERT INTO ks.test (k, v) VALUES (?, ?)"; + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(tempdir) + .forTable(schema) + .withPartitioner(StorageService.instance.getPartitioner()) + .using(insert) + .withBufferSizeInMB(1) + .build(); + + ByteBuffer val = ByteBuffer.allocate(1024 * 1050); + + writer.addRow(0, val); + writer.addRow(1, val); + writer.close(); + + FilenameFilter filterDataFiles = new FilenameFilter() + { + public boolean accept(File dir, String name) + { + return name.endsWith("-Data.db"); + } + }; + assert tempdir.list(filterDataFiles).length > 1 : Arrays.toString(tempdir.list(filterDataFiles)); + } }