Author: jbellis
Date: Wed Sep 22 03:42:00 2010
New Revision: 999743

URL: http://svn.apache.org/viewvc?rev=999743&view=rev
Log:
remove IKeyIterator and move ICompactionInfo implementation into 
Table.IndexBuilder
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415


Removed:
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IKeyIterator.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=999743&r1=999742&r2=999743&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed 
Sep 22 03:42:00 2010
@@ -221,7 +221,8 @@ public class ColumnFamilyStore implement
     public void buildSecondaryIndexes(Collection<SSTableReader> sstables, 
SortedSet<byte[]> columns)
     {
         logger.debug("Submitting index build to compactionmanager");
-        Future future = CompactionManager.instance.submitIndexBuild(this, 
columns, new ReducingKeyIterator(sstables));
+        Table.IndexBuilder builder = 
Table.open(table).createIndexBuilder(this, columns, new 
ReducingKeyIterator(sstables));
+        Future future = CompactionManager.instance.submitIndexBuild(this, 
builder);
         try
         {
             future.get();

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=999743&r1=999742&r2=999743&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed 
Sep 22 03:42:00 2010
@@ -495,14 +495,14 @@ public class CompactionManager implement
         return tablePairs;
     }
 
-    public Future submitIndexBuild(final ColumnFamilyStore cfs, final 
SortedSet<byte[]> columns, final IKeyIterator iter)
+    public Future submitIndexBuild(final ColumnFamilyStore cfs, final 
Table.IndexBuilder builder)
     {
         Runnable runnable = new Runnable()
         {
             public void run()
             {
-                executor.beginCompaction(cfs, iter);
-                Table.open(cfs.table).rebuildIndex(cfs, columns, iter);
+                executor.beginCompaction(cfs, builder);
+                builder.build();
             }
         };
         return executor.submit(runnable);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=999743&r1=999742&r2=999743&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 22 
03:42:00 2010
@@ -33,7 +33,8 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.clock.AbstractReconciler;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.dht.LocalToken;
-import org.apache.cassandra.io.sstable.IKeyIterator;
+import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableDeletingReference;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
@@ -443,39 +444,73 @@ public class Table
         }
     }
 
-    public void rebuildIndex(ColumnFamilyStore cfs, SortedSet<byte[]> columns, 
IKeyIterator iter)
+    public IndexBuilder createIndexBuilder(ColumnFamilyStore cfs, 
SortedSet<byte[]> columns, ReducingKeyIterator iter)
     {
-        while (iter.hasNext())
+        return new IndexBuilder(cfs, columns, iter);
+    }
+
+    public class IndexBuilder implements ICompactionInfo
+    {
+        private final ColumnFamilyStore cfs;
+        private final SortedSet<byte[]> columns;
+        private final ReducingKeyIterator iter;
+
+        public IndexBuilder(ColumnFamilyStore cfs, SortedSet<byte[]> columns, 
ReducingKeyIterator iter)
         {
-            DecoratedKey key = iter.next();
-            logger.debug("Indexing row {} ", key);
-            HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new 
HashMap<ColumnFamilyStore, Memtable>(2);
-            flusherLock.readLock().lock();
-            try
+            this.cfs = cfs;
+            this.columns = columns;
+            this.iter = iter;
+        }
+
+        public void build()
+        {
+            while (iter.hasNext())
             {
-                synchronized (indexLockFor(key.key))
+                DecoratedKey key = iter.next();
+                logger.debug("Indexing row {} ", key);
+                HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new 
HashMap<ColumnFamilyStore, Memtable>(2);
+                flusherLock.readLock().lock();
+                try
                 {
-                    ColumnFamily cf = readCurrentIndexedColumns(key, cfs, 
columns);
-                    if (cf != null)
-                        applyIndexUpdates(key.key, memtablesToFlush, cf, cfs, 
cf.getColumnNames(), null);
+                    synchronized (indexLockFor(key.key))
+                    {
+                        ColumnFamily cf = readCurrentIndexedColumns(key, cfs, 
columns);
+                        if (cf != null)
+                            applyIndexUpdates(key.key, memtablesToFlush, cf, 
cfs, cf.getColumnNames(), null);
+                    }
                 }
+                finally
+                {
+                    flusherLock.readLock().unlock();
+                }
+
+                for (Map.Entry<ColumnFamilyStore, Memtable> entry : 
memtablesToFlush.entrySet())
+                    entry.getKey().maybeSwitchMemtable(entry.getValue(), 
false, null);
+            }
+
+            try
+            {
+                iter.close();
             }
-            finally
+            catch (IOException e)
             {
-                flusherLock.readLock().unlock();
+                throw new RuntimeException(e);
             }
+        }
 
-            for (Map.Entry<ColumnFamilyStore, Memtable> entry : 
memtablesToFlush.entrySet())
-                entry.getKey().maybeSwitchMemtable(entry.getValue(), false, 
null);
+        public long getTotalBytes()
+        {
+            return iter.getTotalBytes();
         }
 
-        try
+        public long getBytesRead()
         {
-            iter.close();
+            return iter.getBytesRead();
         }
-        catch (IOException e)
+
+        public String getTaskType()
         {
-            throw new RuntimeException(e);
+            return "Secondary index build";
         }
     }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java?rev=999743&r1=999742&r2=999743&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
 Wed Sep 22 03:42:00 2010
@@ -1,7 +1,9 @@
 package org.apache.cassandra.io.sstable;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Iterator;
 
 import org.apache.commons.collections.iterators.CollatingIterator;
 
@@ -9,7 +11,7 @@ import org.apache.cassandra.db.Decorated
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ReducingIterator;
 
-public class ReducingKeyIterator implements IKeyIterator
+public class ReducingKeyIterator implements Iterator<DecoratedKey>, Closeable
 {
     private final CollatingIterator ci;
     private final ReducingIterator<DecoratedKey, DecoratedKey> iter;


Reply via email to