Author: slebresne Date: Mon Apr 18 10:46:26 2011 New Revision: 1094481 URL: http://svn.apache.org/viewvc?rev=1094481&view=rev Log: Update row cache post streaming patch by slebresne; reviewed by jbellis for CASSANDRA-2420
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1094481&r1=1094480&r2=1094481&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Apr 18 10:46:26 2011 @@ -735,6 +735,20 @@ public class ColumnFamilyStore implement submitFlush(binaryMemtable.get(), new CountDownLatch(1)); } + public void updateRowCache(DecoratedKey key, ColumnFamily columnFamily) + { + if (rowCache.isPutCopying()) + { + invalidateCachedRow(key); + } + else + { + ColumnFamily cachedRow = getRawCachedRow(key); + if (cachedRow != null) + cachedRow.addAll(columnFamily); + } + } + /** * Insert/Update the column family for this key. * Caller is responsible for acquiring Table.flusherLock! @@ -749,17 +763,8 @@ public class ColumnFamilyStore implement Memtable mt = getMemtableThreadSafe(); boolean flushRequested = mt.isThresholdViolated(); mt.put(key, columnFamily); - if (rowCache.isPutCopying()) - { - invalidateCachedRow(key); - } - else - { - ColumnFamily cachedRow = getRawCachedRow(key); - if (cachedRow != null) - cachedRow.addAll(columnFamily); - writeStats.addNano(System.nanoTime() - start); - } + updateRowCache(key, columnFamily); + writeStats.addNano(System.nanoTime() - start); if (DatabaseDescriptor.estimatesRealMemtableSize()) { Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=1094481&r1=1094480&r2=1094481&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/PrecompactedRow.java Mon Apr 18 10:46:26 2011 @@ -131,4 +131,15 @@ public class PrecompactedRow extends Abs { return compactedCf == null ? 0 : compactedCf.getColumnCount(); } + + /** + * @return the full column family represented by this compacted row. + * + * We do not provide this method for other AbstractCompactedRow, because this fits the whole row into + * memory and don't make sense for those other implementations. + */ + public ColumnFamily getFullColumnFamily() throws IOException + { + return compactedCf; + } } Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1094481&r1=1094480&r2=1094481&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Mon Apr 18 10:46:26 2011 @@ -285,9 +285,9 @@ public class SSTableWriter extends SSTab try { if (cfs.metadata.getDefaultValidator().isCommutative()) - indexer = new CommutativeRowIndexer(desc, cfs.metadata); + indexer = new CommutativeRowIndexer(desc, cfs, type); else - indexer = new RowIndexer(desc, cfs.metadata); + indexer = new RowIndexer(desc, cfs, type); } catch (IOException e) { @@ -320,20 +320,22 @@ public class SSTableWriter extends SSTab { protected final Descriptor desc; public final BufferedRandomAccessFile dfile; + private final OperationType type; protected IndexWriter iwriter; - protected CFMetaData metadata; + protected ColumnFamilyStore cfs; - RowIndexer(Descriptor desc, CFMetaData metadata) throws IOException + RowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException { - this(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), metadata); + this(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), cfs, type); } - protected RowIndexer(Descriptor desc, BufferedRandomAccessFile dfile, CFMetaData metadata) throws IOException + protected RowIndexer(Descriptor desc, BufferedRandomAccessFile dfile, ColumnFamilyStore cfs, OperationType type) throws IOException { this.desc = desc; this.dfile = dfile; - this.metadata = metadata; + this.type = type; + this.cfs = cfs; } long prepareIndexing() throws IOException @@ -377,6 +379,53 @@ public class SSTableWriter extends SSTab iwriter.close(); } + /* + * If the key is cached, we should: + * - For AES: run the newly received row by the cache + * - For other: invalidate the cache (even if very unlikely, a key could be in cache in theory if a neighbor was boostrapped and + * then removed quickly afterward (a key that we had lost but become responsible again could have stayed in cache). That key + * would be obsolete and so we must invalidate the cache). + */ + protected void updateCache(DecoratedKey key, long dataSize, AbstractCompactedRow row) throws IOException + { + ColumnFamily cached = cfs.getRawCachedRow(key); + if (cached != null) + { + switch (type) + { + case AES: + if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()) + { + // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable + // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning. + logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled."); + cfs.invalidateCachedRow(key); + } + else + { + ColumnFamily cf; + if (row == null) + { + // If not provided, read from disk. + cf = ColumnFamily.create(cfs.metadata); + ColumnFamily.serializer().deserializeColumns(dfile, cf, true, true); + } + else + { + assert row instanceof PrecompactedRow; + // we do not purge so we should not get a null here + cf = ((PrecompactedRow)row).getFullColumnFamily(); + } + cfs.updateRowCache(key, cf); + } + break; + default: + cfs.invalidateCachedRow(key); + break; + } + } + } + protected long doIndexing() throws IOException { EstimatedHistogram rowSizes = SSTable.defaultRowHistogram(); @@ -393,10 +442,14 @@ public class SSTableWriter extends SSTab // seek to next key long dataSize = SSTableReader.readRowSize(dfile, desc); rowPosition = dfile.getFilePointer() + dataSize; - + IndexHelper.skipBloomFilter(dfile); IndexHelper.skipIndex(dfile); - ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(metadata), dfile); + ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(cfs.metadata), dfile); + + // don't move that statement around, it expects the dfile to be before the columns + updateCache(key, dataSize, null); + rowSizes.add(dataSize); columnCounts.add(dfile.readInt()); @@ -424,9 +477,9 @@ public class SSTableWriter extends SSTab { protected BufferedRandomAccessFile writerDfile; - CommutativeRowIndexer(Descriptor desc, CFMetaData metadata) throws IOException + CommutativeRowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException { - super(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), metadata); + super(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), cfs, type); writerDfile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "rw", 8 * 1024 * 1024, true); } @@ -448,7 +501,7 @@ public class SSTableWriter extends SSTab // skip data size, bloom filter, column index long dataSize = SSTableReader.readRowSize(dfile, desc); - SSTableIdentityIterator iter = new SSTableIdentityIterator(metadata, dfile, key, dfile.getFilePointer(), dataSize, true); + SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, dfile, key, dfile.getFilePointer(), dataSize, true); AbstractCompactedRow row; if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()) @@ -461,6 +514,8 @@ public class SSTableWriter extends SSTab row = new PrecompactedRow(controller, Collections.singletonList(iter)); } + updateCache(key, dataSize, row); + rowSizes.add(dataSize); columnCounts.add(row.columnCount()); Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java?rev=1094481&r1=1094480&r2=1094481&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java Mon Apr 18 10:46:26 2011 @@ -23,8 +23,6 @@ package org.apache.cassandra.streaming; */ public enum OperationType { - // TODO: the only types of operation that are currently distinguised are AES and everything else. There is no - // sense in having the other types (yet). AES, BOOTSTRAP, UNBOOTSTRAP,