more robust solution to incomplete compactions + counters
patch by yukim; reviewed by jbellis for CASSANDRA-5151


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa90c88b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa90c88b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa90c88b

Branch: refs/heads/trunk
Commit: aa90c88be14b337714739cd857c12cad2a9fedeb
Parents: 21f63a9
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Thu Jan 31 16:01:32 2013 -0600
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Thu Jan 31 22:49:08 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/config/CFMetaData.java    |    7 +
 .../org/apache/cassandra/config/KSMetaData.java    |    1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  108 ++++++++++-----
 src/java/org/apache/cassandra/db/SystemTable.java  |   93 +++++++++++--
 .../cassandra/db/compaction/CompactionTask.java    |    8 +
 .../io/sstable/AbstractSSTableSimpleWriter.java    |    2 +-
 .../apache/cassandra/service/CassandraDaemon.java  |    9 ++
 .../apache/cassandra/db/ColumnFamilyStoreTest.java |   74 ++++++++--
 .../org/apache/cassandra/db/SystemTableTest.java   |    1 +
 .../cassandra/db/compaction/CompactionsTest.java   |   36 +++++
 11 files changed, 275 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 49d4802..faefd55 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.2
+ * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
  * fix symlinks under data dir not working (CASSANDRA-5185)
  * fix bug in compact storage metadata handling (CASSANDRA-5189)
  * Validate login for USE queries (CASSANDRA-5207)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java 
b/src/java/org/apache/cassandra/config/CFMetaData.java
index 7fba681..21ca90a 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -227,6 +227,13 @@ public final class CFMetaData
                                                              + "super boolean"
                                                              + ") WITH 
gc_grace_seconds=864000;", Auth.AUTH_KS);
 
