identify and blacklist corrupted SSTables from future compactions
patch by Benjamin Coverston and Pavel Yaskevich; reviewed by Brandon Williams 
for CASSANDRA-2261


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9483e147
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9483e147
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9483e147

Branch: refs/heads/trunk
Commit: 9483e147ec675c6024b5f1a55c799cc15aa2239e
Parents: 2cc612c
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Tue Feb 28 22:45:24 2012 +0300
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Tue Feb 28 23:37:45 2012 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +-
 .../db/columniterator/IndexedSliceReader.java      |    1 +
 .../db/columniterator/SSTableNamesIterator.java    |    2 +
 .../db/columniterator/SSTableSliceIterator.java    |    1 +
 .../db/columniterator/SimpleSliceReader.java       |    1 +
 .../db/compaction/AbstractCompactionStrategy.java  |   24 +++-
 .../cassandra/db/compaction/LeveledManifest.java   |   28 ++++-
 .../compaction/SizeTieredCompactionStrategy.java   |    5 +-
 .../io/sstable/SSTableBoundedScanner.java          |    1 +
 .../io/sstable/SSTableIdentityIterator.java        |    1 +
 .../apache/cassandra/io/sstable/SSTableReader.java |   15 ++
 .../cassandra/io/sstable/SSTableScanner.java       |    5 +
 .../cassandra/db/compaction/CompactionsTest.java   |  113 ++++++++++++++-
 13 files changed, 191 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6a2a56e..591ae9f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,6 @@
 1.1.1-dev
  * optimize commitlog checksumming (CASSANDRA-3610)
