Don't promote sstables for cleanup, scrub and updateSSTables

patch by slebresne; reviewed by jbellis for CASSANDRA-3989


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/048c8a98
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/048c8a98
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/048c8a98

Branch: refs/heads/trunk
Commit: 048c8a98d83b41d463b557ede8a0bf98d3def022
Parents: c7895e9
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Wed Mar 7 18:35:15 2012 +0100
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Wed Mar 7 18:35:15 2012 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   11 +++--
 src/java/org/apache/cassandra/db/DataTracker.java  |   15 +++---
 .../cassandra/db/compaction/CompactionManager.java |    8 ++-
 .../cassandra/db/compaction/CompactionTask.java    |   16 +++++--
 .../db/compaction/LeveledCompactionStrategy.java   |   13 +++++-
 .../cassandra/db/compaction/LeveledManifest.java   |   35 +++++++++++---
 .../cassandra/db/compaction/OperationType.java     |    1 +
 .../SSTableListChangedNotification.java            |   11 +++-
 .../cassandra/db/compaction/CompactionsTest.java   |    3 +-
 10 files changed, 83 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 91c45dc..c453943 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,8 @@
  * Pig: Composite column support (CASSANDRA-384)
  * Avoid NPE during repair when a keyspace has no CFs (CASSANDRA-3988)
  * Fix division-by-zero error on get_slice (CASSANDRA-4000)
+ * don't change manifest level for cleanup, scrub, and upgradesstables
+   operations under LeveledCompactionStrategy (CASSANDRA-3989)
 
 
 1.0.8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 6f8392d..b50662a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
@@ -995,15 +996,15 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         
CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this);
     }
 
-    public void markCompacted(Collection<SSTableReader> sstables)
+    public void markCompacted(Collection<SSTableReader> sstables, 
OperationType compactionType)
     {
         assert !sstables.isEmpty();
-        data.markCompacted(sstables);
+        data.markCompacted(sstables, compactionType);
     }
 
-    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, 
Iterable<SSTableReader> replacements)
+    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, 
Iterable<SSTableReader> replacements, OperationType compactionType)
     {
-        data.replaceCompactedSSTables(sstables, replacements);
+        data.replaceCompactedSSTables(sstables, replacements, compactionType);
     }
 
     void replaceFlushed(Memtable memtable, SSTableReader sstable)
@@ -1957,6 +1958,6 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
 
         if (!truncatedSSTables.isEmpty())
-            markCompacted(truncatedSSTables);
+            markCompacted(truncatedSSTables, OperationType.UNKNOWN);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java 
b/src/java/org/apache/cassandra/db/DataTracker.java
index aecc083..c4b8a7b 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -222,16 +223,16 @@ public class DataTracker
         while (!view.compareAndSet(currentView, newView));
     }
 
-    public void markCompacted(Collection<SSTableReader> sstables)
+    public void markCompacted(Collection<SSTableReader> sstables, 
OperationType compactionType)
     {
         replace(sstables, Collections.<SSTableReader>emptyList());
-        notifySSTablesChanged(sstables, 
Collections.<SSTableReader>emptyList());
+        notifySSTablesChanged(sstables, 
Collections.<SSTableReader>emptyList(), compactionType);
     }
 
-    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, 
Iterable<SSTableReader> replacements)
+    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, 
Iterable<SSTableReader> replacements, OperationType compactionType)
     {
         replace(sstables, replacements);
-        notifySSTablesChanged(sstables, replacements);
+        notifySSTablesChanged(sstables, replacements, compactionType);
     }
 
     public void addInitialSSTables(Collection<SSTableReader> sstables)
@@ -260,7 +261,7 @@ public class DataTracker
         }
 
         replace(sstables, Collections.<SSTableReader>emptyList());
-        notifySSTablesChanged(sstables, 
Collections.<SSTableReader>emptyList());
+        notifySSTablesChanged(sstables, 
Collections.<SSTableReader>emptyList(), OperationType.UNKNOWN);
     }
 
     /** (Re)initializes the tracker, purging all references. */
@@ -486,11 +487,11 @@ public class DataTracker
         return (double) falseCount / (trueCount + falseCount);
     }
 
