Author: jbellis Date: Wed Sep 22 03:42:00 2010 New Revision: 999743 URL: http://svn.apache.org/viewvc?rev=999743&view=rev Log: remove IKeyIterator and move ICompactionInfo implementation into Table.IndexBuilder patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415
Removed: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IKeyIterator.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java cassandra/trunk/src/java/org/apache/cassandra/db/Table.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=999743&r1=999742&r2=999743&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Sep 22 03:42:00 2010 @@ -221,7 +221,8 @@ public class ColumnFamilyStore implement public void buildSecondaryIndexes(Collection<SSTableReader> sstables, SortedSet<byte[]> columns) { logger.debug("Submitting index build to compactionmanager"); - Future future = CompactionManager.instance.submitIndexBuild(this, columns, new ReducingKeyIterator(sstables)); + Table.IndexBuilder builder = Table.open(table).createIndexBuilder(this, columns, new ReducingKeyIterator(sstables)); + Future future = CompactionManager.instance.submitIndexBuild(this, builder); try { future.get(); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=999743&r1=999742&r2=999743&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed Sep 22 03:42:00 2010 @@ -495,14 +495,14 @@ public class CompactionManager implement return tablePairs; } - public Future submitIndexBuild(final ColumnFamilyStore cfs, final SortedSet<byte[]> columns, final IKeyIterator iter) + public Future submitIndexBuild(final ColumnFamilyStore cfs, final Table.IndexBuilder builder) { Runnable runnable = new Runnable() { public void run() { - executor.beginCompaction(cfs, iter); - Table.open(cfs.table).rebuildIndex(cfs, columns, iter); + executor.beginCompaction(cfs, builder); + builder.build(); } }; return executor.submit(runnable); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=999743&r1=999742&r2=999743&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 22 03:42:00 2010 @@ -33,7 +33,8 @@ import org.apache.cassandra.config.*; import org.apache.cassandra.db.clock.AbstractReconciler; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.dht.LocalToken; -import org.apache.cassandra.io.sstable.IKeyIterator; +import org.apache.cassandra.io.ICompactionInfo; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.SSTableDeletingReference; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; @@ -443,39 +444,73 @@ public class Table } } - public void rebuildIndex(ColumnFamilyStore cfs, SortedSet<byte[]> columns, IKeyIterator iter) + public IndexBuilder createIndexBuilder(ColumnFamilyStore cfs, SortedSet<byte[]> columns, ReducingKeyIterator iter) { - while (iter.hasNext()) + return new IndexBuilder(cfs, columns, iter); + } + + public class IndexBuilder implements ICompactionInfo + { + private final ColumnFamilyStore cfs; + private final SortedSet<byte[]> columns; + private final ReducingKeyIterator iter; + + public IndexBuilder(ColumnFamilyStore cfs, SortedSet<byte[]> columns, ReducingKeyIterator iter) { - DecoratedKey key = iter.next(); - logger.debug("Indexing row {} ", key); - HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new HashMap<ColumnFamilyStore, Memtable>(2); - flusherLock.readLock().lock(); - try + this.cfs = cfs; + this.columns = columns; + this.iter = iter; + } + + public void build() + { + while (iter.hasNext()) { - synchronized (indexLockFor(key.key)) + DecoratedKey key = iter.next(); + logger.debug("Indexing row {} ", key); + HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new HashMap<ColumnFamilyStore, Memtable>(2); + flusherLock.readLock().lock(); + try { - ColumnFamily cf = readCurrentIndexedColumns(key, cfs, columns); - if (cf != null) - applyIndexUpdates(key.key, memtablesToFlush, cf, cfs, cf.getColumnNames(), null); + synchronized (indexLockFor(key.key)) + { + ColumnFamily cf = readCurrentIndexedColumns(key, cfs, columns); + if (cf != null) + applyIndexUpdates(key.key, memtablesToFlush, cf, cfs, cf.getColumnNames(), null); + } } + finally + { + flusherLock.readLock().unlock(); + } + + for (Map.Entry<ColumnFamilyStore, Memtable> entry : memtablesToFlush.entrySet()) + entry.getKey().maybeSwitchMemtable(entry.getValue(), false, null); + } + + try + { + iter.close(); } - finally + catch (IOException e) { - flusherLock.readLock().unlock(); + throw new RuntimeException(e); } + } - for (Map.Entry<ColumnFamilyStore, Memtable> entry : memtablesToFlush.entrySet()) - entry.getKey().maybeSwitchMemtable(entry.getValue(), false, null); + public long getTotalBytes() + { + return iter.getTotalBytes(); } - try + public long getBytesRead() { - iter.close(); + return iter.getBytesRead(); } - catch (IOException e) + + public String getTaskType() { - throw new RuntimeException(e); + return "Secondary index build"; } } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java?rev=999743&r1=999742&r2=999743&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java Wed Sep 22 03:42:00 2010 @@ -1,7 +1,9 @@ package org.apache.cassandra.io.sstable; +import java.io.Closeable; import java.io.IOException; import java.util.Collection; +import java.util.Iterator; import org.apache.commons.collections.iterators.CollatingIterator; @@ -9,7 +11,7 @@ import org.apache.cassandra.db.Decorated import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ReducingIterator; -public class ReducingKeyIterator implements IKeyIterator +public class ReducingKeyIterator implements Iterator<DecoratedKey>, Closeable { private final CollatingIterator ci; private final ReducingIterator<DecoratedKey, DecoratedKey> iter;