Author: jbellis Date: Fri Aug 26 21:31:35 2011 New Revision: 1162266 URL: http://svn.apache.org/viewvc?rev=1162266&view=rev Log: wip
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java?rev=1162266&r1=1162265&r2=1162266&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java Fri Aug 26 21:31:35 2011 @@ -23,16 +23,15 @@ import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.Iterables; -import org.apache.cassandra.io.sstable.SSTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.columniterator.IColumnIterator; import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator; import org.apache.cassandra.db.filter.NamesQueryFilter; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.marshal.CounterColumnType; +import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.CloseableIterator; @@ -42,26 +41,24 @@ public class CollationController { private static Logger logger = LoggerFactory.getLogger(CollationController.class); - private final DataTracker.View dataview; + private final ColumnFamilyStore cfs; private final ISortedColumns.Factory factory; private final QueryFilter filter; private final int gcBefore; - private final CFMetaData metadata; private int sstablesIterated = 0; - public CollationController(DataTracker.View dataview, ISortedColumns.Factory factory, QueryFilter filter, CFMetaData metadata, int gcBefore) + public CollationController(ColumnFamilyStore cfs, ISortedColumns.Factory factory, QueryFilter filter, int gcBefore) { - this.dataview = dataview; + this.cfs = cfs; this.factory = factory; this.filter = filter; this.gcBefore = gcBefore; - this.metadata = metadata; } public ColumnFamily getTopLevelColumns() { - return filter.filter instanceof NamesQueryFilter && metadata.getDefaultValidator() != CounterColumnType.instance + return filter.filter instanceof NamesQueryFilter && cfs.metadata.getDefaultValidator() != CounterColumnType.instance ? collectTimeOrderedData() : collectAllData(); } @@ -74,14 +71,18 @@ public class CollationController private ColumnFamily collectTimeOrderedData() { logger.debug("collectTimeOrderedData"); - List<IColumnIterator> iterators = new ArrayList<IColumnIterator>(); - final ColumnFamily container = ColumnFamily.create(metadata, factory, filter.filter.isReversed()); - List<SSTableReader> sstables = null; - try - { + + List<IColumnIterator> iterators; + ColumnFamily container; + while (true) + { + DataTracker.View dataview = cfs.getDataTracker().getView(); + iterators = new ArrayList<IColumnIterator>(); + container = ColumnFamily.create(cfs.metadata, factory, filter.filter.isReversed()); + List<SSTableReader> sstables; for (Memtable memtable : Iterables.concat(dataview.memtablesPendingFlush, Collections.singleton(dataview.memtable))) { - IColumnIterator iter = filter.getMemtableColumnIterator(memtable, metadata.comparator); + IColumnIterator iter = filter.getMemtableColumnIterator(memtable, cfs.metadata.comparator); if (iter != null) { iterators.add(iter); @@ -93,49 +94,58 @@ public class CollationController // avoid changing the filter columns of the original filter // (reduceNameFilter removes columns that are known to be irrelevant) - TreeSet<ByteBuffer> filterColumns = new TreeSet<ByteBuffer>(metadata.comparator); + TreeSet<ByteBuffer> filterColumns = new TreeSet<ByteBuffer>(cfs.metadata.comparator); filterColumns.addAll(((NamesQueryFilter) filter.filter).columns); QueryFilter reducedFilter = new QueryFilter(filter.key, filter.path, new NamesQueryFilter(filterColumns)); /* add the SSTables on disk */ sstables = dataview.intervalTree.search(new Interval(filter.key, filter.key)); Collections.sort(sstables, SSTable.maxTimestampComparator); - SSTableReader.acquireReferences(sstables); - // read sorted sstables - for (SSTableReader sstable : sstables) - { - long currentMaxTs = sstable.getMaxTimestamp(); - reduceNameFilter(reducedFilter, container, currentMaxTs); - if (((NamesQueryFilter) reducedFilter.filter).columns.isEmpty()) - break; - - IColumnIterator iter = reducedFilter.getSSTableColumnIterator(sstable); - iterators.add(iter); - if (iter.getColumnFamily() != null) + if (!SSTableReader.acquireReferences(sstables)) + continue; // retry w/ new view + + try + { + // read sorted sstables + for (SSTableReader sstable : sstables) { - container.delete(iter.getColumnFamily()); - sstablesIterated++; - while (iter.hasNext()) - container.addColumn(iter.next()); + long currentMaxTs = sstable.getMaxTimestamp(); + reduceNameFilter(reducedFilter, container, currentMaxTs); + if (((NamesQueryFilter) reducedFilter.filter).columns.isEmpty()) + break; + + IColumnIterator iter = reducedFilter.getSSTableColumnIterator(sstable); + iterators.add(iter); + if (iter.getColumnFamily() != null) + { + container.delete(iter.getColumnFamily()); + sstablesIterated++; + while (iter.hasNext()) + container.addColumn(iter.next()); + } } } - } - finally - { - SSTableReader.releaseReferences(sstables); - for (IColumnIterator iter : iterators) - FileUtils.closeQuietly(iter); + finally + { + SSTableReader.releaseReferences(sstables); + for (IColumnIterator iter : iterators) + FileUtils.closeQuietly(iter); + } + + break; // sstable reference acquisition was successful } + // we need to distinguish between "there is no data at all for this row" (BF will let us rebuild that efficiently) // and "there used to be data, but it's gone now" (we should cache the empty CF so we don't need to rebuild that slower) if (iterators.isEmpty()) return null; // do a final collate. toCollate is boilerplate required to provide a CloseableIterator + final ColumnFamily c2 = container; CloseableIterator<IColumn> toCollate = new SimpleAbstractColumnIterator() { - final Iterator<IColumn> iter = container.iterator(); + final Iterator<IColumn> iter = c2.iterator(); protected IColumn computeNext() { @@ -144,7 +154,7 @@ public class CollationController public ColumnFamily getColumnFamily() { - return container; + return c2; } public DecoratedKey getKey() @@ -153,7 +163,7 @@ public class CollationController } }; ColumnFamily returnCF = container.cloneMeShallow(); - filter.collateColumns(returnCF, Collections.singletonList(toCollate), metadata.comparator, gcBefore); + filter.collateColumns(returnCF, Collections.singletonList(toCollate), cfs.metadata.comparator, gcBefore); // Caller is responsible for final removeDeletedCF. This is important for cacheRow to work correctly: return returnCF; @@ -187,14 +197,15 @@ public class CollationController { logger.debug("collectAllData"); List<IColumnIterator> iterators = new ArrayList<IColumnIterator>(); - ColumnFamily returnCF = ColumnFamily.create(metadata, factory, filter.filter.isReversed()); - List<SSTableReader> sstables = null; + ColumnFamily returnCF = ColumnFamily.create(cfs.metadata, factory, filter.filter.isReversed()); + List<SSTableReader> sstables; - try + while (true) { + DataTracker.View dataview = cfs.getDataTracker().getView(); for (Memtable memtable : Iterables.concat(dataview.memtablesPendingFlush, Collections.singleton(dataview.memtable))) { - IColumnIterator iter = filter.getMemtableColumnIterator(memtable, metadata.comparator); + IColumnIterator iter = filter.getMemtableColumnIterator(memtable, cfs.metadata.comparator); if (iter != null) { returnCF.delete(iter.getColumnFamily()); @@ -204,23 +215,30 @@ public class CollationController /* add the SSTables on disk */ sstables = dataview.intervalTree.search(new Interval(filter.key, filter.key)); - SSTableReader.acquireReferences(sstables); - for (SSTableReader sstable : sstables) + if (!SSTableReader.acquireReferences(sstables)) + continue; // retry w/ new view + + try { - IColumnIterator iter = filter.getSSTableColumnIterator(sstable); - iterators.add(iter); - if (iter.getColumnFamily() != null) + for (SSTableReader sstable : sstables) { - returnCF.delete(iter.getColumnFamily()); - sstablesIterated++; + IColumnIterator iter = filter.getSSTableColumnIterator(sstable); + iterators.add(iter); + if (iter.getColumnFamily() != null) + { + returnCF.delete(iter.getColumnFamily()); + sstablesIterated++; + } } } - } - finally - { - SSTableReader.releaseReferences(sstables); - for (IColumnIterator iter : iterators) - FileUtils.closeQuietly(iter); + finally + { + SSTableReader.releaseReferences(sstables); + for (IColumnIterator iter : iterators) + FileUtils.closeQuietly(iter); + } + + break; // sstable reference acquisition was successful } // we need to distinguish between "there is no data at all for this row" (BF will let us rebuild that efficiently) @@ -228,7 +246,7 @@ public class CollationController if (iterators.isEmpty()) return null; - filter.collateColumns(returnCF, iterators, metadata.comparator, gcBefore); + filter.collateColumns(returnCF, iterators, cfs.metadata.comparator, gcBefore); // Caller is responsible for final removeDeletedCF. This is important for cacheRow to work correctly: return returnCF; Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1162266&r1=1162265&r2=1162266&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Aug 26 21:31:35 2011 @@ -888,21 +888,14 @@ public class ColumnFamilyStore implement */ public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<? extends SSTable> sstablesToIgnore) { - DataTracker.View currentView = markCurrentViewReferenced(); - try + // we don't need to acquire references here, since the bloom filter is safe to use even post-compaction + List<SSTableReader> filteredSSTables = data.getView().intervalTree.search(new Interval(key, key)); + for (SSTableReader sstable : filteredSSTables) { - List<SSTableReader> filteredSSTables = currentView.intervalTree.search(new Interval(key, key)); - for (SSTableReader sstable : filteredSSTables) - { - if (!sstablesToIgnore.contains(sstable) && sstable.getBloomFilter().isPresent(key.key)) - return true; - } - return false; - } - finally - { - SSTableReader.releaseReferences(currentView.sstables); + if (!sstablesToIgnore.contains(sstable) && sstable.getBloomFilter().isPresent(key.key)) + return true; } + return false; } /* @@ -1261,16 +1254,8 @@ public class ColumnFamilyStore implement while (true) { DataTracker.View currentView = data.getView(); - SSTableReader.acquireReferences(currentView.sstables); - if (currentView.sstables == data.getView().sstables) // reference equality is fine - { + if (SSTableReader.acquireReferences(currentView.sstables)) return currentView; - } - else - { - // the set of sstables has changed, let's release the acquired references and try again - SSTableReader.releaseReferences(currentView.sstables); - } } } @@ -1287,20 +1272,12 @@ public class ColumnFamilyStore implement private ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore, ISortedColumns.Factory factory) { - DataTracker.View currentView = markCurrentViewReferenced(); - try - { - CollationController controller = new CollationController(currentView, factory, filter, metadata, gcBefore); + CollationController controller = new CollationController(this, factory, filter, gcBefore); ColumnFamily columns = controller.getTopLevelColumns(); recentSSTablesPerRead.add(controller.getSstablesIterated()); sstablesPerRead.add(controller.getSstablesIterated()); return columns; } - finally - { - SSTableReader.releaseReferences(currentView.sstables); - } - } /** * Fetch a range of rows and columns from memtables/sstables. Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1162266&r1=1162265&r2=1162266&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Fri Aug 26 21:31:35 2011 @@ -298,16 +298,8 @@ public class DataTracker if (logger.isDebugEnabled()) logger.debug(String.format("removing %s from list of files tracked for %s.%s", sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName())); - // A reference must be acquire before any call to markCompacted, see SSTableReader for details - sstable.acquireReference(); - try - { - sstable.markCompacted(); - } - finally - { - sstable.releaseReference(); - } + sstable.markCompacted(); + sstable.releaseReference(); liveSize.addAndGet(-sstable.bytesOnDisk()); } } @@ -511,9 +503,9 @@ public class DataTracker // Obviously, dropping sstables whose max column timestamp happens to be equal to another's // is not acceptable for us. So, we use a List instead. public final List<SSTableReader> sstables; - public final IntervalTree intervalTree; + public final IntervalTree<SSTableReader> intervalTree; - View(Memtable memtable, Set<Memtable> pendingFlush, List<SSTableReader> sstables, Set<SSTableReader> compacting, IntervalTree intervalTree) + View(Memtable memtable, Set<Memtable> pendingFlush, List<SSTableReader> sstables, Set<SSTableReader> compacting, IntervalTree<SSTableReader> intervalTree) { this.memtable = memtable; this.memtablesPendingFlush = pendingFlush; Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1162266&r1=1162265&r2=1162266&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Fri Aug 26 21:31:35 2011 @@ -798,6 +798,8 @@ public class CompactionManager implement throw new AssertionError(e); } + // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced + // instead so they won't be cleaned up if they do get compacted during the validation Collection<SSTableReader> sstables = cfs.markCurrentSSTablesReferenced(); CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.request.range); CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1162266&r1=1162265&r2=1162266&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Aug 26 21:31:35 2011 @@ -81,7 +81,9 @@ public class SSTableReader extends SSTab private BloomFilterTracker bloomFilterTracker = new BloomFilterTracker(); - private final AtomicInteger references = new AtomicInteger(0); + private final AtomicInteger references = new AtomicInteger(1); + // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted, + // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone private final AtomicBoolean isCompacted = new AtomicBoolean(false); private final SSTableDeletingTask deletingTask; @@ -618,9 +620,16 @@ public class SSTableReader extends SSTab return dfile.length; } - public void acquireReference() + public boolean acquireReference() { - references.incrementAndGet(); + while (true) + { + int n = references.get(); + if (n <= 0) + return false; + if (references.compareAndSet(n, n + 1)) + return true; + } } public void releaseReference() @@ -831,13 +840,32 @@ public class SSTableReader extends SSTab : RandomAccessReader.open(new File(getFilename()), bufferSize, skipIOCache); } - public static void acquireReferences(Iterable<SSTableReader> sstables) + /** + * @param sstables + * @return true if all desired references were acquired. Otherwise, it will unreference any partial acquisition, and return false. + */ + public static boolean acquireReferences(Iterable<SSTableReader> sstables) { + SSTableReader failed = null; + for (SSTableReader sstable : sstables) + { + if (!sstable.acquireReference()) + { + failed = sstable; + break; + } + } + + if (failed == null) + return true; + for (SSTableReader sstable : sstables) { - if (sstable != null) - sstable.acquireReference(); + if (sstable == failed) + break; + sstable.releaseReference(); } + return false; } public static void releaseReferences(Iterable<SSTableReader> sstables) @@ -846,10 +874,9 @@ public class SSTableReader extends SSTab { try { - if (sstable != null) - sstable.releaseReference(); + sstable.releaseReference(); } - catch (Throwable ex) + catch (Exception ex) { logger.error("Failed releasing reference on " + sstable, ex); } Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1162266&r1=1162265&r2=1162266&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Fri Aug 26 21:31:35 2011 @@ -125,7 +125,6 @@ public class StreamInSession { if (files.isEmpty()) { - // wait for bloom filters and row indexes to finish building HashMap <ColumnFamilyStore, List<SSTableReader>> cfstores = new HashMap<ColumnFamilyStore, List<SSTableReader>>(); List<SSTableReader> referenced = new LinkedList<SSTableReader>(); try Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java?rev=1162266&r1=1162265&r2=1162266&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java Fri Aug 26 21:31:35 2011 @@ -234,7 +234,6 @@ public class SSTableUtils long start = System.currentTimeMillis(); while (appender.append(writer)) { /* pass */ } SSTableReader reader = writer.closeAndOpenReader(); - reader.acquireReference(); // mark all components for removal if (cleanup) for (Component component : reader.components) Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1162266&r1=1162265&r2=1162266&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Fri Aug 26 21:31:35 2011 @@ -296,7 +296,8 @@ public class StreamingTransferTest exten ranges.add(new Range(secondtolast.getKey().token, p.getMinimumToken())); // Acquiring references, transferSSTables needs it - SSTableReader.acquireReferences(ssTableReaders); + if (!SSTableReader.acquireReferences(ssTableReaders)) + throw new AssertionError(); StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, null); StreamOut.transferSSTables(session, ssTableReaders, ranges, OperationType.BOOTSTRAP);