Author: jbellis Date: Mon Oct 31 16:06:48 2011 New Revision: 1195542 URL: http://svn.apache.org/viewvc?rev=1195542&view=rev Log: replace compactionlock use in schema migration by checking CFS.isInvalidD patch by jbellis; reviewed by slebresne for CASSANDRA-3116
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java cassandra/trunk/src/java/org/apache/cassandra/db/Table.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Mon Oct 31 16:06:48 2011 @@ -2,6 +2,7 @@ * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271) * EACH_QUORUM is only supported for writes (CASSANDRA-3272) * cleanup usage of StorageService.setMode() (CASANDRA-3388) + * replace compactionlock use in schema migration by checking CFS.isInvalidD 1.0.1 Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Oct 31 16:06:48 2011 @@ -25,8 +25,6 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -99,7 +97,7 @@ public class ColumnFamilyStore implement public final CFMetaData metadata; public final IPartitioner partitioner; private final String mbeanName; - private boolean invalid = false; + private volatile boolean valid = true; /* Memtables and SSTables on disk for this column family */ private final DataTracker data; @@ -129,9 +127,6 @@ public class ColumnFamilyStore implement private volatile DefaultInteger keyCacheSaveInSeconds; private volatile DefaultInteger rowCacheKeysToSave; - /** Lock to allow migrations to block all flushing, so we can be sure not to write orphaned data files */ - public final Lock flushLock = new ReentrantLock(); - public static enum CacheType { KEY_CACHE_TYPE("KeyCache"), @@ -192,6 +187,7 @@ public class ColumnFamilyStore implement if (metadata.compactionStrategyClass.equals(compactionStrategy.getClass()) && metadata.compactionStrategyOptions.equals(compactionStrategy.getOptions())) return; + // TODO is there a way to avoid locking here? CompactionManager.instance.getCompactionLock().lock(); try { @@ -258,12 +254,12 @@ public class ColumnFamilyStore implement } } - // called when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations. - public void unregisterMBean() + /** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */ + public void invalidate() { try { - invalid = true; + valid = false; MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ObjectName nameObj = new ObjectName(mbeanName); if (mbs.isRegistered(nameObj)) @@ -719,13 +715,6 @@ public class ColumnFamilyStore implement } } - public boolean isDropped() - { - return isIndex() - ? Schema.instance.getCFMetaData(table.name, getParentColumnfamily()) == null - : Schema.instance.getCFMetaData(metadata.cfId) == null; - } - public Future<?> forceFlush() { // during index build, 2ary index memtables can be dirty even if parent is not. if so, @@ -983,14 +972,14 @@ public class ColumnFamilyStore implement CompactionManager.instance.submitBackground(this); } - public boolean isInvalid() + public boolean isValid() { - return invalid; + return valid; } - public void removeAllSSTables() throws IOException + public void unreferenceSSTables() throws IOException { - data.removeAllSSTables(); + data.unreferenceSSTables(); indexManager.removeAllIndexes(); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Mon Oct 31 16:06:48 2011 @@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; @@ -130,6 +129,18 @@ public class DataTracker public void replaceFlushed(Memtable memtable, SSTableReader sstable) { + if (!cfstore.isValid()) + { + View currentView, newView; + do + { + currentView = view.get(); + newView = currentView.replaceFlushed(memtable, sstable).replace(Arrays.asList(sstable), Collections.<SSTableReader>emptyList()); + } + while (!view.compareAndSet(currentView, newView)); + return; + } + View currentView, newView; do { @@ -213,6 +224,16 @@ public class DataTracker */ public void unmarkCompacting(Collection<SSTableReader> unmark) { + if (!cfstore.isValid()) + { + // We don't know if the original compaction suceeded or failed, which makes it difficult to know + // if the sstable reference has already been released. + // A "good enough" approach is to mark the sstables involved compacted, which if compaction succeeded + // is harmlessly redundant, and if it failed ensures that at least the sstable will get deleted on restart. + for (SSTableReader sstable : unmark) + sstable.markCompacted(); + } + View currentView, newView; do { @@ -244,9 +265,23 @@ public class DataTracker notifyAdded(sstable); } - public void removeAllSSTables() + /** + * removes all sstables that are not busy compacting. + */ + public void unreferenceSSTables() { - replace(getSSTables(), Collections.<SSTableReader>emptyList()); + Set<SSTableReader> notCompacting; + + View currentView, newView; + do + { + currentView = view.get(); + notCompacting = Sets.difference(ImmutableSet.copyOf(currentView.sstables), currentView.compacting); + newView = currentView.replace(notCompacting, Collections.<SSTableReader>emptySet()); + } + while (!view.compareAndSet(currentView, newView)); + + postReplace(notCompacting, Collections.<SSTableReader>emptySet()); } /** (Re)initializes the tracker, purging all references. */ @@ -261,6 +296,17 @@ public class DataTracker private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements) { + // removing sstables that are not marked compacting is a bug, since that means we could + // race with a compaction check + for (SSTableReader sstable : oldSSTables) + assert view.get().compacting.contains(sstable); + + if (!cfstore.isValid()) + { + removeOldSSTablesSize(replacements); + replacements = Collections.emptyList(); + } + View currentView, newView; do { @@ -269,6 +315,11 @@ public class DataTracker } while (!view.compareAndSet(currentView, newView)); + postReplace(oldSSTables, replacements); + } + + private void postReplace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements) + { addNewSSTablesSize(replacements); removeOldSSTablesSize(oldSSTables); @@ -298,7 +349,9 @@ public class DataTracker if (logger.isDebugEnabled()) logger.debug(String.format("removing %s from list of files tracked for %s.%s", sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName())); - sstable.markCompacted(); + boolean firstToCompact = sstable.markCompacted(); + assert firstToCompact : sstable + " was already marked compacted"; + sstable.releaseReference(); liveSize.addAndGet(-sstable.bytesOnDisk()); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Mon Oct 31 16:06:48 2011 @@ -281,19 +281,8 @@ public class Memtable { public void runMayThrow() throws IOException { - cfs.flushLock.lock(); - try - { - if (!cfs.isDropped()) - { - SSTableReader sstable = writeSortedContents(context); - cfs.replaceFlushed(Memtable.this, sstable); - } - } - finally - { - cfs.flushLock.unlock(); - } + SSTableReader sstable = writeSortedContents(context); + cfs.replaceFlushed(Memtable.this, sstable); latch.countDown(); } }); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon Oct 31 16:06:48 2011 @@ -29,7 +29,6 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -41,7 +40,6 @@ import org.apache.cassandra.config.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; -import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.io.sstable.SSTableDeletingTask; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; @@ -343,7 +341,7 @@ public class Table return; unloadCf(cfs); - cfs.removeAllSSTables(); + cfs.unreferenceSSTables(); } // disassociate a cfs from this table instance. @@ -361,7 +359,7 @@ public class Table { throw new IOException(e); } - cfs.unregisterMBean(); + cfs.invalidate(); } /** adds a cf to internal structures, ends up creating disk files). */ Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Mon Oct 31 16:06:48 2011 @@ -68,13 +68,12 @@ public class CompactionManager implement /** * compactionLock has two purposes: - * - Compaction acquires its readLock so that multiple compactions can happen simultaneously, - * but the KS/CF migtations acquire its writeLock, so they can be sure no new SSTables will - * be created for a dropped CF posthumously. (Thus, compaction checks CFS.isValid while the - * lock is acquired.) * - "Special" compactions will acquire writelock instead of readlock to make sure that all - * other compaction activity is quiesced and they can grab ALL the sstables to do something. - * TODO this is too big a hammer -- we should only care about quiescing all for the given CFS. + * other compaction activity is quiesced and they can grab ALL the sstables to do something. + * - Some schema migrations cannot run concurrently with compaction. (Currently, this is + * only when changing compaction strategy -- see CFS.maybeReloadCompactionStrategy.) + * + * TODO this is too big a hammer -- we should only care about quiescing all for the given CFS. */ private final ReentrantReadWriteLock compactionLock = new ReentrantReadWriteLock(); @@ -117,9 +116,6 @@ public class CompactionManager implement compactionLock.readLock().lock(); try { - if (cfs.isInvalid()) - return 0; - AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); List<AbstractCompactionTask> tasks = strategy.getBackgroundTasks(getDefaultGcBefore(cfs)); for (AbstractCompactionTask task : tasks) @@ -160,8 +156,6 @@ public class CompactionManager implement compactionLock.writeLock().lock(); try { - if (cfStore.isInvalid()) - return this; Collection<SSTableReader> tocleanup = cfStore.getDataTracker().markCompacting(cfStore.getSSTables(), 1, Integer.MAX_VALUE); if (tocleanup == null || tocleanup.isEmpty()) return this; @@ -206,9 +200,6 @@ public class CompactionManager implement compactionLock.writeLock().lock(); try { - if (cfStore.isInvalid()) - return this; - Collection<SSTableReader> toscrub = cfStore.getDataTracker().markCompacting(cfStore.getSSTables(), 1, Integer.MAX_VALUE); if (toscrub == null || toscrub.isEmpty()) return this; @@ -258,8 +249,6 @@ public class CompactionManager implement compactionLock.writeLock().lock(); try { - if (cfStore.isInvalid()) - return this; AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy(); for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore)) { @@ -339,9 +328,6 @@ public class CompactionManager implement compactionLock.readLock().lock(); try { - if (cfs.isInvalid()) - return this; - // look up the sstables now that we're on the compaction executor, so we don't try to re-compact // something that was already being compacted earlier. Collection<SSTableReader> sstables = new ArrayList<SSTableReader>(); @@ -433,8 +419,7 @@ public class CompactionManager implement compactionLock.readLock().lock(); try { - if (!cfStore.isInvalid()) - doValidationCompaction(cfStore, validator); + doValidationCompaction(cfStore, validator); return this; } finally @@ -801,6 +786,14 @@ public class CompactionManager implement */ private void doValidationCompaction(ColumnFamilyStore cfs, AntiEntropyService.Validator validator) throws IOException { + // this isn't meant to be race-proof, because it's not -- it won't cause bugs for a CFS to be dropped + // mid-validation, or to attempt to validate a droped CFS. this is just a best effort to avoid useless work, + // particularly in the scenario where a validation is submitted before the drop, and there are compactions + // started prior to the drop keeping some sstables alive. Since validationCompaction can run + // concurrently with other compactions, it would otherwise go ahead and scan those again. + if (!cfs.isValid()) + return; + // flush first so everyone is validating data that is as similar as possible try { Modified: cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java Mon Oct 31 16:06:48 2011 @@ -134,7 +134,7 @@ public abstract class SecondaryIndex /** * Unregisters this index's mbean if one exists */ - public abstract void unregisterMbean(); + public abstract void invalidate(); /** Modified: cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java Mon Oct 31 16:06:48 2011 @@ -244,7 +244,7 @@ public class SecondaryIndexManager public void unregisterMBeans() { for (Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet()) - entry.getValue().unregisterMbean(); + entry.getValue().invalidate(); } /** Modified: cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java Mon Oct 31 16:06:48 2011 @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; public class KeysIndex extends PerColumnSecondaryIndex { private static final Logger logger = LoggerFactory.getLogger(KeysIndex.class); - private ColumnFamilyStore indexCfs; + private ColumnFamilyStore indexedCfs; public KeysIndex() { @@ -57,7 +57,7 @@ public class KeysIndex extends PerColumn ColumnDefinition columnDef = columnDefs.iterator().next(); CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef,indexComparator()); - indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.table, + indexedCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.table, indexedCfMetadata.cfName, new LocalPartitioner(columnDef.getValidator()), indexedCfMetadata); @@ -78,9 +78,9 @@ public class KeysIndex extends PerColumn return; int localDeletionTime = (int) (System.currentTimeMillis() / 1000); - ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata); + ColumnFamily cfi = ColumnFamily.create(indexedCfs.metadata); cfi.addTombstone(rowKey, localDeletionTime, column.timestamp()); - indexCfs.apply(valueKey, cfi); + indexedCfs.apply(valueKey, cfi); if (logger.isDebugEnabled()) logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, cfi); } @@ -88,7 +88,7 @@ public class KeysIndex extends PerColumn @Override public void insertColumn(DecoratedKey<?> valueKey, ByteBuffer rowKey, IColumn column) { - ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata); + ColumnFamily cfi = ColumnFamily.create(indexedCfs.metadata); if (column instanceof ExpiringColumn) { ExpiringColumn ec = (ExpiringColumn)column; @@ -101,7 +101,7 @@ public class KeysIndex extends PerColumn if (logger.isDebugEnabled()) logger.debug("applying index row {}:{}", valueKey, cfi); - indexCfs.apply(valueKey, cfi); + indexedCfs.apply(valueKey, cfi); } @Override @@ -113,8 +113,8 @@ public class KeysIndex extends PerColumn @Override public void removeIndex(ByteBuffer columnName) throws IOException { - indexCfs.removeAllSSTables(); - indexCfs.unregisterMBean(); + indexedCfs.unreferenceSSTables(); + indexedCfs.invalidate(); } @Override @@ -122,7 +122,7 @@ public class KeysIndex extends PerColumn { try { - indexCfs.forceBlockingFlush(); + indexedCfs.forceBlockingFlush(); } catch (ExecutionException e) { @@ -135,15 +135,15 @@ public class KeysIndex extends PerColumn } @Override - public void unregisterMbean() + public void invalidate() { - indexCfs.unregisterMBean(); + indexedCfs.invalidate(); } @Override public ColumnFamilyStore getUnderlyingCfs() { - return indexCfs; + return indexedCfs; } @Override @@ -155,13 +155,13 @@ public class KeysIndex extends PerColumn @Override public String getIndexName() { - return indexCfs.columnFamily; + return indexedCfs.columnFamily; } @Override public void renameIndex(String newCfName) throws IOException { - indexCfs.renameSSTables(indexCfs.columnFamily.replace(baseCfs.columnFamily, newCfName)); + indexedCfs.renameSSTables(indexedCfs.columnFamily.replace(baseCfs.columnFamily, newCfName)); } @Override Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java Mon Oct 31 16:06:48 2011 @@ -79,18 +79,7 @@ public class DropColumnFamily extends Mi if (!StorageService.instance.isClientMode()) { cfs.snapshot(Table.getTimestampedSnapshotName(cfs.columnFamily)); - - CompactionManager.instance.getCompactionLock().lock(); - cfs.flushLock.lock(); - try - { - Table.open(ksm.name, schema).dropCf(cfm.cfId); - } - finally - { - cfs.flushLock.unlock(); - CompactionManager.instance.getCompactionLock().unlock(); - } + Table.open(ksm.name, schema).dropCf(cfm.cfId); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java Mon Oct 31 16:06:48 2011 @@ -48,41 +48,25 @@ public class DropKeyspace extends Migrat public void applyModels() throws IOException { String snapshotName = Table.getTimestampedSnapshotName(name); - CompactionManager.instance.getCompactionLock().lock(); - try - { - KSMetaData ksm = schema.getTableDefinition(name); + KSMetaData ksm = schema.getTableDefinition(name); - // remove all cfs from the table instance. - for (CFMetaData cfm : ksm.cfMetaData().values()) + // remove all cfs from the table instance. + for (CFMetaData cfm : ksm.cfMetaData().values()) + { + ColumnFamilyStore cfs = Table.open(ksm.name, schema).getColumnFamilyStore(cfm.cfName); + schema.purge(cfm); + if (!StorageService.instance.isClientMode()) { - ColumnFamilyStore cfs = Table.open(ksm.name, schema).getColumnFamilyStore(cfm.cfName); - schema.purge(cfm); - if (!StorageService.instance.isClientMode()) - { - cfs.snapshot(snapshotName); - cfs.flushLock.lock(); - try - { - Table.open(ksm.name, schema).dropCf(cfm.cfId); - } - finally - { - cfs.flushLock.unlock(); - } - } + cfs.snapshot(snapshotName); + Table.open(ksm.name, schema).dropCf(cfm.cfId); } - - // remove the table from the static instances. - Table table = Table.clear(ksm.name, schema); - assert table != null; - // reset defs. - schema.clearTableDefinition(ksm, newVersion); - } - finally - { - CompactionManager.instance.getCompactionLock().unlock(); } + + // remove the table from the static instances. + Table table = Table.clear(ksm.name, schema); + assert table != null; + // reset defs. + schema.clearTableDefinition(ksm, newVersion); } public void subdeflate(org.apache.cassandra.db.migration.avro.Migration mi) Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Mon Oct 31 16:06:48 2011 @@ -719,23 +719,28 @@ public class SSTableReader extends SSTab * Mark the sstable as compacted. * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere * except for threads holding a reference. + * + * @return true if the this is the first time the file was marked compacted. With rare exceptions + * (see DataTracker.unmarkCompacted) calling this multiple times would be buggy. */ - public void markCompacted() + public boolean markCompacted() { if (logger.isDebugEnabled()) logger.debug("Marking " + getFilename() + " compacted"); + + if (isCompacted.getAndSet(true)) + return false; + try { if (!new File(descriptor.filenameFor(Component.COMPACTED_MARKER)).createNewFile()) - throw new IOException("Unable to create compaction marker"); + throw new IOException("Compaction marker already exists"); } catch (IOException e) { throw new IOError(e); } - - boolean alreadyCompacted = isCompacted.getAndSet(true); - assert !alreadyCompacted : this + " was already marked compacted"; + return true; } /** Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java Mon Oct 31 16:06:48 2011 @@ -91,7 +91,7 @@ public class KeyCacheTest extends Cleanu assert store.getKeyCacheSize() == 0; // load the cache from disk - store.unregisterMBean(); // unregistering old MBean to test how key cache will be loaded + store.invalidate(); // unregistering old MBean to test how key cache will be loaded ColumnFamilyStore newStore = ColumnFamilyStore.createColumnFamilyStore(Table.open(TABLE1), COLUMN_FAMILY3); assert newStore.getKeyCacheSize() == 100; Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java Mon Oct 31 16:06:48 2011 @@ -39,7 +39,6 @@ import org.apache.cassandra.db.filter.Qu import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; public class CompactionsTest extends CleanupHelper { @@ -231,7 +230,7 @@ public class CompactionsTest extends Cle ColumnFamilyStore store = table.getColumnFamilyStore(cfname); // disable compaction while flushing - store.removeAllSSTables(); + store.unreferenceSSTables(); store.disableAutoCompaction(); // Add test row Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1195542&r1=1195541&r2=1195542&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Mon Oct 31 16:06:48 2011 @@ -28,7 +28,6 @@ import java.net.InetAddress; import java.util.*; import org.apache.cassandra.CleanupHelper; -import org.apache.cassandra.Util; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.context.CounterContext; @@ -76,7 +75,7 @@ public class StreamingTransferTest exten SSTableReader sstable = cfs.getSSTables().iterator().next(); // We acquire a reference now, because removeAllSSTables will mark the sstable compacted, and we have work to do with it sstable.acquireReference(); - cfs.removeAllSSTables(); + cfs.unreferenceSSTables(); // transfer the first and last key int[] offs = new int[]{1, 3};