Author: jbellis Date: Wed Sep 21 13:02:16 2011 New Revision: 1173611 URL: http://svn.apache.org/viewvc?rev=1173611&view=rev Log: Revert "parallelize sstable open at server startup"
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1173611&r1=1173610&r2=1173611&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Wed Sep 21 13:02:16 2011 @@ -16,7 +16,6 @@ schema definitions were found (CASSANDRA-3219) * Fixes for LeveledCompactionStrategy score computation, prioritization, and scheduling (CASSANDRA-3224) - * parallelize sstable open at server startup (CASSANDRA-2988) 1.0.0-beta1 Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1173611&r1=1173610&r2=1173611&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Wed Sep 21 13:02:16 2011 @@ -98,11 +98,6 @@ public class DebuggableThreadPoolExecuto this.setRejectedExecutionHandler(blockingExecutionHandler); } - public static DebuggableThreadPoolExecutor createWithPoolSize(String threadPoolName, int size) - { - return new DebuggableThreadPoolExecutor(size, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName)); - } - protected void onInitialRejection(Runnable task) {} protected void onFinalAccept(Runnable task) {} protected void onFinalRejection(Runnable task) {} Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1173611&r1=1173610&r2=1173611&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Sep 21 13:02:16 2011 @@ -212,8 +212,15 @@ public class ColumnFamilyStore implement // scan for sstables corresponding to this cf and load them data = new DataTracker(this); Set<DecoratedKey> savedKeys = keyCache.readSaved(); - Set<Map.Entry<Descriptor, Set<Component>>> entries = files(table.name, columnFamilyName, false, false).entrySet(); - data.addSSTables(SSTableReader.batchOpen(entries, savedKeys, data, metadata, this.partitioner)); + List<SSTableReader> sstables = new ArrayList<SSTableReader>(); + for (Map.Entry<Descriptor,Set<Component>> sstableFiles : files(table.name, columnFamilyName, false, false).entrySet()) + { + SSTableReader reader = openSSTableReader(sstableFiles, savedKeys, data, metadata, partitioner); + + if (reader != null) // if == null, logger errors where already fired + sstables.add(reader); + } + data.addSSTables(sstables); // compaction strategy should be created after the CFS has been prepared this.compactionStrategy = metadata.createCompactionStrategyInstance(this); @@ -534,15 +541,10 @@ public class ColumnFamilyStore implement descriptor)); logger.info("Initializing new SSTable {}", rawSSTable); - try - { - reader = SSTableReader.open(rawSSTable.getKey(), rawSSTable.getValue(), savedKeys, data, metadata, partitioner); - } - catch (IOException e) - { - SSTableReader.logOpenException(rawSSTable.getKey(), e); - continue; - } + reader = openSSTableReader(rawSSTable, savedKeys, data, metadata, partitioner); + + if (reader == null) + continue; // something wrong with SSTable, skipping sstables.add(reader); @@ -1890,6 +1892,30 @@ public class ColumnFamilyStore implement return indexManager.getBuiltIndexes(); } + private static SSTableReader openSSTableReader(Map.Entry<Descriptor, Set<Component>> rawSSTable, + Set<DecoratedKey> savedKeys, + DataTracker tracker, + CFMetaData metadata, + IPartitioner partitioner) + { + SSTableReader reader = null; + + try + { + reader = SSTableReader.open(rawSSTable.getKey(), rawSSTable.getValue(), savedKeys, tracker, metadata, partitioner); + } + catch (FileNotFoundException ex) + { + logger.error("Missing sstable component in " + rawSSTable + "; skipped because of " + ex.getMessage()); + } + catch (IOException ex) + { + logger.error("Corrupt sstable " + rawSSTable + "; skipped", ex); + } + + return reader; + } + public int getUnleveledSSTables() { return this.compactionStrategy instanceof LeveledCompactionStrategy Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1173611&r1=1173610&r2=1173611&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Wed Sep 21 13:02:16 2011 @@ -24,12 +24,9 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.*; import com.google.common.base.Function; import com.google.common.collect.Collections2; - -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.io.compress.CompressedRandomAccessReader; import org.slf4j.Logger; @@ -167,59 +164,6 @@ public class SSTableReader extends SSTab return sstable; } - public static void logOpenException(Descriptor descriptor, IOException e) - { - if (e instanceof FileNotFoundException) - logger.error("Missing sstable component in " + descriptor + "; skipped because of " + e.getMessage()); - else - logger.error("Corrupt sstable " + descriptor + "; skipped", e); - } - - public static Collection<SSTableReader> batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries, - final Set<DecoratedKey> savedKeys, - final DataTracker tracker, - final CFMetaData metadata, - final IPartitioner partitioner) - { - final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>(); - - ExecutorService executor = DebuggableThreadPoolExecutor.createWithPoolSize("SSTableBatchOpen", Runtime.getRuntime().availableProcessors()); - for (final Map.Entry<Descriptor, Set<Component>> entry : entries) - { - Runnable runnable = new Runnable() - { - public void run() - { - SSTableReader sstable; - try - { - sstable = open(entry.getKey(), entry.getValue(), savedKeys, tracker, metadata, partitioner); - } - catch (IOException ex) - { - logger.error("Corrupt sstable " + entry + "; skipped", ex); - return; - } - sstables.add(sstable); - } - }; - executor.submit(runnable); - } - - executor.shutdown(); - try - { - executor.awaitTermination(7, TimeUnit.DAYS); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - - return sstables; - - } - /** * Open a RowIndexedReader which already has its state initialized (by SSTableWriter). */