Author: jbellis
Date: Sun Aug  7 01:51:44 2011
New Revision: 1154635

URL: http://svn.apache.org/viewvc?rev=1154635&view=rev
Log:
refactor CompactionIterator -> CompactionIterable
patch by jbellis; reviewed by slebresne for CASSANDRA-2901

Added:
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
      - copied, changed from r1154426, 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
Removed:
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
Modified:
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
    
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java

Copied: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
 (from r1154426, 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java)
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java?p2=cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java&p1=cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java&r1=1154426&r2=1154635&rev=1154635&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
 Sun Aug  7 01:51:44 2011
@@ -38,15 +38,16 @@ import org.apache.cassandra.service.Stor
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.MergeIterator;
 
-public class CompactionIterator
-implements CloseableIterator<AbstractCompactedRow>, CompactionInfo.Holder
+public class CompactionIterable
+implements Iterable<AbstractCompactedRow>, CompactionInfo.Holder
 {
-    private static Logger logger = 
LoggerFactory.getLogger(CompactionIterator.class);
+    private static Logger logger = 
LoggerFactory.getLogger(CompactionIterable.class);
 
     public static final int FILE_BUFFER_SIZE = 1024 * 1024;
 
-    private final MergeIterator<IColumnIterator, AbstractCompactedRow> source;
+    private MergeIterator<IColumnIterator, AbstractCompactedRow> source;
     protected final CompactionType type;
+    private final List<SSTableScanner> scanners;
     protected final CompactionController controller;
 
     private long totalBytes;
@@ -61,16 +62,16 @@ implements CloseableIterator<AbstractCom
     // current target bytes to compact per millisecond
     private int targetBytesPerMS = -1;
 
-    public CompactionIterator(CompactionType type, Iterable<SSTableReader> 
sstables, CompactionController controller) throws IOException
+    public CompactionIterable(CompactionType type, Iterable<SSTableReader> 
sstables, CompactionController controller) throws IOException
     {
         this(type, getScanners(sstables), controller);
     }
 
-    protected CompactionIterator(CompactionType type, List<SSTableScanner> 
scanners, CompactionController controller)
+    protected CompactionIterable(CompactionType type, List<SSTableScanner> 
scanners, CompactionController controller)
     {
         this.type = type;
+        this.scanners = scanners;
         this.controller = controller;
-        this.source = MergeIterator.get(scanners, ICOMP, new Reducer());
         row = 0;
         totalBytes = bytesRead = 0;
         for (SSTableScanner scanner : scanners)
@@ -94,20 +95,9 @@ implements CloseableIterator<AbstractCom
                                   totalBytes);
     }
 
-
-    public boolean hasNext()
-    {
-        return source.hasNext();
-    }
-
-    public AbstractCompactedRow next()
-    {
-        return source.next();
-    }
-
-    public void remove()
+    public CloseableIterator<AbstractCompactedRow> iterator()
     {
-        throw new UnsupportedOperationException();
+        return MergeIterator.get(scanners, ICOMP, new Reducer());
     }
 
     private void throttle()
@@ -151,16 +141,6 @@ implements CloseableIterator<AbstractCom
         timeAtLastDelay = System.currentTimeMillis();
     }
 
-    public void close() throws IOException
-    {
-        source.close();
-    }
-
-    protected Iterable<SSTableScanner> getScanners()
-    {
-        return (Iterable<SSTableScanner>)(source.iterators());
-    }
-
     public String toString()
     {
         return this.getCompactionInfo().toString();
@@ -201,7 +181,7 @@ implements CloseableIterator<AbstractCom
                 if ((row++ % 1000) == 0)
                 {
                     bytesRead = 0;
-                    for (SSTableScanner scanner : getScanners())
+                    for (SSTableScanner scanner : scanners)
                     {
                         bytesRead += scanner.getFilePointer();
                     }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1154635&r1=1154634&r2=1154635&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 Sun Aug  7 01:51:44 2011
@@ -687,7 +687,7 @@ public class CompactionManager implement
             if (compactionFileLocation == null)
                 throw new IOException("disk full");
 
-            SSTableScanner scanner = 
sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
+            SSTableScanner scanner = 
sstable.getDirectScanner(CompactionIterable.FILE_BUFFER_SIZE);
             SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
             CleanupInfo ci = new CleanupInfo(sstable, scanner);
             executor.beginCompaction(ci);
@@ -795,11 +795,12 @@ public class CompactionManager implement
         }
 
         Collection<SSTableReader> sstables = 
cfs.markCurrentSSTablesReferenced();
-        CompactionIterator ci = new ValidationCompactionIterator(cfs, 
sstables, validator.request.range);
+        CompactionIterable ci = new ValidationCompactionIterable(cfs, 
sstables, validator.request.range);
+        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
         validationExecutor.beginCompaction(ci);
         try
         {
-            Iterator<AbstractCompactedRow> nni = Iterators.filter(ci, 
Predicates.notNull());
+            Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, 
Predicates.notNull());
 
             // validate the CF as we iterate over it
             validator.prepare(cfs);
@@ -813,7 +814,7 @@ public class CompactionManager implement
         finally
         {
             SSTableReader.releaseReferences(sstables);
-            ci.close();
+            iter.close();
             validationExecutor.finishCompaction(ci);
         }
     }