+    public static final CFMetaData CompactionLogCF = compile(19, "CREATE TABLE 
" + SystemTable.COMPACTION_LOG + " ("
+                                                                 + "id uuid 
PRIMARY KEY,"
+                                                                 + 
"keyspace_name text,"
+                                                                 + 
"columnfamily_name text,"
+                                                                 + "inputs 
set<int>"
+                                                                 + ") WITH 
COMMENT='unfinished compactions'");
+
     public enum Caching
     {
         ALL, KEYS_ONLY, ROWS_ONLY, NONE;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java 
b/src/java/org/apache/cassandra/config/KSMetaData.java
index e5b349a..cfb4aef 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -89,6 +89,7 @@ public final class KSMetaData
                                                 CFMetaData.SchemaKeyspacesCf,
                                                 
CFMetaData.SchemaColumnFamiliesCf,
                                                 CFMetaData.SchemaColumnsCf,
+                                                CFMetaData.CompactionLogCF,
                                                 CFMetaData.OldStatusCf,
                                                 CFMetaData.OldHintsCf,
                                                 CFMetaData.OldMigrationsCf,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cb12792..0769d5c 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,10 +33,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
-
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,9 +41,10 @@ import org.apache.cassandra.cache.IRowCacheEntry;
 import org.apache.cassandra.cache.RowCacheKey;
 import org.apache.cassandra.cache.RowCacheSentinel;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
@@ -56,6 +53,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ExtendedFilter;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.index.SecondaryIndex;
@@ -63,6 +61,7 @@ import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -243,34 +242,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         {
             Directories.SSTableLister sstableFiles = 
directories.sstableLister().skipTemporary(true);
             Collection<SSTableReader> sstables = 
SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, 
this.partitioner);
-
-            if (metadata.getDefaultValidator().isCommutative())
-            {
-                // Filter non-compacted sstables, remove compacted ones
-                Set<Integer> compactedSSTables = new HashSet<Integer>();
-                for (SSTableReader sstable : sstables)
-                    compactedSSTables.addAll(sstable.getAncestors());
-
-                Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>();
-                for (SSTableReader sstable : sstables)
-                {
-                    if 
(compactedSSTables.contains(sstable.descriptor.generation))
-                    {
-                        logger.info("{} is already compacted and will be 
removed.", sstable);
-                        sstable.markCompacted(); // we need to mark as 
compacted to be deleted
-                        sstable.releaseReference(); // this amount to deleting 
the sstable
-                    }
-                    else
-                    {
-                        liveSSTables.add(sstable);
-                    }
-                }
-                data.addInitialSSTables(liveSSTables);
-            }
-            else
-            {
-                data.addInitialSSTables(sstables);
-            }
+            data.addInitialSSTables(sstables);
         }
 
         if (caching == Caching.ALL || caching == Caching.KEYS_ONLY)
@@ -447,6 +419,72 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
     }
 
+    /**
+     * Replacing compacted sstables is atomic as far as observers of 
DataTracker are concerned, but not on the
+     * filesystem: first the new sstables are renamed to "live" status (i.e., 
the tmp marker is removed), then
+     * their ancestors are removed.
+     *
+     * If an unclean shutdown happens at the right time, we can thus end up 
with both the new ones and their
+     * ancestors "live" in the system.  This is harmless for normal data, but 
for counters it can cause overcounts.
+     *
+     * To prevent this, we record sstables being compacted in the system 
keyspace.  If we find unfinished
+     * compactions, we remove the new ones (since those may be incomplete -- 
under LCS, we may create multiple
+     * sstables from any given ancestor).
+     */
+    public static void removeUnfinishedCompactionLeftovers(String keyspace, 
String columnfamily, Set<Integer> unfinishedGenerations)
+    {
+        Directories directories = Directories.create(keyspace, columnfamily);
+
+        // sanity-check unfinishedGenerations
+        Set<Integer> allGenerations = new HashSet<Integer>();
+        for (Descriptor desc : directories.sstableLister().list().keySet())
+            allGenerations.add(desc.generation);
+        if (!allGenerations.containsAll(unfinishedGenerations))
+        {
+            throw new IllegalStateException("Unfinished compactions reference 
missing sstables."
+                                            + " This should never happen since 
compactions are marked finished before we start removing the old sstables.");
+        }
+
+        // remove new sstables from compactions that didn't complete, and 
compute
+        // set of ancestors that shouldn't exist anymore
+        Set<Integer> completedAncestors = new HashSet<Integer>();
+        for (Map.Entry<Descriptor, Set<Component>> sstableFiles : 
directories.sstableLister().list().entrySet())
+        {
+            Descriptor desc = sstableFiles.getKey();
+            Set<Component> components = sstableFiles.getValue();
+
+            SSTableMetadata meta;
+            try
+            {
+                meta = SSTableMetadata.serializer.deserialize(desc);
+            }
+            catch (IOException e)
+            {
+                throw new FSReadError(e, desc.filenameFor(Component.STATS));
+            }
+
+            Set<Integer> ancestors = meta.ancestors;
+            if (!ancestors.isEmpty() && 
unfinishedGenerations.containsAll(ancestors))
+            {
+                SSTable.delete(desc, components);
+            }
+            else
+            {
+                completedAncestors.addAll(ancestors);
+            }
+        }
+
+        // remove old sstables from compactions that did complete
+        for (Map.Entry<Descriptor, Set<Component>> sstableFiles : 
directories.sstableLister().list().entrySet())
+        {
+            Descriptor desc = sstableFiles.getKey();
+            Set<Component> components = sstableFiles.getValue();
+
+            if (completedAncestors.contains(desc.generation))
+                SSTable.delete(desc, components);
+        }
+    }
+
     // must be called after all sstables are loaded since row cache merges all 
row versions
     public void initRowCache()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java 
b/src/java/org/apache/cassandra/db/SystemTable.java
index 9dad12e..4f6234b 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
@@ -26,38 +25,33 @@ import java.nio.charset.CharacterCodingException;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 
+import com.google.common.base.Function;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.avro.ipc.ByteBufferInputStream;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.Constants;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CounterId;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.*;
 
 import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
 
@@ -77,6 +71,7 @@ public class SystemTable
     public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces";
     public static final String SCHEMA_COLUMNFAMILIES_CF = 
"schema_columnfamilies";
     public static final String SCHEMA_COLUMNS_CF = "schema_columns";
+    public static final String COMPACTION_LOG = "compactions_in_progress";
 
     @Deprecated
     public static final String OLD_STATUS_CF = "LocationInfo";
@@ -187,6 +182,74 @@ public class SystemTable
         }
     }
 
+    /**
+     * Write compaction log, except columfamilies under system keyspace.
+     *
+     * @param cfs
+     * @param toCompact sstables to compact
+     * @return compaction task id or null if cfs is under system keyspace
+     */
+    public static UUID startCompaction(ColumnFamilyStore cfs, 
Iterable<SSTableReader> toCompact)
+    {
+        if (Table.SYSTEM_KS.equals(cfs.table.name))
+            return null;
+
+        UUID compactionId = UUIDGen.getTimeUUID();
+        String req = "INSERT INTO system.%s (id, keyspace_name, 
columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})";
+        Iterable<Integer> generations = Iterables.transform(toCompact, new 
Function<SSTableReader, Integer>()
+        {
+            public Integer apply(SSTableReader sstable)
+            {
+                return sstable.descriptor.generation;
+            }
+        });
+        processInternal(String.format(req, COMPACTION_LOG, compactionId, 
cfs.table.name, cfs.columnFamily, StringUtils.join(generations.iterator(), 
',')));
+        forceBlockingFlush(COMPACTION_LOG);
+        return compactionId;
+    }
+
+    public static void finishCompaction(UUID taskId)
+    {
+        assert taskId != null;
+
+        String req = "DELETE FROM system.%s WHERE id = %s";
+        processInternal(String.format(req, COMPACTION_LOG, taskId));
+        forceBlockingFlush(COMPACTION_LOG);
+    }
+
+    /**
+     * @return unfinished compactions, grouped by keyspace/columnfamily pair.
+     */
+    public static SetMultimap<Pair<String, String>, Integer> 
getUnfinishedCompactions()
+    {
+        String req = "SELECT * FROM system.%s";
+        UntypedResultSet resultSet = processInternal(String.format(req, 
COMPACTION_LOG));
+
+        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = 
HashMultimap.create();
+        for (UntypedResultSet.Row row : resultSet)
+        {
+            String keyspace = row.getString("keyspace_name");
+            String columnfamily = row.getString("columnfamily_name");
+            Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
+
+            unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), 
inputs);
+        }
+        return unfinishedCompactions;
+    }
+
+    public static void discardCompactionsInProgress()
+    {
+        ColumnFamilyStore compactionLog = 
Table.open(Table.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG);
+        try
+        {
+            compactionLog.truncate().get();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     public static void saveTruncationPosition(ColumnFamilyStore cfs, 
ReplayPosition position)
     {
         String req = "UPDATE system.%s SET truncated_at = truncated_at + %s 
WHERE key = '%s'";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 8aa5aca..ca1e2da 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.SystemTable;
 import 
org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.CloseableIterator;
@@ -111,6 +112,8 @@ public class CompactionTask extends AbstractCompactionTask
         for (SSTableReader sstable : toCompact)
             assert sstable.descriptor.cfname.equals(cfs.columnFamily);
 
+        UUID taskId = SystemTable.startCompaction(cfs, toCompact);
+
         CompactionController controller = new CompactionController(cfs, 
toCompact, gcBefore);
         // new sstables from flush can be added during a compaction, but only 
the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of 
determining if we're compacting
@@ -236,6 +239,11 @@ public class CompactionTask extends AbstractCompactionTask
                 throw new RuntimeException(e);
             }
 
+            // point of no return -- the new sstables are live on disk; next 
we'll start deleting the old ones
+            // (in replaceCompactedSSTables)
+            if (taskId != null)
+                SystemTable.finishCompaction(taskId);
+
             if (collector != null)
                 collector.finishCompaction(ci);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java 
b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index f3d097f..c91fcce 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -59,7 +59,7 @@ public abstract class AbstractSSTableSimpleWriter
     }
 
     // find available generation and pick up filename from that
-    private static String makeFilename(File directory, final String keyspace, 
final String columnFamily)
+    protected static String makeFilename(File directory, final String 
keyspace, final String columnFamily)
     {
         final Set<Descriptor> existing = new HashSet<Descriptor>();
         directory.list(new FilenameFilter()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index fd32f34..605f94d 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.SetMultimap;
 import org.apache.log4j.PropertyConfigurator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +45,7 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.thrift.ThriftServer;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.Mx4jTool;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon
@@ -204,6 +206,13 @@ public class CassandraDaemon
                 ColumnFamilyStore.scrubDataDirectories(table, cfm.cfName);
             }
         }
+        // clean up compaction leftovers
+        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = 
SystemTable.getUnfinishedCompactions();
+        for (Pair<String, String> kscf : unfinishedCompactions.keySet())
+        {
+            ColumnFamilyStore.removeUnfinishedCompactionLeftovers(kscf.left, 
kscf.right, unfinishedCompactions.get(kscf));
+        }
+        SystemTable.discardCompactionsInProgress();
 
         // initialize keyspaces
         for (String table : Schema.instance.getTables())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java 
b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index cd34a91..8048fcb 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -22,21 +22,13 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.junit.Test;
@@ -55,7 +47,9 @@ import static 
org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.*;
@@ -68,13 +62,11 @@ import org.apache.cassandra.dht.ExcludingBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.IncludingExcludingBounds;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 public class ColumnFamilyStoreTest extends SchemaLoader
@@ -1249,6 +1241,60 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         testMultiRangeSlicesBehavior(prepareMultiRangeSlicesTest(10, false));
     }
 
