Author: jbellis Date: Wed Dec 1 22:41:23 2010 New Revision: 1041200 URL: http://svn.apache.org/viewvc?rev=1041200&view=rev Log: avoid opening readers on anticompacted to-be-streamed temporary files patch by thobbs; reviewed by mdennis and jbellis for CASSANDRA-1752
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1041200&r1=1041199&r2=1041200&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java Wed Dec 1 22:41:23 2010 @@ -133,11 +133,11 @@ public class CompactionManager implement return executor.submit(runnable); } - public Future<List<SSTableReader>> submitAnticompaction(final ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress target) + public Future<List<String>> submitAnticompaction(final ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress target) { - Callable<List<SSTableReader>> callable = new Callable<List<SSTableReader>>() + Callable<List<String>> callable = new Callable<List<String>>() { - public List<SSTableReader> call() throws IOException + public List<String> call() throws IOException { return doAntiCompaction(cfStore, cfStore.getSSTables(), ranges, target); } @@ -320,18 +320,7 @@ public class CompactionManager implement return sstables.size(); } - /** - * This function is used to do the anti compaction process , it spits out the file which has keys that belong to a given range - * If the target is not specified it spits out the file as a compacted file with the unecessary ranges wiped out. - * - * @param cfs - * @param sstables - * @param ranges - * @param target - * @return - * @throws java.io.IOException - */ - private List<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress target) + private SSTableWriter antiCompactionHelper(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress target) throws IOException { Table table = cfs.getTable(); @@ -348,10 +337,9 @@ public class CompactionManager implement // compacting for streaming: send to subdirectory compactionFileLocation = compactionFileLocation + File.separator + DatabaseDescriptor.STREAMING_SUBDIR; } - List<SSTableReader> results = new ArrayList<SSTableReader>(); + long totalKeysWritten = 0; long startTime = System.currentTimeMillis(); - long totalkeysWritten = 0; int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstables) / 2)); if (logger.isDebugEnabled()) @@ -364,11 +352,6 @@ public class CompactionManager implement try { - if (!nni.hasNext()) - { - return results; - } - while (nni.hasNext()) { CompactionIterator.CompactedRow row = nni.next(); @@ -379,22 +362,61 @@ public class CompactionManager implement writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner()); } writer.append(row.key, row.buffer); - totalkeysWritten++; + totalKeysWritten++; } } finally { ci.close(); } + if (writer != null) { + List<String> filenames = writer.getAllFilenames(); + String format = "AntiCompacted to %s. %d/%d bytes for %d keys. Time: %dms."; + long dTime = System.currentTimeMillis() - startTime; + long length = new File(filenames.get(filenames.size() -1)).length(); // Data file is last in the list + logger.info(String.format(format, writer.getFilename(), SSTable.getTotalBytes(sstables), length, totalKeysWritten, dTime)); + } + return writer; + } + /** + * This function is used to do the anti compaction process. It spits out a file which has keys + * that belong to a given range. If the target is not specified it spits out the file as a compacted file with the + * unnecessary ranges wiped out. + * + * @param cfs + * @param sstables + * @param ranges + * @param target + * @return + * @throws java.io.IOException + */ + private List<String> doAntiCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress target) + throws IOException + { + List<String> filenames = new ArrayList<String>(SSTable.FILES_ON_DISK); + SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges, target); if (writer != null) { - results.add(writer.closeAndOpenReader()); - String format = "AntiCompacted to %s. %d/%d bytes for %d keys. Time: %dms."; - long dTime = System.currentTimeMillis() - startTime; - logger.info(String.format(format, writer.getFilename(), SSTable.getTotalBytes(sstables), results.get(0).length(), totalkeysWritten, dTime)); + writer.close(); + filenames = writer.getAllFilenames(); } + return filenames; + } + /** + * Like doAntiCompaction(), but returns an List of SSTableReaders instead of a list of filenames. + * @throws java.io.IOException + */ + private List<SSTableReader> doAntiCompactionReturnReaders(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress target) + throws IOException + { + List<SSTableReader> results = new ArrayList<SSTableReader>(1); + SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges, target); + if (writer != null) + { + results.add(writer.closeAndOpenReader()); + } return results; } @@ -407,7 +429,7 @@ public class CompactionManager implement private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException { Collection<SSTableReader> originalSSTables = cfs.getSSTables(); - List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables, StorageService.instance.getLocalRanges(cfs.getTable().name), null); + List<SSTableReader> sstables = doAntiCompactionReturnReaders(cfs, originalSSTables, StorageService.instance.getLocalRanges(cfs.getTable().name), null); if (!sstables.isEmpty()) { cfs.replaceCompactedSSTables(originalSSTables, sstables); Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java?rev=1041200&r1=1041199&r2=1041200&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java Wed Dec 1 22:41:23 2010 @@ -282,9 +282,9 @@ public class Table * do a complete compaction since we can figure out based on the ranges * whether the files need to be split. */ - public List<SSTableReader> forceAntiCompaction(Collection<Range> ranges, InetAddress target) + public List<String> forceAntiCompaction(Collection<Range> ranges, InetAddress target) { - List<SSTableReader> allResults = new ArrayList<SSTableReader>(); + List<String> allResults = new ArrayList<String>(); Set<String> columnFamilies = tableMetadata.getColumnFamilies(); for ( String columnFamily : columnFamilies ) { Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=1041200&r1=1041199&r2=1041200&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java Wed Dec 1 22:41:23 2010 @@ -114,7 +114,7 @@ public class SSTableWriter extends SSTab /** * Renames temporary SSTable files to valid data, index, and bloom filter files */ - public SSTableReader closeAndOpenReader() throws IOException + public void close() throws IOException { // bloom filter FileOutputStream fos = new FileOutputStream(filterFilename()); @@ -136,6 +136,14 @@ public class SSTableWriter extends SSTab path = rename(path); // important to do this last since index & filter file names are derived from it indexSummary.complete(); + } + + /** + * Renames temporary SSTable files to valid data, index, and bloom filter files and returns an SSTableReader + */ + public SSTableReader closeAndOpenReader() throws IOException + { + this.close(); return new SSTableReader(path, partitioner, indexSummary, bf); } Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1041200&r1=1041199&r2=1041200&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java Wed Dec 1 22:41:23 2010 @@ -485,12 +485,12 @@ public class AntiEntropyService try { List<Range> ranges = new ArrayList<Range>(differences); - final List<SSTableReader> sstables = CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get(); + final List<String> filenames = CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get(); Future f = StageManager.getStage(StageManager.STREAM_STAGE).submit(new WrappedRunnable() { protected void runMayThrow() throws Exception { - StreamOut.transferSSTables(remote, sstables, cf.left); + StreamOut.transferSSTables(remote, filenames, cf.left); StreamOutManager.remove(remote); } }); Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1041200&r1=1041199&r2=1041200&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java Wed Dec 1 22:41:23 2010 @@ -112,19 +112,16 @@ public class StreamOut * Transfers a group of sstables from a single table to the target endpoint * and then marks them as ready for local deletion. */ - public static void transferSSTables(InetAddress target, List<SSTableReader> sstables, String table) throws IOException + public static void transferSSTables(InetAddress target, List<String> filenames, String table) throws IOException { - PendingFile[] pendingFiles = new PendingFile[SSTable.FILES_ON_DISK * sstables.size()]; + PendingFile[] pendingFiles = new PendingFile[filenames.size()]; int i = 0; - for (SSTableReader sstable : sstables) + for (String filename : filenames) { - for (String filename : sstable.getAllFilenames()) - { - File file = new File(filename); - pendingFiles[i++] = new PendingFile(file.getAbsolutePath(), file.length(), table); - } + File file = new File(filename); + pendingFiles[i++] = new PendingFile(file.getAbsolutePath(), file.length(), table); } - logger.info("Stream context metadata " + StringUtils.join(pendingFiles, ", " + " " + sstables.size() + " sstables.")); + logger.info("Stream context metadata " + StringUtils.join(pendingFiles, ", " + " " + filenames.size() + " sstables.")); StreamOutManager.get(target).addFilesToStream(pendingFiles); StreamInitiateMessage biMessage = new StreamInitiateMessage(pendingFiles); Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage); Modified: cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=1041200&r1=1041199&r2=1041200&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original) +++ cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Wed Dec 1 22:41:23 2010 @@ -127,7 +127,7 @@ public class ColumnFamilyStoreTest exten Range r = new Range(partitioner.getToken("0"), partitioner.getToken("zzzzzzz")); ranges.add(r); - List<SSTableReader> fileList = CompactionManager.instance.submitAnticompaction(store, ranges, InetAddress.getByName("127.0.0.1")).get(); + List<String> fileList = CompactionManager.instance.submitAnticompaction(store, ranges, InetAddress.getByName("127.0.0.1")).get(); assert fileList.size() >= 1; } Modified: cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java?rev=1041200&r1=1041199&r2=1041200&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java (original) +++ cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java Wed Dec 1 22:41:23 2010 @@ -49,9 +49,10 @@ public class StreamingTest extends Clean SSTableReader sstable = SSTableUtils.writeSSTable(content); String tablename = sstable.getTableName(); String cfname = sstable.getColumnFamilyName(); + List<String> filenames = sstable.getAllFilenames(); // transfer - StreamOut.transferSSTables(LOCAL, Arrays.asList(sstable), tablename); + StreamOut.transferSSTables(LOCAL, filenames, tablename); // confirm that the SSTable was transferred and registered ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname);