Author: jbellis
Date: Wed Sep 21 13:02:16 2011
New Revision: 1173611

URL: http://svn.apache.org/viewvc?rev=1173611&view=rev
Log:
Revert "parallelize sstable open at server startup"

Modified:
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1173611&r1=1173610&r2=1173611&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Sep 21 13:02:16 2011
@@ -16,7 +16,6 @@
    schema definitions were found (CASSANDRA-3219)
  * Fixes for LeveledCompactionStrategy score computation, prioritization,
    and scheduling (CASSANDRA-3224)
- * parallelize sstable open at server startup (CASSANDRA-2988)
 
 
 1.0.0-beta1

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1173611&r1=1173610&r2=1173611&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 Wed Sep 21 13:02:16 2011
@@ -98,11 +98,6 @@ public class DebuggableThreadPoolExecuto
         this.setRejectedExecutionHandler(blockingExecutionHandler);
     }
 
-    public static DebuggableThreadPoolExecutor createWithPoolSize(String 
threadPoolName, int size)
-    {
-        return new DebuggableThreadPoolExecutor(size, Integer.MAX_VALUE, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory(threadPoolName));
-    }
-
     protected void onInitialRejection(Runnable task) {}
     protected void onFinalAccept(Runnable task) {}
     protected void onFinalRejection(Runnable task) {}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1173611&r1=1173610&r2=1173611&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed 
Sep 21 13:02:16 2011
@@ -212,8 +212,15 @@ public class ColumnFamilyStore implement
         // scan for sstables corresponding to this cf and load them
         data = new DataTracker(this);
         Set<DecoratedKey> savedKeys = keyCache.readSaved();
-        Set<Map.Entry<Descriptor, Set<Component>>> entries = files(table.name, 
columnFamilyName, false, false).entrySet();
-        data.addSSTables(SSTableReader.batchOpen(entries, savedKeys, data, 
metadata, this.partitioner));
+        List<SSTableReader> sstables = new ArrayList<SSTableReader>();
+        for (Map.Entry<Descriptor,Set<Component>> sstableFiles : 
files(table.name, columnFamilyName, false, false).entrySet())
+        {
+            SSTableReader reader = openSSTableReader(sstableFiles, savedKeys, 
data, metadata, partitioner);
+
+            if (reader != null) // if == null, logger errors where already 
fired
+                sstables.add(reader);
+        }
+        data.addSSTables(sstables);
 
         // compaction strategy should be created after the CFS has been 
prepared
         this.compactionStrategy = 
metadata.createCompactionStrategyInstance(this);
@@ -534,15 +541,10 @@ public class ColumnFamilyStore implement
                                                          descriptor));
 
             logger.info("Initializing new SSTable {}", rawSSTable);
-            try
-            {
-                reader = SSTableReader.open(rawSSTable.getKey(), 
rawSSTable.getValue(), savedKeys, data, metadata, partitioner);
-            }
-            catch (IOException e)
-            {
-                SSTableReader.logOpenException(rawSSTable.getKey(), e);
-                continue;
-            }
+            reader = openSSTableReader(rawSSTable, savedKeys, data, metadata, 
partitioner);
+
+            if (reader == null)
+                continue; // something wrong with SSTable, skipping
 
             sstables.add(reader);
 
@@ -1890,6 +1892,30 @@ public class ColumnFamilyStore implement
        return indexManager.getBuiltIndexes();
     }
 
+    private static SSTableReader openSSTableReader(Map.Entry<Descriptor, 
Set<Component>> rawSSTable,
+                                                   Set<DecoratedKey> savedKeys,
+                                                   DataTracker tracker,
+                                                   CFMetaData metadata,
+                                                   IPartitioner partitioner)
+    {
+        SSTableReader reader = null;
+
+        try
+        {
+            reader = SSTableReader.open(rawSSTable.getKey(), 
rawSSTable.getValue(), savedKeys, tracker, metadata, partitioner);
+        }
+        catch (FileNotFoundException ex)
+        {
+            logger.error("Missing sstable component in " + rawSSTable + "; 
skipped because of " + ex.getMessage());
+        }
+        catch (IOException ex)
+        {
+            logger.error("Corrupt sstable " + rawSSTable + "; skipped", ex);
+        }
+
+        return reader;
+    }
+
     public int getUnleveledSSTables()
     {
         return this.compactionStrategy instanceof LeveledCompactionStrategy

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1173611&r1=1173610&r2=1173611&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
Wed Sep 21 13:02:16 2011
@@ -24,12 +24,9 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.*;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
-
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.slf4j.Logger;
@@ -167,59 +164,6 @@ public class SSTableReader extends SSTab
         return sstable;
     }
 
-    public static void logOpenException(Descriptor descriptor, IOException e)
-    {
-        if (e instanceof FileNotFoundException)
-            logger.error("Missing sstable component in " + descriptor + "; 
skipped because of " + e.getMessage());
-        else
-            logger.error("Corrupt sstable " + descriptor + "; skipped", e);
-    }
-
-    public static Collection<SSTableReader> 
batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries,
-                                                      final Set<DecoratedKey> 
savedKeys,
-                                                      final DataTracker 
tracker,
-                                                      final CFMetaData 
metadata,
-                                                      final IPartitioner 
partitioner)
-    {
-        final Collection<SSTableReader> sstables = new 
LinkedBlockingQueue<SSTableReader>();
-
-        ExecutorService executor = 
DebuggableThreadPoolExecutor.createWithPoolSize("SSTableBatchOpen", 
Runtime.getRuntime().availableProcessors());
-        for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
-        {
-            Runnable runnable = new Runnable()
-            {
-                public void run()
-                {
-                    SSTableReader sstable;
-                    try
-                    {
-                        sstable = open(entry.getKey(), entry.getValue(), 
savedKeys, tracker, metadata, partitioner);
-                    }
-                    catch (IOException ex)
-                    {
-                        logger.error("Corrupt sstable " + entry + "; skipped", 
ex);
-                        return;
-                    }
-                    sstables.add(sstable);
-                }
-            };
-            executor.submit(runnable);
-        }
-
-        executor.shutdown();
-        try
-        {
-            executor.awaitTermination(7, TimeUnit.DAYS);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-
-        return sstables;
-
-    }
-
     /**
      * Open a RowIndexedReader which already has its state initialized (by 
SSTableWriter).
      */


Reply via email to