+    @Test
+    public void testRemoveUnifinishedCompactionLeftovers() throws Throwable
+    {
+        String ks = "Keyspace1";
+        String cf = "Standard3"; // should be empty
+
+        CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
+        Directories dir = Directories.create(ks, cf);
+        ByteBuffer key = bytes("key");
+
+        // 1st sstable
+        SSTableSimpleWriter writer = new 
SSTableSimpleWriter(dir.getDirectoryForNewSSTables(100),
+                                                             cfmeta, 
StorageService.getPartitioner());
+        writer.newRow(key);
+        writer.addColumn(bytes("col"), bytes("val"), 1);
+        writer.close();
+
+        Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
+        assert sstables.size() == 1;
+
+        Map.Entry<Descriptor, Set<Component>> sstableToOpen = 
sstables.entrySet().iterator().next();
+        final SSTableReader sstable1 = 
SSTableReader.open(sstableToOpen.getKey());
+
+        // simulate incomplete compaction
+        writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(100),
+                                                             cfmeta, 
StorageService.getPartitioner())
+        {
+            protected SSTableWriter getWriter()
+            {
+                SSTableMetadata.Collector collector = 
SSTableMetadata.createCollector();
+                collector.addAncestor(sstable1.descriptor.generation); // add 
ancestor from previously written sstable
+                return new SSTableWriter(makeFilename(directory, 
metadata.ksName, metadata.cfName),
+                                         0,
+                                         metadata,
+                                         StorageService.getPartitioner(),
+                                         collector);
+            }
+        };
+        writer.newRow(key);
+        writer.addColumn(bytes("col"), bytes("val"), 1);
+        writer.close();
+
+        // should have 2 sstables now
+        sstables = dir.sstableLister().list();
+        assert sstables.size() == 2;
+
+        ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, 
Sets.newHashSet(sstable1.descriptor.generation));
+
+        // 2nd sstable should be removed (only 1st sstable exists in set of 
size 1)
+        sstables = dir.sstableLister().list();
+        assert sstables.size() == 1;
+        assert sstables.containsKey(sstable1.descriptor);
+    }
+
     private ColumnFamilyStore prepareMultiRangeSlicesTest(int valueSize, 
boolean flush) throws Throwable
     {
         String tableName = "Keyspace1";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/test/unit/org/apache/cassandra/db/SystemTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SystemTableTest.java 
b/test/unit/org/apache/cassandra/db/SystemTableTest.java
index 8854411..b202173 100644
--- a/test/unit/org/apache/cassandra/db/SystemTableTest.java
+++ b/test/unit/org/apache/cassandra/db/SystemTableTest.java
@@ -31,6 +31,7 @@ import java.util.UUID;
 
 import org.junit.Test;
 
+import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.dht.BytesToken;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.utils.ByteBufferUtil;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa90c88b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index c918561..302c651 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -24,6 +24,10 @@ import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
@@ -40,6 +44,7 @@ import org.apache.cassandra.io.sstable.SSTableScanner;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 import static junit.framework.Assert.*;
 
@@ -275,6 +280,37 @@ public class CompactionsTest extends SchemaLoader
         assert sstables.iterator().next().descriptor.generation == 
prevGeneration + 1;
     }
 