-    public void notifySSTablesChanged(Iterable<SSTableReader> removed, 
Iterable<SSTableReader> added)
+    public void notifySSTablesChanged(Iterable<SSTableReader> removed, 
Iterable<SSTableReader> added, OperationType compactionType)
     {
         for (INotificationConsumer subscriber : subscribers)
         {
-            INotification notification = new 
SSTableListChangedNotification(added, removed);
+            INotification notification = new 
SSTableListChangedNotification(added, removed, compactionType);
             subscriber.handleNotification(notification, this);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 97e5067..55fab3c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -225,8 +225,10 @@ public class CompactionManager implements 
CompactionManagerMBean
                 for (final SSTableReader sstable : sstables)
                 {
                     // SSTables are marked by the caller
+                    // NOTE: it is important that the task create one and only 
one sstable, even for Leveled compaction (see LeveledManifest.replace())
                     CompactionTask task = new CompactionTask(cfs, 
Collections.singletonList(sstable), Integer.MAX_VALUE);
                     task.isUserDefined(true);
+                    task.setCompactionType(OperationType.UPGRADE_SSTABLES);
                     task.execute(executor);
                 }
             }
@@ -646,7 +648,7 @@ public class CompactionManager implements 
CompactionManagerMBean
 
         if (newSstable == null)
         {
-            cfs.markCompacted(Arrays.asList(sstable));
+            cfs.markCompacted(Arrays.asList(sstable), OperationType.SCRUB);
             if (badRows > 0)
                 logger.warn("No valid rows found while scrubbing " + sstable + 
"; it is marked for deletion now. If you want to attempt manual recovery, you 
can find a copy in the pre-scrub snapshot");
             else
@@ -654,7 +656,7 @@ public class CompactionManager implements 
CompactionManagerMBean
         }
         else
         {
-            cfs.replaceCompactedSSTables(Arrays.asList(sstable), 
Arrays.asList(newSstable));
+            cfs.replaceCompactedSSTables(Arrays.asList(sstable), 
Arrays.asList(newSstable), OperationType.SCRUB);
             logger.info("Scrub of " + sstable + " complete: " + goodRows + " 
rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
             if (badRows > 0)
                 logger.warn("Unable to recover " + badRows + " rows that were 
skipped.  You can attempt manual recovery from the pre-scrub snapshot.  You can 
also run nodetool repair to transfer the data from a healthy replica, if any");
@@ -797,7 +799,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             // flush to ensure we don't lose the tombstones on a restart, 
since they are not commitlog'd         
             cfs.indexManager.flushIndexesBlocking();
 
-            cfs.replaceCompactedSSTables(Arrays.asList(sstable), results);
+            cfs.replaceCompactedSSTables(Arrays.asList(sstable), results, 
OperationType.CLEANUP);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index ece5f19..5847bf7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -44,6 +44,7 @@ public class CompactionTask extends AbstractCompactionTask
     protected String compactionFileLocation;
     protected final int gcBefore;
     protected boolean isUserDefined;
+    protected OperationType compactionType;
     protected static long totalBytesCompacted = 0;
 
     public CompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> 
sstables, final int gcBefore)
@@ -52,6 +53,7 @@ public class CompactionTask extends AbstractCompactionTask
         compactionFileLocation = null;
         this.gcBefore = gcBefore;
         this.isUserDefined = false;
+        this.compactionType = OperationType.COMPACTION;
     }
 
     public static synchronized long addToTotalBytesCompacted(long 
bytesCompacted)
@@ -122,8 +124,8 @@ public class CompactionTask extends AbstractCompactionTask
             logger.debug("Expected bloom filter size : " + keysPerSSTable);
 
         AbstractCompactionIterable ci = 
DatabaseDescriptor.isMultithreadedCompaction()
-                                      ? new 
ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller)
-                                      : new 
CompactionIterable(OperationType.COMPACTION, toCompact, controller);
+                                      ? new 
ParallelCompactionIterable(compactionType, toCompact, controller)
+                                      : new CompactionIterable(compactionType, 
toCompact, controller);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
         Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, 
Predicates.notNull());
         Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
@@ -144,7 +146,7 @@ public class CompactionTask extends AbstractCompactionTask
                 // don't mark compacted in the finally block, since if there 
_is_ nondeleted data,
                 // we need to sync it (via closeAndOpen) first, so there is no 
period during which
                 // a crash could cause data loss.
-                cfs.markCompacted(toCompact);
+                cfs.markCompacted(toCompact, compactionType);
                 return 0;
             }
 
@@ -197,7 +199,7 @@ public class CompactionTask extends AbstractCompactionTask
                 collector.finishCompaction(ci);
         }
 
-        cfs.replaceCompactedSSTables(toCompact, sstables);
+        cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
         // TODO: this doesn't belong here, it should be part of the reader to 
load when the tracker is wired up
         for (Entry<SSTableReader, Map<DecoratedKey, Long>> 
ssTableReaderMapEntry : cachedKeyMap.entrySet())
         {
@@ -270,4 +272,10 @@ public class CompactionTask extends AbstractCompactionTask
         this.isUserDefined = isUserDefined;
         return this;
     }
