Author: jbellis
Date: Wed Dec  1 22:41:23 2010
New Revision: 1041200

URL: http://svn.apache.org/viewvc?rev=1041200&view=rev
Log:
avoid opening readers on anticompacted to-be-streamed temporary files
patch by thobbs; reviewed by mdennis and jbellis for CASSANDRA-1752

Modified:
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
    
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
    
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1041200&r1=1041199&r2=1041200&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
 Wed Dec  1 22:41:23 2010
@@ -133,11 +133,11 @@ public class CompactionManager implement
         return executor.submit(runnable);
     }
 
-    public Future<List<SSTableReader>> submitAnticompaction(final 
ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress 
target)
+    public Future<List<String>> submitAnticompaction(final ColumnFamilyStore 
cfStore, final Collection<Range> ranges, final InetAddress target)
     {
-        Callable<List<SSTableReader>> callable = new 
Callable<List<SSTableReader>>()
+        Callable<List<String>> callable = new Callable<List<String>>()
         {
-            public List<SSTableReader> call() throws IOException
+            public List<String> call() throws IOException
             {
                 return doAntiCompaction(cfStore, cfStore.getSSTables(), 
ranges, target);
             }
@@ -320,18 +320,7 @@ public class CompactionManager implement
         return sstables.size();
     }
 
-    /**
-     * This function is used to do the anti compaction process , it spits out 
the file which has keys that belong to a given range
-     * If the target is not specified it spits out the file as a compacted 
file with the unecessary ranges wiped out.
-     *
-     * @param cfs
-     * @param sstables
-     * @param ranges
-     * @param target
-     * @return
-     * @throws java.io.IOException
-     */
-    private List<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress 
target)
+    private SSTableWriter antiCompactionHelper(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress 
target)
             throws IOException
     {
         Table table = cfs.getTable();
@@ -348,10 +337,9 @@ public class CompactionManager implement
             // compacting for streaming: send to subdirectory
             compactionFileLocation = compactionFileLocation + File.separator + 
DatabaseDescriptor.STREAMING_SUBDIR;
         }
-        List<SSTableReader> results = new ArrayList<SSTableReader>();
 
+        long totalKeysWritten = 0;
         long startTime = System.currentTimeMillis();
-        long totalkeysWritten = 0;
 
         int expectedBloomFilterSize = 
Math.max(DatabaseDescriptor.getIndexInterval(), 
(int)(SSTableReader.getApproximateKeyCount(sstables) / 2));
         if (logger.isDebugEnabled())
@@ -364,11 +352,6 @@ public class CompactionManager implement
 
         try
         {
-            if (!nni.hasNext())
-            {
-                return results;
-            }
-
             while (nni.hasNext())
             {
                 CompactionIterator.CompactedRow row = nni.next();
@@ -379,22 +362,61 @@ public class CompactionManager implement
                     writer = new SSTableWriter(newFilename, 
expectedBloomFilterSize, StorageService.getPartitioner());
                 }
                 writer.append(row.key, row.buffer);
-                totalkeysWritten++;
+                totalKeysWritten++;
             }
         }
         finally
         {
             ci.close();
         }
+        if (writer != null) {
+            List<String> filenames = writer.getAllFilenames();
+            String format = "AntiCompacted to %s.  %d/%d bytes for %d keys.  
Time: %dms.";
+            long dTime = System.currentTimeMillis() - startTime;
+            long length = new File(filenames.get(filenames.size() 
-1)).length(); // Data file is last in the list
+            logger.info(String.format(format, writer.getFilename(), 
SSTable.getTotalBytes(sstables), length, totalKeysWritten, dTime));
+        }
+        return writer;
+    }
 
+    /**
+     * This function is used to do the anti compaction process.  It spits out 
a file which has keys
+     * that belong to a given range. If the target is not specified it spits 
out the file as a compacted file with the
+     * unnecessary ranges wiped out.
+     *
+     * @param cfs
+     * @param sstables
+     * @param ranges
+     * @param target
+     * @return
+     * @throws java.io.IOException
+     */
+    private List<String> doAntiCompaction(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress 
target)
+            throws IOException
+    {
+        List<String> filenames = new ArrayList<String>(SSTable.FILES_ON_DISK);
+        SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges, 
target);
         if (writer != null)
         {
-            results.add(writer.closeAndOpenReader());
-            String format = "AntiCompacted to %s.  %d/%d bytes for %d keys.  
Time: %dms.";
-            long dTime = System.currentTimeMillis() - startTime;
-            logger.info(String.format(format, writer.getFilename(), 
SSTable.getTotalBytes(sstables), results.get(0).length(), totalkeysWritten, 
dTime));
+            writer.close();
+            filenames = writer.getAllFilenames();
         }
+        return filenames;
+    }
 
+    /**
+     * Like doAntiCompaction(), but returns an List of SSTableReaders instead 
of a list of filenames.
+     * @throws java.io.IOException
+     */
+    private List<SSTableReader> 
doAntiCompactionReturnReaders(ColumnFamilyStore cfs, Collection<SSTableReader> 
sstables, Collection<Range> ranges, InetAddress target)
+            throws IOException
+    {
+        List<SSTableReader> results = new ArrayList<SSTableReader>(1);
+        SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges, 
target);
+        if (writer != null)
+        {
+            results.add(writer.closeAndOpenReader());
+        }
         return results;
     }
 
