revert CASSANDRA-5151 from 1.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7ff2805c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7ff2805c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7ff2805c

Branch: refs/heads/trunk
Commit: 7ff2805c7e6a362eb451e6b8dfaec59642fee0f2
Parents: 694e3a7
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Feb 14 11:13:11 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Feb 14 11:24:09 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 -
 .../org/apache/cassandra/config/CFMetaData.java    |    7 -
 .../org/apache/cassandra/config/KSMetaData.java    |    1 -
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   95 ++++----------
 src/java/org/apache/cassandra/db/SystemTable.java  |   69 -----------
 .../cassandra/db/compaction/CompactionTask.java    |    7 -
 .../apache/cassandra/service/CassandraDaemon.java  |    9 --
 .../apache/cassandra/db/ColumnFamilyStoreTest.java |   55 ---------
 .../cassandra/db/compaction/CompactionsTest.java   |   34 -----
 9 files changed, 27 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9281b5e..aa25b63 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,5 @@
 1.2.2
  * avoid no-op caching of byte[] on commitlog append (CASSANDRA-5199)
- * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
  * fix symlinks under data dir not working (CASSANDRA-5185)
  * fix bug in compact storage metadata handling (CASSANDRA-5189)
  * Validate login for USE queries (CASSANDRA-5207)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java 
b/src/java/org/apache/cassandra/config/CFMetaData.java
index 9c76c9b..73b7b6b 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -226,13 +226,6 @@ public final class CFMetaData
                                                               + "requested_at 
timestamp"
                                                               + ") WITH 
COMMENT='ranges requested for transfer here'");
 
