Author: jbellis Date: Mon Aug 29 20:28:46 2011 New Revision: 1162988 URL: http://svn.apache.org/viewvc?rev=1162988&view=rev Log: fix race condition in sstable reference counting patch by jbellis; reviewed by slebresne for CASSANDRA-3085
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1162988&r1=1162987&r2=1162988&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Mon Aug 29 20:28:46 2011 @@ -44,7 +44,7 @@ Thrift<->Avro conversion methods (CASSANDRA-3032) * Add timeouts to client request schedulers (CASSANDRA-3079) * Cli to use hashes rather than array of hashes for strategy options (CASSANDRA-3081) - * LeveledCompactionStrategy (CASSANDRA-1608) + * LeveledCompactionStrategy (CASSANDRA-1608, 3085) * Improvements of the CLI `describe` command (CASSANDRA-2630) * reduce window where dropped CF sstables may not be deleted (CASSANDRA-2942) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java?rev=1162988&r1=1162987&r2=1162988&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java Mon Aug 29 20:28:46 2011 @@ -72,15 +72,12 @@ public class CollationController { logger.debug("collectTimeOrderedData"); - List<IColumnIterator> iterators; - ColumnFamily container; - while (true) - { - DataTracker.View dataview = cfs.getDataTracker().getView(); - iterators = new ArrayList<IColumnIterator>(); - container = ColumnFamily.create(cfs.metadata, factory, filter.filter.isReversed()); - List<SSTableReader> sstables; - for (Memtable memtable : Iterables.concat(dataview.memtablesPendingFlush, Collections.singleton(dataview.memtable))) + ColumnFamily container = ColumnFamily.create(cfs.metadata, factory, filter.filter.isReversed()); + List<IColumnIterator> iterators = new ArrayList<IColumnIterator>(); + ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key); + try + { + for (Memtable memtable : view.memtables) { IColumnIterator iter = filter.getMemtableColumnIterator(memtable, cfs.metadata.comparator); if (iter != null) @@ -99,42 +96,33 @@ public class CollationController QueryFilter reducedFilter = new QueryFilter(filter.key, filter.path, new NamesQueryFilter(filterColumns)); /* add the SSTables on disk */ - sstables = dataview.intervalTree.search(new Interval(filter.key, filter.key)); - Collections.sort(sstables, SSTable.maxTimestampComparator); - if (!SSTableReader.acquireReferences(sstables)) - continue; // retry w/ new view + Collections.sort(view.sstables, SSTable.maxTimestampComparator); - try + // read sorted sstables + for (SSTableReader sstable : view.sstables) { - // read sorted sstables - for (SSTableReader sstable : sstables) + long currentMaxTs = sstable.getMaxTimestamp(); + reduceNameFilter(reducedFilter, container, currentMaxTs); + if (((NamesQueryFilter) reducedFilter.filter).columns.isEmpty()) + break; + + IColumnIterator iter = reducedFilter.getSSTableColumnIterator(sstable); + iterators.add(iter); + if (iter.getColumnFamily() != null) { - long currentMaxTs = sstable.getMaxTimestamp(); - reduceNameFilter(reducedFilter, container, currentMaxTs); - if (((NamesQueryFilter) reducedFilter.filter).columns.isEmpty()) - break; - - IColumnIterator iter = reducedFilter.getSSTableColumnIterator(sstable); - iterators.add(iter); - if (iter.getColumnFamily() != null) - { - container.delete(iter.getColumnFamily()); - sstablesIterated++; - while (iter.hasNext()) - container.addColumn(iter.next()); - } + container.delete(iter.getColumnFamily()); + sstablesIterated++; + while (iter.hasNext()) + container.addColumn(iter.next()); } } - finally - { - SSTableReader.releaseReferences(sstables); - for (IColumnIterator iter : iterators) - FileUtils.closeQuietly(iter); - } - - break; // sstable reference acquisition was successful } - + finally + { + SSTableReader.releaseReferences(view.sstables); + for (IColumnIterator iter : iterators) + FileUtils.closeQuietly(iter); + } // we need to distinguish between "there is no data at all for this row" (BF will let us rebuild that efficiently) // and "there used to be data, but it's gone now" (we should cache the empty CF so we don't need to rebuild that slower) @@ -198,12 +186,11 @@ public class CollationController logger.debug("collectAllData"); List<IColumnIterator> iterators = new ArrayList<IColumnIterator>(); ColumnFamily returnCF = ColumnFamily.create(cfs.metadata, factory, filter.filter.isReversed()); - List<SSTableReader> sstables; - while (true) + ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key); + try { - DataTracker.View dataview = cfs.getDataTracker().getView(); - for (Memtable memtable : Iterables.concat(dataview.memtablesPendingFlush, Collections.singleton(dataview.memtable))) + for (Memtable memtable : view.memtables) { IColumnIterator iter = filter.getMemtableColumnIterator(memtable, cfs.metadata.comparator); if (iter != null) @@ -213,32 +200,22 @@ public class CollationController } } - /* add the SSTables on disk */ - sstables = dataview.intervalTree.search(new Interval(filter.key, filter.key)); - if (!SSTableReader.acquireReferences(sstables)) - continue; // retry w/ new view - - try + for (SSTableReader sstable : view.sstables) { - for (SSTableReader sstable : sstables) + IColumnIterator iter = filter.getSSTableColumnIterator(sstable); + iterators.add(iter); + if (iter.getColumnFamily() != null) { - IColumnIterator iter = filter.getSSTableColumnIterator(sstable); - iterators.add(iter); - if (iter.getColumnFamily() != null) - { - returnCF.delete(iter.getColumnFamily()); - sstablesIterated++; - } + returnCF.delete(iter.getColumnFamily()); + sstablesIterated++; } } - finally - { - SSTableReader.releaseReferences(sstables); - for (IColumnIterator iter : iterators) - FileUtils.closeQuietly(iter); - } - - break; // sstable reference acquisition was successful + } + finally + { + SSTableReader.releaseReferences(view.sstables); + for (IColumnIterator iter : iterators) + FileUtils.closeQuietly(iter); } // we need to distinguish between "there is no data at all for this row" (BF will let us rebuild that efficiently) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1162988&r1=1162987&r2=1162988&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Aug 29 20:28:46 2011 @@ -1270,6 +1270,48 @@ public class ColumnFamilyStore implement return markCurrentViewReferenced().sstables; } + /** + * @return a ViewFragment containing the sstables and memtables that may need to be merged + * for the given @param key, according to the interval tree + */ + public ViewFragment markReferenced(DecoratedKey key) + { + assert !key.isEmpty(); + DataTracker.View view; + List<SSTableReader> sstables; + while (true) + { + view = data.getView(); + sstables = view.intervalTree.search(new Interval(key, key)); + if (SSTableReader.acquireReferences(sstables)) + break; + // retry w/ new view + } + return new ViewFragment(sstables, Iterables.concat(Collections.singleton(view.memtable), view.memtablesPendingFlush)); + } + + /** + * @return a ViewFragment containing the sstables and memtables that may need to be merged + * for rows between @param startWith and @param stopAt, inclusive, according to the interval tree + */ + public ViewFragment markReferenced(DecoratedKey startWith, DecoratedKey stopAt) + { + DataTracker.View view; + List<SSTableReader> sstables; + while (true) + { + view = data.getView(); + // startAt == minimum is ok, but stopAt == minimum is confusing because all IntervalTree deals with + // is Comparable, so it won't know to special-case that. + Comparable stopInTree = stopAt.isEmpty() ? view.intervalTree.max : stopAt; + sstables = view.intervalTree.search(new Interval(startWith, stopInTree)); + if (SSTableReader.acquireReferences(sstables)) + break; + // retry w/ new view + } + return new ViewFragment(sstables, Iterables.concat(Collections.singleton(view.memtable), view.memtablesPendingFlush)); + } + private ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore, ISortedColumns.Factory factory) { CollationController controller = new CollationController(this, factory, filter, gcBefore); @@ -1301,25 +1343,12 @@ public class ColumnFamilyStore implement QueryFilter filter = new QueryFilter(null, new QueryPath(columnFamily, superColumn, null), columnFilter); int gcBefore = (int)(System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds(); - DataTracker.View currentView = markCurrentViewReferenced(); + List<Row> rows; + ViewFragment view = markReferenced(startWith, stopAt); try { - Collection<Memtable> memtables = new ArrayList<Memtable>(); - memtables.add(currentView.memtable); - memtables.addAll(currentView.memtablesPendingFlush); - // It is fine to aliases the View.sstables since it's an unmodifiable collection - Collection<SSTableReader> sstables = currentView.sstables; - - Comparable startWithComp = startWith; - Comparable stopAtComp = stopAt; - if (startWith.token.equals(partitioner.getMinimumToken())) - startWithComp = currentView.intervalTree.min; - if (stopAt.token.equals(partitioner.getMinimumToken())) - stopAtComp = currentView.intervalTree.max; - sstables = currentView.intervalTree.search(new Interval(startWithComp, stopAtComp)); - - CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter, getComparator(), this); - List<Row> rows = new ArrayList<Row>(); + CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, startWith, stopAt, filter, getComparator(), this); + rows = new ArrayList<Row>(); try { @@ -1361,13 +1390,14 @@ public class ColumnFamilyStore implement throw new IOError(e); } } - - return rows; } finally { - SSTableReader.releaseReferences(currentView.sstables); + // separate finally block to release references in case getIterator() throws + SSTableReader.releaseReferences(view.sstables); } + + return rows; } public List<Row> search(IndexClause clause, AbstractBounds range, IFilter dataFilter) @@ -1941,4 +1971,16 @@ public class ColumnFamilyStore implement ? ((LeveledCompactionStrategy) this.compactionStrategy).getLevelSize(0) : 0; } + + public static class ViewFragment + { + public final List<SSTableReader> sstables; + public final Iterable<Memtable> memtables; + + public ViewFragment(List<SSTableReader> sstables, Iterable<Memtable> memtables) + { + this.sstables = sstables; + this.memtables = memtables; + } + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1162988&r1=1162987&r2=1162988&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java Mon Aug 29 20:28:46 2011 @@ -61,14 +61,13 @@ public class RowIteratorFactory * @param comparator * @return A row iterator following all the given restrictions */ - public static CloseableIterator<Row> getIterator(final Collection<Memtable> memtables, + public static CloseableIterator<Row> getIterator(final Iterable<Memtable> memtables, final Collection<SSTableReader> sstables, final DecoratedKey startWith, final DecoratedKey stopAt, final QueryFilter filter, final AbstractType comparator, - final ColumnFamilyStore cfs - ) + final ColumnFamilyStore cfs) { // fetch data from current memtable, historical memtables, and SSTables in the correct order. final List<CloseableIterator<IColumnIterator>> iterators = new ArrayList<CloseableIterator<IColumnIterator>>();