Author: jbellis
Date: Fri Aug 26 21:31:35 2011
New Revision: 1162266

URL: http://svn.apache.org/viewvc?rev=1162266&view=rev
Log:
wip

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
    
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java 
Fri Aug 26 21:31:35 2011
@@ -23,16 +23,15 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Iterables;
-import org.apache.cassandra.io.sstable.SSTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.CloseableIterator;
@@ -42,26 +41,24 @@ public class CollationController
 {
     private static Logger logger = 
LoggerFactory.getLogger(CollationController.class);
 
-    private final DataTracker.View dataview;
+    private final ColumnFamilyStore cfs;
     private final ISortedColumns.Factory factory;
     private final QueryFilter filter;
     private final int gcBefore;
-    private final CFMetaData metadata;
 
     private int sstablesIterated = 0;
 
-    public CollationController(DataTracker.View dataview, 
ISortedColumns.Factory factory, QueryFilter filter, CFMetaData metadata, int 
gcBefore)
+    public CollationController(ColumnFamilyStore cfs, ISortedColumns.Factory 
factory, QueryFilter filter, int gcBefore)
     {
-        this.dataview = dataview;
+        this.cfs = cfs;
         this.factory = factory;
         this.filter = filter;
         this.gcBefore = gcBefore;
-        this.metadata = metadata;
     }
 
     public ColumnFamily getTopLevelColumns()
     {
-        return filter.filter instanceof NamesQueryFilter && 
metadata.getDefaultValidator() != CounterColumnType.instance
+        return filter.filter instanceof NamesQueryFilter && 
cfs.metadata.getDefaultValidator() != CounterColumnType.instance
                ? collectTimeOrderedData()
                : collectAllData();
     }
@@ -74,14 +71,18 @@ public class CollationController
     private ColumnFamily collectTimeOrderedData()
     {
         logger.debug("collectTimeOrderedData");
-        List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
-        final ColumnFamily container = ColumnFamily.create(metadata, factory, 
filter.filter.isReversed());
-        List<SSTableReader> sstables = null;
-        try
-        {
+
+        List<IColumnIterator> iterators;
+        ColumnFamily container;
+        while (true)
+        {
+            DataTracker.View dataview = cfs.getDataTracker().getView();
+            iterators = new ArrayList<IColumnIterator>();
+            container = ColumnFamily.create(cfs.metadata, factory, 
filter.filter.isReversed());
+            List<SSTableReader> sstables;
             for (Memtable memtable : 
Iterables.concat(dataview.memtablesPendingFlush, 
Collections.singleton(dataview.memtable)))
             {
-                IColumnIterator iter = 
filter.getMemtableColumnIterator(memtable, metadata.comparator);
+                IColumnIterator iter = 
filter.getMemtableColumnIterator(memtable, cfs.metadata.comparator);
                 if (iter != null)
                 {
                     iterators.add(iter);
@@ -93,49 +94,58 @@ public class CollationController
 
             // avoid changing the filter columns of the original filter
             // (reduceNameFilter removes columns that are known to be 
irrelevant)
-            TreeSet<ByteBuffer> filterColumns = new 
TreeSet<ByteBuffer>(metadata.comparator);
+            TreeSet<ByteBuffer> filterColumns = new 
TreeSet<ByteBuffer>(cfs.metadata.comparator);
             filterColumns.addAll(((NamesQueryFilter) filter.filter).columns);
             QueryFilter reducedFilter = new QueryFilter(filter.key, 
filter.path, new NamesQueryFilter(filterColumns));
 
             /* add the SSTables on disk */
             sstables = dataview.intervalTree.search(new Interval(filter.key, 
filter.key));
             Collections.sort(sstables, SSTable.maxTimestampComparator);
-            SSTableReader.acquireReferences(sstables);
-            // read sorted sstables
-            for (SSTableReader sstable : sstables)
-            {
-                long currentMaxTs = sstable.getMaxTimestamp();
-                reduceNameFilter(reducedFilter, container, currentMaxTs);
-                if (((NamesQueryFilter) 
reducedFilter.filter).columns.isEmpty())
-                    break;
-
-                IColumnIterator iter = 
reducedFilter.getSSTableColumnIterator(sstable);
-                iterators.add(iter);
-                if (iter.getColumnFamily() != null)
+            if (!SSTableReader.acquireReferences(sstables))
+                continue; // retry w/ new view
+
+            try
+            {
+                // read sorted sstables
+                for (SSTableReader sstable : sstables)
                 {
-                    container.delete(iter.getColumnFamily());
-                    sstablesIterated++;
-                    while (iter.hasNext())
-                        container.addColumn(iter.next());
+                    long currentMaxTs = sstable.getMaxTimestamp();
+                    reduceNameFilter(reducedFilter, container, currentMaxTs);
+                    if (((NamesQueryFilter) 
reducedFilter.filter).columns.isEmpty())
+                        break;
+
+                    IColumnIterator iter = 
reducedFilter.getSSTableColumnIterator(sstable);
+                    iterators.add(iter);
+                    if (iter.getColumnFamily() != null)
+                    {
+                        container.delete(iter.getColumnFamily());
+                        sstablesIterated++;
+                        while (iter.hasNext())
+                            container.addColumn(iter.next());
+                    }
                 }
             }
-        }
-        finally
-        {
-            SSTableReader.releaseReferences(sstables);
-            for (IColumnIterator iter : iterators)
-                FileUtils.closeQuietly(iter);
+            finally
+            {
+                SSTableReader.releaseReferences(sstables);
+                for (IColumnIterator iter : iterators)
+                    FileUtils.closeQuietly(iter);
+            }
+
+            break; // sstable reference acquisition was successful
         }
 
+
         // we need to distinguish between "there is no data at all for this 
row" (BF will let us rebuild that efficiently)
         // and "there used to be data, but it's gone now" (we should cache the 
empty CF so we don't need to rebuild that slower)
         if (iterators.isEmpty())
             return null;
 
         // do a final collate.  toCollate is boilerplate required to provide a 
CloseableIterator
+        final ColumnFamily c2 = container;
         CloseableIterator<IColumn> toCollate = new 
SimpleAbstractColumnIterator()
         {
-            final Iterator<IColumn> iter = container.iterator();
+            final Iterator<IColumn> iter = c2.iterator();
 
             protected IColumn computeNext()
             {
@@ -144,7 +154,7 @@ public class CollationController
 
             public ColumnFamily getColumnFamily()
             {
-                return container;
+                return c2;
             }
 
             public DecoratedKey getKey()
@@ -153,7 +163,7 @@ public class CollationController
             }
         };
         ColumnFamily returnCF = container.cloneMeShallow();
-        filter.collateColumns(returnCF, Collections.singletonList(toCollate), 
metadata.comparator, gcBefore);
+        filter.collateColumns(returnCF, Collections.singletonList(toCollate), 
cfs.metadata.comparator, gcBefore);
 
         // Caller is responsible for final removeDeletedCF.  This is important 
for cacheRow to work correctly:
         return returnCF;
@@ -187,14 +197,15 @@ public class CollationController
     {
         logger.debug("collectAllData");
         List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
-        ColumnFamily returnCF = ColumnFamily.create(metadata, factory, 
filter.filter.isReversed());
-        List<SSTableReader> sstables = null;
+        ColumnFamily returnCF = ColumnFamily.create(cfs.metadata, factory, 
filter.filter.isReversed());
+        List<SSTableReader> sstables;
 
-        try
+        while (true)
         {
+            DataTracker.View dataview = cfs.getDataTracker().getView();
             for (Memtable memtable : 
Iterables.concat(dataview.memtablesPendingFlush, 
Collections.singleton(dataview.memtable)))
             {
-                IColumnIterator iter = 
filter.getMemtableColumnIterator(memtable, metadata.comparator);
+                IColumnIterator iter = 
filter.getMemtableColumnIterator(memtable, cfs.metadata.comparator);
                 if (iter != null)
                 {
                     returnCF.delete(iter.getColumnFamily());
@@ -204,23 +215,30 @@ public class CollationController
 
             /* add the SSTables on disk */
             sstables = dataview.intervalTree.search(new Interval(filter.key, 
filter.key));
-            SSTableReader.acquireReferences(sstables);
-            for (SSTableReader sstable : sstables)
+            if (!SSTableReader.acquireReferences(sstables))
+                continue; // retry w/ new view
+
+            try
             {
-                IColumnIterator iter = 
filter.getSSTableColumnIterator(sstable);
-                iterators.add(iter);
-                if (iter.getColumnFamily() != null)
+                for (SSTableReader sstable : sstables)
                 {
-                    returnCF.delete(iter.getColumnFamily());
-                    sstablesIterated++;
+                    IColumnIterator iter = 
filter.getSSTableColumnIterator(sstable);
+                    iterators.add(iter);
+                    if (iter.getColumnFamily() != null)
+                    {
+                        returnCF.delete(iter.getColumnFamily());
+                        sstablesIterated++;
+                    }
                 }
             }
-        }
-        finally
-        {
-            SSTableReader.releaseReferences(sstables);
-            for (IColumnIterator iter : iterators)
-                FileUtils.closeQuietly(iter);
+            finally
+            {
+                SSTableReader.releaseReferences(sstables);
+                for (IColumnIterator iter : iterators)
+                    FileUtils.closeQuietly(iter);
+            }
+
+            break; // sstable reference acquisition was successful
         }
 
         // we need to distinguish between "there is no data at all for this 
row" (BF will let us rebuild that efficiently)
@@ -228,7 +246,7 @@ public class CollationController
         if (iterators.isEmpty())
             return null;
 
-        filter.collateColumns(returnCF, iterators, metadata.comparator, 
gcBefore);
+        filter.collateColumns(returnCF, iterators, cfs.metadata.comparator, 
gcBefore);
 
         // Caller is responsible for final removeDeletedCF.  This is important 
for cacheRow to work correctly:
         return returnCF;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri 
Aug 26 21:31:35 2011
@@ -888,21 +888,14 @@ public class ColumnFamilyStore implement
      */
     public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<? extends 
SSTable> sstablesToIgnore)
     {
-        DataTracker.View currentView = markCurrentViewReferenced();
-        try
+        // we don't need to acquire references here, since the bloom filter is 
safe to use even post-compaction
+        List<SSTableReader> filteredSSTables = 
data.getView().intervalTree.search(new Interval(key, key));
+        for (SSTableReader sstable : filteredSSTables)
         {
-            List<SSTableReader> filteredSSTables = 
currentView.intervalTree.search(new Interval(key, key));
-            for (SSTableReader sstable : filteredSSTables)
-            {
-                if (!sstablesToIgnore.contains(sstable) && 
sstable.getBloomFilter().isPresent(key.key))
-                    return true;
-            }
-            return false;
-        }
-        finally
-        {
-            SSTableReader.releaseReferences(currentView.sstables);
+            if (!sstablesToIgnore.contains(sstable) && 
sstable.getBloomFilter().isPresent(key.key))
+                return true;
         }
+        return false;
     }
 
     /*
@@ -1261,16 +1254,8 @@ public class ColumnFamilyStore implement
         while (true)
         {
             DataTracker.View currentView = data.getView();
-            SSTableReader.acquireReferences(currentView.sstables);
-            if (currentView.sstables == data.getView().sstables) // reference 
equality is fine
-            {
+            if (SSTableReader.acquireReferences(currentView.sstables))
                 return currentView;
-            }
-            else
-            {
-                // the set of sstables has changed, let's release the acquired 
references and try again
-                SSTableReader.releaseReferences(currentView.sstables);
-            }
         }
     }
 
@@ -1287,20 +1272,12 @@ public class ColumnFamilyStore implement
 
     private ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore, 
ISortedColumns.Factory factory)
     {
-        DataTracker.View currentView = markCurrentViewReferenced();
-        try
-        {
-        CollationController controller = new CollationController(currentView, 
factory, filter, metadata, gcBefore);
+        CollationController controller = new CollationController(this, 
factory, filter, gcBefore);
         ColumnFamily columns = controller.getTopLevelColumns();
         recentSSTablesPerRead.add(controller.getSstablesIterated());
         sstablesPerRead.add(controller.getSstablesIterated());
         return columns;
     }
-        finally
-        {
-            SSTableReader.releaseReferences(currentView.sstables);
-        }
-    }
 
     /**
       * Fetch a range of rows and columns from memtables/sstables.

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Fri Aug 
26 21:31:35 2011
@@ -298,16 +298,8 @@ public class DataTracker
             if (logger.isDebugEnabled())
                 logger.debug(String.format("removing %s from list of files 
tracked for %s.%s",
                             sstable.descriptor, cfstore.table.name, 
cfstore.getColumnFamilyName()));
-            // A reference must be acquire before any call to markCompacted, 
see SSTableReader for details
-            sstable.acquireReference();
-            try
-            {
-                sstable.markCompacted();
-            }
-            finally
-            {
-                sstable.releaseReference();
-            }
+            sstable.markCompacted();
+            sstable.releaseReference();
             liveSize.addAndGet(-sstable.bytesOnDisk());
         }
     }
@@ -511,9 +503,9 @@ public class DataTracker
         // Obviously, dropping sstables whose max column timestamp happens to 
be equal to another's
         // is not acceptable for us.  So, we use a List instead.
         public final List<SSTableReader> sstables;
-        public final IntervalTree intervalTree;
+        public final IntervalTree<SSTableReader> intervalTree;
 
-        View(Memtable memtable, Set<Memtable> pendingFlush, 
List<SSTableReader> sstables, Set<SSTableReader> compacting, IntervalTree 
intervalTree)
+        View(Memtable memtable, Set<Memtable> pendingFlush, 
List<SSTableReader> sstables, Set<SSTableReader> compacting, 
IntervalTree<SSTableReader> intervalTree)
         {
             this.memtable = memtable;
             this.memtablesPendingFlush = pendingFlush;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 Fri Aug 26 21:31:35 2011
@@ -798,6 +798,8 @@ public class CompactionManager implement
             throw new AssertionError(e);
         }
 
+        // we don't mark validating sstables as compacting in DataTracker, so 
we have to mark them referenced
+        // instead so they won't be cleaned up if they do get compacted during 
the validation
         Collection<SSTableReader> sstables = 
cfs.markCurrentSSTablesReferenced();
         CompactionIterable ci = new ValidationCompactionIterable(cfs, 
sstables, validator.request.range);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
Fri Aug 26 21:31:35 2011
@@ -81,7 +81,9 @@ public class SSTableReader extends SSTab
 
     private BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
 
-    private final AtomicInteger references = new AtomicInteger(0);
+    private final AtomicInteger references = new AtomicInteger(1);
+    // technically isCompacted is not necessary since it should never be 
unreferenced unless it is also compacted,
+    // but it seems like a good extra layer of protection against reference 
counting bugs to not delete data based on that alone
     private final AtomicBoolean isCompacted = new AtomicBoolean(false);
     private final SSTableDeletingTask deletingTask;
 
@@ -618,9 +620,16 @@ public class SSTableReader extends SSTab
         return dfile.length;
     }
 
-    public void acquireReference()
+    public boolean acquireReference()
     {
-        references.incrementAndGet();
+        while (true)
+        {
+            int n = references.get();
+            if (n <= 0)
+                return false;
+            if (references.compareAndSet(n, n + 1))
+                return true;
+        }
     }
 
     public void releaseReference()
@@ -831,13 +840,32 @@ public class SSTableReader extends SSTab
                 : RandomAccessReader.open(new File(getFilename()), bufferSize, 
skipIOCache);
     }
 
-    public static void acquireReferences(Iterable<SSTableReader> sstables)
+    /**
+     * @param sstables
+     * @return true if all desired references were acquired.  Otherwise, it 
will unreference any partial acquisition, and return false.
+     */
+    public static boolean acquireReferences(Iterable<SSTableReader> sstables)
     {
+        SSTableReader failed = null;
+        for (SSTableReader sstable : sstables)
+        {
+            if (!sstable.acquireReference())
+            {
+                failed = sstable;
+                break;
+            }
+        }
+
+        if (failed == null)
+            return true;
+
         for (SSTableReader sstable : sstables)
         {
-            if (sstable != null)
-                sstable.acquireReference();
+            if (sstable == failed)
+                break;
+            sstable.releaseReference();
         }
+        return false;
     }
 
     public static void releaseReferences(Iterable<SSTableReader> sstables)
@@ -846,10 +874,9 @@ public class SSTableReader extends SSTab
         {
             try
             {
-                if (sstable != null)
-                    sstable.releaseReference();
+                sstable.releaseReference();
             }
-            catch (Throwable ex)
+            catch (Exception ex)
             {
                 logger.error("Failed releasing reference on " + sstable, ex);
             }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java 
Fri Aug 26 21:31:35 2011
@@ -125,7 +125,6 @@ public class StreamInSession
     {
         if (files.isEmpty())
         {
-            // wait for bloom filters and row indexes to finish building
             HashMap <ColumnFamilyStore, List<SSTableReader>> cfstores = new 
HashMap<ColumnFamilyStore, List<SSTableReader>>();
             List<SSTableReader> referenced = new LinkedList<SSTableReader>();
             try

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java 
Fri Aug 26 21:31:35 2011
@@ -234,7 +234,6 @@ public class SSTableUtils
             long start = System.currentTimeMillis();
             while (appender.append(writer)) { /* pass */ }
             SSTableReader reader = writer.closeAndOpenReader();
-            reader.acquireReference();
             // mark all components for removal
             if (cleanup)
                 for (Component component : reader.components)

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1162266&r1=1162265&r2=1162266&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
 Fri Aug 26 21:31:35 2011
@@ -296,7 +296,8 @@ public class StreamingTransferTest exten
         ranges.add(new Range(secondtolast.getKey().token, 
p.getMinimumToken()));
 
         // Acquiring references, transferSSTables needs it
-        SSTableReader.acquireReferences(ssTableReaders);
+        if (!SSTableReader.acquireReferences(ssTableReaders))
+            throw new AssertionError();
 
         StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, 
null);
         StreamOut.transferSSTables(session, ssTableReaders, ranges, 
OperationType.BOOTSTRAP);


Reply via email to