-    public static final CFMetaData CompactionLogCF = compile(18, "CREATE TABLE 
" + SystemTable.COMPACTION_LOG + " ("
-                                                                 + "id uuid 
PRIMARY KEY,"
-                                                                 + 
"keyspace_name text,"
-                                                                 + 
"columnfamily_name text,"
-                                                                 + "inputs 
set<int>"
-                                                                 + ") WITH 
COMMENT='unfinished compactions'");
-
     public enum Caching
     {
         ALL, KEYS_ONLY, ROWS_ONLY, NONE;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java 
b/src/java/org/apache/cassandra/config/KSMetaData.java
index b0764cc..9522aa0 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -89,7 +89,6 @@ public final class KSMetaData
                                                 CFMetaData.SchemaKeyspacesCf,
                                                 
CFMetaData.SchemaColumnFamiliesCf,
                                                 CFMetaData.SchemaColumnsCf,
-                                                CFMetaData.CompactionLogCF,
                                                 CFMetaData.OldStatusCf,
                                                 CFMetaData.OldHintsCf,
                                                 CFMetaData.OldMigrationsCf,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 0769d5c..6043be5 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -61,7 +61,6 @@ import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -242,7 +241,33 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         {
             Directories.SSTableLister sstableFiles = 
directories.sstableLister().skipTemporary(true);
             Collection<SSTableReader> sstables = 
SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, 
this.partitioner);
-            data.addInitialSSTables(sstables);
+            if (metadata.getDefaultValidator().isCommutative())
+            {
+                // Filter non-compacted sstables, remove compacted ones
+                Set<Integer> compactedSSTables = new HashSet<Integer>();
+                for (SSTableReader sstable : sstables)
+                    compactedSSTables.addAll(sstable.getAncestors());
+
+                Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>();
+                for (SSTableReader sstable : sstables)
+                {
+                    if 
(compactedSSTables.contains(sstable.descriptor.generation))
+                    {
+                        logger.info("{} is already compacted and will be 
removed.", sstable);
+                        sstable.markCompacted(); // we need to mark as 
compacted to be deleted
+                        sstable.releaseReference(); // this amount to deleting 
the sstable
+                    }
+                    else
+                    {
+                        liveSSTables.add(sstable);
+                    }
+                }
+                data.addInitialSSTables(liveSSTables);
+            }
+            else
+            {
+                data.addInitialSSTables(sstables);
+            }
         }
 
         if (caching == Caching.ALL || caching == Caching.KEYS_ONLY)
@@ -419,72 +444,6 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
     }
 
-    /**
-     * Replacing compacted sstables is atomic as far as observers of 
DataTracker are concerned, but not on the
-     * filesystem: first the new sstables are renamed to "live" status (i.e., 
the tmp marker is removed), then
-     * their ancestors are removed.
-     *
-     * If an unclean shutdown happens at the right time, we can thus end up 
with both the new ones and their
-     * ancestors "live" in the system.  This is harmless for normal data, but 
for counters it can cause overcounts.
-     *
-     * To prevent this, we record sstables being compacted in the system 
keyspace.  If we find unfinished
-     * compactions, we remove the new ones (since those may be incomplete -- 
under LCS, we may create multiple
-     * sstables from any given ancestor).
-     */
-    public static void removeUnfinishedCompactionLeftovers(String keyspace, 
String columnfamily, Set<Integer> unfinishedGenerations)
-    {
-        Directories directories = Directories.create(keyspace, columnfamily);
-
-        // sanity-check unfinishedGenerations
-        Set<Integer> allGenerations = new HashSet<Integer>();
-        for (Descriptor desc : directories.sstableLister().list().keySet())
-            allGenerations.add(desc.generation);
-        if (!allGenerations.containsAll(unfinishedGenerations))
-        {
-            throw new IllegalStateException("Unfinished compactions reference 
missing sstables."
-                                            + " This should never happen since 
compactions are marked finished before we start removing the old sstables.");
-        }
-
-        // remove new sstables from compactions that didn't complete, and 
compute
-        // set of ancestors that shouldn't exist anymore
-        Set<Integer> completedAncestors = new HashSet<Integer>();
-        for (Map.Entry<Descriptor, Set<Component>> sstableFiles : 
directories.sstableLister().list().entrySet())
-        {
-            Descriptor desc = sstableFiles.getKey();
-            Set<Component> components = sstableFiles.getValue();
-
-            SSTableMetadata meta;
-            try
-            {
-                meta = SSTableMetadata.serializer.deserialize(desc);
-            }
-            catch (IOException e)
-            {
-                throw new FSReadError(e, desc.filenameFor(Component.STATS));
-            }
-
-            Set<Integer> ancestors = meta.ancestors;
-            if (!ancestors.isEmpty() && 
unfinishedGenerations.containsAll(ancestors))
-            {
-                SSTable.delete(desc, components);
-            }
-            else
-            {
-                completedAncestors.addAll(ancestors);
-            }
-        }
-
-        // remove old sstables from compactions that did complete
-        for (Map.Entry<Descriptor, Set<Component>> sstableFiles : 
directories.sstableLister().list().entrySet())
-        {
-            Descriptor desc = sstableFiles.getKey();
-            Set<Component> components = sstableFiles.getValue();
-
-            if (completedAncestors.contains(desc.generation))
-                SSTable.delete(desc, components);
-        }
-    }
-
     // must be called after all sstables are loaded since row cache merges all 
row versions
     public void initRowCache()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java 
b/src/java/org/apache/cassandra/db/SystemTable.java
index adf08cc..0422a21 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -73,7 +73,6 @@ public class SystemTable
     public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces";
     public static final String SCHEMA_COLUMNFAMILIES_CF = 
"schema_columnfamilies";
     public static final String SCHEMA_COLUMNS_CF = "schema_columns";
-    public static final String COMPACTION_LOG = "compactions_in_progress";
 
     @Deprecated
     public static final String OLD_STATUS_CF = "LocationInfo";
@@ -184,74 +183,6 @@ public class SystemTable
         }
     }
 
-    /**
-     * Write compaction log, except columfamilies under system keyspace.
-     *
-     * @param cfs
-     * @param toCompact sstables to compact
-     * @return compaction task id or null if cfs is under system keyspace
-     */
-    public static UUID startCompaction(ColumnFamilyStore cfs, 
Iterable<SSTableReader> toCompact)
-    {
-        if (Table.SYSTEM_KS.equals(cfs.table.name))
-            return null;
-
-        UUID compactionId = UUIDGen.getTimeUUID();
-        String req = "INSERT INTO system.%s (id, keyspace_name, 
columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})";
-        Iterable<Integer> generations = Iterables.transform(toCompact, new 
Function<SSTableReader, Integer>()
-        {
-            public Integer apply(SSTableReader sstable)
-            {
-                return sstable.descriptor.generation;
-            }
-        });
-        processInternal(String.format(req, COMPACTION_LOG, compactionId, 
cfs.table.name, cfs.columnFamily, 
StringUtils.join(Sets.newHashSet(generations), ',')));
-        forceBlockingFlush(COMPACTION_LOG);
-        return compactionId;
-    }
-
-    public static void finishCompaction(UUID taskId)
-    {
-        assert taskId != null;
-
-        String req = "DELETE FROM system.%s WHERE id = %s";
-        processInternal(String.format(req, COMPACTION_LOG, taskId));
-        forceBlockingFlush(COMPACTION_LOG);
-    }
-
-    /**
-     * @return unfinished compactions, grouped by keyspace/columnfamily pair.
-     */
-    public static SetMultimap<Pair<String, String>, Integer> 
getUnfinishedCompactions()
-    {
-        String req = "SELECT * FROM system.%s";
-        UntypedResultSet resultSet = processInternal(String.format(req, 
COMPACTION_LOG));
-
-        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = 
HashMultimap.create();
-        for (UntypedResultSet.Row row : resultSet)
-        {
-            String keyspace = row.getString("keyspace_name");
-            String columnfamily = row.getString("columnfamily_name");
-            Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
-
-            unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), 
inputs);
-        }
-        return unfinishedCompactions;
-    }
-
-    public static void discardCompactionsInProgress()
-    {
-        ColumnFamilyStore compactionLog = 
Table.open(Table.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG);
-        try
-        {
-            compactionLog.truncate().get();
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
     public static void saveTruncationPosition(ColumnFamilyStore cfs, 
ReplayPosition position)
     {
         String req = "UPDATE system.%s SET truncated_at = truncated_at + %s 
WHERE key = '%s'";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 1b1599b..3893f1c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -103,8 +103,6 @@ public class CompactionTask extends AbstractCompactionTask
         for (SSTableReader sstable : toCompact)
             assert sstable.descriptor.cfname.equals(cfs.columnFamily);
 
-        UUID taskId = SystemTable.startCompaction(cfs, toCompact);
-
         CompactionController controller = new CompactionController(cfs, 
toCompact, gcBefore);
         // new sstables from flush can be added during a compaction, but only 
the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of 
determining if we're compacting
@@ -230,11 +228,6 @@ public class CompactionTask extends AbstractCompactionTask
                 throw new RuntimeException(e);
             }
 
-            // point of no return -- the new sstables are live on disk; next 
we'll start deleting the old ones
-            // (in replaceCompactedSSTables)
-            if (taskId != null)
-                SystemTable.finishCompaction(taskId);
-
             if (collector != null)
                 collector.finishCompaction(ci);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 0a7b957..7d78196 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Iterables;
-import com.google.common.collect.SetMultimap;
 import org.apache.log4j.PropertyConfigurator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +43,6 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.thrift.ThriftServer;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.Mx4jTool;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon
@@ -202,13 +200,6 @@ public class CassandraDaemon
                 ColumnFamilyStore.scrubDataDirectories(table, cfm.cfName);
             }
         }
