Repository: cassandra Updated Branches: refs/heads/trunk e60a06cc8 -> 0f59629ce
Make sure unfinished compaction files are removed. Patch by marcuse; reviewed by yukim for CASSANDRA-8124 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c316e78 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c316e78 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c316e78 Branch: refs/heads/trunk Commit: 9c316e7858f6dbf9df892aff78431044aa104ed9 Parents: d230922 Author: Marcus Eriksson <marc...@apache.org> Authored: Fri Oct 17 14:15:46 2014 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Nov 3 16:17:01 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/sstable/SSTableReader.java | 6 + .../cassandra/io/sstable/SSTableRewriter.java | 90 +++- .../io/sstable/SSTableRewriterTest.java | 473 +++++++++++++++++++ 4 files changed, 555 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 494fb93..681d616 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.2 + * Make sure unfinished compaction files are removed (CASSANDRA-8124) * Fix shutdown when run as Windows service (CASSANDRA-8136) * Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031) * Fix race in RecoveryManagerTest (CASSANDRA-8176) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 872f7df..40e708d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -1599,6 +1599,12 @@ public class SSTableReader extends SSTable } } + @VisibleForTesting + int referenceCount() + { + return references.get(); + } + /** * Release reference to this SSTableReader. * If there is no one referring to this SSTable, and is marked as compacted, http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java index 76677ac..2c9fe7e 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java @@ -25,8 +25,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; +import com.google.common.collect.Lists; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; @@ -37,6 +39,7 @@ import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.utils.CLibrary; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; /** * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb @@ -55,8 +58,7 @@ import org.apache.cassandra.utils.FBUtilities; */ public class SSTableRewriter { - - private static final long preemptiveOpenInterval; + private static long preemptiveOpenInterval; static { long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20); @@ -65,6 +67,14 @@ public class SSTableRewriter preemptiveOpenInterval = interval; } + private boolean isFinished = false; + + @VisibleForTesting + static void overrideOpenInterval(long size) + { + preemptiveOpenInterval = size; + } + private final DataTracker dataTracker; private final ColumnFamilyStore cfs; @@ -77,6 +87,8 @@ public class SSTableRewriter private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at private final List<SSTableReader> finished = new ArrayList<>(); // the resultant sstables + private final List<SSTableReader> finishedOpenedEarly = new ArrayList<>(); // the 'finished' tmplink sstables + private final List<Pair<SSTableWriter, SSTableReader>> finishedWriters = new ArrayList<>(); private final OperationType rewriteType; // the type of rewrite/compaction being performed private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker) @@ -187,20 +199,32 @@ public class SSTableRewriter { if (writer == null) return; + + switchWriter(null); + moveStarts(null, Functions.forMap(originalStarts), true); - List<SSTableReader> close = new ArrayList<>(finished); + + List<SSTableReader> close = Lists.newArrayList(finishedOpenedEarly); if (currentlyOpenedEarly != null) close.add(currentlyOpenedEarly); + + for (Pair<SSTableWriter, SSTableReader> w : finishedWriters) + { + // we should close the bloom filter if we have not opened an sstable reader from this + // writer (it will get closed when we release the sstable reference below): + w.left.abort(w.right == null); + } + // also remove already completed SSTables for (SSTableReader sstable : close) sstable.markObsolete(); + // releases reference in replaceReaders if (!isOffline) { dataTracker.replaceReaders(close, Collections.<SSTableReader>emptyList(), false); dataTracker.unmarkCompacting(close); } - writer.abort(currentlyOpenedEarly == null); } /** @@ -208,6 +232,11 @@ public class SSTableRewriter * needed, and transferring any key cache entries over to the new reader, expiring them from the old. if reset * is true, we are instead restoring the starts of the readers from before the rewriting began * + * note that we replace an existing sstable with a new *instance* of the same sstable, the replacement + * sstable .equals() the old one, BUT, it is a new instance, so, for example, since we releaseReference() on the old + * one, the old *instance* will have reference count == 0 and if we were to start a new compaction with that old + * instance, we would get exceptions. + * * @param newReader the rewritten reader that replaces them for this region * @param newStarts a function mapping a reader's descriptor to their new start value * @param reset true iff we are restoring earlier starts (increasing the range over which they are valid) @@ -284,11 +313,15 @@ public class SSTableRewriter writer = newWriter; return; } - // tmp = false because later we want to query it with descriptor from SSTableReader - SSTableReader reader = writer.closeAndOpenReader(maxAge); - finished.add(reader); - replaceReader(currentlyOpenedEarly, reader, false); - moveStarts(reader, Functions.constant(reader.last), false); + // we leave it as a tmp file, but we open it early and add it to the dataTracker + SSTableReader reader = writer.openEarly(maxAge); + if (reader != null) + { + finishedOpenedEarly.add(reader); + replaceReader(currentlyOpenedEarly, reader, false); + moveStarts(reader, Functions.constant(reader.last), false); + } + finishedWriters.add(Pair.create(writer, reader)); currentlyOpenedEarly = null; currentlyOpenedEarlyAt = 0; writer = newWriter; @@ -306,23 +339,48 @@ public class SSTableRewriter { finish(cleanupOldReaders, -1); } + + /** + * Finishes the new file(s) + * + * Creates final files, adds the new files to the dataTracker (via replaceReader) but only marks the + * old files as compacted if cleanupOldReaders is set to true. Otherwise it is up to the caller to do those gymnastics + * (ie, call DataTracker#markCompactedSSTablesReplaced(..)) + * + * @param cleanupOldReaders if we should replace the old files with the new ones + * @param repairedAt the repair time, -1 if we should use the time we supplied when we created + * the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the + * repair time. + */ public void finish(boolean cleanupOldReaders, long repairedAt) { if (writer.getFilePointer() > 0) { - SSTableReader reader = repairedAt < 0 ? - writer.closeAndOpenReader(maxAge) : - writer.closeAndOpenReader(maxAge, repairedAt); + SSTableReader reader = repairedAt < 0 ? writer.closeAndOpenReader(maxAge) : writer.closeAndOpenReader(maxAge, repairedAt); finished.add(reader); replaceReader(currentlyOpenedEarly, reader, false); moveStarts(reader, Functions.constant(reader.last), false); } else { - writer.abort(); - writer = null; + writer.abort(true); + } + // make real sstables of the written ones: + for (Pair<SSTableWriter, SSTableReader> w : finishedWriters) + { + if (w.left.getFilePointer() > 0) + { + SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt); + finished.add(newReader); + // w.right is the tmplink-reader we added when switching writer, replace with the real sstable. + replaceReader(w.right, newReader, false); + } + else + { + assert w.right == null; + w.left.abort(true); + } } - if (!isOffline) { dataTracker.unmarkCompacting(finished); @@ -337,10 +395,12 @@ public class SSTableRewriter reader.releaseReference(); } } + isFinished = true; } public List<SSTableReader> finished() { + assert isFinished; return finished; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java new file mode 100644 index 0000000..8b203ac --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import com.google.common.collect.Sets; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.ArrayBackedSortedColumns; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; +import org.apache.cassandra.db.compaction.CompactionController; +import org.apache.cassandra.db.compaction.ICompactionScanner; +import org.apache.cassandra.db.compaction.LazilyCompactedRow; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SSTableRewriterTest extends SchemaLoader +{ + private static final String KEYSPACE = "Keyspace1"; + private static final String CF = "Standard1"; + @Test + public void basicTest() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + for (int j = 0; j < 100; j ++) + { + ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); + Mutation rm = new Mutation(KEYSPACE, key); + rm.add(CF, Util.cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); + rm.apply(); + } + cfs.forceBlockingFlush(); + Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); + assertEquals(1, sstables.size()); + SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, OperationType.COMPACTION, false); + AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables); + ICompactionScanner scanner = scanners.scanners.get(0); + CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); + writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); + while(scanner.hasNext()) + { + AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next())); + writer.append(row); + } + writer.finish(); + + validateCFS(cfs); + + } + + + @Test + public void testFileRemoval() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata); + for (int i = 0; i < 1000; i++) + cf.addColumn(Util.column(String.valueOf(i), "a", 1)); + File dir = cfs.directories.getDirectoryForNewSSTables(); + SSTableWriter writer = getWriter(cfs, dir); + + for (int i = 0; i < 500; i++) + writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf); + SSTableReader s = writer.openEarly(1000); + assertFileCounts(dir.list(), 2, 3); + for (int i = 500; i < 1000; i++) + writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf); + SSTableReader s2 = writer.openEarly(1000); + assertTrue(s != s2); + assertFileCounts(dir.list(), 2, 3); + s.markObsolete(); + s.releaseReference(); + Thread.sleep(1000); + assertFileCounts(dir.list(), 0, 3); + writer.abort(false); + Thread.sleep(1000); + assertFileCounts(dir.list(), 0, 0); + validateCFS(cfs); + } + + @Test + public void testFileRemovalNoAbort() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata); + for (int i = 0; i < 1000; i++) + cf.addColumn(Util.column(String.valueOf(i), "a", 1)); + File dir = cfs.directories.getDirectoryForNewSSTables(); + SSTableWriter writer = getWriter(cfs, dir); + + for (int i = 0; i < 500; i++) + writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf); + SSTableReader s = writer.openEarly(1000); + //assertFileCounts(dir.list(), 2, 3); + for (int i = 500; i < 1000; i++) + writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf); + writer.closeAndOpenReader(); + s.markObsolete(); + s.releaseReference(); + Thread.sleep(1000); + assertFileCounts(dir.list(), 0, 0); + validateCFS(cfs); + } + + + @Test + public void testNumberOfFiles() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + + SSTableReader s = writeFile(cfs, 1000); + cfs.addSSTable(s); + + Set<SSTableReader> compacting = Sets.newHashSet(s); + SSTableRewriter.overrideOpenInterval(10000000); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + + ICompactionScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + int files = 1; + while(scanner.hasNext()) + { + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + files++; + assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + } + } + rewriter.finish(); + assertEquals(files, rewriter.finished().size()); + assertEquals(files, cfs.getSSTables().size()); + Thread.sleep(1000); + // tmplink and tmp files should be gone: + assertFileCounts(s.descriptor.directory.list(), 0, 0); + validateCFS(cfs); + } + + @Test + public void testNumberOfFiles_dont_clean_readers() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + + SSTableReader s = writeFile(cfs, 1000); + cfs.addSSTable(s); + + Set<SSTableReader> compacting = Sets.newHashSet(s); + SSTableRewriter.overrideOpenInterval(10000000); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + + ICompactionScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + int files = 1; + while(scanner.hasNext()) + { + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + files++; + assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + } + } + rewriter.finish(false); + assertEquals(files, rewriter.finished().size()); + assertEquals(files + 1, cfs.getSSTables().size()); + cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finished(), OperationType.COMPACTION); + assertEquals(files, cfs.getSSTables().size()); + Thread.sleep(1000); + assertFileCounts(s.descriptor.directory.list(), 0, 0); + validateCFS(cfs); + } + + + @Test + public void testNumberOfFiles_abort() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + + SSTableReader s = writeFile(cfs, 1000); + cfs.addSSTable(s); + DecoratedKey origFirst = s.first; + DecoratedKey origLast = s.last; + Set<SSTableReader> compacting = Sets.newHashSet(s); + SSTableRewriter.overrideOpenInterval(10000000); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + + ICompactionScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + int files = 1; + while(scanner.hasNext()) + { + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + files++; + assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + } + } + rewriter.abort(); + Thread.sleep(1000); + assertEquals(1, cfs.getSSTables().size()); + assertFileCounts(s.descriptor.directory.list(), 0, 0); + assertEquals(cfs.getSSTables().iterator().next().first, origFirst); + assertEquals(cfs.getSSTables().iterator().next().last, origLast); + validateCFS(cfs); + + } + + @Test + public void testNumberOfFiles_abort2() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + + SSTableReader s = writeFile(cfs, 1000); + cfs.addSSTable(s); + + DecoratedKey origFirst = s.first; + DecoratedKey origLast = s.last; + Set<SSTableReader> compacting = Sets.newHashSet(s); + SSTableRewriter.overrideOpenInterval(10000000); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + + ICompactionScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + int files = 1; + while(scanner.hasNext()) + { + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + files++; + assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + } + if (files == 3) + { + //testing to abort when we have nothing written in the new file + rewriter.abort(); + break; + } + } + Thread.sleep(1000); + assertEquals(1, cfs.getSSTables().size()); + assertFileCounts(s.descriptor.directory.list(), 0, 0); + + assertEquals(cfs.getSSTables().iterator().next().first, origFirst); + assertEquals(cfs.getSSTables().iterator().next().last, origLast); + validateCFS(cfs); + } + + @Test + public void testNumberOfFiles_finish_empty_new_writer() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + + SSTableReader s = writeFile(cfs, 1000); + cfs.addSSTable(s); + + Set<SSTableReader> compacting = Sets.newHashSet(s); + SSTableRewriter.overrideOpenInterval(10000000); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + + ICompactionScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + int files = 1; + while(scanner.hasNext()) + { + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + files++; + assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + } + if (files == 3) + { + //testing to finish when we have nothing written in the new file + rewriter.finish(); + break; + } + } + Thread.sleep(1000); + assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote anything to the last file + assertFileCounts(s.descriptor.directory.list(), 0, 0); + validateCFS(cfs); + } + + @Test + public void testNumberOfFiles_truncate() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + SSTableReader s = writeFile(cfs, 1000); + cfs.addSSTable(s); + Set<SSTableReader> compacting = Sets.newHashSet(s); + SSTableRewriter.overrideOpenInterval(10000000); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + + ICompactionScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + int files = 1; + while(scanner.hasNext()) + { + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) + { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + files++; + assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. + } + } + rewriter.finish(); + Thread.sleep(1000); + assertFileCounts(s.descriptor.directory.list(), 0, 0); + cfs.truncateBlocking(); + validateCFS(cfs); + } + + @Test + public void testSmallFiles() throws InterruptedException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + SSTableReader s = writeFile(cfs, 400); + DecoratedKey origFirst = s.first; + cfs.addSSTable(s); + Set<SSTableReader> compacting = Sets.newHashSet(s); + SSTableRewriter.overrideOpenInterval(1000000); + SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false); + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + + ICompactionScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + int files = 1; + while(scanner.hasNext()) + { + rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); + if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000) + { + assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ... + assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + files++; + } + } + rewriter.finish(); + assertEquals(files, rewriter.finished().size()); + assertEquals(files, cfs.getSSTables().size()); + Thread.sleep(1000); + assertFileCounts(s.descriptor.directory.list(), 0, 0); + validateCFS(cfs); + } + + private SSTableReader writeFile(ColumnFamilyStore cfs, int count) + { + ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata); + for (int i = 0; i < count / 100; i++) + cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1); + File dir = cfs.directories.getDirectoryForNewSSTables(); + String filename = cfs.getTempSSTablePath(dir); + + SSTableWriter writer = new SSTableWriter(filename, + 0, + 0, + cfs.metadata, + StorageService.getPartitioner(), + new MetadataCollector(cfs.metadata.comparator)); + + for (int i = 0; i < count * 5; i++) + writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf); + return writer.closeAndOpenReader(); + } + + private void validateCFS(ColumnFamilyStore cfs) + { + for (SSTableReader sstable : cfs.getSSTables()) + { + assertFalse(sstable.isMarkedCompacted()); + assertEquals(1, sstable.referenceCount()); + } + assertTrue(cfs.getDataTracker().getCompacting().isEmpty()); + } + + + private void assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount) + { + int tmplinkcount = 0; + int tmpcount = 0; + for (String f : files) + { + if (f.contains("-tmplink-")) + tmplinkcount++; + if (f.contains("-tmp-")) + tmpcount++; + } + assertEquals(expectedtmplinkCount, tmplinkcount); + assertEquals(expectedtmpCount, tmpcount); + } + + private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory) + { + String filename = cfs.getTempSSTablePath(directory); + return new SSTableWriter(filename, + 0, + 0, + cfs.metadata, + StorageService.getPartitioner(), + new MetadataCollector(cfs.metadata.comparator)); + } +}