http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 0000000,4d248bd..58803c3
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -1,0 -1,490 +1,504 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.cassandra.io.sstable;
+
+ import java.io.File;
+ import java.nio.ByteBuffer;
+ import java.util.Arrays;
-import java.util.Collection;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import com.google.common.collect.Sets;
++import org.junit.After;
++import org.junit.BeforeClass;
+ import org.junit.Test;
+
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
++import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.db.ArrayBackedSortedColumns;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+ import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+ import org.apache.cassandra.db.compaction.CompactionController;
+ import org.apache.cassandra.db.compaction.ICompactionScanner;
+ import org.apache.cassandra.db.compaction.LazilyCompactedRow;
+ import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
++import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
++import org.apache.cassandra.locator.SimpleStrategy;
+ import org.apache.cassandra.metrics.StorageMetrics;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+
+ public class SSTableRewriterTest extends SchemaLoader
+ {
- private static final String KEYSPACE = "Keyspace1";
++ private static final String KEYSPACE = "SSTableRewriterTest";
+ private static final String CF = "Standard1";
++
++ @BeforeClass
++ public static void defineSchema() throws ConfigurationException
++ {
++ SchemaLoader.prepareServer();
++ SchemaLoader.createKeyspace(KEYSPACE,
++ SimpleStrategy.class,
++ KSMetaData.optsWithRF(1),
++ SchemaLoader.standardCFMD(KEYSPACE, CF));
++ }
++
++ @After
++ public void truncateCF()
++ {
++ Keyspace keyspace = Keyspace.open(KEYSPACE);
++ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
++ store.truncateBlocking();
++ }
++
++
+ @Test
+ public void basicTest() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ for (int j = 0; j < 100; j ++)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
+ Mutation rm = new Mutation(KEYSPACE, key);
+ rm.add(CF, Util.cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER,
j);
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+ Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
+ assertEquals(1, sstables.size());
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000,
false);
+ AbstractCompactionStrategy.ScannerList scanners =
cfs.getCompactionStrategy().getScanners(sstables);
+ ICompactionScanner scanner = scanners.scanners.get(0);
+ CompactionController controller = new CompactionController(cfs,
sstables, cfs.gcBefore(System.currentTimeMillis()));
+ writer.switchWriter(getWriter(cfs,
sstables.iterator().next().descriptor.directory));
+ while(scanner.hasNext())
+ {
+ AbstractCompactedRow row = new LazilyCompactedRow(controller,
Arrays.asList(scanner.next()));
+ writer.append(row);
+ }
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstables,
writer.finish(), OperationType.COMPACTION);
+
+ validateCFS(cfs);
+
+ }
+
+
+ @Test
+ public void testFileRemoval() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ ArrayBackedSortedColumns cf =
ArrayBackedSortedColumns.factory.create(cfs.metadata);
- for (int i = 0; i < 1000; i++)
- cf.addColumn(Util.column(String.valueOf(i), "a", 1));
++ for (int i = 0; i < 100; i++)
++ cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
+ File dir = cfs.directories.getDirectoryForNewSSTables();
+ SSTableWriter writer = getWriter(cfs, dir);
-
+ for (int i = 0; i < 500; i++)
+
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
cf);
+ SSTableReader s = writer.openEarly(1000);
+ assertFileCounts(dir.list(), 2, 3);
+ for (int i = 500; i < 1000; i++)
+
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
cf);
+ SSTableReader s2 = writer.openEarly(1000);
+ assertTrue(s != s2);
+ assertFileCounts(dir.list(), 2, 3);
+ s.markObsolete();
+ s.releaseReference();
+ Thread.sleep(1000);
+ assertFileCounts(dir.list(), 0, 3);
+ writer.abort(false);
+ Thread.sleep(1000);
+ assertFileCounts(dir.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testFileRemovalNoAbort() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ ArrayBackedSortedColumns cf =
ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ for (int i = 0; i < 1000; i++)
+ cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+ File dir = cfs.directories.getDirectoryForNewSSTables();
+ SSTableWriter writer = getWriter(cfs, dir);
+
+ for (int i = 0; i < 500; i++)
+
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
cf);
+ SSTableReader s = writer.openEarly(1000);
+ //assertFileCounts(dir.list(), 2, 3);
+ for (int i = 500; i < 1000; i++)
+
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
cf);
+ writer.closeAndOpenReader();
+ s.markObsolete();
+ s.releaseReference();
+ Thread.sleep(1000);
+ assertFileCounts(dir.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+
+ @Test
+ public void testNumberOfFilesAndSizes() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ long startStorageMetricsLoad = StorageMetrics.load.count();
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000,
false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs,
compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller,
Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one
original file plus the ones we have switched out.
+ assertEquals(s.bytesOnDisk(),
cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(s.bytesOnDisk(),
cfs.metric.totalDiskSpaceUsed.count());
+
+ }
+ }
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting,
sstables, OperationType.COMPACTION);
+ long sum = 0;
+ for (SSTableReader x : cfs.getSSTables())
+ sum += x.bytesOnDisk();
+ assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(startStorageMetricsLoad - s.bytesOnDisk() + sum,
StorageMetrics.load.count());
+ assertEquals(files, sstables.size());
+ assertEquals(files, cfs.getSSTables().size());
+ Thread.sleep(1000);
+ // tmplink and tmp files should be gone:
+ assertEquals(sum, cfs.metric.totalDiskSpaceUsed.count());
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testNumberOfFiles_dont_clean_readers() throws
InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000,
false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs,
compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller,
Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one
original file plus the ones we have switched out.
+ }
+ }
+ List<SSTableReader> sstables = rewriter.finish();
+ assertEquals(files, sstables.size());
+ assertEquals(files + 1, cfs.getSSTables().size());
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting,
sstables, OperationType.COMPACTION);
+ assertEquals(files, cfs.getSSTables().size());
+ Thread.sleep(1000);
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+
+ @Test
+ public void testNumberOfFiles_abort() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ long startSize = cfs.metric.liveDiskSpaceUsed.count();
+ DecoratedKey origFirst = s.first;
+ DecoratedKey origLast = s.last;
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000,
false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs,
compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller,
Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one
original file plus the ones we have switched out.
+ }
+ }
+ rewriter.abort();
+ Thread.sleep(1000);
+ assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(1, cfs.getSSTables().size());
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
+ assertEquals(cfs.getSSTables().iterator().next().last, origLast);
+ validateCFS(cfs);
+
+ }
+
+ @Test
+ public void testNumberOfFiles_abort2() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+
+ DecoratedKey origFirst = s.first;
+ DecoratedKey origLast = s.last;
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000,
false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs,
compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller,
Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one
original file plus the ones we have switched out.
+ }
+ if (files == 3)
+ {
+ //testing to abort when we have nothing written in the new
file
+ rewriter.abort();
+ break;
+ }
+ }
+ Thread.sleep(1000);
+ assertEquals(1, cfs.getSSTables().size());
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+
+ assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
+ assertEquals(cfs.getSSTables().iterator().next().last, origLast);
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testNumberOfFiles_finish_empty_new_writer() throws
InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000,
false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs,
compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller,
Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one
original file plus the ones we have switched out.
+ }
+ if (files == 3)
+ {
+ //testing to finish when we have nothing written in the new
file
+ List<SSTableReader> sstables = rewriter.finish();
+
cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables,
OperationType.COMPACTION);
+ break;
+ }
+ }
+ Thread.sleep(1000);
+ assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote
anything to the last file
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testNumberOfFiles_truncate() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000,
false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs,
compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller,
Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ assertEquals(cfs.getSSTables().size(), files); // we have one
original file plus the ones we have switched out.
+ }
+ }
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting,
sstables, OperationType.COMPACTION);
+ Thread.sleep(1000);
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ cfs.truncateBlocking();
+ validateCFS(cfs);
+ }
+
+ @Test
+ public void testSmallFiles() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ SSTableReader s = writeFile(cfs, 400);
+ DecoratedKey origFirst = s.first;
+ cfs.addSSTable(s);
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(1000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000,
false);
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+ ICompactionScanner scanner = s.getScanner();
+ CompactionController controller = new CompactionController(cfs,
compacting, 0);
+ int files = 1;
+ while(scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller,
Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
+ {
+ assertEquals(1, cfs.getSSTables().size()); // we dont open
small files early ...
+ assertEquals(origFirst,
cfs.getSSTables().iterator().next().first); // ... and the first key should
stay the same
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ files++;
+ }
+ }
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting,
sstables, OperationType.COMPACTION);
+ assertEquals(files, sstables.size());
+ assertEquals(files, cfs.getSSTables().size());
+ Thread.sleep(1000);
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ validateCFS(cfs);
+ }
+
+ private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
+ {
+ ArrayBackedSortedColumns cf =
ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ for (int i = 0; i < count / 100; i++)
+ cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
+ File dir = cfs.directories.getDirectoryForNewSSTables();
+ String filename = cfs.getTempSSTablePath(dir);
+
- SSTableWriter writer = new SSTableWriter(filename,
- 0,
- 0,
- cfs.metadata,
- StorageService.getPartitioner(),
- new MetadataCollector(cfs.metadata.comparator));
++ SSTableWriter writer = SSTableWriter.create(filename, 0, 0);
+
+ for (int i = 0; i < count * 5; i++)
+
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
cf);
+ return writer.closeAndOpenReader();
+ }
+
+ private void validateCFS(ColumnFamilyStore cfs)
+ {
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ assertFalse(sstable.isMarkedCompacted());
+ assertEquals(1, sstable.referenceCount());
+ }
+ assertTrue(cfs.getDataTracker().getCompacting().isEmpty());
+ }
+
+
+ private void assertFileCounts(String [] files, int expectedtmplinkCount,
int expectedtmpCount)
+ {
+ int tmplinkcount = 0;
+ int tmpcount = 0;
+ for (String f : files)
+ {
- if (f.contains("-tmplink-"))
++ if (f.contains("tmplink-"))
+ tmplinkcount++;
- if (f.contains("-tmp-"))
++ if (f.contains("tmp-"))
+ tmpcount++;
+ }
+ assertEquals(expectedtmplinkCount, tmplinkcount);
+ assertEquals(expectedtmpCount, tmpcount);
+ }
+
+ private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)
+ {
+ String filename = cfs.getTempSSTablePath(directory);
- return new SSTableWriter(filename,
- 0,
- 0,
- cfs.metadata,
- StorageService.getPartitioner(),
- new
MetadataCollector(cfs.metadata.comparator));
++ return SSTableWriter.create(filename, 0, 0);
+ }
+ }