-        // clean up compaction leftovers
-        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = 
SystemTable.getUnfinishedCompactions();
-        for (Pair<String, String> kscf : unfinishedCompactions.keySet())
-        {
-            ColumnFamilyStore.removeUnfinishedCompactionLeftovers(kscf.left, 
kscf.right, unfinishedCompactions.get(kscf));
-        }
-        SystemTable.discardCompactionsInProgress();
 
         // initialize keyspaces
         for (String table : Schema.instance.getTables())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java 
b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 8048fcb..8318f0d 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -66,7 +66,6 @@ import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 public class ColumnFamilyStoreTest extends SchemaLoader
@@ -1241,60 +1240,6 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         testMultiRangeSlicesBehavior(prepareMultiRangeSlicesTest(10, false));
     }
 
-    @Test
-    public void testRemoveUnifinishedCompactionLeftovers() throws Throwable
-    {
-        String ks = "Keyspace1";
-        String cf = "Standard3"; // should be empty
-
-        CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
-        Directories dir = Directories.create(ks, cf);
-        ByteBuffer key = bytes("key");
-
-        // 1st sstable
-        SSTableSimpleWriter writer = new 
SSTableSimpleWriter(dir.getDirectoryForNewSSTables(100),
-                                                             cfmeta, 
StorageService.getPartitioner());
-        writer.newRow(key);
-        writer.addColumn(bytes("col"), bytes("val"), 1);
-        writer.close();
-
-        Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
-        assert sstables.size() == 1;
-
-        Map.Entry<Descriptor, Set<Component>> sstableToOpen = 
sstables.entrySet().iterator().next();
-        final SSTableReader sstable1 = 
SSTableReader.open(sstableToOpen.getKey());
-
-        // simulate incomplete compaction
-        writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(100),
-                                                             cfmeta, 
StorageService.getPartitioner())
-        {
-            protected SSTableWriter getWriter()
-            {
-                SSTableMetadata.Collector collector = 
SSTableMetadata.createCollector();
-                collector.addAncestor(sstable1.descriptor.generation); // add 
ancestor from previously written sstable
-                return new SSTableWriter(makeFilename(directory, 
metadata.ksName, metadata.cfName),
-                                         0,
-                                         metadata,
-                                         StorageService.getPartitioner(),
-                                         collector);
-            }
-        };
-        writer.newRow(key);
-        writer.addColumn(bytes("col"), bytes("val"), 1);
-        writer.close();
-
-        // should have 2 sstables now
-        sstables = dir.sstableLister().list();
-        assert sstables.size() == 2;
-
-        ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, 
Sets.newHashSet(sstable1.descriptor.generation));
-
-        // 2nd sstable should be removed (only 1st sstable exists in set of 
size 1)
-        sstables = dir.sstableLister().list();
-        assert sstables.size() == 1;
-        assert sstables.containsKey(sstable1.descriptor);
-    }
-
     private ColumnFamilyStore prepareMultiRangeSlicesTest(int valueSize, 
boolean flush) throws Throwable
     {
         String tableName = "Keyspace1";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index b41bf19..a09451e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -24,10 +24,6 @@ import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
@@ -44,7 +40,6 @@ import org.apache.cassandra.io.sstable.SSTableScanner;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 import static junit.framework.Assert.*;
 
@@ -280,35 +275,6 @@ public class CompactionsTest extends SchemaLoader
         assert sstables.iterator().next().descriptor.generation == 
prevGeneration + 1;
     }
 
