Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 33d71b825 -> e5a76bdb5
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 5dca589..fa91d00 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -18,14 +18,16 @@ package org.apache.cassandra.io.sstable; import java.io.File; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import org.junit.After; import org.junit.BeforeClass; +import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Test; import org.apache.cassandra.SchemaLoader; @@ -43,6 +45,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.db.compaction.SSTableSplitter; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.metrics.StorageMetrics; @@ -52,7 +55,6 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import static org.junit.Assert.*; -import static org.apache.cassandra.utils.Throwables.maybeFail; public class SSTableRewriterTest extends SchemaLoader { @@ -83,7 +85,9 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); + assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); + for (int j = 0; j < 100; j ++) { ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); @@ -94,8 +98,10 @@ public class SSTableRewriterTest extends SchemaLoader cfs.forceBlockingFlush(); Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); assertEquals(1, sstables.size()); - try (SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false); - AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);) + assertEquals(sstables.iterator().next().bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.getCount()); + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables); + LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); + SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);) { ISSTableScanner scanner = scanners.scanners.get(0); CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); @@ -105,30 +111,29 @@ public class SSTableRewriterTest extends SchemaLoader AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next())); writer.append(row); } - Collection<SSTableReader> newsstables = writer.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables , OperationType.COMPACTION); + writer.finish(); } SSTableDeletingTask.waitForDeletions(); - validateCFS(cfs); int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); assertEquals(1, filecounts); - + truncate(cfs); } @Test public void basicTest2() throws InterruptedException { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); assertEquals(1, sstables.size()); SSTableRewriter.overrideOpenInterval(10000000); - try (SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false); - AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);) + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables); + LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); + SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);) { ISSTableScanner scanner = scanners.scanners.get(0); CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); @@ -138,11 +143,9 @@ public class SSTableRewriterTest extends SchemaLoader AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next())); writer.append(row); } - Collection<SSTableReader> newsstables = writer.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION); + writer.finish(); } SSTableDeletingTask.waitForDeletions(); - validateCFS(cfs); int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); assertEquals(1, filecounts); @@ -153,7 +156,9 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); + assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); + assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount()); SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); @@ -161,8 +166,9 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(1, sstables.size()); SSTableRewriter.overrideOpenInterval(10000000); boolean checked = false; - try (SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false); - AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);) + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables); + LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); + SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);) { ISSTableScanner scanner = scanners.scanners.get(0); CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); @@ -178,7 +184,7 @@ public class SSTableRewriterTest extends SchemaLoader { if (sstable.openReason == SSTableReader.OpenReason.EARLY) { - SSTableReader c = sstables.iterator().next(); + SSTableReader c = txn.current(sstables.iterator().next()); Collection<Range<Token>> r = Arrays.asList(new Range<>(cfs.partitioner.getMinimumToken(), cfs.partitioner.getMinimumToken())); List<Pair<Long, Long>> tmplinkPositions = sstable.getPositionsForRanges(r); List<Pair<Long, Long>> compactingPositions = c.getPositionsForRanges(r); @@ -193,17 +199,14 @@ public class SSTableRewriterTest extends SchemaLoader } } assertTrue(checked); - Collection<SSTableReader> newsstables = writer.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION); + writer.finish(); } SSTableDeletingTask.waitForDeletions(); - validateCFS(cfs); int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); assertEquals(1, filecounts); - cfs.truncateBlocking(); + truncate(cfs); SSTableDeletingTask.waitForDeletions(); - validateCFS(cfs); } @@ -212,7 +215,8 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); + assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata); for (int i = 0; i < 100; i++) cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1); @@ -253,7 +257,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); @@ -263,9 +267,10 @@ public class SSTableRewriterTest extends SchemaLoader List<SSTableReader> sstables; int files = 1; - try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); - ISSTableScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + try (ISSTableScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); + SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); while(scanner.hasNext()) @@ -282,8 +287,6 @@ public class SSTableRewriterTest extends SchemaLoader } } sstables = rewriter.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); - assertEquals(files, sstables.size()); } long sum = 0; for (SSTableReader x : cfs.getSSTables()) @@ -305,7 +308,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); @@ -315,9 +318,10 @@ public class SSTableRewriterTest extends SchemaLoader List<SSTableReader> sstables; int files = 1; - try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); - ISSTableScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + try (ISSTableScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); + SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); while(scanner.hasNext()) @@ -335,10 +339,6 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(files, sstables.size()); assertEquals(files, cfs.getSSTables().size()); - assertEquals(1, cfs.getDataTracker().getView().shadowed.size()); - cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); - assertEquals(files, cfs.getSSTables().size()); - assertEquals(0, cfs.getDataTracker().getView().shadowed.size()); SSTableDeletingTask.waitForDeletions(); assertFileCounts(s.descriptor.directory.list(), 0, 0); @@ -429,7 +429,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); @@ -440,9 +440,10 @@ public class SSTableRewriterTest extends SchemaLoader Set<SSTableReader> compacting = Sets.newHashSet(s); SSTableRewriter.overrideOpenInterval(10000000); - try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); - ISSTableScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + try (ISSTableScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); + SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); test.run(scanner, controller, s, cfs, rewriter); @@ -463,7 +464,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); @@ -472,9 +473,10 @@ public class SSTableRewriterTest extends SchemaLoader SSTableRewriter.overrideOpenInterval(10000000); int files = 1; - try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); - ISSTableScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + try (ISSTableScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); + SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); while(scanner.hasNext()) @@ -489,8 +491,7 @@ public class SSTableRewriterTest extends SchemaLoader if (files == 3) { //testing to finish when we have nothing written in the new file - List<SSTableReader> sstables = rewriter.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); + rewriter.finish(); break; } } @@ -508,7 +509,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); cfs.disableAutoCompaction(); SSTableReader s = writeFile(cfs, 1000); @@ -518,9 +519,10 @@ public class SSTableRewriterTest extends SchemaLoader List<SSTableReader> sstables; int files = 1; - try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); - ISSTableScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + try (ISSTableScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); + SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); while(scanner.hasNext()) @@ -535,7 +537,6 @@ public class SSTableRewriterTest extends SchemaLoader } sstables = rewriter.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); } SSTableDeletingTask.waitForDeletions(); @@ -550,7 +551,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); cfs.disableAutoCompaction(); SSTableReader s = writeFile(cfs, 400); @@ -560,9 +561,10 @@ public class SSTableRewriterTest extends SchemaLoader List<SSTableReader> sstables; int files = 1; - try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); - ISSTableScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + try (ISSTableScanner scanner = s.getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); + SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); while(scanner.hasNext()) @@ -577,7 +579,6 @@ public class SSTableRewriterTest extends SchemaLoader } sstables = rewriter.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); } assertEquals(files, sstables.size()); @@ -593,22 +594,24 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); cfs.disableAutoCompaction(); SSTableReader s = writeFile(cfs, 1000); - cfs.getDataTracker().markCompacting(Arrays.asList(s), true, false); - SSTableSplitter splitter = new SSTableSplitter(cfs, s, 10); - splitter.split(); + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.UNKNOWN, s)) + { + SSTableSplitter splitter = new SSTableSplitter(cfs, txn, 10); + splitter.split(); - assertFileCounts(s.descriptor.directory.list(), 0, 0); + assertFileCounts(s.descriptor.directory.list(), 0, 0); - s.selfRef().release(); - SSTableDeletingTask.waitForDeletions(); + s.selfRef().release(); + SSTableDeletingTask.waitForDeletions(); - for (File f : s.descriptor.directory.listFiles()) - { - // we need to clear out the data dir, otherwise tests running after this breaks - FileUtils.deleteRecursive(f); + for (File f : s.descriptor.directory.listFiles()) + { + // we need to clear out the data dir, otherwise tests running after this breaks + FileUtils.deleteRecursive(f); + } } } @@ -639,17 +642,18 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); if (!offline) cfs.addSSTable(s); Set<SSTableReader> compacting = Sets.newHashSet(s); - cfs.getDataTracker().markCompacting(compacting); SSTableRewriter.overrideOpenInterval(10000000); - - try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline); - ISSTableScanner scanner = compacting.iterator().next().getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + try (ISSTableScanner scanner = compacting.iterator().next().getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = offline ? LifecycleTransaction.offline(OperationType.UNKNOWN, compacting) + : cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); + SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, offline); + ) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); while (scanner.hasNext()) @@ -672,7 +676,6 @@ public class SSTableRewriterTest extends SchemaLoader } finally { - cfs.getDataTracker().unmarkCompacting(compacting); if (offline) s.selfRef().release(); } @@ -686,9 +689,8 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(1, cfs.getSSTables().size()); validateCFS(cfs); } - cfs.truncateBlocking(); + truncate(cfs); SSTableDeletingTask.waitForDeletions(); - filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); if (offline) { @@ -709,7 +711,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); for (int i = 0; i < 100; i++) { DecoratedKey key = Util.dk(Integer.toString(i)); @@ -726,14 +728,14 @@ public class SSTableRewriterTest extends SchemaLoader SSTableReader s = cfs.getSSTables().iterator().next(); Set<SSTableReader> compacting = new HashSet<>(); compacting.add(s); - cfs.getDataTracker().markCompacting(compacting); - SSTableRewriter.overrideOpenInterval(1); int keyCount = 0; - try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false); - ISSTableScanner scanner = compacting.iterator().next().getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + try (ISSTableScanner scanner = compacting.iterator().next().getScanner(); + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); + SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false); + ) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); while (scanner.hasNext()) @@ -746,15 +748,7 @@ public class SSTableRewriterTest extends SchemaLoader keyCount++; validateKeys(keyspace); } - try - { - cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finish(), OperationType.COMPACTION); - cfs.getDataTracker().unmarkCompacting(compacting); - } - catch (Throwable t) - { - rewriter.abort(); - } + rewriter.finish(); } validateKeys(keyspace); SSTableDeletingTask.waitForDeletions(); @@ -762,7 +756,7 @@ public class SSTableRewriterTest extends SchemaLoader } @Test - public void testCanonicalView() throws IOException + public void testCanonicalView() throws Exception { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -770,15 +764,16 @@ public class SSTableRewriterTest extends SchemaLoader SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); - Set<SSTableReader> sstables = Sets.newHashSet(cfs.markAllCompacting()); + Set<SSTableReader> sstables = Sets.newHashSet(s); assertEquals(1, sstables.size()); SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false); boolean checked = false; - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables)) + try (ISSTableScanner scanner = sstables.iterator().next().getScanner(); + CompactionController controller = new CompactionController(cfs, sstables, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); + SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false); + ) { - ISSTableScanner scanner = scanners.scanners.get(0); - CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); while (scanner.hasNext()) { @@ -796,10 +791,7 @@ public class SSTableRewriterTest extends SchemaLoader } } } - writer.abort(); - cfs.getDataTracker().unmarkCompacting(sstables); - cfs.truncateBlocking(); - SSTableDeletingTask.waitForDeletions(); + truncateCF(); validateCFS(cfs); } @@ -813,30 +805,52 @@ public class SSTableRewriterTest extends SchemaLoader } } - private SSTableReader writeFile(ColumnFamilyStore cfs, int count) + public static void truncate(ColumnFamilyStore cfs) { - ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata); - for (int i = 0; i < count / 100; i++) - cf.addColumn(Util.cellname(i), random(0, 1000), 1); - File dir = cfs.directories.getDirectoryForNewSSTables(); - String filename = cfs.getTempSSTablePath(dir); + cfs.truncateBlocking(); + Uninterruptibles.sleepUninterruptibly(10L,TimeUnit.MILLISECONDS); + assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount()); + assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount()); + } - try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0);) + public static SSTableReader writeFile(ColumnFamilyStore cfs, int count) + { + return Iterables.getFirst(writeFiles(cfs, 1, count * 5, count / 100, 1000), null); + } + + public static Set<SSTableReader> writeFiles(ColumnFamilyStore cfs, int fileCount, int partitionCount, int cellCount, int cellSize) + { + int i = 0; + Set<SSTableReader> result = new LinkedHashSet<>(); + for (int f = 0 ; f < fileCount ; f++) { - for (int i = 0; i < count * 5; i++) + File dir = cfs.directories.getDirectoryForNewSSTables(); + String filename = cfs.getTempSSTablePath(dir); + + SSTableWriter writer = SSTableWriter.create(filename, 0, 0); + int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount; + for ( ; i < end ; i++) + { + ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata); + for (int j = 0; j < cellCount ; j++) + cf.addColumn(Util.cellname(j), random(0, cellSize), 1); writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf); - return writer.finish(true); + } + result.add(writer.finish(true)); } + return result; } - private void validateCFS(ColumnFamilyStore cfs) + public static void validateCFS(ColumnFamilyStore cfs) { Set<Integer> liveDescriptors = new HashSet<>(); + long spaceUsed = 0; for (SSTableReader sstable : cfs.getSSTables()) { assertFalse(sstable.isMarkedCompacted()); assertEquals(1, sstable.selfRef().globalCount()); liveDescriptors.add(sstable.descriptor.generation); + spaceUsed += sstable.bytesOnDisk(); } for (File dir : cfs.directories.getCFDirectories()) { @@ -849,11 +863,13 @@ public class SSTableRewriterTest extends SchemaLoader } } } - assertTrue(cfs.getDataTracker().getCompacting().isEmpty()); + assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount()); + assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount()); + assertTrue(cfs.getTracker().getCompacting().isEmpty()); } - private int assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount) + public static int assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount) { int tmplinkcount = 0; int tmpcount = 0; @@ -874,7 +890,7 @@ public class SSTableRewriterTest extends SchemaLoader return datacount; } - private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory) + public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory) { String filename = cfs.getTempSSTablePath(directory); return SSTableWriter.create(filename, 0, 0);