Author: slebresne Date: Mon Apr 18 10:59:39 2011 New Revision: 1094484 URL: http://svn.apache.org/viewvc?rev=1094484&view=rev Log: Merge CASSANDRA-2420 from 0.8
Modified: cassandra/trunk/ (props changed) cassandra/trunk/contrib/ (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java cassandra/trunk/src/java/org/apache/cassandra/streaming/OperationType.java Propchange: cassandra/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Apr 18 10:59:39 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000 /cassandra/branches/cassandra-0.7:1026516-1091087,1091503,1091542,1091654 /cassandra/branches/cassandra-0.7.0:1053690-1055654 -/cassandra/branches/cassandra-0.8:1090935-1094085 +/cassandra/branches/cassandra-0.8:1090935-1094085,1094481 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3:774578-796573 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350 Propchange: cassandra/trunk/contrib/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Apr 18 10:59:39 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009 -/cassandra/branches/cassandra-0.7/contrib:1026516-1091087,1091503,1091542,1091654 +/cassandra/branches/cassandra-0.7/contrib:1026516-1091087,1091503,1091542,1091654,1094481 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654 -/cassandra/branches/cassandra-0.8/contrib:1090935-1094085 +/cassandra/branches/cassandra-0.8/contrib:1090935-1094085,1094481 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573 /incubator/cassandra/branches/cassandra-0.4/contrib:810145-810987,810994-834239,834349-834350 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Apr 18 10:59:39 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1091087,1091503,1091542,1091654 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1091087,1091503,1091542,1091654,1094481 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090935-1094085 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090935-1094085,1094481 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Apr 18 10:59:39 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1091087,1091503,1091542,1091654 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1091087,1091503,1091542,1091654,1094481 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090935-1094085 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090935-1094085,1094481 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Apr 18 10:59:39 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1091087,1091503,1091542,1091654 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1091087,1091503,1091542,1091654,1094481 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090935-1094085 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090935-1094085,1094481 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Apr 18 10:59:39 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1091087,1091503,1091542,1091654 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1091087,1091503,1091542,1091654,1094481 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090935-1094085 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090935-1094085,1094481 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Apr 18 10:59:39 2011 @@ -1,7 +1,7 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1091087,1091503,1091542,1091654 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1091087,1091503,1091542,1091654,1094481 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654 -/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090935-1094085 +/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090935-1094085,1094481 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350 Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1094484&r1=1094483&r2=1094484&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Apr 18 10:59:39 2011 @@ -735,6 +735,20 @@ public class ColumnFamilyStore implement submitFlush(binaryMemtable.get(), new CountDownLatch(1)); } + public void updateRowCache(DecoratedKey key, ColumnFamily columnFamily) + { + if (rowCache.isPutCopying()) + { + invalidateCachedRow(key); + } + else + { + ColumnFamily cachedRow = getRawCachedRow(key); + if (cachedRow != null) + cachedRow.addAll(columnFamily); + } + } + /** * Insert/Update the column family for this key. * Caller is responsible for acquiring Table.flusherLock! @@ -749,17 +763,8 @@ public class ColumnFamilyStore implement Memtable mt = getMemtableThreadSafe(); boolean flushRequested = mt.isThresholdViolated(); mt.put(key, columnFamily); - if (rowCache.isPutCopying()) - { - invalidateCachedRow(key); - } - else - { - ColumnFamily cachedRow = getRawCachedRow(key); - if (cachedRow != null) - cachedRow.addAll(columnFamily); - writeStats.addNano(System.nanoTime() - start); - } + updateRowCache(key, columnFamily); + writeStats.addNano(System.nanoTime() - start); if (DatabaseDescriptor.estimatesRealMemtableSize()) { Modified: cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=1094484&r1=1094483&r2=1094484&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java Mon Apr 18 10:59:39 2011 @@ -131,4 +131,15 @@ public class PrecompactedRow extends Abs { return compactedCf == null ? 0 : compactedCf.getColumnCount(); } + + /** + * @return the full column family represented by this compacted row. + * + * We do not provide this method for other AbstractCompactedRow, because this fits the whole row into + * memory and don't make sense for those other implementations. + */ + public ColumnFamily getFullColumnFamily() throws IOException + { + return compactedCf; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1094484&r1=1094483&r2=1094484&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Mon Apr 18 10:59:39 2011 @@ -285,9 +285,9 @@ public class SSTableWriter extends SSTab try { if (cfs.metadata.getDefaultValidator().isCommutative()) - indexer = new CommutativeRowIndexer(desc, cfs.metadata); + indexer = new CommutativeRowIndexer(desc, cfs, type); else - indexer = new RowIndexer(desc, cfs.metadata); + indexer = new RowIndexer(desc, cfs, type); } catch (IOException e) { @@ -320,20 +320,22 @@ public class SSTableWriter extends SSTab { protected final Descriptor desc; public final BufferedRandomAccessFile dfile; + private final OperationType type; protected IndexWriter iwriter; - protected CFMetaData metadata; + protected ColumnFamilyStore cfs; - RowIndexer(Descriptor desc, CFMetaData metadata) throws IOException + RowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException { - this(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), metadata); + this(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), cfs, type); } - protected RowIndexer(Descriptor desc, BufferedRandomAccessFile dfile, CFMetaData metadata) throws IOException + protected RowIndexer(Descriptor desc, BufferedRandomAccessFile dfile, ColumnFamilyStore cfs, OperationType type) throws IOException { this.desc = desc; this.dfile = dfile; - this.metadata = metadata; + this.type = type; + this.cfs = cfs; } long prepareIndexing() throws IOException @@ -377,6 +379,53 @@ public class SSTableWriter extends SSTab iwriter.close(); } + /* + * If the key is cached, we should: + * - For AES: run the newly received row by the cache + * - For other: invalidate the cache (even if very unlikely, a key could be in cache in theory if a neighbor was boostrapped and + * then removed quickly afterward (a key that we had lost but become responsible again could have stayed in cache). That key + * would be obsolete and so we must invalidate the cache). + */ + protected void updateCache(DecoratedKey key, long dataSize, AbstractCompactedRow row) throws IOException + { + ColumnFamily cached = cfs.getRawCachedRow(key); + if (cached != null) + { + switch (type) + { + case AES: + if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()) + { + // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable + // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning. + logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled."); + cfs.invalidateCachedRow(key); + } + else + { + ColumnFamily cf; + if (row == null) + { + // If not provided, read from disk. + cf = ColumnFamily.create(cfs.metadata); + ColumnFamily.serializer().deserializeColumns(dfile, cf, true, true); + } + else + { + assert row instanceof PrecompactedRow; + // we do not purge so we should not get a null here + cf = ((PrecompactedRow)row).getFullColumnFamily(); + } + cfs.updateRowCache(key, cf); + } + break; + default: + cfs.invalidateCachedRow(key); + break; + } + } + } + protected long doIndexing() throws IOException { EstimatedHistogram rowSizes = SSTable.defaultRowHistogram(); @@ -393,10 +442,14 @@ public class SSTableWriter extends SSTab // seek to next key long dataSize = SSTableReader.readRowSize(dfile, desc); rowPosition = dfile.getFilePointer() + dataSize; - + IndexHelper.skipBloomFilter(dfile); IndexHelper.skipIndex(dfile); - ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(metadata), dfile); + ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(cfs.metadata), dfile); + + // don't move that statement around, it expects the dfile to be before the columns + updateCache(key, dataSize, null); + rowSizes.add(dataSize); columnCounts.add(dfile.readInt()); @@ -424,9 +477,9 @@ public class SSTableWriter extends SSTab { protected BufferedRandomAccessFile writerDfile; - CommutativeRowIndexer(Descriptor desc, CFMetaData metadata) throws IOException + CommutativeRowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException { - super(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), metadata); + super(desc, new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), cfs, type); writerDfile = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), "rw", 8 * 1024 * 1024, true); } @@ -448,7 +501,7 @@ public class SSTableWriter extends SSTab // skip data size, bloom filter, column index long dataSize = SSTableReader.readRowSize(dfile, desc); - SSTableIdentityIterator iter = new SSTableIdentityIterator(metadata, dfile, key, dfile.getFilePointer(), dataSize, true); + SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, dfile, key, dfile.getFilePointer(), dataSize, true); AbstractCompactedRow row; if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()) @@ -461,6 +514,8 @@ public class SSTableWriter extends SSTab row = new PrecompactedRow(controller, Collections.singletonList(iter)); } + updateCache(key, dataSize, row); + rowSizes.add(dataSize); columnCounts.add(row.columnCount()); Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/OperationType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/OperationType.java?rev=1094484&r1=1094483&r2=1094484&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/OperationType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/OperationType.java Mon Apr 18 10:59:39 2011 @@ -23,8 +23,6 @@ package org.apache.cassandra.streaming; */ public enum OperationType { - // TODO: the only types of operation that are currently distinguised are AES and everything else. There is no - // sense in having the other types (yet). AES, BOOTSTRAP, UNBOOTSTRAP,