-    @Test
-    public void testCompactionLog() throws Exception
-    {
-        SystemTable.discardCompactionsInProgress();
-
-        String cf = "Standard4";
-        ColumnFamilyStore cfs = Table.open(TABLE1).getColumnFamilyStore(cf);
-        insertData(TABLE1, cf, 0, 1);
-        cfs.forceBlockingFlush();
-
-        Collection<SSTableReader> sstables = cfs.getSSTables();
-        assert !sstables.isEmpty();
-        Set<Integer> generations = 
Sets.newHashSet(Iterables.transform(sstables, new Function<SSTableReader, 
Integer>()
-        {
-            public Integer apply(SSTableReader sstable)
-            {
-                return sstable.descriptor.generation;
-            }
-        }));
-        UUID taskId = SystemTable.startCompaction(cfs, sstables);
-        SetMultimap<Pair<String, String>, Integer> compactionLogs = 
SystemTable.getUnfinishedCompactions();
-        Set<Integer> unfinishedCompactions = 
compactionLogs.get(Pair.create(TABLE1, cf));
-        assert unfinishedCompactions.containsAll(generations);
-
-        SystemTable.finishCompaction(taskId);
-        compactionLogs = SystemTable.getUnfinishedCompactions();
-        assert !compactionLogs.containsKey(Pair.create(TABLE1, cf));
-    }
-
     private void testDontPurgeAccidentaly(String k, String cfname, boolean 
forceDeserialize) throws IOException, ExecutionException, InterruptedException
     {
         // This test catches the regression of CASSANDRA-2786

Reply via email to