+
+    public CompactionTask setCompactionType(OperationType compactionType)
+    {
+        this.compactionType = compactionType;
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index d4402ee..de0ff75 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -138,7 +138,18 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy implem
         else if (notification instanceof SSTableListChangedNotification)
         {
             SSTableListChangedNotification listChangedNotification = 
(SSTableListChangedNotification) notification;
-            manifest.promote(listChangedNotification.removed, 
listChangedNotification.added);
+            switch (listChangedNotification.compactionType)
+            {
+                // Cleanup, scrub and updateSSTable shouldn't promote (see 
#3989)
+                case CLEANUP:
+                case SCRUB:
+                case UPGRADE_SSTABLES:
+                    manifest.replace(listChangedNotification.removed, 
listChangedNotification.added);
+                    break;
+                default:
+                    manifest.promote(listChangedNotification.removed, 
listChangedNotification.added);
+                    break;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 189de8e..6dc6ce9 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -61,7 +61,6 @@ public class LeveledManifest
     private final List<SSTableReader>[] generations;
     private final DecoratedKey[] lastCompactedKeys;
     private final int maxSSTableSizeInMB;
-    private int levelCount;
 
     private LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB)
     {
@@ -165,10 +164,9 @@ public class LeveledManifest
         int maximumLevel = 0;
         for (SSTableReader sstable : removed)
         {
-            int thisLevel = levelOf(sstable);
+            int thisLevel = remove(sstable);
             maximumLevel = Math.max(maximumLevel, thisLevel);
             minimumLevel = Math.min(minimumLevel, thisLevel);
-            remove(sstable);
         }
 
         // it's valid to do a remove w/o an add (e.g. on truncate)
@@ -188,6 +186,22 @@ public class LeveledManifest
         serialize();
     }
 
+    public synchronized void replace(Iterable<SSTableReader> removed, 
Iterable<SSTableReader> added)
+    {
+        // replace is for compaction operation that don't really change the
+        // content of a sstable (cleanup, scrub) and much replace one sstable 
by another
+        assert Iterables.size(removed) == 1;
+        assert Iterables.size(added) == 1;
+        SSTableReader toRemove = removed.iterator().next();
+        SSTableReader toAdd = added.iterator().next();
+        logDistribution();
+        if (logger.isDebugEnabled())
+            logger.debug("Replacing " + removed + " by " + toAdd);
+
+        add(toAdd, remove(toRemove));
+        serialize();
+    }
+
     private String toString(Iterable<SSTableReader> sstables)
     {
         StringBuilder builder = new StringBuilder();
@@ -266,12 +280,15 @@ public class LeveledManifest
 
     private void logDistribution()
     {
-        for (int i = 0; i < generations.length; i++)
+        if (logger.isDebugEnabled())
         {
-            if (!generations[i].isEmpty())
+            for (int i = 0; i < generations.length; i++)
             {
-                logger.debug("L{} contains {} SSTables ({} bytes) in {}",
-                             new Object[] {i, generations[i].size(), 
SSTableReader.getTotalBytes(generations[i]), this});
+                if (!generations[i].isEmpty())
+                {
+                    logger.debug("L{} contains {} SSTables ({} bytes) in {}",
+                            new Object[] {i, generations[i].size(), 
SSTableReader.getTotalBytes(generations[i]), this});
+                }
             }
         }
     }
@@ -286,15 +303,17 @@ public class LeveledManifest
         return -1;
     }
 
-    private void remove(SSTableReader reader)
+    private int remove(SSTableReader reader)
     {
         int level = levelOf(reader);
         assert level >= 0 : reader + " not present in manifest";
         generations[level].remove(reader);
+        return level;
     }
 
     private void add(SSTableReader sstable, int level)
     {
+        assert level < generations.length : "Invalid level " + level + " out 
of " + (generations.length - 1);
         generations[level].add(sstable);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java 
b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index ecdc71c..b4b5498 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -29,6 +29,7 @@ public enum OperationType
     ROW_CACHE_SAVE("Row cache save"),
     CLEANUP("Cleanup"),
     SCRUB("Scrub"),
+    UPGRADE_SSTABLES("Upgrade sstables"),
     INDEX_BUILD("Secondary index build"),
     UNKNOWN("Unkown compaction type");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
 
b/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
index 480dcf2..9f31b4e 100644
--- 
a/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
+++ 
b/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
@@ -25,13 +25,18 @@ import org.apache.cassandra.io.sstable.SSTableReader;
 
 import java.util.List;
 
+import org.apache.cassandra.db.compaction.OperationType;
+
 public class SSTableListChangedNotification implements INotification
 {
-    public Iterable<SSTableReader> removed;
-    public Iterable<SSTableReader> added;
-    public SSTableListChangedNotification(Iterable<SSTableReader> added, 
Iterable<SSTableReader> removed)
+    public final Iterable<SSTableReader> removed;
+    public final Iterable<SSTableReader> added;
+    public final OperationType compactionType;
+
+    public SSTableListChangedNotification(Iterable<SSTableReader> added, 
Iterable<SSTableReader> removed, OperationType compactionType)
     {
         this.removed = removed;
         this.added = added;
+        this.compactionType = compactionType;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/048c8a98/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 89fee28..1179919 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.sstable.*;
@@ -209,7 +210,7 @@ public class CompactionsTest extends CleanupHelper
         assertEquals(2, store.getSSTables().size());
 
         // Now, we remove the sstable that was just created to force the use 
of EchoedRow (so that it doesn't hide the problem)
-        store.markCompacted(Collections.singleton(tmpSSTable));
+        store.markCompacted(Collections.singleton(tmpSSTable), 
OperationType.UNKNOWN);
         assertEquals(1, store.getSSTables().size());
 
         // Now assert we do have the 4 keys

Reply via email to