identify and blacklist corrupted SSTables from future compactions patch by Benjamin Coverston and Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-2261
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9483e147 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9483e147 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9483e147 Branch: refs/heads/trunk Commit: 9483e147ec675c6024b5f1a55c799cc15aa2239e Parents: 2cc612c Author: Pavel Yaskevich <xe...@apache.org> Authored: Tue Feb 28 22:45:24 2012 +0300 Committer: Pavel Yaskevich <xe...@apache.org> Committed: Tue Feb 28 23:37:45 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../db/columniterator/IndexedSliceReader.java | 1 + .../db/columniterator/SSTableNamesIterator.java | 2 + .../db/columniterator/SSTableSliceIterator.java | 1 + .../db/columniterator/SimpleSliceReader.java | 1 + .../db/compaction/AbstractCompactionStrategy.java | 24 +++- .../cassandra/db/compaction/LeveledManifest.java | 28 ++++- .../compaction/SizeTieredCompactionStrategy.java | 5 +- .../io/sstable/SSTableBoundedScanner.java | 1 + .../io/sstable/SSTableIdentityIterator.java | 1 + .../apache/cassandra/io/sstable/SSTableReader.java | 15 ++ .../cassandra/io/sstable/SSTableScanner.java | 5 + .../cassandra/db/compaction/CompactionsTest.java | 113 ++++++++++++++- 13 files changed, 191 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6a2a56e..591ae9f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,6 @@ 1.1.1-dev * optimize commitlog checksumming (CASSANDRA-3610) - + * identify and blacklist corrupted SSTables from future compactions (CASSANDRA-2261) 1.1-dev * start hint replay as soon as FD notifies that the target is back up http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java index d2d9e9b..68c9a4c 100644 --- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java +++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java @@ -76,6 +76,7 @@ class IndexedSliceReader extends AbstractIterator<IColumn> implements IColumnIte } catch (IOException e) { + sstable.markSuspect(); throw new IOError(e); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java index fdb0c5c..ac8938c 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java @@ -73,6 +73,7 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement } catch (IOException e) { + sstable.markSuspect(); throw new IOError(e); } finally @@ -93,6 +94,7 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement } catch (IOException ioe) { + sstable.markSuspect(); throw new IOError(ioe); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java index 72fe1bf..5c97729 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java @@ -60,6 +60,7 @@ public class SSTableSliceIterator implements IColumnIterator } catch (IOException e) { + sstable.markSuspect(); throw new IOError(e); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java index 383777f..4e49ebb 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java +++ b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java @@ -62,6 +62,7 @@ class SimpleSliceReader extends AbstractIterator<IColumn> implements IColumnIter } catch (IOException e) { + sstable.markSuspect(); throw new IOError(e); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 0455f95..2e70c46 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -18,9 +18,7 @@ package org.apache.cassandra.db.compaction; -import java.util.Collection; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; import org.apache.cassandra.db.ColumnFamilyStore; @@ -108,4 +106,24 @@ public abstract class AbstractCompactionStrategy * is going to be expensive */ public abstract boolean isKeyExistenceExpensive(Set<? extends SSTable> sstablesToIgnore); + + /** + * Filters SSTables that are to be blacklisted from the given collection + * + * @param originalCandidates The collection to check for blacklisted SSTables + * + * @return list of the SSTables with blacklisted ones filtered out + */ + public static List<SSTableReader> filterSuspectSSTables(Collection<SSTableReader> originalCandidates) + { + List<SSTableReader> filteredCandidates = new ArrayList<SSTableReader>(); + + for (SSTableReader candidate : originalCandidates) + { + if (!candidate.isMarkedSuspect()) + filteredCandidates.add(candidate); + } + + return filteredCandidates; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 4c63180..2661c6d 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -44,6 +44,8 @@ import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; +import static org.apache.cassandra.db.compaction.AbstractCompactionStrategy.filterSuspectSSTables; + public class LeveledManifest { private static final Logger logger = LoggerFactory.getLogger(LeveledManifest.class); @@ -249,15 +251,39 @@ public class LeveledManifest if (score > 1.001 || i == 0) { Collection<SSTableReader> candidates = getCandidatesFor(i); + if (logger.isDebugEnabled()) logger.debug("Compaction candidates for L{} are {}", i, toString(candidates)); - return candidates; + + // check if have any SSTables marked as suspected, + // saves us filter time when no SSTables are suspects + return hasSuspectSSTables(candidates) + ? filterSuspectSSTables(candidates) + : candidates; } } return Collections.emptyList(); } + /** + * Go through candidates collection and check if any of the SSTables are marked as suspected. + * + * @param candidates The SSTable collection to examine. + * + * @return true if collection has at least one SSTable marked as suspected, false otherwise. + */ + private boolean hasSuspectSSTables(Collection<SSTableReader> candidates) + { + for (SSTableReader candidate : candidates) + { + if (candidate.isMarkedSuspect()) + return true; + } + + return false; + } + public int getLevelSize(int i) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index caead37..0281f08 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -55,7 +55,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy return null; } - List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(cfs.getUncompactingSSTables()), minSSTableSize); + Set<SSTableReader> candidates = cfs.getUncompactingSSTables(); + List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(filterSuspectSSTables(candidates)), minSSTableSize); updateEstimatedCompactionsByTasks(buckets); List<List<SSTableReader>> prunedBuckets = new ArrayList<List<SSTableReader>>(); @@ -102,7 +103,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy public AbstractCompactionTask getMaximalTask(final int gcBefore) { - return cfs.getSSTables().isEmpty() ? null : new CompactionTask(cfs, cfs.getSSTables(), gcBefore); + return cfs.getSSTables().isEmpty() ? null : new CompactionTask(cfs, filterSuspectSSTables(cfs.getSSTables()), gcBefore); } public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java index 262394d..280d7fc 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java @@ -49,6 +49,7 @@ public class SSTableBoundedScanner extends SSTableScanner } catch (IOException e) { + sstable.markSuspect(); throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index fbb0788..275f983 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@ -151,6 +151,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat } catch (IOException e) { + sstable.markSuspect(); throw new IOError(e); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 79b71ae..8c1d540 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -92,6 +92,7 @@ public class SSTableReader extends SSTable // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted, // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone private final AtomicBoolean isCompacted = new AtomicBoolean(false); + private final AtomicBoolean isSuspect = new AtomicBoolean(false); private final SSTableDeletingTask deletingTask; private final SSTableMetadata sstableMetadata; @@ -713,6 +714,7 @@ public class SSTableReader extends SSTable } catch (IOException e) { + markSuspect(); throw new IOError(e); } finally @@ -799,6 +801,19 @@ public class SSTableReader extends SSTable return true; } + public void markSuspect() + { + if (logger.isDebugEnabled()) + logger.debug("Marking " + getFilename() + " as a suspect for blacklisting."); + + isSuspect.getAndSet(true); + } + + public boolean isMarkedSuspect() + { + return isSuspect.get(); + } + /** * * @param filter filter to use when reading the columns http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java index 0a5240a..5e4f269 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java @@ -57,6 +57,7 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> } catch (IOException e) { + sstable.markSuspect(); throw new IOError(e); } this.sstable = sstable; @@ -74,6 +75,7 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> } catch (IOException e) { + sstable.markSuspect(); throw new IOError(e); } this.sstable = sstable; @@ -100,6 +102,7 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> } catch (IOException e) { + sstable.markSuspect(); throw new RuntimeException("corrupt sstable", e); } } @@ -154,6 +157,7 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> } catch (IOException e) { + sstable.markSuspect(); throw new RuntimeException(e); } } @@ -185,6 +189,7 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator> } catch (IOException e) { + sstable.markSuspect(); throw new RuntimeException(SSTableScanner.this + " failed to provide next columns from " + this, e); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index ff6636d..23a2657 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -18,7 +18,7 @@ */ package org.apache.cassandra.db.compaction; -import java.io.IOException; +import java.io.*; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashSet; @@ -30,6 +30,7 @@ import java.util.concurrent.Future; import org.junit.Test; import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; import org.apache.cassandra.CleanupHelper; import org.apache.cassandra.Util; @@ -38,6 +39,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -46,6 +48,18 @@ public class CompactionsTest extends CleanupHelper public static final String TABLE1 = "Keyspace1"; @Test + public void testBlacklistingWithSizeTieredCompactionStrategy() throws Exception + { + testBlacklisting(SizeTieredCompactionStrategy.class.getCanonicalName()); + } + + @Test + public void testBlacklistingWithLeveledCompactionStrategy() throws Exception + { + testBlacklisting(LeveledCompactionStrategy.class.getCanonicalName()); + } + + @Test public void testStandardColumnCompactions() throws IOException, ExecutionException, InterruptedException { // this test does enough rows to force multiple block indexes to be used @@ -83,6 +97,7 @@ public class CompactionsTest extends CleanupHelper // make sure max timestamp of compacted sstables is recorded properly after compaction. assertMaxTimestamp(store, maxTimestampExpected); + store.truncate(); } @@ -269,4 +284,100 @@ public class CompactionsTest extends CleanupHelper cf = store.getColumnFamily(filter); assert cf == null || cf.isEmpty() : "should be empty: " + cf; } + + public void testBlacklisting(String compactionStrategy) throws Exception + { + // this test does enough rows to force multiple block indexes to be used + Table table = Table.open(TABLE1); + final ColumnFamilyStore store = table.getColumnFamilyStore("Standard1"); + + final int ROWS_PER_SSTABLE = 10; + final int SSTABLES = DatabaseDescriptor.getIndexInterval() * 2 / ROWS_PER_SSTABLE; + + store.setCompactionStrategyClass(compactionStrategy); + + // disable compaction while flushing + store.disableAutoCompaction(); + //test index corruption + //now create a few new SSTables + long maxTimestampExpected = Long.MIN_VALUE; + Set<DecoratedKey> inserted = new HashSet<DecoratedKey>(); + for (int j = 0; j < SSTABLES; j++) + { + for (int i = 0; i < ROWS_PER_SSTABLE; i++) + { + DecoratedKey key = Util.dk(String.valueOf(i % 2)); + RowMutation rm = new RowMutation(TABLE1, key.key); + long timestamp = j * ROWS_PER_SSTABLE + i; + rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(String.valueOf(i / 2))), + ByteBufferUtil.EMPTY_BYTE_BUFFER, + timestamp); + maxTimestampExpected = Math.max(timestamp, maxTimestampExpected); + rm.apply(); + inserted.add(key); + } + store.forceBlockingFlush(); + assertMaxTimestamp(store, maxTimestampExpected); + assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(store).size()); + } + + Collection<SSTableReader> sstables = store.getSSTables(); + int currentSSTable = 0; + int sstablesToCorrupt = 8; + + // corrupt first 'sstablesToCorrupt' SSTables + for (SSTableReader sstable : sstables) + { + if(currentSSTable + 1 > sstablesToCorrupt) + break; + + RandomAccessFile raf = null; + + try + { + raf = new RandomAccessFile(sstable.getFilename(), "rw"); + assertNotNull(raf); + raf.write(0xFFFFFF); + } + finally + { + FileUtils.closeQuietly(raf); + } + + currentSSTable++; + } + + int failures = 0; + + // close error output steam to avoid printing ton of useless RuntimeException + System.err.close(); + + try + { + // in case something will go wrong we don't want to loop forever using for (;;) + for (int i = 0; i < sstables.size(); i++) + { + try + { + store.forceMajorCompaction(); + } + catch (Exception e) + { + failures++; + continue; + } + + assertEquals(sstablesToCorrupt + 1, store.getSSTables().size()); + break; + } + } + finally + { + System.setErr(new PrintStream(new ByteArrayOutputStream())); + } + + + store.truncate(); + assertEquals(failures, sstablesToCorrupt); + } }