@@ -407,7 +429,7 @@ public class CompactionManager implement
     private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
     {
         Collection<SSTableReader> originalSSTables = cfs.getSSTables();
-        List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables, 
StorageService.instance.getLocalRanges(cfs.getTable().name), null);
+        List<SSTableReader> sstables = doAntiCompactionReturnReaders(cfs, 
originalSSTables, StorageService.instance.getLocalRanges(cfs.getTable().name), 
null);
         if (!sstables.isEmpty())
         {
             cfs.replaceCompactedSSTables(originalSSTables, sstables);

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java?rev=1041200&r1=1041199&r2=1041200&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java 
(original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java 
Wed Dec  1 22:41:23 2010
@@ -282,9 +282,9 @@ public class Table 
      * do a complete compaction since we can figure out based on the ranges
      * whether the files need to be split.
     */
-    public List<SSTableReader> forceAntiCompaction(Collection<Range> ranges, 
InetAddress target)
+    public List<String> forceAntiCompaction(Collection<Range> ranges, 
InetAddress target)
     {
-        List<SSTableReader> allResults = new ArrayList<SSTableReader>();
+        List<String> allResults = new ArrayList<String>();
         Set<String> columnFamilies = tableMetadata.getColumnFamilies();
         for ( String columnFamily : columnFamilies )
         {

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=1041200&r1=1041199&r2=1041200&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
 Wed Dec  1 22:41:23 2010
@@ -114,7 +114,7 @@ public class SSTableWriter extends SSTab
     /**
      * Renames temporary SSTable files to valid data, index, and bloom filter 
files
      */
-    public SSTableReader closeAndOpenReader() throws IOException
+    public void close() throws IOException
     {
         // bloom filter
         FileOutputStream fos = new FileOutputStream(filterFilename());
@@ -136,6 +136,14 @@ public class SSTableWriter extends SSTab
         path = rename(path); // important to do this last since index & filter 
file names are derived from it
 
         indexSummary.complete();
+    }
+    
+    /**
+     * Renames temporary SSTable files to valid data, index, and bloom filter 
files and returns an SSTableReader
+     */
+    public SSTableReader closeAndOpenReader() throws IOException
+    {
+        this.close();
         return new SSTableReader(path, partitioner, indexSummary, bf);
     }
 

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1041200&r1=1041199&r2=1041200&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
 Wed Dec  1 22:41:23 2010
@@ -485,12 +485,12 @@ public class AntiEntropyService
             try
             {
                 List<Range> ranges = new ArrayList<Range>(differences);
-                final List<SSTableReader> sstables = 
CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
+                final List<String> filenames = 
CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
                 Future f = 
StageManager.getStage(StageManager.STREAM_STAGE).submit(new WrappedRunnable() 
                 {
                     protected void runMayThrow() throws Exception
                     {
-                        StreamOut.transferSSTables(remote, sstables, cf.left);
+                        StreamOut.transferSSTables(remote, filenames, cf.left);
                         StreamOutManager.remove(remote);
                     }
                 });

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1041200&r1=1041199&r2=1041200&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
 Wed Dec  1 22:41:23 2010
@@ -112,19 +112,16 @@ public class StreamOut
      * Transfers a group of sstables from a single table to the target endpoint
      * and then marks them as ready for local deletion.
      */
-    public static void transferSSTables(InetAddress target, 
List<SSTableReader> sstables, String table) throws IOException
+    public static void transferSSTables(InetAddress target, List<String> 
filenames, String table) throws IOException
     {
-        PendingFile[] pendingFiles = new PendingFile[SSTable.FILES_ON_DISK * 
sstables.size()];
+        PendingFile[] pendingFiles = new PendingFile[filenames.size()];
         int i = 0;
-        for (SSTableReader sstable : sstables)
+        for (String filename : filenames)
         {
-            for (String filename : sstable.getAllFilenames())
-            {
-                File file = new File(filename);
-                pendingFiles[i++] = new PendingFile(file.getAbsolutePath(), 
file.length(), table);
-            }
+            File file = new File(filename);
+            pendingFiles[i++] = new PendingFile(file.getAbsolutePath(), 
file.length(), table);
         }
-        logger.info("Stream context metadata " + 
StringUtils.join(pendingFiles, ", " + " " + sstables.size() + " sstables."));
+        logger.info("Stream context metadata " + 
StringUtils.join(pendingFiles, ", " + " " + filenames.size() + " sstables."));
         StreamOutManager.get(target).addFilesToStream(pendingFiles);
         StreamInitiateMessage biMessage = new 
StreamInitiateMessage(pendingFiles);
         Message message = 
StreamInitiateMessage.makeStreamInitiateMessage(biMessage);

Modified: 
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=1041200&r1=1041199&r2=1041200&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
 Wed Dec  1 22:41:23 2010
@@ -127,7 +127,7 @@ public class ColumnFamilyStoreTest exten
         Range r = new Range(partitioner.getToken("0"), 
partitioner.getToken("zzzzzzz"));
         ranges.add(r);
 
-        List<SSTableReader> fileList = 
CompactionManager.instance.submitAnticompaction(store, ranges, 
InetAddress.getByName("127.0.0.1")).get();
+        List<String> fileList = 
CompactionManager.instance.submitAnticompaction(store, ranges, 
InetAddress.getByName("127.0.0.1")).get();
         assert fileList.size() >= 1;
     }
 

Modified: 
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java?rev=1041200&r1=1041199&r2=1041200&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java
 Wed Dec  1 22:41:23 2010
@@ -49,9 +49,10 @@ public class StreamingTest extends Clean
         SSTableReader sstable = SSTableUtils.writeSSTable(content);
         String tablename = sstable.getTableName();
         String cfname = sstable.getColumnFamilyName();
+        List<String> filenames = sstable.getAllFilenames();
 
         // transfer
-        StreamOut.transferSSTables(LOCAL, Arrays.asList(sstable), tablename);
+        StreamOut.transferSSTables(LOCAL, filenames, tablename);
 
         // confirm that the SSTable was transferred and registered
         ColumnFamilyStore cfstore = 
Table.open(tablename).getColumnFamilyStore(cfname);


Reply via email to