+    @Test
+    public void testCompactionLog() throws Exception
+    {
+        SystemTable.discardCompactionsInProgress();
+        SetMultimap<Pair<String, String>, Integer> compactionLogs = 
SystemTable.getUnfinishedCompactions();
+        assert compactionLogs.isEmpty();
+
+        String cf = "Standard1";
+        ColumnFamilyStore cfs = Table.open(TABLE1).getColumnFamilyStore(cf);
+        insertData(TABLE1, cf, 0, 1);
+        cfs.forceBlockingFlush();
+
+        Collection<SSTableReader> sstables = cfs.getSSTables();
+        assert !sstables.isEmpty();
+        Set<Integer> generations = 
Sets.newHashSet(Iterables.transform(sstables, new Function<SSTableReader, 
Integer>()
+        {
+            public Integer apply(SSTableReader sstable)
+            {
+                return sstable.descriptor.generation;
+            }
+        }));
+        UUID taskId = SystemTable.startCompaction(cfs, sstables);
+        compactionLogs = SystemTable.getUnfinishedCompactions();
+        Set<Integer> unfinishedCompactions = 
compactionLogs.get(Pair.create(TABLE1, cf));
+        assert unfinishedCompactions.containsAll(generations);
+
+        SystemTable.finishCompaction(taskId);
+        compactionLogs = SystemTable.getUnfinishedCompactions();
+        assert compactionLogs.isEmpty();
+    }
+
     private void testDontPurgeAccidentaly(String k, String cfname, boolean 
forceDeserialize) throws IOException, ExecutionException, InterruptedException
     {
         // This test catches the regression of CASSANDRA-2786

Reply via email to