Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 3bee990ca -> 369966a2a refs/heads/trunk ed0840299 -> 54f4984f5
Fix error when dropping table during compaction patch by benedict; reviewed by tjake CASSANDRA-9251 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/369966a2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/369966a2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/369966a2 Branch: refs/heads/cassandra-2.1 Commit: 369966a2af65aa1d8e8248307ebd187fccacbd8e Parents: 3bee990 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Mon May 4 18:31:14 2015 +0100 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Mon May 4 18:31:14 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 24 ++----------- .../org/apache/cassandra/db/DataTracker.java | 15 ++------ src/java/org/apache/cassandra/db/Keyspace.java | 4 +++ .../db/compaction/CompactionManager.java | 22 ++++++++++++ .../cassandra/cql3/CrcCheckChanceTest.java | 36 ++++++++++++++++++++ 6 files changed, 69 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/369966a2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3a2daa7..64d0760 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.6 + * Fix error when dropping table during compaction (CASSANDRA-9251) * cassandra-stress supports validation operations over user profiles (CASSANDRA-8773) * Add support for rate limiting log messages (CASSANDRA-9029) * Log the partition key with tombstone warnings (CASSANDRA-8561) http://git-wip-us.apache.org/repos/asf/cassandra/blob/369966a2/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index bb23332..4438afd 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -383,6 +383,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean /** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */ public void invalidate() { + // disable and cancel in-progress compactions before invalidating valid = false; try @@ -397,7 +398,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } latencyCalculator.cancel(false); - compactionStrategyWrapper.shutdown(); SystemKeyspace.removeTruncationRecord(metadata.cfId); data.unreferenceSSTables(); indexManager.invalidate(); @@ -2566,26 +2566,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean try { // interrupt in-progress compactions - Function<ColumnFamilyStore, CFMetaData> f = new Function<ColumnFamilyStore, CFMetaData>() - { - public CFMetaData apply(ColumnFamilyStore cfs) - { - return cfs.metadata; - } - }; - Iterable<CFMetaData> allMetadata = Iterables.transform(selfWithIndexes, f); - CompactionManager.instance.interruptCompactionFor(allMetadata, interruptValidation); - - // wait for the interruption to be recognized - long start = System.nanoTime(); - long delay = TimeUnit.MINUTES.toNanos(1); - while (System.nanoTime() - start < delay) - { - if (CompactionManager.instance.isCompacting(selfWithIndexes)) - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); - else - break; - } + CompactionManager.instance.interruptCompactionForCFs(selfWithIndexes, interruptValidation); + CompactionManager.instance.waitForCessation(selfWithIndexes); // doublecheck that we finished, instead of timing out for (ColumnFamilyStore cfs : selfWithIndexes) http://git-wip-us.apache.org/repos/asf/cassandra/blob/369966a2/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index 757b48a..a520dcd 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -229,17 +229,6 @@ public class DataTracker */ public void unmarkCompacting(Iterable<SSTableReader> unmark) { - boolean isValid = cfstore.isValid(); - if (!isValid) - { - // The CF has been dropped. We don't know if the original compaction suceeded or failed, - // which makes it difficult to know if the sstable reference has already been released. - // A "good enough" approach is to mark the sstables involved obsolete, which if compaction succeeded - // is harmlessly redundant, and if it failed ensures that at least the sstable will get deleted on restart. - for (SSTableReader sstable : unmark) - sstable.markObsolete(); - } - View currentView, newView; do { @@ -248,7 +237,7 @@ public class DataTracker } while (!view.compareAndSet(currentView, newView)); - if (!isValid) + if (!cfstore.isValid()) { // when the CFS is invalidated, it will call unreferenceSSTables(). However, unreferenceSSTables only deals // with sstables that aren't currently being compacted. If there are ongoing compactions that finish or are @@ -333,6 +322,8 @@ public class DataTracker do { currentView = view.get(); + if (!currentView.compacting.isEmpty()) + logger.error("Set of compacting sstables is non-empty when invalidating sstables {}", currentView.compacting); notCompacting = currentView.nonCompactingSStables(); newView = currentView.replace(notCompacting, Collections.<SSTableReader>emptySet()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/369966a2/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index d92eea4..cec1beb 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; @@ -295,6 +297,8 @@ public class Keyspace if (cfs == null) return; + cfs.getCompactionStrategy().shutdown(); + CompactionManager.instance.interruptCompactionForCFs(cfs.concatWithIndexes(), true); // wait for any outstanding reads/writes that might affect the CFS cfs.keyspace.writeOrder.awaitNewBarrier(); cfs.readOrdering.awaitNewBarrier(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/369966a2/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 7215945..cc0dde0 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1467,4 +1467,26 @@ public class CompactionManager implements CompactionManagerMBean compactionHolder.stop(); // signal compaction to stop } } + + public void interruptCompactionForCFs(Iterable<ColumnFamilyStore> cfss, boolean interruptValidation) + { + List<CFMetaData> metadata = new ArrayList<>(); + for (ColumnFamilyStore cfs : cfss) + metadata.add(cfs.metadata); + + interruptCompactionFor(metadata, interruptValidation); + } + + public void waitForCessation(Iterable<ColumnFamilyStore> cfss) + { + long start = System.nanoTime(); + long delay = TimeUnit.MINUTES.toNanos(1); + while (System.nanoTime() - start < delay) + { + if (CompactionManager.instance.isCompacting(cfss)) + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + else + break; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/369966a2/test/unit/org/apache/cassandra/cql3/CrcCheckChanceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CrcCheckChanceTest.java b/test/unit/org/apache/cassandra/cql3/CrcCheckChanceTest.java index b9d23cd..cc803fb 100644 --- a/test/unit/org/apache/cassandra/cql3/CrcCheckChanceTest.java +++ b/test/unit/org/apache/cassandra/cql3/CrcCheckChanceTest.java @@ -17,9 +17,16 @@ */ package org.apache.cassandra.cql3; +import java.util.List; +import java.util.concurrent.Future; + import junit.framework.Assert; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.utils.FBUtilities; + import org.junit.Test; @@ -108,5 +115,34 @@ public class CrcCheckChanceTest extends CQLTester Assert.assertEquals( 0.03, indexCfs.getSSTables().iterator().next().getCompressionMetadata().parameters.getCrcCheckChance()); } + + + @Test + public void testDropDuringCompaction() throws Throwable + { + CompactionManager.instance.disableAutoCompaction(); + + //Start with crc_check_chance of 99% + createTable("CREATE TABLE %s (p text, c text, v text, s text static, PRIMARY KEY (p, c)) WITH compression = {'sstable_compression': 'LZ4Compressor', 'crc_check_chance' : 0.99}"); + + ColumnFamilyStore cfs = Keyspace.open(CQLTester.KEYSPACE).getColumnFamilyStore(currentTable()); + + //Write a few SSTables then Compact, and drop + for (int i = 0; i < 100; i++) + { + execute("INSERT INTO %s(p, c, v, s) values (?, ?, ?, ?)", "p1", "k1", "v1", "sv1"); + execute("INSERT INTO %s(p, c, v) values (?, ?, ?)", "p1", "k2", "v2"); + execute("INSERT INTO %s(p, s) values (?, ?)", "p2", "sv2"); + + cfs.forceBlockingFlush(); + } + + DatabaseDescriptor.setCompactionThroughputMbPerSec(1); + List<Future<?>> futures = CompactionManager.instance.submitMaximal(cfs, CompactionManager.GC_ALL); + execute("DROP TABLE %s"); + + FBUtilities.waitOnFutures(futures); + + } }