Author: jbellis Date: Sun Aug 7 01:51:44 2011 New Revision: 1154635 URL: http://svn.apache.org/viewvc?rev=1154635&view=rev Log: refactor CompactionIterator -> CompactionIterable patch by jbellis; reviewed by slebresne for CASSANDRA-2901
Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java - copied, changed from r1154426, cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java Removed: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Copied: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java (from r1154426, cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java) URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java?p2=cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java&p1=cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java&r1=1154426&r2=1154635&rev=1154635&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java Sun Aug 7 01:51:44 2011 @@ -38,15 +38,16 @@ import org.apache.cassandra.service.Stor import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.MergeIterator; -public class CompactionIterator -implements CloseableIterator<AbstractCompactedRow>, CompactionInfo.Holder +public class CompactionIterable +implements Iterable<AbstractCompactedRow>, CompactionInfo.Holder { - private static Logger logger = LoggerFactory.getLogger(CompactionIterator.class); + private static Logger logger = LoggerFactory.getLogger(CompactionIterable.class); public static final int FILE_BUFFER_SIZE = 1024 * 1024; - private final MergeIterator<IColumnIterator, AbstractCompactedRow> source; + private MergeIterator<IColumnIterator, AbstractCompactedRow> source; protected final CompactionType type; + private final List<SSTableScanner> scanners; protected final CompactionController controller; private long totalBytes; @@ -61,16 +62,16 @@ implements CloseableIterator<AbstractCom // current target bytes to compact per millisecond private int targetBytesPerMS = -1; - public CompactionIterator(CompactionType type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException + public CompactionIterable(CompactionType type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException { this(type, getScanners(sstables), controller); } - protected CompactionIterator(CompactionType type, List<SSTableScanner> scanners, CompactionController controller) + protected CompactionIterable(CompactionType type, List<SSTableScanner> scanners, CompactionController controller) { this.type = type; + this.scanners = scanners; this.controller = controller; - this.source = MergeIterator.get(scanners, ICOMP, new Reducer()); row = 0; totalBytes = bytesRead = 0; for (SSTableScanner scanner : scanners) @@ -94,20 +95,9 @@ implements CloseableIterator<AbstractCom totalBytes); } - - public boolean hasNext() - { - return source.hasNext(); - } - - public AbstractCompactedRow next() - { - return source.next(); - } - - public void remove() + public CloseableIterator<AbstractCompactedRow> iterator() { - throw new UnsupportedOperationException(); + return MergeIterator.get(scanners, ICOMP, new Reducer()); } private void throttle() @@ -151,16 +141,6 @@ implements CloseableIterator<AbstractCom timeAtLastDelay = System.currentTimeMillis(); } - public void close() throws IOException - { - source.close(); - } - - protected Iterable<SSTableScanner> getScanners() - { - return (Iterable<SSTableScanner>)(source.iterators()); - } - public String toString() { return this.getCompactionInfo().toString(); @@ -201,7 +181,7 @@ implements CloseableIterator<AbstractCom if ((row++ % 1000) == 0) { bytesRead = 0; - for (SSTableScanner scanner : getScanners()) + for (SSTableScanner scanner : scanners) { bytesRead += scanner.getFilePointer(); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1154635&r1=1154634&r2=1154635&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Sun Aug 7 01:51:44 2011 @@ -687,7 +687,7 @@ public class CompactionManager implement if (compactionFileLocation == null) throw new IOException("disk full"); - SSTableScanner scanner = sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE); + SSTableScanner scanner = sstable.getDirectScanner(CompactionIterable.FILE_BUFFER_SIZE); SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns(); CleanupInfo ci = new CleanupInfo(sstable, scanner); executor.beginCompaction(ci); @@ -795,11 +795,12 @@ public class CompactionManager implement } Collection<SSTableReader> sstables = cfs.markCurrentSSTablesReferenced(); - CompactionIterator ci = new ValidationCompactionIterator(cfs, sstables, validator.request.range); + CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.request.range); + CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); validationExecutor.beginCompaction(ci); try { - Iterator<AbstractCompactedRow> nni = Iterators.filter(ci, Predicates.notNull()); + Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull()); // validate the CF as we iterate over it validator.prepare(cfs); @@ -813,7 +814,7 @@ public class CompactionManager implement finally { SSTableReader.releaseReferences(sstables); - ci.close(); + iter.close(); validationExecutor.finishCompaction(ci); } } @@ -922,9 +923,9 @@ public class CompactionManager implement : (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds(); } - private static class ValidationCompactionIterator extends CompactionIterator + private static class ValidationCompactionIterable extends CompactionIterable { - public ValidationCompactionIterator(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range range) throws IOException + public ValidationCompactionIterable(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range range) throws IOException { super(CompactionType.VALIDATION, getScanners(sstables, range), Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1154635&r1=1154634&r2=1154635&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Sun Aug 7 01:51:44 2011 @@ -41,6 +41,7 @@ import org.apache.cassandra.db.compactio import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.utils.CloseableIterator; public class CompactionTask extends AbstractCompactionTask { @@ -129,8 +130,9 @@ public class CompactionTask extends Abst SSTableWriter writer = null; final SSTableReader ssTable; - CompactionIterator ci = new CompactionIterator(type, toCompact, controller); // retain a handle so we can call close() - Iterator<AbstractCompactedRow> nni = Iterators.filter(ci, Predicates.notNull()); + CompactionIterable ci = new CompactionIterable(type, toCompact, controller); // retain a handle so we can call close() + CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); + Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull()); Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>(); if (collector != null) @@ -169,7 +171,7 @@ public class CompactionTask extends Abst } finally { - ci.close(); + iter.close(); if (collector != null) collector.finishCompaction(ci); if (writer != null) Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1154635&r1=1154634&r2=1154635&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Sun Aug 7 01:51:44 2011 @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; @@ -58,8 +59,8 @@ public class LazilyCompactedRowTest exte private void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws IOException { Collection<SSTableReader> sstables = cfs.getSSTables(); - CompactionIterator ci1 = new CompactionIterator(CompactionType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false)); - CompactionIterator ci2 = new CompactionIterator(CompactionType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false)); + Iterator<AbstractCompactedRow> ci1 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false)).iterator(); + Iterator<AbstractCompactedRow> ci2 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false)).iterator(); while (true) { @@ -133,8 +134,8 @@ public class LazilyCompactedRowTest exte private void assertDigest(ColumnFamilyStore cfs, int gcBefore) throws IOException, NoSuchAlgorithmException { Collection<SSTableReader> sstables = cfs.getSSTables(); - CompactionIterator ci1 = new CompactionIterator(CompactionType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false)); - CompactionIterator ci2 = new CompactionIterator(CompactionType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false)); + Iterator<AbstractCompactedRow> ci1 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false)).iterator(); + Iterator<AbstractCompactedRow> ci2 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false)).iterator(); while (true) {