better handling for amid compaction failure; patch by yukim reviewed by slebresne for CASSANDRA-5137
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3cc8656f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3cc8656f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3cc8656f Branch: refs/heads/trunk Commit: 3cc8656f8fbb67c7e665fe27642076ae0109c2b5 Parents: 1cbbba0 Author: Yuki Morishita <yu...@apache.org> Authored: Fri Jan 11 12:32:59 2013 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Fri Jan 11 12:32:59 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 35 ++++++++++----- .../cassandra/db/compaction/CompactionTask.java | 28 +++++++----- 3 files changed, 42 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cc8656f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 82f503c..6c76151 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ * fix user defined compaction to run against 1.1 data directory (CASSANDRA-5118) * Fix CQL3 BATCH authorization caching (CASSANDRA-5145) * fix get_count returns incorrect value with TTL (CASSANDRA-5099) + * better handling for amid compaction failure (CASSANDRA-5137) 1.1.8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cc8656f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 8284d38..2781800 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -244,20 +244,33 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean Directories.SSTableLister sstableFiles = directories.sstableLister().skipCompacted(true).skipTemporary(true); Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), savedKeys, data, metadata, this.partitioner); - // Filter non-compacted sstables, remove compacted ones - Set<Integer> compactedSSTables = new HashSet<Integer>(); - for (SSTableReader sstable : sstables) - compactedSSTables.addAll(sstable.getAncestors()); + if (metadata.getDefaultValidator().isCommutative()) + { + // Filter non-compacted sstables, remove compacted ones + Set<Integer> compactedSSTables = new HashSet<Integer>(); + for (SSTableReader sstable : sstables) + compactedSSTables.addAll(sstable.getAncestors()); - Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>(); - for (SSTableReader sstable : sstables) + Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>(); + for (SSTableReader sstable : sstables) + { + if (compactedSSTables.contains(sstable.descriptor.generation)) + { + logger.info("{} is already compacted and will be removed.", sstable); + sstable.markCompacted(); // we need to mark as compacted to be deleted + sstable.releaseReference(); // this amount to deleting the sstable + } + else + { + liveSSTables.add(sstable); + } + } + data.addInitialSSTables(liveSSTables); + } + else { - if (compactedSSTables.contains(sstable.descriptor.generation)) - sstable.releaseReference(); // this amount to deleting the sstable - else - liveSSTables.add(sstable); + data.addInitialSSTables(sstables); } - data.addInitialSSTables(liveSSTables); } // compaction strategy should be created after the CFS has been prepared http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cc8656f/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index b252bc5..714e308 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -32,9 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector; -import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.FBUtilities; @@ -127,7 +125,7 @@ public class CompactionTask extends AbstractCompactionTask // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to // replace the old entries. Track entries to preheat here until then. - Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap = new HashMap<SSTableReader, Map<DecoratedKey, Long>>(); + Map<Descriptor, Map<DecoratedKey, Long>> cachedKeyMap = new HashMap<Descriptor, Map<DecoratedKey, Long>>(); Collection<SSTableReader> sstables = new ArrayList<SSTableReader>(); Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>(); @@ -175,9 +173,8 @@ public class CompactionTask extends AbstractCompactionTask } if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer)) { - SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact)); - cachedKeyMap.put(toIndex, cachedKeys); - sstables.add(toIndex); + // tmp = false because later we want to query it with descriptor from SSTableReader + cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys); if (nni.hasNext()) { writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact); @@ -186,11 +183,21 @@ public class CompactionTask extends AbstractCompactionTask } } } + + long maxAge = getMaxDataAge(toCompact); + for (SSTableWriter completedWriter : writers) + sstables.add(completedWriter.closeAndOpenReader(maxAge)); } catch (Exception e) { for (SSTableWriter writer : writers) writer.abort(); + // also remove already completed SSTables + for (SSTableReader sstable : sstables) + { + sstable.markCompacted(); + sstable.releaseReference(); + } throw FBUtilities.unchecked(e); } finally @@ -202,11 +209,10 @@ public class CompactionTask extends AbstractCompactionTask cfs.replaceCompactedSSTables(toCompact, sstables, compactionType); // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up - for (Map.Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet()) + for (SSTableReader sstable : sstables) { - SSTableReader key = ssTableReaderMapEntry.getKey(); - for (Map.Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet()) - key.cacheKey(entry.getKey(), entry.getValue()); + for (Map.Entry<DecoratedKey, Long> entry : cachedKeyMap.get(sstable.descriptor).entrySet()) + sstable.cacheKey(entry.getKey(), entry.getValue()); } long dTime = System.currentTimeMillis() - startTime;