Author: jbellis Date: Tue Jul 12 23:33:37 2011 New Revision: 1145818 URL: http://svn.apache.org/viewvc?rev=1145818&view=rev Log: optimize away seek when compacting wide rows patch by Pavel Yaskevich and jbellis for CASSANDRA-2879
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1145818&r1=1145817&r2=1145818&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Tue Jul 12 23:33:37 2011 @@ -11,6 +11,7 @@ * restrict repair streaming to specific columnfamilies (CASSANDRA-2280) * don't bother persisting columns shadowed by a row tombstone (CASSANDRA-2589) * reset CF and SC deletion times after gc_grace (CASSANDRA-2317) + * optimize away seek when compacting wide rows (CASSANDRA-2879) 0.8.2 Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1145818&r1=1145817&r2=1145818&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Tue Jul 12 23:33:37 2011 @@ -306,10 +306,15 @@ public class ColumnFamily extends Abstra public long serializedSize() { - int size = boolSize_ // bool - + intSize_ // id - + intSize_ // local deletion time - + longSize_ // client deltion time + return boolSize_ // nullness bool + + intSize_ // id + + serializedSizeForSSTable(); + } + + public long serializedSizeForSSTable() + { + int size = intSize_ // local deletion time + + longSize_ // client deletion time + intSize_; // column count for (IColumn column : columns.values()) size += column.serializedSize(); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1145818&r1=1145817&r2=1145818&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Tue Jul 12 23:33:37 2011 @@ -102,9 +102,9 @@ public class ColumnFamilySerializer impl dos.writeLong(columnFamily.getMarkedForDeleteAt()); } - public int serializeWithIndexes(ColumnFamily columnFamily, DataOutput dos) + public int serializeWithIndexes(ColumnFamily columnFamily, ColumnIndexer.RowHeader index, DataOutput dos) { - ColumnIndexer.serialize(columnFamily, dos); + ColumnIndexer.serialize(index, dos); return serializeForSSTable(columnFamily, dos); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=1145818&r1=1145817&r2=1145818&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Tue Jul 12 23:33:37 2011 @@ -22,6 +22,7 @@ import java.io.DataOutput; import java.io.IOError; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.cassandra.config.DatabaseDescriptor; @@ -39,15 +40,15 @@ public class ColumnIndexer /** * Given a column family this, function creates an in-memory structure that represents the * column index for the column family, and subsequently writes it to disk. + * * @param columns Column family to create index for * @param dos data output stream - * @throws IOException */ public static void serialize(IIterableColumns columns, DataOutput dos) { try { - serializeInternal(columns, dos); + writeIndex(serialize(columns), dos); } catch (IOException e) { @@ -55,24 +56,41 @@ public class ColumnIndexer } } - public static void serializeInternal(IIterableColumns columns, DataOutput dos) throws IOException + public static void serialize(RowHeader indexInfo, DataOutput dos) + { + try + { + writeIndex(indexInfo, dos); + } + catch (IOException e) + { + throw new IOError(e); + } + } + + /** + * Serializes the index into in-memory structure with all required components + * such as Bloom Filter, index block size, IndexInfo list + * + * @param columns Column family to create index for + * + * @return information about index - it's Bloom Filter, block size and IndexInfo list + */ + public static RowHeader serialize(IIterableColumns columns) { int columnCount = columns.getEstimatedColumnCount(); BloomFilter bf = BloomFilter.getFilter(columnCount, 4); if (columnCount == 0) - { - writeEmptyHeader(dos, bf); - return; - } + return new RowHeader(bf, Collections.<IndexHelper.IndexInfo>emptyList()); // update bloom filter and create a list of IndexInfo objects marking the first and last column // in each block of ColumnIndexSize List<IndexHelper.IndexInfo> indexList = new ArrayList<IndexHelper.IndexInfo>(); int endPosition = 0, startPosition = -1; - int indexSizeInBytes = 0; IColumn lastColumn = null, firstColumn = null; + for (IColumn column : columns) { bf.add(column.name()); @@ -82,13 +100,14 @@ public class ColumnIndexer firstColumn = column; startPosition = endPosition; } + endPosition += column.serializedSize(); - /* if we hit the column index size that we have to index after, go ahead and index it. */ + + // if we hit the column index size that we have to index after, go ahead and index it. if (endPosition - startPosition >= DatabaseDescriptor.getColumnIndexSize()) { IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), column.name(), startPosition, endPosition - startPosition); indexList.add(cIndexInfo); - indexSizeInBytes += cIndexInfo.serializedSize(); firstColumn = null; } @@ -97,45 +116,43 @@ public class ColumnIndexer // all columns were GC'd after all if (lastColumn == null) - { - writeEmptyHeader(dos, bf); - return; - } + return new RowHeader(bf, Collections.<IndexHelper.IndexInfo>emptyList()); // the last column may have fallen on an index boundary already. if not, index it explicitly. if (indexList.isEmpty() || columns.getComparator().compare(indexList.get(indexList.size() - 1).lastName, lastColumn.name()) != 0) { IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), lastColumn.name(), startPosition, endPosition - startPosition); indexList.add(cIndexInfo); - indexSizeInBytes += cIndexInfo.serializedSize(); } + // we should always have at least one computed index block, but we only write it out if there is more than that. + assert indexList.size() > 0; + return new RowHeader(bf, indexList); + } + + private static void writeIndex(RowHeader indexInfo, DataOutput dos) throws IOException + { + assert indexInfo != null; + /* Write out the bloom filter. */ - writeBloomFilter(dos, bf); + writeBloomFilter(dos, indexInfo.bloomFilter); - // write the index. we should always have at least one computed index block, but we only write it out if there is more than that. - assert indexSizeInBytes > 0; - if (indexList.size() > 1) + dos.writeInt(indexInfo.entriesSize); + if (indexInfo.indexEntries.size() > 1) { - dos.writeInt(indexSizeInBytes); - for (IndexHelper.IndexInfo cIndexInfo : indexList) - { + for (IndexHelper.IndexInfo cIndexInfo : indexInfo.indexEntries) cIndexInfo.serialize(dos); - } } - else - { - dos.writeInt(0); - } - } - - private static void writeEmptyHeader(DataOutput dos, BloomFilter bf) - throws IOException - { - writeBloomFilter(dos, bf); - dos.writeInt(0); } + /** + * Write a Bloom filter into file + * + * @param dos file to serialize Bloom Filter + * @param bf Bloom Filter + * + * @throws IOException on any I/O error. + */ private static void writeBloomFilter(DataOutput dos, BloomFilter bf) throws IOException { DataOutputBuffer bufOut = new DataOutputBuffer(); @@ -145,4 +162,36 @@ public class ColumnIndexer bufOut.flush(); } + /** + * Holds information about serialized index and bloom filter + */ + public static class RowHeader + { + public final BloomFilter bloomFilter; + public final List<IndexHelper.IndexInfo> indexEntries; + public final int entriesSize; + + public RowHeader(BloomFilter bf, List<IndexHelper.IndexInfo> indexes) + { + assert bf != null; + assert indexes != null; + bloomFilter = bf; + indexEntries = indexes; + int entriesSize = 0; + if (indexEntries.size() > 1) + { + for (IndexHelper.IndexInfo info : indexEntries) + entriesSize += info.serializedSize(); + } + this.entriesSize = entriesSize; + } + + public long serializedSize() + { + return DBConstants.intSize_ // length of Bloom Filter + + bloomFilter.serializedSize() // BF data + + DBConstants.intSize_ // length of index block + + entriesSize; // index block + } + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1145818&r1=1145817&r2=1145818&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Tue Jul 12 23:33:37 2011 @@ -27,19 +27,15 @@ import java.util.HashSet; import java.util.Set; import com.google.common.collect.Sets; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.*; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Table; import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.io.*; -import org.apache.cassandra.io.sstable.SSTableMetadata; import org.apache.cassandra.io.util.BufferedRandomAccessFile; import org.apache.cassandra.io.util.FileMark; import org.apache.cassandra.io.util.FileUtils; @@ -148,23 +144,20 @@ public class SSTableWriter extends SSTab { long startPosition = beforeAppend(decoratedKey); ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile); - // write placeholder for the row size, since we don't know it yet - long sizePosition = dataFile.getFilePointer(); - dataFile.writeLong(-1); - // write out row data - int columnCount = ColumnFamily.serializer().serializeWithIndexes(cf, dataFile); - // seek back and write the row size (not including the size Long itself) - long endPosition = dataFile.getFilePointer(); - dataFile.seek(sizePosition); - long dataSize = endPosition - (sizePosition + 8); - assert dataSize > 0; - dataFile.writeLong(dataSize); - // finally, reset for next row - dataFile.seek(endPosition); + + // serialize index and bloom filter into in-memory structure + ColumnIndexer.RowHeader header = ColumnIndexer.serialize(cf); + + // write out row size + dataFile.writeLong(header.serializedSize() + cf.serializedSizeForSSTable()); + + // write out row header and data + int columnCount = ColumnFamily.serializer().serializeWithIndexes(cf, header, dataFile); afterAppend(decoratedKey, startPosition); + // track max column timestamp sstableMetadataCollector.updateMaxTimestamp(cf.maxTimestamp()); - sstableMetadataCollector.addRowSize(endPosition - startPosition); + sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - startPosition); sstableMetadataCollector.addColumnCount(columnCount); } Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java?rev=1145818&r1=1145817&r2=1145818&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java Tue Jul 12 23:33:37 2011 @@ -138,4 +138,9 @@ public class BloomFilter extends Filter { bitset.clear(0, bitset.size()); } + + public int serializedSize() + { + return BloomFilterSerializer.serializedSize(this); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java?rev=1145818&r1=1145817&r2=1145818&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java Tue Jul 12 23:33:37 2011 @@ -25,6 +25,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.cassandra.db.DBConstants; import org.apache.cassandra.io.ICompactSerializer2; import org.apache.cassandra.utils.obs.OpenBitSet; @@ -52,6 +53,19 @@ class BloomFilterSerializer implements I OpenBitSet bs = new OpenBitSet(bits, bitLength); return new BloomFilter(hashes, bs); } -} - + /** + * Calculates a serialized size of the given Bloom Filter + * @see this.serialize(BloomFilter, DataOutput) + * + * @param bf Bloom filter to calculate serialized size + * + * @return serialized size of the given bloom filter + */ + public static int serializedSize(BloomFilter bf) + { + return DBConstants.intSize_ // hash count + + DBConstants.intSize_ // length + + bf.bitset.getBits().length * DBConstants.longSize_; // buckets + } +}