revert CASSANDRA-5151 from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7ff2805c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7ff2805c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7ff2805c Branch: refs/heads/trunk Commit: 7ff2805c7e6a362eb451e6b8dfaec59642fee0f2 Parents: 694e3a7 Author: Yuki Morishita <yu...@apache.org> Authored: Thu Feb 14 11:13:11 2013 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Thu Feb 14 11:24:09 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 - .../org/apache/cassandra/config/CFMetaData.java | 7 - .../org/apache/cassandra/config/KSMetaData.java | 1 - .../org/apache/cassandra/db/ColumnFamilyStore.java | 95 ++++---------- src/java/org/apache/cassandra/db/SystemTable.java | 69 ----------- .../cassandra/db/compaction/CompactionTask.java | 7 - .../apache/cassandra/service/CassandraDaemon.java | 9 -- .../apache/cassandra/db/ColumnFamilyStoreTest.java | 55 --------- .../cassandra/db/compaction/CompactionsTest.java | 34 ----- 9 files changed, 27 insertions(+), 251 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9281b5e..aa25b63 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,5 @@ 1.2.2 * avoid no-op caching of byte[] on commitlog append (CASSANDRA-5199) - * more robust solution to incomplete compactions + counters (CASSANDRA-5151) * fix symlinks under data dir not working (CASSANDRA-5185) * fix bug in compact storage metadata handling (CASSANDRA-5189) * Validate login for USE queries (CASSANDRA-5207) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 9c76c9b..73b7b6b 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -226,13 +226,6 @@ public final class CFMetaData + "requested_at timestamp" + ") WITH COMMENT='ranges requested for transfer here'"); - public static final CFMetaData CompactionLogCF = compile(18, "CREATE TABLE " + SystemTable.COMPACTION_LOG + " (" - + "id uuid PRIMARY KEY," - + "keyspace_name text," - + "columnfamily_name text," - + "inputs set<int>" - + ") WITH COMMENT='unfinished compactions'"); - public enum Caching { ALL, KEYS_ONLY, ROWS_ONLY, NONE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/src/java/org/apache/cassandra/config/KSMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java index b0764cc..9522aa0 100644 --- a/src/java/org/apache/cassandra/config/KSMetaData.java +++ b/src/java/org/apache/cassandra/config/KSMetaData.java @@ -89,7 +89,6 @@ public final class KSMetaData CFMetaData.SchemaKeyspacesCf, CFMetaData.SchemaColumnFamiliesCf, CFMetaData.SchemaColumnsCf, - CFMetaData.CompactionLogCF, CFMetaData.OldStatusCf, CFMetaData.OldHintsCf, CFMetaData.OldMigrationsCf, http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 0769d5c..6043be5 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -61,7 +61,6 @@ import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.Descriptor; @@ -242,7 +241,33 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true); Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), metadata, this.partitioner); - data.addInitialSSTables(sstables); + if (metadata.getDefaultValidator().isCommutative()) + { + // Filter non-compacted sstables, remove compacted ones + Set<Integer> compactedSSTables = new HashSet<Integer>(); + for (SSTableReader sstable : sstables) + compactedSSTables.addAll(sstable.getAncestors()); + + Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>(); + for (SSTableReader sstable : sstables) + { + if (compactedSSTables.contains(sstable.descriptor.generation)) + { + logger.info("{} is already compacted and will be removed.", sstable); + sstable.markCompacted(); // we need to mark as compacted to be deleted + sstable.releaseReference(); // this amount to deleting the sstable + } + else + { + liveSSTables.add(sstable); + } + } + data.addInitialSSTables(liveSSTables); + } + else + { + data.addInitialSSTables(sstables); + } } if (caching == Caching.ALL || caching == Caching.KEYS_ONLY) @@ -419,72 +444,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - /** - * Replacing compacted sstables is atomic as far as observers of DataTracker are concerned, but not on the - * filesystem: first the new sstables are renamed to "live" status (i.e., the tmp marker is removed), then - * their ancestors are removed. - * - * If an unclean shutdown happens at the right time, we can thus end up with both the new ones and their - * ancestors "live" in the system. This is harmless for normal data, but for counters it can cause overcounts. - * - * To prevent this, we record sstables being compacted in the system keyspace. If we find unfinished - * compactions, we remove the new ones (since those may be incomplete -- under LCS, we may create multiple - * sstables from any given ancestor). - */ - public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Set<Integer> unfinishedGenerations) - { - Directories directories = Directories.create(keyspace, columnfamily); - - // sanity-check unfinishedGenerations - Set<Integer> allGenerations = new HashSet<Integer>(); - for (Descriptor desc : directories.sstableLister().list().keySet()) - allGenerations.add(desc.generation); - if (!allGenerations.containsAll(unfinishedGenerations)) - { - throw new IllegalStateException("Unfinished compactions reference missing sstables." - + " This should never happen since compactions are marked finished before we start removing the old sstables."); - } - - // remove new sstables from compactions that didn't complete, and compute - // set of ancestors that shouldn't exist anymore - Set<Integer> completedAncestors = new HashSet<Integer>(); - for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet()) - { - Descriptor desc = sstableFiles.getKey(); - Set<Component> components = sstableFiles.getValue(); - - SSTableMetadata meta; - try - { - meta = SSTableMetadata.serializer.deserialize(desc); - } - catch (IOException e) - { - throw new FSReadError(e, desc.filenameFor(Component.STATS)); - } - - Set<Integer> ancestors = meta.ancestors; - if (!ancestors.isEmpty() && unfinishedGenerations.containsAll(ancestors)) - { - SSTable.delete(desc, components); - } - else - { - completedAncestors.addAll(ancestors); - } - } - - // remove old sstables from compactions that did complete - for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet()) - { - Descriptor desc = sstableFiles.getKey(); - Set<Component> components = sstableFiles.getValue(); - - if (completedAncestors.contains(desc.generation)) - SSTable.delete(desc, components); - } - } - // must be called after all sstables are loaded since row cache merges all row versions public void initRowCache() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/src/java/org/apache/cassandra/db/SystemTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java index adf08cc..0422a21 100644 --- a/src/java/org/apache/cassandra/db/SystemTable.java +++ b/src/java/org/apache/cassandra/db/SystemTable.java @@ -73,7 +73,6 @@ public class SystemTable public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces"; public static final String SCHEMA_COLUMNFAMILIES_CF = "schema_columnfamilies"; public static final String SCHEMA_COLUMNS_CF = "schema_columns"; - public static final String COMPACTION_LOG = "compactions_in_progress"; @Deprecated public static final String OLD_STATUS_CF = "LocationInfo"; @@ -184,74 +183,6 @@ public class SystemTable } } - /** - * Write compaction log, except columfamilies under system keyspace. - * - * @param cfs - * @param toCompact sstables to compact - * @return compaction task id or null if cfs is under system keyspace - */ - public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact) - { - if (Table.SYSTEM_KS.equals(cfs.table.name)) - return null; - - UUID compactionId = UUIDGen.getTimeUUID(); - String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})"; - Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>() - { - public Integer apply(SSTableReader sstable) - { - return sstable.descriptor.generation; - } - }); - processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.table.name, cfs.columnFamily, StringUtils.join(Sets.newHashSet(generations), ','))); - forceBlockingFlush(COMPACTION_LOG); - return compactionId; - } - - public static void finishCompaction(UUID taskId) - { - assert taskId != null; - - String req = "DELETE FROM system.%s WHERE id = %s"; - processInternal(String.format(req, COMPACTION_LOG, taskId)); - forceBlockingFlush(COMPACTION_LOG); - } - - /** - * @return unfinished compactions, grouped by keyspace/columnfamily pair. - */ - public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions() - { - String req = "SELECT * FROM system.%s"; - UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG)); - - SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create(); - for (UntypedResultSet.Row row : resultSet) - { - String keyspace = row.getString("keyspace_name"); - String columnfamily = row.getString("columnfamily_name"); - Set<Integer> inputs = row.getSet("inputs", Int32Type.instance); - - unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs); - } - return unfinishedCompactions; - } - - public static void discardCompactionsInProgress() - { - ColumnFamilyStore compactionLog = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG); - try - { - compactionLog.truncate().get(); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - public static void saveTruncationPosition(ColumnFamilyStore cfs, ReplayPosition position) { String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 1b1599b..3893f1c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -103,8 +103,6 @@ public class CompactionTask extends AbstractCompactionTask for (SSTableReader sstable : toCompact) assert sstable.descriptor.cfname.equals(cfs.columnFamily); - UUID taskId = SystemTable.startCompaction(cfs, toCompact); - CompactionController controller = new CompactionController(cfs, toCompact, gcBefore); // new sstables from flush can be added during a compaction, but only the compaction can remove them, // so in our single-threaded compaction world this is a valid way of determining if we're compacting @@ -230,11 +228,6 @@ public class CompactionTask extends AbstractCompactionTask throw new RuntimeException(e); } - // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones - // (in replaceCompactedSSTables) - if (taskId != null) - SystemTable.finishCompaction(taskId); - if (collector != null) collector.finishCompaction(ci); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 0a7b957..7d78196 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Iterables; -import com.google.common.collect.SetMultimap; import org.apache.log4j.PropertyConfigurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +43,6 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.thrift.ThriftServer; import org.apache.cassandra.utils.CLibrary; import org.apache.cassandra.utils.Mx4jTool; -import org.apache.cassandra.utils.Pair; /** * The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon @@ -202,13 +200,6 @@ public class CassandraDaemon ColumnFamilyStore.scrubDataDirectories(table, cfm.cfName); } } - // clean up compaction leftovers - SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = SystemTable.getUnfinishedCompactions(); - for (Pair<String, String> kscf : unfinishedCompactions.keySet()) - { - ColumnFamilyStore.removeUnfinishedCompactionLeftovers(kscf.left, kscf.right, unfinishedCompactions.get(kscf)); - } - SystemTable.discardCompactionsInProgress(); // initialize keyspaces for (String table : Schema.instance.getTables()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index 8048fcb..8318f0d 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -66,7 +66,6 @@ import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.WrappedRunnable; public class ColumnFamilyStoreTest extends SchemaLoader @@ -1241,60 +1240,6 @@ public class ColumnFamilyStoreTest extends SchemaLoader testMultiRangeSlicesBehavior(prepareMultiRangeSlicesTest(10, false)); } - @Test - public void testRemoveUnifinishedCompactionLeftovers() throws Throwable - { - String ks = "Keyspace1"; - String cf = "Standard3"; // should be empty - - CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf); - Directories dir = Directories.create(ks, cf); - ByteBuffer key = bytes("key"); - - // 1st sstable - SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(100), - cfmeta, StorageService.getPartitioner()); - writer.newRow(key); - writer.addColumn(bytes("col"), bytes("val"), 1); - writer.close(); - - Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list(); - assert sstables.size() == 1; - - Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next(); - final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey()); - - // simulate incomplete compaction - writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(100), - cfmeta, StorageService.getPartitioner()) - { - protected SSTableWriter getWriter() - { - SSTableMetadata.Collector collector = SSTableMetadata.createCollector(); - collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable - return new SSTableWriter(makeFilename(directory, metadata.ksName, metadata.cfName), - 0, - metadata, - StorageService.getPartitioner(), - collector); - } - }; - writer.newRow(key); - writer.addColumn(bytes("col"), bytes("val"), 1); - writer.close(); - - // should have 2 sstables now - sstables = dir.sstableLister().list(); - assert sstables.size() == 2; - - ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, Sets.newHashSet(sstable1.descriptor.generation)); - - // 2nd sstable should be removed (only 1st sstable exists in set of size 1) - sstables = dir.sstableLister().list(); - assert sstables.size() == 1; - assert sstables.containsKey(sstable1.descriptor); - } - private ColumnFamilyStore prepareMultiRangeSlicesTest(int valueSize, boolean flush) throws Throwable { String tableName = "Keyspace1"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff2805c/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index b41bf19..a09451e 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -24,10 +24,6 @@ import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; import org.junit.Test; import org.apache.cassandra.SchemaLoader; @@ -44,7 +40,6 @@ import org.apache.cassandra.io.sstable.SSTableScanner; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; import static junit.framework.Assert.*; @@ -280,35 +275,6 @@ public class CompactionsTest extends SchemaLoader assert sstables.iterator().next().descriptor.generation == prevGeneration + 1; } - @Test - public void testCompactionLog() throws Exception - { - SystemTable.discardCompactionsInProgress(); - - String cf = "Standard4"; - ColumnFamilyStore cfs = Table.open(TABLE1).getColumnFamilyStore(cf); - insertData(TABLE1, cf, 0, 1); - cfs.forceBlockingFlush(); - - Collection<SSTableReader> sstables = cfs.getSSTables(); - assert !sstables.isEmpty(); - Set<Integer> generations = Sets.newHashSet(Iterables.transform(sstables, new Function<SSTableReader, Integer>() - { - public Integer apply(SSTableReader sstable) - { - return sstable.descriptor.generation; - } - })); - UUID taskId = SystemTable.startCompaction(cfs, sstables); - SetMultimap<Pair<String, String>, Integer> compactionLogs = SystemTable.getUnfinishedCompactions(); - Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(TABLE1, cf)); - assert unfinishedCompactions.containsAll(generations); - - SystemTable.finishCompaction(taskId); - compactionLogs = SystemTable.getUnfinishedCompactions(); - assert !compactionLogs.containsKey(Pair.create(TABLE1, cf)); - } - private void testDontPurgeAccidentaly(String k, String cfname, boolean forceDeserialize) throws IOException, ExecutionException, InterruptedException { // This test catches the regression of CASSANDRA-2786