-
+ * identify and blacklist corrupted SSTables from future compactions 
(CASSANDRA-2261)
 
 1.1-dev
  * start hint replay as soon as FD notifies that the target is back up

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java 
b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
index d2d9e9b..68c9a4c 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
@@ -76,6 +76,7 @@ class IndexedSliceReader extends AbstractIterator<IColumn> 
implements IColumnIte
         }
         catch (IOException e)
         {
+            sstable.markSuspect();
             throw new IOError(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
index fdb0c5c..ac8938c 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
@@ -73,6 +73,7 @@ public class SSTableNamesIterator extends 
SimpleAbstractColumnIterator implement
         }
         catch (IOException e)
         {
+            sstable.markSuspect();
             throw new IOError(e);
         }
         finally
@@ -93,6 +94,7 @@ public class SSTableNamesIterator extends 
SimpleAbstractColumnIterator implement
         }
         catch (IOException ioe)
         {
+            sstable.markSuspect();
             throw new IOError(ioe);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
index 72fe1bf..5c97729 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
@@ -60,6 +60,7 @@ public class SSTableSliceIterator implements IColumnIterator
         }
         catch (IOException e)
         {
+            sstable.markSuspect();
             throw new IOError(e);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java 
b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
index 383777f..4e49ebb 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
@@ -62,6 +62,7 @@ class SimpleSliceReader extends AbstractIterator<IColumn> 
implements IColumnIter
         }
         catch (IOException e)
         {
+            sstable.markSuspect();
             throw new IOError(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 0455f95..2e70c46 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -18,9 +18,7 @@
 
 package org.apache.cassandra.db.compaction;
 
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -108,4 +106,24 @@ public abstract class AbstractCompactionStrategy
      * is going to be expensive
      */
     public abstract boolean isKeyExistenceExpensive(Set<? extends SSTable> 
sstablesToIgnore);
+
+    /**
+     * Filters SSTables that are to be blacklisted from the given collection
+     *
+     * @param originalCandidates The collection to check for blacklisted 
SSTables
+     *
+     * @return list of the SSTables with blacklisted ones filtered out
+     */
+    public static List<SSTableReader> 
filterSuspectSSTables(Collection<SSTableReader> originalCandidates)
+    {
+        List<SSTableReader> filteredCandidates = new 
ArrayList<SSTableReader>();
+
+        for (SSTableReader candidate : originalCandidates)
+        {
+            if (!candidate.isMarkedSuspect())
+                filteredCandidates.add(candidate);
+        }
+
+        return filteredCandidates;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 4c63180..2661c6d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -44,6 +44,8 @@ import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.ObjectMapper;
 
+import static 
org.apache.cassandra.db.compaction.AbstractCompactionStrategy.filterSuspectSSTables;
+
 public class LeveledManifest
 {
     private static final Logger logger = 
LoggerFactory.getLogger(LeveledManifest.class);
@@ -249,15 +251,39 @@ public class LeveledManifest
             if (score > 1.001 || i == 0)
             {
                 Collection<SSTableReader> candidates = getCandidatesFor(i);
+
                 if (logger.isDebugEnabled())
                     logger.debug("Compaction candidates for L{} are {}", i, 
toString(candidates));
-                return candidates;
+
+                // check if have any SSTables marked as suspected,
+                // saves us filter time when no SSTables are suspects
+                return hasSuspectSSTables(candidates)
+                        ? filterSuspectSSTables(candidates)
+                        : candidates;
             }
         }
 
         return Collections.emptyList();
     }
 
+    /**
+     * Go through candidates collection and check if any of the SSTables are 
marked as suspected.
+     *
+     * @param candidates The SSTable collection to examine.
+     *
+     * @return true if collection has at least one SSTable marked as 
suspected, false otherwise.
+     */
+    private boolean hasSuspectSSTables(Collection<SSTableReader> candidates)
+    {
+        for (SSTableReader candidate : candidates)
+        {
+            if (candidate.isMarkedSuspect())
+                return true;
+        }
+
+        return false;
+    }
+
     public int getLevelSize(int i)
     {
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index caead37..0281f08 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -55,7 +55,8 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
             return null;
         }
 
-        List<List<SSTableReader>> buckets = 
getBuckets(createSSTableAndLengthPairs(cfs.getUncompactingSSTables()), 
minSSTableSize);
+        Set<SSTableReader> candidates = cfs.getUncompactingSSTables();
+        List<List<SSTableReader>> buckets = 
getBuckets(createSSTableAndLengthPairs(filterSuspectSSTables(candidates)), 
minSSTableSize);
         updateEstimatedCompactionsByTasks(buckets);
 
         List<List<SSTableReader>> prunedBuckets = new 
ArrayList<List<SSTableReader>>();
@@ -102,7 +103,7 @@ public class SizeTieredCompactionStrategy extends 
AbstractCompactionStrategy
 
     public AbstractCompactionTask getMaximalTask(final int gcBefore)
     {
-        return cfs.getSSTables().isEmpty() ? null : new CompactionTask(cfs, 
cfs.getSSTables(), gcBefore);
+        return cfs.getSSTables().isEmpty() ? null : new CompactionTask(cfs, 
filterSuspectSSTables(cfs.getSSTables()), gcBefore);
     }
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> 
sstables, final int gcBefore)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
index 262394d..280d7fc 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
@@ -49,6 +49,7 @@ public class SSTableBoundedScanner extends SSTableScanner
             }
             catch (IOException e)
             {
+                sstable.markSuspect();
                 throw new RuntimeException(e);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index fbb0788..275f983 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -151,6 +151,7 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
         }
         catch (IOException e)
         {
+            sstable.markSuspect();
             throw new IOError(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 79b71ae..8c1d540 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -92,6 +92,7 @@ public class SSTableReader extends SSTable
     // technically isCompacted is not necessary since it should never be 
unreferenced unless it is also compacted,
     // but it seems like a good extra layer of protection against reference 
counting bugs to not delete data based on that alone
     private final AtomicBoolean isCompacted = new AtomicBoolean(false);
+    private final AtomicBoolean isSuspect = new AtomicBoolean(false);
     private final SSTableDeletingTask deletingTask;
 
     private final SSTableMetadata sstableMetadata;
@@ -713,6 +714,7 @@ public class SSTableReader extends SSTable
             }
             catch (IOException e)
             {
+                markSuspect();
                 throw new IOError(e);
             }
             finally
@@ -799,6 +801,19 @@ public class SSTableReader extends SSTable
         return true;
     }
 
+    public void markSuspect()
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("Marking " + getFilename() + " as a suspect for 
blacklisting.");
+
+        isSuspect.getAndSet(true);
+    }
+
+    public boolean isMarkedSuspect()
+    {
+        return isSuspect.get();
+    }
+
     /**
      *
      * @param filter filter to use when reading the columns

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 0a5240a..5e4f269 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -57,6 +57,7 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
         }
         catch (IOException e)
         {
+            sstable.markSuspect();
             throw new IOError(e);
         }
         this.sstable = sstable;
@@ -74,6 +75,7 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
         }
         catch (IOException e)
         {
+            sstable.markSuspect();
             throw new IOError(e);
         }
         this.sstable = sstable;
@@ -100,6 +102,7 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
         }
         catch (IOException e)
         {
+            sstable.markSuspect();
             throw new RuntimeException("corrupt sstable", e);
         }
     }
@@ -154,6 +157,7 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
             }
             catch (IOException e)
             {
+                sstable.markSuspect();
                 throw new RuntimeException(e);
             }
         }
@@ -185,6 +189,7 @@ public class SSTableScanner implements 
CloseableIterator<IColumnIterator>
             }
             catch (IOException e)
             {
+                sstable.markSuspect();
                 throw new RuntimeException(SSTableScanner.this + " failed to 
provide next columns from " + this, e);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9483e147/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index ff6636d..23a2657 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -18,7 +18,7 @@
 */
 package org.apache.cassandra.db.compaction;
 
