more robust solution to incomplete compactions + counters patch by yukim; reviewed by jbellis for CASSANDRA-5151
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa90c88b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa90c88b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa90c88b Branch: refs/heads/trunk Commit: aa90c88be14b337714739cd857c12cad2a9fedeb Parents: 21f63a9 Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu Jan 31 16:01:32 2013 -0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu Jan 31 22:49:08 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/CFMetaData.java | 7 + .../org/apache/cassandra/config/KSMetaData.java | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 108 ++++++++++----- src/java/org/apache/cassandra/db/SystemTable.java | 93 +++++++++++-- .../cassandra/db/compaction/CompactionTask.java | 8 + .../io/sstable/AbstractSSTableSimpleWriter.java | 2 +- .../apache/cassandra/service/CassandraDaemon.java | 9 ++ .../apache/cassandra/db/ColumnFamilyStoreTest.java | 74 ++++++++-- .../org/apache/cassandra/db/SystemTableTest.java | 1 + .../cassandra/db/compaction/CompactionsTest.java | 36 +++++ 11 files changed, 275 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 49d4802..faefd55 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.2 + * more robust solution to incomplete compactions + counters (CASSANDRA-5151) * fix symlinks under data dir not working (CASSANDRA-5185) * fix bug in compact storage metadata handling (CASSANDRA-5189) * Validate login for USE queries (CASSANDRA-5207) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 7fba681..21ca90a 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -227,6 +227,13 @@ public final class CFMetaData + "super boolean" + ") WITH gc_grace_seconds=864000;", Auth.AUTH_KS); + public static final CFMetaData CompactionLogCF = compile(19, "CREATE TABLE " + SystemTable.COMPACTION_LOG + " (" + + "id uuid PRIMARY KEY," + + "keyspace_name text," + + "columnfamily_name text," + + "inputs set<int>" + + ") WITH COMMENT='unfinished compactions'"); + public enum Caching { ALL, KEYS_ONLY, ROWS_ONLY, NONE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/src/java/org/apache/cassandra/config/KSMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java index e5b349a..cfb4aef 100644 --- a/src/java/org/apache/cassandra/config/KSMetaData.java +++ b/src/java/org/apache/cassandra/config/KSMetaData.java @@ -89,6 +89,7 @@ public final class KSMetaData CFMetaData.SchemaKeyspacesCf, CFMetaData.SchemaColumnFamiliesCf, CFMetaData.SchemaColumnsCf, + CFMetaData.CompactionLogCF, CFMetaData.OldStatusCf, CFMetaData.OldHintsCf, CFMetaData.OldMigrationsCf, http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index cb12792..0769d5c 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -33,10 +33,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; - -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.tracing.TraceState; -import org.apache.cassandra.tracing.Tracing; import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,9 +41,10 @@ import org.apache.cassandra.cache.IRowCacheEntry; import org.apache.cassandra.cache.RowCacheKey; import org.apache.cassandra.cache.RowCacheSentinel; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; -import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.*; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.ReplayPosition; @@ -56,6 +53,7 @@ import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.ExtendedFilter; +import org.apache.cassandra.db.filter.IDiskAtomFilter; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.index.SecondaryIndex; @@ -63,6 +61,7 @@ import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.Descriptor; @@ -243,34 +242,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true); Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, this.partitioner); - - if (metadata.getDefaultValidator().isCommutative()) - { - // Filter non-compacted sstables, remove compacted ones - Set<Integer> compactedSSTables = new HashSet<Integer>(); - for (SSTableReader sstable : sstables) - compactedSSTables.addAll(sstable.getAncestors()); - - Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>(); - for (SSTableReader sstable : sstables) - { - if (compactedSSTables.contains(sstable.descriptor.generation)) - { - logger.info("{} is already compacted and will be removed.", sstable); - sstable.markCompacted(); // we need to mark as compacted to be deleted - sstable.releaseReference(); // this amount to deleting the sstable - } - else - { - liveSSTables.add(sstable); - } - } - data.addInitialSSTables(liveSSTables); - } - else - { - data.addInitialSSTables(sstables); - } + data.addInitialSSTables(sstables); } if (caching == Caching.ALL || caching == Caching.KEYS_ONLY) @@ -447,6 +419,72 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } + /** + * Replacing compacted sstables is atomic as far as observers of DataTracker are concerned, but not on the + * filesystem: first the new sstables are renamed to "live" status (i.e., the tmp marker is removed), then + * their ancestors are removed. + * + * If an unclean shutdown happens at the right time, we can thus end up with both the new ones and their + * ancestors "live" in the system. This is harmless for normal data, but for counters it can cause overcounts. + * + * To prevent this, we record sstables being compacted in the system keyspace. If we find unfinished + * compactions, we remove the new ones (since those may be incomplete -- under LCS, we may create multiple + * sstables from any given ancestor). + */ + public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Set<Integer> unfinishedGenerations) + { + Directories directories = Directories.create(keyspace, columnfamily); + + // sanity-check unfinishedGenerations + Set<Integer> allGenerations = new HashSet<Integer>(); + for (Descriptor desc : directories.sstableLister().list().keySet()) + allGenerations.add(desc.generation); + if (!allGenerations.containsAll(unfinishedGenerations)) + { + throw new IllegalStateException("Unfinished compactions reference missing sstables." + + " This should never happen since compactions are marked finished before we start removing the old sstables."); + } + + // remove new sstables from compactions that didn't complete, and compute + // set of ancestors that shouldn't exist anymore + Set<Integer> completedAncestors = new HashSet<Integer>(); + for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet()) + { + Descriptor desc = sstableFiles.getKey(); + Set<Component> components = sstableFiles.getValue(); + + SSTableMetadata meta; + try + { + meta = SSTableMetadata.serializer.deserialize(desc); + } + catch (IOException e) + { + throw new FSReadError(e, desc.filenameFor(Component.STATS)); + } + + Set<Integer> ancestors = meta.ancestors; + if (!ancestors.isEmpty() && unfinishedGenerations.containsAll(ancestors)) + { + SSTable.delete(desc, components); + } + else + { + completedAncestors.addAll(ancestors); + } + } + + // remove old sstables from compactions that did complete + for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet()) + { + Descriptor desc = sstableFiles.getKey(); + Set<Component> components = sstableFiles.getValue(); + + if (completedAncestors.contains(desc.generation)) + SSTable.delete(desc, components); + } + } + // must be called after all sstables are loaded since row cache merges all row versions public void initRowCache() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/src/java/org/apache/cassandra/db/SystemTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java index 9dad12e..4f6234b 100644 --- a/src/java/org/apache/cassandra/db/SystemTable.java +++ b/src/java/org/apache/cassandra/db/SystemTable.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; @@ -26,38 +25,33 @@ import java.nio.charset.CharacterCodingException; import java.util.*; import java.util.concurrent.ExecutionException; +import com.google.common.base.Function; import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.avro.ipc.ByteBufferInputStream; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.commitlog.ReplayPosition; -import org.apache.cassandra.db.marshal.UUIDType; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; +import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; -import org.apache.cassandra.db.marshal.AsciiType; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.Constants; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.CounterId; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.*; import static org.apache.cassandra.cql3.QueryProcessor.processInternal; @@ -77,6 +71,7 @@ public class SystemTable public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces"; public static final String SCHEMA_COLUMNFAMILIES_CF = "schema_columnfamilies"; public static final String SCHEMA_COLUMNS_CF = "schema_columns"; + public static final String COMPACTION_LOG = "compactions_in_progress"; @Deprecated public static final String OLD_STATUS_CF = "LocationInfo"; @@ -187,6 +182,74 @@ public class SystemTable } } + /** + * Write compaction log, except columfamilies under system keyspace. + * + * @param cfs + * @param toCompact sstables to compact + * @return compaction task id or null if cfs is under system keyspace + */ + public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact) + { + if (Table.SYSTEM_KS.equals(cfs.table.name)) + return null; + + UUID compactionId = UUIDGen.getTimeUUID(); + String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})"; + Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>() + { + public Integer apply(SSTableReader sstable) + { + return sstable.descriptor.generation; + } + }); + processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.table.name, cfs.columnFamily, StringUtils.join(generations.iterator(), ','))); + forceBlockingFlush(COMPACTION_LOG); + return compactionId; + } + + public static void finishCompaction(UUID taskId) + { + assert taskId != null; + + String req = "DELETE FROM system.%s WHERE id = %s"; + processInternal(String.format(req, COMPACTION_LOG, taskId)); + forceBlockingFlush(COMPACTION_LOG); + } + + /** + * @return unfinished compactions, grouped by keyspace/columnfamily pair. + */ + public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions() + { + String req = "SELECT * FROM system.%s"; + UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG)); + + SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create(); + for (UntypedResultSet.Row row : resultSet) + { + String keyspace = row.getString("keyspace_name"); + String columnfamily = row.getString("columnfamily_name"); + Set<Integer> inputs = row.getSet("inputs", Int32Type.instance); + + unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs); + } + return unfinishedCompactions; + } + + public static void discardCompactionsInProgress() + { + ColumnFamilyStore compactionLog = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG); + try + { + compactionLog.truncate().get(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + public static void saveTruncationPosition(ColumnFamilyStore cfs, ReplayPosition position) { String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 8aa5aca..ca1e2da 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -36,6 +36,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.SystemTable; import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.utils.CloseableIterator; @@ -111,6 +112,8 @@ public class CompactionTask extends AbstractCompactionTask for (SSTableReader sstable : toCompact) assert sstable.descriptor.cfname.equals(cfs.columnFamily); + UUID taskId = SystemTable.startCompaction(cfs, toCompact); + CompactionController controller = new CompactionController(cfs, toCompact, gcBefore); // new sstables from flush can be added during a compaction, but only the compaction can remove them, // so in our single-threaded compaction world this is a valid way of determining if we're compacting @@ -236,6 +239,11 @@ public class CompactionTask extends AbstractCompactionTask throw new RuntimeException(e); } + // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones + // (in replaceCompactedSSTables) + if (taskId != null) + SystemTable.finishCompaction(taskId); + if (collector != null) collector.finishCompaction(ci); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index f3d097f..c91fcce 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -59,7 +59,7 @@ public abstract class AbstractSSTableSimpleWriter } // find available generation and pick up filename from that - private static String makeFilename(File directory, final String keyspace, final String columnFamily) + protected static String makeFilename(File directory, final String keyspace, final String columnFamily) { final Set<Descriptor> existing = new HashSet<Descriptor>(); directory.list(new FilenameFilter() http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index fd32f34..605f94d 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Iterables; +import com.google.common.collect.SetMultimap; import org.apache.log4j.PropertyConfigurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +45,7 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.thrift.ThriftServer; import org.apache.cassandra.utils.CLibrary; import org.apache.cassandra.utils.Mx4jTool; +import org.apache.cassandra.utils.Pair; /** * The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon @@ -204,6 +206,13 @@ public class CassandraDaemon ColumnFamilyStore.scrubDataDirectories(table, cfm.cfName); } } + // clean up compaction leftovers + SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = SystemTable.getUnfinishedCompactions(); + for (Pair<String, String> kscf : unfinishedCompactions.keySet()) + { + ColumnFamilyStore.removeUnfinishedCompactionLeftovers(kscf.left, kscf.right, unfinishedCompactions.get(kscf)); + } + SystemTable.discardCompactionsInProgress(); // initialize keyspaces for (String table : Schema.instance.getTables()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index cd34a91..8048fcb 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -22,21 +22,13 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Random; -import java.util.Set; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import com.google.common.base.Function; import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.junit.Test; @@ -55,7 +47,9 @@ import static org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.Schema; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.filter.*; @@ -68,13 +62,11 @@ import org.apache.cassandra.dht.ExcludingBounds; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.IncludingExcludingBounds; import org.apache.cassandra.dht.Range; -import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.WrappedRunnable; public class ColumnFamilyStoreTest extends SchemaLoader @@ -1249,6 +1241,60 @@ public class ColumnFamilyStoreTest extends SchemaLoader testMultiRangeSlicesBehavior(prepareMultiRangeSlicesTest(10, false)); } + @Test + public void testRemoveUnifinishedCompactionLeftovers() throws Throwable + { + String ks = "Keyspace1"; + String cf = "Standard3"; // should be empty + + CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf); + Directories dir = Directories.create(ks, cf); + ByteBuffer key = bytes("key"); + + // 1st sstable + SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(100), + cfmeta, StorageService.getPartitioner()); + writer.newRow(key); + writer.addColumn(bytes("col"), bytes("val"), 1); + writer.close(); + + Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list(); + assert sstables.size() == 1; + + Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next(); + final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey()); + + // simulate incomplete compaction + writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(100), + cfmeta, StorageService.getPartitioner()) + { + protected SSTableWriter getWriter() + { + SSTableMetadata.Collector collector = SSTableMetadata.createCollector(); + collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable + return new SSTableWriter(makeFilename(directory, metadata.ksName, metadata.cfName), + 0, + metadata, + StorageService.getPartitioner(), + collector); + } + }; + writer.newRow(key); + writer.addColumn(bytes("col"), bytes("val"), 1); + writer.close(); + + // should have 2 sstables now + sstables = dir.sstableLister().list(); + assert sstables.size() == 2; + + ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, Sets.newHashSet(sstable1.descriptor.generation)); + + // 2nd sstable should be removed (only 1st sstable exists in set of size 1) + sstables = dir.sstableLister().list(); + assert sstables.size() == 1; + assert sstables.containsKey(sstable1.descriptor); + } + private ColumnFamilyStore prepareMultiRangeSlicesTest(int valueSize, boolean flush) throws Throwable { String tableName = "Keyspace1"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/test/unit/org/apache/cassandra/db/SystemTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SystemTableTest.java b/test/unit/org/apache/cassandra/db/SystemTableTest.java index 8854411..b202173 100644 --- a/test/unit/org/apache/cassandra/db/SystemTableTest.java +++ b/test/unit/org/apache/cassandra/db/SystemTableTest.java @@ -31,6 +31,7 @@ import java.util.UUID; import org.junit.Test; +import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.dht.BytesToken; import org.apache.cassandra.dht.Token; import org.apache.cassandra.utils.ByteBufferUtil; http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index c918561..302c651 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -24,6 +24,10 @@ import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; import org.junit.Test; import org.apache.cassandra.SchemaLoader; @@ -40,6 +44,7 @@ import org.apache.cassandra.io.sstable.SSTableScanner; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import static junit.framework.Assert.*; @@ -275,6 +280,37 @@ public class CompactionsTest extends SchemaLoader assert sstables.iterator().next().descriptor.generation == prevGeneration + 1; } + @Test + public void testCompactionLog() throws Exception + { + SystemTable.discardCompactionsInProgress(); + SetMultimap<Pair<String, String>, Integer> compactionLogs = SystemTable.getUnfinishedCompactions(); + assert compactionLogs.isEmpty(); + + String cf = "Standard1"; + ColumnFamilyStore cfs = Table.open(TABLE1).getColumnFamilyStore(cf); + insertData(TABLE1, cf, 0, 1); + cfs.forceBlockingFlush(); + + Collection<SSTableReader> sstables = cfs.getSSTables(); + assert !sstables.isEmpty(); + Set<Integer> generations = Sets.newHashSet(Iterables.transform(sstables, new Function<SSTableReader, Integer>() + { + public Integer apply(SSTableReader sstable) + { + return sstable.descriptor.generation; + } + })); + UUID taskId = SystemTable.startCompaction(cfs, sstables); + compactionLogs = SystemTable.getUnfinishedCompactions(); + Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(TABLE1, cf)); + assert unfinishedCompactions.containsAll(generations); + + SystemTable.finishCompaction(taskId); + compactionLogs = SystemTable.getUnfinishedCompactions(); + assert compactionLogs.isEmpty(); + } + private void testDontPurgeAccidentaly(String k, String cfname, boolean forceDeserialize) throws IOException, ExecutionException, InterruptedException { // This test catches the regression of CASSANDRA-2786