@@ -922,9 +923,9 @@ public class CompactionManager implement
                : (int) (System.currentTimeMillis() / 1000) - 
cfs.metadata.getGcGraceSeconds();
     }
 
-    private static class ValidationCompactionIterator extends 
CompactionIterator
+    private static class ValidationCompactionIterable extends 
CompactionIterable
     {
-        public ValidationCompactionIterator(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, Range range) throws IOException
+        public ValidationCompactionIterable(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, Range range) throws IOException
         {
             super(CompactionType.VALIDATION,
                   getScanners(sstables, range),

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1154635&r1=1154634&r2=1154635&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
Sun Aug  7 01:51:44 2011
@@ -41,6 +41,7 @@ import org.apache.cassandra.db.compactio
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.utils.CloseableIterator;
 
 public class CompactionTask extends AbstractCompactionTask
 {
@@ -129,8 +130,9 @@ public class CompactionTask extends Abst
 
         SSTableWriter writer = null;
         final SSTableReader ssTable;
-        CompactionIterator ci = new CompactionIterator(type, toCompact, 
controller); // retain a handle so we can call close()
-        Iterator<AbstractCompactedRow> nni = Iterators.filter(ci, 
Predicates.notNull());
+        CompactionIterable ci = new CompactionIterable(type, toCompact, 
controller); // retain a handle so we can call close()
+        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
+        Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, 
Predicates.notNull());
         Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
 
         if (collector != null)
@@ -169,7 +171,7 @@ public class CompactionTask extends Abst
         }
         finally
         {
-            ci.close();
+            iter.close();
             if (collector != null)
                 collector.finishCompaction(ci);
             if (writer != null)

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1154635&r1=1154634&r2=1154635&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java 
(original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java 
Sun Aug  7 01:51:44 2011
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
@@ -58,8 +59,8 @@ public class LazilyCompactedRowTest exte
     private void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws 
IOException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        CompactionIterator ci1 = new 
CompactionIterator(CompactionType.UNKNOWN, sstables, new 
PreCompactingController(cfs, sstables, gcBefore, false));
-        CompactionIterator ci2 = new 
CompactionIterator(CompactionType.UNKNOWN, sstables, new 
LazilyCompactingController(cfs, sstables, gcBefore, false));
+        Iterator<AbstractCompactedRow> ci1 = new 
CompactionIterable(CompactionType.UNKNOWN, sstables, new 
PreCompactingController(cfs, sstables, gcBefore, false)).iterator();
+        Iterator<AbstractCompactedRow> ci2 = new 
CompactionIterable(CompactionType.UNKNOWN, sstables, new 
LazilyCompactingController(cfs, sstables, gcBefore, false)).iterator();
 
         while (true)
         {
@@ -133,8 +134,8 @@ public class LazilyCompactedRowTest exte
     private void assertDigest(ColumnFamilyStore cfs, int gcBefore) throws 
IOException, NoSuchAlgorithmException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        CompactionIterator ci1 = new 
CompactionIterator(CompactionType.UNKNOWN, sstables, new 
PreCompactingController(cfs, sstables, gcBefore, false));
-        CompactionIterator ci2 = new 
CompactionIterator(CompactionType.UNKNOWN, sstables, new 
LazilyCompactingController(cfs, sstables, gcBefore, false));
+        Iterator<AbstractCompactedRow> ci1 = new 
CompactionIterable(CompactionType.UNKNOWN, sstables, new 
PreCompactingController(cfs, sstables, gcBefore, false)).iterator();
+        Iterator<AbstractCompactedRow> ci2 = new 
CompactionIterable(CompactionType.UNKNOWN, sstables, new 
LazilyCompactingController(cfs, sstables, gcBefore, false)).iterator();
 
         while (true)
         {


Reply via email to