Author: jbellis
Date: Wed Sep 21 14:13:04 2011
New Revision: 1173658

URL: http://svn.apache.org/viewvc?rev=1173658&view=rev
Log:
merge #2988 from 1.0

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 21 14:13:04 2011
@@ -4,8 +4,8 @@
 
/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1173134
-/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1173133
+/cassandra/branches/cassandra-1.0:1167085-1173657
+/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1173617
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1173658&r1=1173657&r2=1173658&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Sep 21 14:13:04 2011
@@ -16,6 +16,7 @@
    schema definitions were found (CASSANDRA-3219)
  * Fixes for LeveledCompactionStrategy score computation, prioritization,
    and scheduling (CASSANDRA-3224)
+ * parallelize sstable open at server startup (CASSANDRA-2988)
 
 
 1.0.0-beta1

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 21 14:13:04 2011
@@ -4,8 +4,8 @@
 
/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1171737,1172026,1172591
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1173134
-/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1173133
+/cassandra/branches/cassandra-1.0/contrib:1167085-1173657
+/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1173617
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 21 14:13:04 2011
@@ -4,8 +4,8 @@
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1171737,1172026,1172591
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1173134
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1173133
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1173657
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1173617
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 21 14:13:04 2011
@@ -4,8 +4,8 @@
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1171737,1172026,1172591
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1173134
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1173133
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1173657
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1173617
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 21 14:13:04 2011
@@ -4,8 +4,8 @@
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1171737,1172026,1172591
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1173134
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1173133
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1173657
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1173617
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 21 14:13:04 2011
@@ -4,8 +4,8 @@
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1171737,1172026,1172591
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1173134
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1173133
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1173657
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1173617
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 21 14:13:04 2011
@@ -4,8 +4,8 @@
 
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1171737,1172026,1172591
 
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1173134
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1173133
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1173657
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1173617
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1173658&r1=1173657&r2=1173658&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
 Wed Sep 21 14:13:04 2011
@@ -98,6 +98,11 @@ public class DebuggableThreadPoolExecuto
         this.setRejectedExecutionHandler(blockingExecutionHandler);
     }
 
+    public static DebuggableThreadPoolExecutor createWithPoolSize(String 
threadPoolName, int size)
+    {
+        return new DebuggableThreadPoolExecutor(size, Integer.MAX_VALUE, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory(threadPoolName));
+    }
+
     protected void onInitialRejection(Runnable task) {}
     protected void onFinalAccept(Runnable task) {}
     protected void onFinalRejection(Runnable task) {}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1173658&r1=1173657&r2=1173658&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed 
Sep 21 14:13:04 2011
@@ -212,15 +212,8 @@ public class ColumnFamilyStore implement
         // scan for sstables corresponding to this cf and load them
         data = new DataTracker(this);
         Set<DecoratedKey> savedKeys = keyCache.readSaved();
-        List<SSTableReader> sstables = new ArrayList<SSTableReader>();
-        for (Map.Entry<Descriptor,Set<Component>> sstableFiles : 
files(table.name, columnFamilyName, false, false).entrySet())
-        {
-            SSTableReader reader = openSSTableReader(sstableFiles, savedKeys, 
data, metadata, partitioner);
-
-            if (reader != null) // if == null, logger errors where already 
fired
-                sstables.add(reader);
-        }
-        data.addSSTables(sstables);
+        Set<Map.Entry<Descriptor, Set<Component>>> entries = files(table.name, 
columnFamilyName, false, false).entrySet();
+        data.addSSTables(SSTableReader.batchOpen(entries, savedKeys, data, 
metadata, this.partitioner));
 
         // compaction strategy should be created after the CFS has been 
prepared
         this.compactionStrategy = 
metadata.createCompactionStrategyInstance(this);
@@ -541,10 +534,15 @@ public class ColumnFamilyStore implement
                                                          descriptor));
 
             logger.info("Initializing new SSTable {}", rawSSTable);
-            reader = openSSTableReader(rawSSTable, savedKeys, data, metadata, 
partitioner);
-
-            if (reader == null)
-                continue; // something wrong with SSTable, skipping
+            try
+            {
+                reader = SSTableReader.open(rawSSTable.getKey(), 
rawSSTable.getValue(), savedKeys, data, metadata, partitioner);
+            }
+            catch (IOException e)
+            {
+                SSTableReader.logOpenException(rawSSTable.getKey(), e);
+                continue;
+            }
 
             sstables.add(reader);
 
@@ -1892,30 +1890,6 @@ public class ColumnFamilyStore implement
        return indexManager.getBuiltIndexes();
     }
 
-    private static SSTableReader openSSTableReader(Map.Entry<Descriptor, 
Set<Component>> rawSSTable,
-                                                   Set<DecoratedKey> savedKeys,
-                                                   DataTracker tracker,
-                                                   CFMetaData metadata,
-                                                   IPartitioner partitioner)
-    {
-        SSTableReader reader = null;
-
-        try
-        {
-            reader = SSTableReader.open(rawSSTable.getKey(), 
rawSSTable.getValue(), savedKeys, tracker, metadata, partitioner);
-        }
-        catch (FileNotFoundException ex)
-        {
-            logger.error("Missing sstable component in " + rawSSTable + "; 
skipped because of " + ex.getMessage());
-        }
-        catch (IOException ex)
-        {
-            logger.error("Corrupt sstable " + rawSSTable + "; skipped", ex);
-        }
-
-        return reader;
-    }
-
     public int getUnleveledSSTables()
     {
         return this.compactionStrategy instanceof LeveledCompactionStrategy

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1173658&r1=1173657&r2=1173658&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
Wed Sep 21 14:13:04 2011
@@ -24,9 +24,12 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.*;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.slf4j.Logger;
@@ -164,6 +167,59 @@ public class SSTableReader extends SSTab
         return sstable;
     }
 
+    public static void logOpenException(Descriptor descriptor, IOException e)
+    {
+        if (e instanceof FileNotFoundException)
+            logger.error("Missing sstable component in " + descriptor + "; 
skipped because of " + e.getMessage());
+        else
+            logger.error("Corrupt sstable " + descriptor + "; skipped", e);
+    }
+
+    public static Collection<SSTableReader> 
batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries,
+                                                      final Set<DecoratedKey> 
savedKeys,
+                                                      final DataTracker 
tracker,
+                                                      final CFMetaData 
metadata,
+                                                      final IPartitioner 
partitioner)
+    {
+        final Collection<SSTableReader> sstables = new 
LinkedBlockingQueue<SSTableReader>();
+
+        ExecutorService executor = 
DebuggableThreadPoolExecutor.createWithPoolSize("SSTableBatchOpen", 
Runtime.getRuntime().availableProcessors());
+        for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
+        {
+            Runnable runnable = new Runnable()
+            {
+                public void run()
+                {
+                    SSTableReader sstable;
+                    try
+                    {
+                        sstable = open(entry.getKey(), entry.getValue(), 
savedKeys, tracker, metadata, partitioner);
+                    }
+                    catch (IOException ex)
+                    {
+                        logger.error("Corrupt sstable " + entry + "; skipped", 
ex);
+                        return;
+                    }
+                    sstables.add(sstable);
+                }
+            };
+            executor.submit(runnable);
+        }
+
+        executor.shutdown();
+        try
+        {
+            executor.awaitTermination(7, TimeUnit.DAYS);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+
+        return sstables;
+
+    }
+
     /**
      * Open a RowIndexedReader which already has its state initialized (by 
SSTableWriter).
      */


Reply via email to