Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 3bee990ca -> 369966a2a
  refs/heads/trunk ed0840299 -> 54f4984f5


Fix error when dropping table during compaction

patch by benedict; reviewed by tjake CASSANDRA-9251


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/369966a2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/369966a2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/369966a2

Branch: refs/heads/cassandra-2.1
Commit: 369966a2af65aa1d8e8248307ebd187fccacbd8e
Parents: 3bee990
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Mon May 4 18:31:14 2015 +0100
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Mon May 4 18:31:14 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 24 ++-----------
 .../org/apache/cassandra/db/DataTracker.java    | 15 ++------
 src/java/org/apache/cassandra/db/Keyspace.java  |  4 +++
 .../db/compaction/CompactionManager.java        | 22 ++++++++++++
 .../cassandra/cql3/CrcCheckChanceTest.java      | 36 ++++++++++++++++++++
 6 files changed, 69 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/369966a2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3a2daa7..64d0760 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.6
+ * Fix error when dropping table during compaction (CASSANDRA-9251)
  * cassandra-stress supports validation operations over user profiles 
(CASSANDRA-8773)
  * Add support for rate limiting log messages (CASSANDRA-9029)
  * Log the partition key with tombstone warnings (CASSANDRA-8561)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/369966a2/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index bb23332..4438afd 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -383,6 +383,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     /** call when dropping or renaming a CF. Performs mbean housekeeping and 
invalidates CFS to other operations */
     public void invalidate()
     {
+        // disable and cancel in-progress compactions before invalidating
         valid = false;
 
         try
@@ -397,7 +398,6 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
 
         latencyCalculator.cancel(false);
-        compactionStrategyWrapper.shutdown();
         SystemKeyspace.removeTruncationRecord(metadata.cfId);
         data.unreferenceSSTables();
         indexManager.invalidate();
@@ -2566,26 +2566,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             try
             {
                 // interrupt in-progress compactions
-                Function<ColumnFamilyStore, CFMetaData> f = new 
Function<ColumnFamilyStore, CFMetaData>()
-                {
-                    public CFMetaData apply(ColumnFamilyStore cfs)
-                    {
-                        return cfs.metadata;
-                    }
-                };
-                Iterable<CFMetaData> allMetadata = 
Iterables.transform(selfWithIndexes, f);
-                CompactionManager.instance.interruptCompactionFor(allMetadata, 
interruptValidation);
-
-                // wait for the interruption to be recognized
-                long start = System.nanoTime();
-                long delay = TimeUnit.MINUTES.toNanos(1);
-                while (System.nanoTime() - start < delay)
-                {
-                    if 
(CompactionManager.instance.isCompacting(selfWithIndexes))
-                        Uninterruptibles.sleepUninterruptibly(1, 
TimeUnit.MILLISECONDS);
-                    else
-                        break;
-                }
+                
CompactionManager.instance.interruptCompactionForCFs(selfWithIndexes, 
interruptValidation);
+                CompactionManager.instance.waitForCessation(selfWithIndexes);
 
                 // doublecheck that we finished, instead of timing out
                 for (ColumnFamilyStore cfs : selfWithIndexes)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/369966a2/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java 
b/src/java/org/apache/cassandra/db/DataTracker.java
index 757b48a..a520dcd 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -229,17 +229,6 @@ public class DataTracker
      */
     public void unmarkCompacting(Iterable<SSTableReader> unmark)
     {
-        boolean isValid = cfstore.isValid();
-        if (!isValid)
-        {
-            // The CF has been dropped.  We don't know if the original 
compaction suceeded or failed,
-            // which makes it difficult to know if the sstable reference has 
already been released.
-            // A "good enough" approach is to mark the sstables involved 
obsolete, which if compaction succeeded
-            // is harmlessly redundant, and if it failed ensures that at least 
the sstable will get deleted on restart.
-            for (SSTableReader sstable : unmark)
-                sstable.markObsolete();
-        }
-
         View currentView, newView;
         do
         {
@@ -248,7 +237,7 @@ public class DataTracker
         }
         while (!view.compareAndSet(currentView, newView));
 
-        if (!isValid)
+        if (!cfstore.isValid())
         {
             // when the CFS is invalidated, it will call 
unreferenceSSTables().  However, unreferenceSSTables only deals
             // with sstables that aren't currently being compacted.  If there 
are ongoing compactions that finish or are
@@ -333,6 +322,8 @@ public class DataTracker
         do
         {
             currentView = view.get();
+            if (!currentView.compacting.isEmpty())
+                logger.error("Set of compacting sstables is non-empty when 
invalidating sstables {}", currentView.compacting);
             notCompacting = currentView.nonCompactingSStables();
             newView = currentView.replace(notCompacting, 
Collections.<SSTableReader>emptySet());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/369966a2/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index d92eea4..cec1beb 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
@@ -295,6 +297,8 @@ public class Keyspace
         if (cfs == null)
             return;
 
+        cfs.getCompactionStrategy().shutdown();
+        
CompactionManager.instance.interruptCompactionForCFs(cfs.concatWithIndexes(), 
true);
         // wait for any outstanding reads/writes that might affect the CFS
         cfs.keyspace.writeOrder.awaitNewBarrier();
         cfs.readOrdering.awaitNewBarrier();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/369966a2/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 7215945..cc0dde0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1467,4 +1467,26 @@ public class CompactionManager implements 
CompactionManagerMBean
                 compactionHolder.stop(); // signal compaction to stop
         }
     }
+
+    public void interruptCompactionForCFs(Iterable<ColumnFamilyStore> cfss, 
boolean interruptValidation)
+    {
+        List<CFMetaData> metadata = new ArrayList<>();
+        for (ColumnFamilyStore cfs : cfss)
+            metadata.add(cfs.metadata);
+
+        interruptCompactionFor(metadata, interruptValidation);
+    }
+
+    public void waitForCessation(Iterable<ColumnFamilyStore> cfss)
+    {
+        long start = System.nanoTime();
+        long delay = TimeUnit.MINUTES.toNanos(1);
+        while (System.nanoTime() - start < delay)
+        {
+            if (CompactionManager.instance.isCompacting(cfss))
+                Uninterruptibles.sleepUninterruptibly(100, 
TimeUnit.MILLISECONDS);
+            else
+                break;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/369966a2/test/unit/org/apache/cassandra/cql3/CrcCheckChanceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CrcCheckChanceTest.java 
b/test/unit/org/apache/cassandra/cql3/CrcCheckChanceTest.java
index b9d23cd..cc803fb 100644
--- a/test/unit/org/apache/cassandra/cql3/CrcCheckChanceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/CrcCheckChanceTest.java
@@ -17,9 +17,16 @@
  */
 package org.apache.cassandra.cql3;
 
+import java.util.List;
+import java.util.concurrent.Future;
+
 import junit.framework.Assert;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.utils.FBUtilities;
+
 import org.junit.Test;
 
 
@@ -108,5 +115,34 @@ public class CrcCheckChanceTest extends CQLTester
         Assert.assertEquals( 0.03, 
indexCfs.getSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance());
 
     }
+
+
+    @Test
+    public void testDropDuringCompaction() throws Throwable
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        //Start with crc_check_chance of 99%
+        createTable("CREATE TABLE %s (p text, c text, v text, s text static, 
PRIMARY KEY (p, c)) WITH compression = {'sstable_compression': 'LZ4Compressor', 
'crc_check_chance' : 0.99}");
+
+        ColumnFamilyStore cfs = 
Keyspace.open(CQLTester.KEYSPACE).getColumnFamilyStore(currentTable());
+
+        //Write a few SSTables then Compact, and drop
+        for (int i = 0; i < 100; i++)
+        {
+            execute("INSERT INTO %s(p, c, v, s) values (?, ?, ?, ?)", "p1", 
"k1", "v1", "sv1");
+            execute("INSERT INTO %s(p, c, v) values (?, ?, ?)", "p1", "k2", 
"v2");
+            execute("INSERT INTO %s(p, s) values (?, ?)", "p2", "sv2");
+
+            cfs.forceBlockingFlush();
+        }
+
+        DatabaseDescriptor.setCompactionThroughputMbPerSec(1);
+        List<Future<?>> futures = 
CompactionManager.instance.submitMaximal(cfs, CompactionManager.GC_ALL);
+        execute("DROP TABLE %s");
+
+        FBUtilities.waitOnFutures(futures);
+
+    }
 }
 

Reply via email to