-import java.io.IOException;
+import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -30,6 +30,7 @@ import java.util.concurrent.Future;
 
 import org.junit.Test;
 import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
 
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.Util;
@@ -38,6 +39,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -46,6 +48,18 @@ public class CompactionsTest extends CleanupHelper
     public static final String TABLE1 = "Keyspace1";
 
     @Test
+    public void testBlacklistingWithSizeTieredCompactionStrategy() throws 
Exception
+    {
+        
testBlacklisting(SizeTieredCompactionStrategy.class.getCanonicalName());
+    }
+
+    @Test
+    public void testBlacklistingWithLeveledCompactionStrategy() throws 
Exception
+    {
+        testBlacklisting(LeveledCompactionStrategy.class.getCanonicalName());
+    }
+
+    @Test
     public void testStandardColumnCompactions() throws IOException, 
ExecutionException, InterruptedException
     {
         // this test does enough rows to force multiple block indexes to be 
used
@@ -83,6 +97,7 @@ public class CompactionsTest extends CleanupHelper
 
         // make sure max timestamp of compacted sstables is recorded properly 
after compaction.
         assertMaxTimestamp(store, maxTimestampExpected);
+        store.truncate();
     }
 
 
@@ -269,4 +284,100 @@ public class CompactionsTest extends CleanupHelper
         cf = store.getColumnFamily(filter);
         assert cf == null || cf.isEmpty() : "should be empty: " + cf;
     }
+
+    public void testBlacklisting(String compactionStrategy) throws Exception
+    {
+        // this test does enough rows to force multiple block indexes to be 
used
+        Table table = Table.open(TABLE1);
+        final ColumnFamilyStore store = 
table.getColumnFamilyStore("Standard1");
+
+        final int ROWS_PER_SSTABLE = 10;
+        final int SSTABLES = DatabaseDescriptor.getIndexInterval() * 2 / 
ROWS_PER_SSTABLE;
+
+        store.setCompactionStrategyClass(compactionStrategy);
+
+        // disable compaction while flushing
+        store.disableAutoCompaction();
+        //test index corruption
+        //now create a few new SSTables
+        long maxTimestampExpected = Long.MIN_VALUE;
+        Set<DecoratedKey> inserted = new HashSet<DecoratedKey>();
+        for (int j = 0; j < SSTABLES; j++)
+        {
+            for (int i = 0; i < ROWS_PER_SSTABLE; i++)
+            {
+                DecoratedKey key = Util.dk(String.valueOf(i % 2));
+                RowMutation rm = new RowMutation(TABLE1, key.key);
+                long timestamp = j * ROWS_PER_SSTABLE + i;
+                rm.add(new QueryPath("Standard1", null, 
ByteBufferUtil.bytes(String.valueOf(i / 2))),
+                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                        timestamp);
+                maxTimestampExpected = Math.max(timestamp, 
maxTimestampExpected);
+                rm.apply();
+                inserted.add(key);
+            }
+            store.forceBlockingFlush();
+            assertMaxTimestamp(store, maxTimestampExpected);
+            assertEquals(inserted.toString(), inserted.size(), 
Util.getRangeSlice(store).size());
+        }
+
+        Collection<SSTableReader> sstables = store.getSSTables();
+        int currentSSTable = 0;
+        int sstablesToCorrupt = 8;
+
+        // corrupt first 'sstablesToCorrupt' SSTables
+        for (SSTableReader sstable : sstables)
+        {
+            if(currentSSTable + 1 > sstablesToCorrupt)
+                break;
+
+            RandomAccessFile raf = null;
+
+            try
+            {
+                raf = new RandomAccessFile(sstable.getFilename(), "rw");
+                assertNotNull(raf);
+                raf.write(0xFFFFFF);
+            }
+            finally
+            {
+                FileUtils.closeQuietly(raf);
+            }
+
+            currentSSTable++;
+        }
+
+        int failures = 0;
+
+        // close error output steam to avoid printing ton of useless 
RuntimeException
+        System.err.close();
+
+        try
+        {
+            // in case something will go wrong we don't want to loop forever 
using for (;;)
+            for (int i = 0; i < sstables.size(); i++)
+            {
+                try
+                {
+                    store.forceMajorCompaction();
+                }
+                catch (Exception e)
+                {
+                    failures++;
+                    continue;
+                }
+
+                assertEquals(sstablesToCorrupt + 1, 
store.getSSTables().size());
+                break;
+            }
+        }
+        finally
+        {
+            System.setErr(new PrintStream(new ByteArrayOutputStream()));
+        }
+
+
+        store.truncate();
+        assertEquals(failures, sstablesToCorrupt);
+    }
 }

Reply via email to