Author: jbellis Date: Fri Aug 26 19:39:04 2011 New Revision: 1162220 URL: http://svn.apache.org/viewvc?rev=1162220&view=rev Log: Revert "add LeveledCompactionStrategy"
Removed: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java cassandra/trunk/src/java/org/apache/cassandra/notifications/INotification.java cassandra/trunk/src/java/org/apache/cassandra/notifications/INotificationConsumer.java cassandra/trunk/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java cassandra/trunk/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/Interval.java cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/IntervalNode.java cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/IntervalTree.java cassandra/trunk/test/unit/org/apache/cassandra/utils/IntervalTest.java cassandra/trunk/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1162220&r1=1162219&r2=1162220&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Fri Aug 26 19:39:04 2011 @@ -44,8 +44,6 @@ Thrift<->Avro conversion methods (CASSANDRA-3032) * Add timeouts to client request schedulers (CASSANDRA-3079) * Cli to use hashes rather than array of hashes for strategy options (CASSANDRA-3081) - * LeveledCompactionStrategy (CASSANDRA-1608) - 0.8.5 * fix NPE when encryption_options is unspecified (CASSANDRA-3007) Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1162220&r1=1162219&r2=1162220&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Fri Aug 26 19:39:04 2011 @@ -67,7 +67,7 @@ public final class CFMetaData public final static double DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS = sizeMemtableOperations(DEFAULT_MEMTABLE_THROUGHPUT_IN_MB); public final static double DEFAULT_MERGE_SHARDS_CHANCE = 0.1; public final static String DEFAULT_ROW_CACHE_PROVIDER = "org.apache.cassandra.cache.ConcurrentLinkedHashCacheProvider"; - public final static String DEFAULT_COMPACTION_STRATEGY_CLASS = "SizeTieredCompactionStrategy"; + public final static String DEFAULT_COMPACTION_STRATEGY_CLASS = "org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy"; public final static ByteBuffer DEFAULT_KEY_NAME = ByteBufferUtil.bytes("KEY"); public final static boolean DEFAULT_COMPRESSION = false; @@ -214,11 +214,11 @@ public final class CFMetaData try { - compactionStrategyClass = createCompactionSrategy(DEFAULT_COMPACTION_STRATEGY_CLASS); + compactionStrategyClass = (Class<? extends AbstractCompactionStrategy>)Class.forName(DEFAULT_COMPACTION_STRATEGY_CLASS); } - catch (ConfigurationException e) + catch (Exception e) { - throw new AssertionError(e); + throw new RuntimeException("Could not create Compaction Strategy of type " + DEFAULT_COMPACTION_STRATEGY_CLASS, e); } compactionStrategyOptions = new HashMap<String, String>(); } @@ -409,11 +409,11 @@ public final class CFMetaData { try { - newCFMD.compactionStrategyClass = createCompactionSrategy(cf.compaction_strategy.toString()); + newCFMD.compactionStrategyClass((Class<? extends AbstractCompactionStrategy>)Class.forName(cf.compaction_strategy.toString())); } - catch (ConfigurationException e) + catch (Exception e) { - throw new RuntimeException(e); + throw new RuntimeException("Could not create Compaction Strategy of type " + cf.compaction_strategy.toString(), e); } } if (cf.compaction_strategy_options != null) @@ -695,7 +695,16 @@ public final class CFMetaData if (cf_def.isSetKey_alias()) { newCFMD.keyAlias(cf_def.key_alias); } if (cf_def.isSetKey_validation_class()) { newCFMD.keyValidator(TypeParser.parse(cf_def.key_validation_class)); } if (cf_def.isSetCompaction_strategy()) - newCFMD.compactionStrategyClass = createCompactionSrategy(cf_def.compaction_strategy); + { + try + { + newCFMD.compactionStrategyClass((Class<? extends AbstractCompactionStrategy>)Class.forName(cf_def.compaction_strategy)); + } + catch (Exception e) + { + throw new ConfigurationException("Unable to set Compaction Strategy Class of " + cf_def.compaction_strategy, e); + } + } if (cf_def.isSetCompaction_strategy_options()) newCFMD.compactionStrategyOptions(new HashMap<String, String>(cf_def.compaction_strategy_options)); @@ -803,7 +812,16 @@ public final class CFMetaData } if (cf_def.compaction_strategy != null) - compactionStrategyClass = createCompactionSrategy(cf_def.compaction_strategy.toString()); + { + try + { + compactionStrategyClass = (Class<? extends AbstractCompactionStrategy>)Class.forName(cf_def.compaction_strategy.toString()); + } + catch (Exception e) + { + throw new RuntimeException("Could not create Compaction Strategy of type " + cf_def.compaction_strategy.toString(), e); + } + } if (null != cf_def.compaction_strategy_options) { @@ -814,20 +832,7 @@ public final class CFMetaData logger.debug("application result is {}", this); } - - private static Class<? extends AbstractCompactionStrategy> createCompactionSrategy(String className) throws ConfigurationException - { - className = className.contains(".") ? className : "org.apache.cassandra.db.compaction." + className; - try - { - return (Class<? extends AbstractCompactionStrategy>) Class.forName(className); - } - catch (Exception e) - { - throw new ConfigurationException("Could not create Compaction Strategy of type " + className, e); - } - } - + public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs) { try Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java?rev=1162220&r1=1162219&r2=1162220&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java Fri Aug 26 19:39:04 2011 @@ -19,14 +19,6 @@ */ package org.apache.cassandra.db; -import java.nio.ByteBuffer; -import java.util.*; - -import com.google.common.collect.Iterables; -import org.apache.cassandra.io.sstable.SSTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.columniterator.IColumnIterator; import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator; @@ -36,7 +28,14 @@ import org.apache.cassandra.db.marshal.C import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.CloseableIterator; -import org.apache.cassandra.utils.IntervalTree.Interval; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.collect.Iterables; public class CollationController { @@ -76,7 +75,7 @@ public class CollationController logger.debug("collectTimeOrderedData"); List<IColumnIterator> iterators = new ArrayList<IColumnIterator>(); final ColumnFamily container = ColumnFamily.create(metadata, factory, filter.filter.isReversed()); - List<SSTableReader> sstables = null; + try { for (Memtable memtable : Iterables.concat(dataview.memtablesPendingFlush, Collections.singleton(dataview.memtable))) @@ -97,12 +96,8 @@ public class CollationController filterColumns.addAll(((NamesQueryFilter) filter.filter).columns); QueryFilter reducedFilter = new QueryFilter(filter.key, filter.path, new NamesQueryFilter(filterColumns)); - /* add the SSTables on disk */ - sstables = dataview.intervalTree.search(new Interval(filter.key, filter.key)); - Collections.sort(sstables, SSTable.maxTimestampComparator); - SSTableReader.acquireReferences(sstables); // read sorted sstables - for (SSTableReader sstable : sstables) + for (SSTableReader sstable : dataview.sstables) { long currentMaxTs = sstable.getMaxTimestamp(); reduceNameFilter(reducedFilter, container, currentMaxTs); @@ -122,7 +117,6 @@ public class CollationController } finally { - SSTableReader.releaseReferences(sstables); for (IColumnIterator iter : iterators) FileUtils.closeQuietly(iter); } @@ -188,7 +182,6 @@ public class CollationController logger.debug("collectAllData"); List<IColumnIterator> iterators = new ArrayList<IColumnIterator>(); ColumnFamily returnCF = ColumnFamily.create(metadata, factory, filter.filter.isReversed()); - List<SSTableReader> sstables = null; try { @@ -203,9 +196,7 @@ public class CollationController } /* add the SSTables on disk */ - sstables = dataview.intervalTree.search(new Interval(filter.key, filter.key)); - SSTableReader.acquireReferences(sstables); - for (SSTableReader sstable : sstables) + for (SSTableReader sstable : dataview.sstables) { IColumnIterator iter = filter.getSSTableColumnIterator(sstable); iterators.add(iter); @@ -218,7 +209,6 @@ public class CollationController } finally { - SSTableReader.releaseReferences(sstables); for (IColumnIterator iter : iterators) FileUtils.closeQuietly(iter); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1162220&r1=1162219&r2=1162220&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Aug 26 19:39:04 2011 @@ -35,7 +35,11 @@ import com.google.common.collect.Iterabl import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.cache.*; +import org.apache.cassandra.cache.AutoSavingCache; +import org.apache.cassandra.cache.AutoSavingKeyCache; +import org.apache.cassandra.cache.AutoSavingRowCache; +import org.apache.cassandra.cache.ConcurrentLinkedHashCache; +import org.apache.cassandra.cache.ICache; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.concurrent.StageManager; @@ -45,20 +49,17 @@ import org.apache.cassandra.db.commitlog import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; -import org.apache.cassandra.db.filter.IFilter; -import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.db.filter.QueryPath; -import org.apache.cassandra.db.filter.SliceQueryFilter; +import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.LocalByPartionerType; import org.apache.cassandra.dht.*; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.IndexClause; import org.apache.cassandra.utils.*; -import org.apache.cassandra.utils.IntervalTree.Interval; import org.cliffc.high_scale_lib.NonBlockingHashMap; public class ColumnFamilyStore implements ColumnFamilyStoreMBean @@ -136,7 +137,7 @@ public class ColumnFamilyStore implement private volatile DefaultDouble memops; private volatile DefaultInteger rowCacheSaveInSeconds; private volatile DefaultInteger keyCacheSaveInSeconds; - private volatile DefaultInteger rowCacheKeysToSave; + private volatile DefaultInteger rowCacheKeysToSave; /** Lock to allow migrations to block all flushing, so we can be sure not to write orphaned data files */ public final Lock flushLock = new ReentrantLock(); @@ -172,7 +173,7 @@ public class ColumnFamilyStore implement public void reload() { // metadata object has been mutated directly. make all the members jibe with new settings. - + // only update these runtime-modifiable settings if they have not been modified. if (!minCompactionThreshold.isModified()) for (ColumnFamilyStore cfs : concatWithIndexes()) @@ -193,12 +194,11 @@ public class ColumnFamilyStore implement if (!rowCacheKeysToSave.isModified()) rowCacheKeysToSave = new DefaultInteger(metadata.getRowCacheKeysToSave()); - compactionStrategy.shutdown(); compactionStrategy = metadata.createCompactionStrategyInstance(this); updateCacheSizes(); scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value(), rowCacheKeysToSave.value()); - + indexManager.reload(); } @@ -206,10 +206,11 @@ public class ColumnFamilyStore implement { assert metadata != null : "null metadata for " + table + ":" + columnFamilyName; this.table = table; - columnFamily = columnFamilyName; + columnFamily = columnFamilyName; this.metadata = metadata; this.minCompactionThreshold = new DefaultInteger(metadata.getMinCompactionThreshold()); this.maxCompactionThreshold = new DefaultInteger(metadata.getMaxCompactionThreshold()); + this.compactionStrategy = metadata.createCompactionStrategyInstance(this); this.memsize = new DefaultInteger(metadata.getMemtableThroughputInMb()); this.memops = new DefaultDouble(metadata.getMemtableOperationsInMillions()); this.rowCacheSaveInSeconds = new DefaultInteger(metadata.getRowCacheSavePeriodInSeconds()); @@ -240,9 +241,6 @@ public class ColumnFamilyStore implement } data.addSSTables(sstables); - // compaction strategy should be created after the CFS has been prepared - this.compactionStrategy = metadata.createCompactionStrategyInstance(this); - // create the private ColumnFamilyStores for the secondary column indexes for (ColumnDefinition info : metadata.getColumn_metadata().values()) { @@ -339,7 +337,7 @@ public class ColumnFamilyStore implement return new ColumnFamilyStore(table, columnFamily, partitioner, value, metadata); } - + /** * Removes unnecessary files from the cf directory at startup: these include temp files, orphans, zero-length files * and compacted sstables. Files that cannot be recognized will be ignored. @@ -397,7 +395,7 @@ public class ColumnFamilyStore implement if (!file.delete()) logger.warn("could not delete " + file.getAbsolutePath()); } - + // also clean out any index leftovers. CFMetaData cfm = Schema.instance.getCFMetaData(table, columnFamily); if (cfm != null) // secondary indexes aren't stored in DD. @@ -888,21 +886,12 @@ public class ColumnFamilyStore implement */ public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<? extends SSTable> sstablesToIgnore) { - DataTracker.View currentView = markCurrentViewReferenced(); - try + for (SSTableReader sstable : data.getSSTables()) { - List<SSTableReader> filteredSSTables = currentView.intervalTree.search(new Interval(key, key)); - for (SSTableReader sstable : filteredSSTables) - { - if (!sstablesToIgnore.contains(sstable) && sstable.getBloomFilter().isPresent(key.key)) - return true; - } - return false; - } - finally - { - SSTableReader.releaseReferences(currentView.sstables); + if (!sstablesToIgnore.contains(sstable) && sstable.getBloomFilter().isPresent(key.key)) + return true; } + return false; } /* @@ -994,7 +983,7 @@ public class ColumnFamilyStore implement public void removeAllSSTables() { data.removeAllSSTables(); - indexManager.removeAllIndexes(); + indexManager.removeAllIndexes(); } public long getMemtableColumnsCount() @@ -1188,7 +1177,7 @@ public class ColumnFamilyStore implement ColumnFamily cached = cacheRow(filter.key); if (cached == null) return null; - + return filterColumnFamily(cached, filter, gcBefore); } finally @@ -1230,7 +1219,7 @@ public class ColumnFamilyStore implement // top-level columns if (sliceFilter.count >= cached.getColumnCount()) { - removeDeletedColumnsOnly(cached, gcBefore); + removeDeletedColumnsOnly(cached, gcBefore); return removeDeletedCF(cached, gcBefore); } } @@ -1290,12 +1279,12 @@ public class ColumnFamilyStore implement DataTracker.View currentView = markCurrentViewReferenced(); try { - CollationController controller = new CollationController(currentView, factory, filter, metadata, gcBefore); - ColumnFamily columns = controller.getTopLevelColumns(); - recentSSTablesPerRead.add(controller.getSstablesIterated()); - sstablesPerRead.add(controller.getSstablesIterated()); - return columns; - } + CollationController controller = new CollationController(currentView, factory, filter, metadata, gcBefore); + ColumnFamily columns = controller.getTopLevelColumns(); + recentSSTablesPerRead.add(controller.getSstablesIterated()); + sstablesPerRead.add(controller.getSstablesIterated()); + return columns; + } finally { SSTableReader.releaseReferences(currentView.sstables); @@ -1304,7 +1293,7 @@ public class ColumnFamilyStore implement /** * Fetch a range of rows and columns from memtables/sstables. - * + * * @param superColumn optional SuperColumn to slice subcolumns of; null to slice top-level columns * @param range Either a Bounds, which includes start key, or a Range, which does not. * @param maxResults Maximum rows to return @@ -1333,14 +1322,6 @@ public class ColumnFamilyStore implement // It is fine to aliases the View.sstables since it's an unmodifiable collection Collection<SSTableReader> sstables = currentView.sstables; - Comparable startWithComp = startWith; - Comparable stopAtComp = stopAt; - if (startWith.token.equals(partitioner.getMinimumToken())) - startWithComp = currentView.intervalTree.min; - if (stopAt.token.equals(partitioner.getMinimumToken())) - stopAtComp = currentView.intervalTree.max; - sstables = currentView.intervalTree.search(new Interval(startWithComp, stopAtComp)); - CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter, getComparator(), this); List<Row> rows = new ArrayList<Row>(); @@ -1392,12 +1373,12 @@ public class ColumnFamilyStore implement SSTableReader.releaseReferences(currentView.sstables); } } - + public List<Row> search(IndexClause clause, AbstractBounds range, IFilter dataFilter) { return indexManager.search(clause, range, dataFilter); } - + public AbstractType getComparator() { return metadata.comparator; @@ -1440,8 +1421,8 @@ public class ColumnFamilyStore implement /** * Take a snap shot of this columnfamily store. - * - * @param snapshotName the name of the associated with the snapshot + * + * @param snapshotName the name of the associated with the snapshot */ public void snapshot(String snapshotName) { @@ -1696,7 +1677,7 @@ public class ColumnFamilyStore implement return data.getRecentBloomFilterFalseRatio(); } - + @Override public String toString() @@ -1734,7 +1715,7 @@ public class ColumnFamilyStore implement { return minCompactionThreshold.value(); } - + public void setMinimumCompactionThreshold(int minCompactionThreshold) { if ((minCompactionThreshold > this.maxCompactionThreshold.value()) && this.maxCompactionThreshold.value() != 0) @@ -1957,11 +1938,4 @@ public class ColumnFamilyStore implement return reader; } - - public int getUnleveledSSTables() - { - return this.compactionStrategy instanceof LeveledCompactionStrategy - ? ((LeveledCompactionStrategy) this.compactionStrategy).getLevelSize(0) - : 0; - } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1162220&r1=1162219&r2=1162220&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Fri Aug 26 19:39:04 2011 @@ -240,9 +240,4 @@ public interface ColumnFamilyStoreMBean * determine which SSTables should be loaded and load them */ public void loadNewSSTables(); - - /** - * @return the number of SSTables in L0. Always return 0 if Leveled compaction is not enabled. - */ - public int getUnleveledSSTables(); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1162220&r1=1162219&r2=1162220&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Fri Aug 26 19:39:04 2011 @@ -20,38 +20,30 @@ package org.apache.cassandra.db; import java.io.File; +import java.io.IOError; import java.io.IOException; import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.collect.*; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.notifications.INotification; -import org.apache.cassandra.notifications.INotificationConsumer; -import org.apache.cassandra.notifications.SSTableAddedNotification; -import org.apache.cassandra.notifications.SSTableListChangedNotification; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.IntervalTree.Interval; -import org.apache.cassandra.utils.IntervalTree.IntervalTree; import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.WrappedRunnable; public class DataTracker { private static final Logger logger = LoggerFactory.getLogger(DataTracker.class); - public static Collection<INotificationConsumer> subscribers = new CopyOnWriteArrayList<INotificationConsumer>(); - public final ColumnFamilyStore cfstore; private final AtomicReference<View> view; @@ -142,27 +134,26 @@ public class DataTracker addNewSSTablesSize(Arrays.asList(sstable)); cfstore.updateCacheSizes(); - notifyAdded(sstable); incrementallyBackup(sstable); } - public void incrementallyBackup(final SSTableReader sstable) + public void incrementallyBackup(SSTableReader sstable) { - if (!DatabaseDescriptor.incrementalBackupsEnabled()) - return; - - Runnable runnable = new WrappedRunnable() + if (DatabaseDescriptor.incrementalBackupsEnabled()) { - protected void runMayThrow() throws Exception + File keyspaceDir = new File(sstable.getFilename()).getParentFile(); + File backupsDir = new File(keyspaceDir, "backups"); + try { - File keyspaceDir = new File(sstable.getFilename()).getParentFile(); - File backupsDir = new File(keyspaceDir, "backups"); if (!backupsDir.exists() && !backupsDir.mkdirs()) throw new IOException("Unable to create " + backupsDir); sstable.createLinks(backupsDir.getCanonicalPath()); } - }; - StorageService.tasks.execute(runnable); + catch (IOException e) + { + throw new IOError(e); + } + } } /** @@ -242,7 +233,6 @@ public class DataTracker { addSSTables(Arrays.asList(sstable)); incrementallyBackup(sstable); - notifyAdded(sstable); } public void removeAllSSTables() @@ -256,8 +246,7 @@ public class DataTracker view.set(new View(new Memtable(cfstore), Collections.<Memtable>emptySet(), Collections.<SSTableReader>emptyList(), - Collections.<SSTableReader>emptySet(), - new IntervalTree())); + Collections.<SSTableReader>emptySet())); } private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements) @@ -273,7 +262,6 @@ public class DataTracker addNewSSTablesSize(replacements); removeOldSSTablesSize(oldSSTables); - notifySSTablesChanged(replacements, oldSSTables); cfstore.updateCacheSizes(); } @@ -467,35 +455,6 @@ public class DataTracker return (double) falseCount / (trueCount + falseCount); } - public void notifySSTablesChanged(Iterable<SSTableReader> added, Iterable<SSTableReader> removed) - { - for (INotificationConsumer subscriber : subscribers) - { - INotification notification = new SSTableListChangedNotification(added, removed); - subscriber.handleNotification(notification, this); - } - } - - public void notifyAdded(SSTableReader added) - { - for (INotificationConsumer subscriber : subscribers) - { - INotification notification = new SSTableAddedNotification(added); - subscriber.handleNotification(notification, this); - } - } - - public static void subscribe(INotificationConsumer consumer) - { - subscribers.add(consumer); - } - - public static void unsubscribe(INotificationConsumer consumer) - { - boolean found = subscribers.remove(consumer); - assert found : consumer + " not subscribed"; - } - /** * An immutable structure holding the current memtable, the memtables pending * flush, the sstables for a column family, and the sstables that are active @@ -512,63 +471,49 @@ public class DataTracker // Obviously, dropping sstables whose max column timestamp happens to be equal to another's // is not acceptable for us. So, we use a List instead. public final List<SSTableReader> sstables; - public final IntervalTree intervalTree; - View(Memtable memtable, Set<Memtable> pendingFlush, List<SSTableReader> sstables, Set<SSTableReader> compacting, IntervalTree intervalTree) + View(Memtable memtable, Set<Memtable> pendingFlush, List<SSTableReader> sstables, Set<SSTableReader> compacting) { this.memtable = memtable; this.memtablesPendingFlush = pendingFlush; this.sstables = sstables; this.compacting = compacting; - this.intervalTree = intervalTree; - } - - private IntervalTree buildIntervalTree(List<SSTableReader> sstables) - { - List<SSTableReader> itsstList = ImmutableList.copyOf(Ordering.from(SSTable.sstableComparator).sortedCopy(sstables)); - List<Interval> intervals = new ArrayList<Interval>(itsstList.size()); - for (SSTableReader sstable : itsstList) - intervals.add(new Interval<SSTableReader>(sstable.first, sstable.last, sstable)); - assert intervals.size() == sstables.size(); - return new IntervalTree<SSTableReader>(intervals); } public View switchMemtable(Memtable newMemtable) { Set<Memtable> newPending = ImmutableSet.<Memtable>builder().addAll(memtablesPendingFlush).add(memtable).build(); - return new View(newMemtable, newPending, sstables, compacting, intervalTree); + return new View(newMemtable, newPending, sstables, compacting); } public View renewMemtable(Memtable newMemtable) { - return new View(newMemtable, memtablesPendingFlush, sstables, compacting, intervalTree); + return new View(newMemtable, memtablesPendingFlush, sstables, compacting); } public View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable) { Set<Memtable> newPending = ImmutableSet.copyOf(Sets.difference(memtablesPendingFlush, Collections.singleton(flushedMemtable))); List<SSTableReader> newSSTables = newSSTables(newSSTable); - IntervalTree intervalTree = buildIntervalTree(newSSTables); - return new View(memtable, newPending, Collections.unmodifiableList(newSSTables), compacting, intervalTree); + return new View(memtable, newPending, Collections.unmodifiableList(newSSTables), compacting); } public View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements) { List<SSTableReader> newSSTables = newSSTables(oldSSTables, replacements); - IntervalTree intervalTree = buildIntervalTree(newSSTables); - return new View(memtable, memtablesPendingFlush, Collections.unmodifiableList(newSSTables), compacting, intervalTree); + return new View(memtable, memtablesPendingFlush, Collections.unmodifiableList(newSSTables), compacting); } public View markCompacting(Collection<SSTableReader> tomark) { Set<SSTableReader> compactingNew = ImmutableSet.<SSTableReader>builder().addAll(compacting).addAll(tomark).build(); - return new View(memtable, memtablesPendingFlush, sstables, compactingNew, intervalTree); + return new View(memtable, memtablesPendingFlush, sstables, compactingNew); } public View unmarkCompacting(Collection<SSTableReader> tounmark) { Set<SSTableReader> compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark))); - return new View(memtable, memtablesPendingFlush, sstables, compactingNew, intervalTree); + return new View(memtable, memtablesPendingFlush, sstables, compactingNew); } private List<SSTableReader> newSSTables(SSTableReader newSSTable) @@ -589,6 +534,7 @@ public class DataTracker } Iterables.addAll(newSSTables, replacements); assert newSSTables.size() == newSSTablesSize; + Collections.sort(newSSTables, SSTable.maxTimestampComparator); return newSSTables; } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1162220&r1=1162219&r2=1162220&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java Fri Aug 26 19:39:04 2011 @@ -89,6 +89,7 @@ public class RowIteratorFactory iterators.add(new ConvertToColumnIterator(filter, comparator, p, memtable.getEntryIterator(startWith))); } + // sstables for (SSTableReader sstable : sstables) { final SSTableScanner scanner = sstable.getScanner(RANGE_FILE_BUFFER_SIZE, filter); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java?rev=1162220&r1=1162219&r2=1162220&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java Fri Aug 26 19:39:04 2011 @@ -50,18 +50,11 @@ public abstract class AbstractCompaction protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options) { - assert cfs != null; this.cfs = cfs; this.options = options; } /** - * Releases any resources if this strategy is shutdown (when the CFS is reloaded after a schema change). - * Default is to do nothing. - */ - public void shutdown() { } - - /** * @return a list of compaction tasks that should run in the background to get the sstable * count down to desired parameters. Will not be null, but may be empty. * @param gcBefore throw away tombstones older than this Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1162220&r1=1162219&r2=1162220&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Fri Aug 26 19:39:04 2011 @@ -19,8 +19,14 @@ package org.apache.cassandra.db.compaction; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import com.google.common.base.Predicates; import com.google.common.collect.Iterators; @@ -43,7 +49,6 @@ public class CompactionTask extends Abst protected String compactionFileLocation; protected final int gcBefore; protected boolean isUserDefined; - protected static long totalBytesCompacted = 0; public CompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, final int gcBefore) { @@ -53,11 +58,6 @@ public class CompactionTask extends Abst this.isUserDefined = false; } - public static synchronized long addToTotalBytesCompacted(long bytesCompacted) - { - return totalBytesCompacted += bytesCompacted; - } - /** * For internal use and testing only. The rest of the system should go through the submit* methods, * which are properly serialized. @@ -72,7 +72,7 @@ public class CompactionTask extends Abst Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables); if (!isUserDefined) { - if ( !allowSingletonCompaction() && toCompact.size() < 2) + if (toCompact.size() < 2) { logger.info("Nothing to compact in " + cfs.getColumnFamilyName() + "." + "Use forceUserDefinedCompaction if you wish to force compaction of single sstables " + @@ -128,18 +128,13 @@ public class CompactionTask extends Abst if (logger.isDebugEnabled()) logger.debug("Expected bloom filter size : " + expectedBloomFilterSize); + SSTableWriter writer = null; + final SSTableReader ssTable; CompactionIterable ci = new CompactionIterable(type, toCompact, controller); // retain a handle so we can call close() CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull()); Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>(); - // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to - // replace the old entries. Track entries to preheat here until then. - Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap = new HashMap<SSTableReader, Map<DecoratedKey, Long>>(); - - Collection<SSTableReader> sstables = new ArrayList<SSTableReader>(); - Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>(); - if (collector != null) collector.beginCompaction(ci); try @@ -153,14 +148,13 @@ public class CompactionTask extends Abst return 0; } - SSTableWriter writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, toCompact); - writers.add(writer); + writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, toCompact); while (nni.hasNext()) { AbstractCompactedRow row = nni.next(); if (row.isEmpty()) continue; - + long position = writer.append(row); totalkeysWritten++; @@ -175,70 +169,32 @@ public class CompactionTask extends Abst } } } - if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer, position)) - { - SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact)); - cachedKeyMap.put(toIndex, cachedKeys); - sstables.add(toIndex); - writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, toCompact); - writers.add(writer); - cachedKeys = new HashMap<DecoratedKey, Long>(); - } } + ssTable = writer.closeAndOpenReader(getMaxDataAge(toCompact)); } finally { iter.close(); if (collector != null) collector.finishCompaction(ci); - for (SSTableWriter writer : writers) + if (writer != null) writer.cleanupIfNecessary(); } - cfs.replaceCompactedSSTables(toCompact, sstables); - // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up - for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet()) - { - SSTableReader key = ssTableReaderMapEntry.getKey(); - for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet()) - key.cacheKey(entry.getKey(), entry.getValue()); - } - + cfs.replaceCompactedSSTables(toCompact, Arrays.asList(ssTable)); + for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off + ssTable.cacheKey(entry.getKey(), entry.getValue()); CompactionManager.instance.submitBackground(cfs); long dTime = System.currentTimeMillis() - startTime; long startsize = SSTable.getTotalBytes(toCompact); - long endsize = SSTable.getTotalBytes(sstables); + long endsize = ssTable.length(); double ratio = (double)endsize / (double)startsize; - - StringBuilder builder = new StringBuilder(); - builder.append("["); - for (SSTableReader reader : sstables) - builder.append(reader.getFilename()).append(","); - builder.append("]"); - - double mbps = dTime > 0 ? (double)endsize/(1024*1024)/((double)dTime/1000) : 0; - logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys at %fMBPS. Time: %,dms.", - builder.toString(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, mbps, dTime)); - logger.info(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.", + ssTable.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime)); return toCompact.size(); } - //extensibility point for other strategies that may want to limit the upper bounds of the sstable segment size - protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, long position) - { - return false; - } - - /** - * extend this if the overridden compaction strategy requires single files to be compacted to function properly - * @return boolean - */ - protected boolean allowSingletonCompaction() - { - return false; - } - public static long getMaxDataAge(Collection<SSTableReader> sstables) { long max = 0; Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1162220&r1=1162219&r2=1162220&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Fri Aug 26 19:39:04 2011 @@ -24,8 +24,6 @@ import java.io.FileFilter; import java.io.IOException; import java.util.*; -import com.google.common.collect.Ordering; -import org.apache.cassandra.db.DecoratedKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,9 +74,6 @@ public abstract class SSTable public final IPartitioner partitioner; public final boolean compression; - public DecoratedKey first; - public DecoratedKey last; - protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner) { this(descriptor, new HashSet<Component>(), metadata, partitioner); @@ -103,16 +98,6 @@ public abstract class SSTable this.partitioner = partitioner; } - public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>() - { - public int compare(SSTableReader o1, SSTableReader o2) - { - return o1.first.compareTo(o2.first); - } - }; - - public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator); - /** * We use a ReferenceQueue to manage deleting files that have been compacted * and for which no more SSTable references exist. But this is not guaranteed @@ -169,8 +154,7 @@ public abstract class SSTable } catch (Exception e) { - if (!"snapshots".equals(name) && !"backups".equals(name) - && !name.contains(".json")) + if (!"snapshots".equals(name) && !"backups".equals(name)) logger.warn("Invalid file '{}' in data directory {}.", name, dir); return null; } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1162220&r1=1162219&r2=1162220&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Aug 26 19:39:04 2011 @@ -261,7 +261,6 @@ public class SSTableReader extends SSTab // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. RandomAccessReader input = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true); - DecoratedKey left = null, right = null; try { if (keyCache != null && keyCache.getCapacity() - keyCache.size() < keysToLoadInCache.size()) @@ -279,19 +278,10 @@ public class SSTableReader extends SSTab if (indexPosition == indexSize) break; - ByteBuffer key = null, skippedKey; - skippedKey = ByteBufferUtil.readWithShortLength(input); - boolean shouldAddEntry = indexSummary.shouldAddEntry(); - if (shouldAddEntry || cacheLoading || recreatebloom) - { - key = skippedKey; - } - - if(null == left) - left = decodeKey(partitioner, descriptor, skippedKey); - right = decodeKey(partitioner, descriptor, skippedKey); - + ByteBuffer key = (shouldAddEntry || cacheLoading || recreatebloom) + ? ByteBufferUtil.readWithShortLength(input) + : ByteBufferUtil.skipShortLength(input); long dataPosition = input.readLong(); if (key != null) { @@ -314,8 +304,6 @@ public class SSTableReader extends SSTab { FileUtils.closeQuietly(input); } - this.first = left; - this.last = right; // finalize the state of the reader ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1162220&r1=1162219&r2=1162220&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Fri Aug 26 19:39:04 2011 @@ -136,9 +136,6 @@ public class SSTableWriter extends SSTab private void afterAppend(DecoratedKey decoratedKey, long dataPosition) throws IOException { lastWrittenKey = decoratedKey; - this.last = lastWrittenKey; - if(null == this.first) - this.first = lastWrittenKey; if (logger.isTraceEnabled()) logger.trace("wrote " + decoratedKey + " at " + dataPosition); @@ -251,8 +248,6 @@ public class SSTableWriter extends SSTab iwriter.bf, maxDataAge, sstableMetadata); - sstable.first = this.first; - sstable.last = this.last; iwriter = null; dbuilder = null; return sstable;