Updated Branches: refs/heads/trunk abbb8601e -> 4c28cfb57
IndexHelper.skipBloomFilters won't skip non-SHA filters, part 2 patch by Carl Yeksigian; reviewed by jasobrown got CASSANDRA-5385 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4c28cfb5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4c28cfb5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4c28cfb5 Branch: refs/heads/trunk Commit: 4c28cfb57bfcd8f52991c3ee44eee0044fb3ba65 Parents: abbb860 Author: Jason Brown <jasedbr...@gmail.com> Authored: Wed Apr 3 06:29:05 2013 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Wed Apr 3 06:29:05 2013 -0700 ---------------------------------------------------------------------- .../db/columniterator/IndexedSliceReader.java | 2 +- .../db/columniterator/SSTableNamesIterator.java | 5 +- .../db/columniterator/SimpleSliceReader.java | 2 +- .../apache/cassandra/io/sstable/Descriptor.java | 2 + .../apache/cassandra/io/sstable/IndexHelper.java | 13 + .../io/sstable/SSTableIdentityIterator.java | 4 +- test/unit/org/apache/cassandra/db/ScrubTest.java | 245 +++++++++++++++ 7 files changed, 268 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c28cfb5/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java index e02316c..f63c577 100644 --- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java +++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java @@ -93,7 +93,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA else { setToRowStart(sstable, indexEntry, input); - IndexHelper.skipBloomFilter(file, version.filterType); + IndexHelper.skipSSTableBloomFilter(file, version); this.indexes = IndexHelper.deserializeIndex(file); this.emptyColumnFamily = EmptyColumns.factory.create(sstable.metadata); emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c28cfb5/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java index fe9d84f..415a1b8 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java @@ -124,7 +124,10 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement else { assert file != null; - IndexHelper.skipBloomFilter(file, sstable.descriptor.version.filterType ); + if (sstable.descriptor.version.hasRowLevelBF) + { + IndexHelper.skipSSTableBloomFilter(file, sstable.descriptor.version); + } indexList = IndexHelper.deserializeIndex(file); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c28cfb5/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java index ac556b3..58d8774 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java +++ b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java @@ -69,7 +69,7 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt if (!version.hasPromotedIndexes) { if(sstable.descriptor.version.hasRowLevelBF) - IndexHelper.skipBloomFilter(file, version.filterType); + IndexHelper.skipSSTableBloomFilter(file, version); IndexHelper.skipIndex(file); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c28cfb5/src/java/org/apache/cassandra/io/sstable/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index fabfbb8..c8c87c2 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -93,6 +93,7 @@ public class Descriptor public final FilterFactory.Type filterType; public final boolean hasAncestors; public final boolean hasSuperColumns; + public final boolean hasBloomFilterSizeInHeader; public final boolean tracksMaxLocalDeletionTime; public final boolean hasBloomFilterFPChance; public final boolean hasRowLevelBF; @@ -120,6 +121,7 @@ public class Descriptor filterType = FilterFactory.Type.MURMUR2; else filterType = FilterFactory.Type.MURMUR3; + hasBloomFilterSizeInHeader = version.compareTo("ia") < 0; hasSuperColumns = version.compareTo("ja") < 0; hasBloomFilterFPChance = version.compareTo("ja") >= 0; hasRowLevelBF = version.compareTo("ja") < 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c28cfb5/src/java/org/apache/cassandra/io/sstable/IndexHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java index 9b3a598..468fdc2 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java @@ -36,6 +36,19 @@ import org.apache.cassandra.utils.*; */ public class IndexHelper { + public static void skipSSTableBloomFilter(DataInput in, Descriptor.Version version) throws IOException + { + if (version.hasBloomFilterSizeInHeader) + { + int size = in.readInt(); + FileUtils.skipBytesFully(in, size); + } + else + { + skipBloomFilter(in, version.filterType); + } + } + /** * Skip the bloom filter http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c28cfb5/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index 27144cc..147472a 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@ -126,7 +126,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat { try { - IndexHelper.skipBloomFilter(file, dataVersion.filterType); + IndexHelper.skipSSTableBloomFilter(file, dataVersion); } catch (Exception e) { @@ -151,7 +151,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat if (sstable != null && !dataVersion.hasPromotedIndexes) { - IndexHelper.skipBloomFilter(inputWithTracker, dataVersion.filterType); + IndexHelper.skipSSTableBloomFilter(inputWithTracker, dataVersion); IndexHelper.skipIndex(inputWithTracker); } columnFamily = EmptyColumns.factory.create(metadata); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c28cfb5/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java new file mode 100644 index 0000000..93d0a0d --- /dev/null +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -0,0 +1,245 @@ +package org.apache.cassandra.db; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.cassandra.OrderedJUnit4ClassRunner; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.db.columniterator.IdentityQueryFilter; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.filter.NamesQueryFilter; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.CLibrary; + +import static org.apache.cassandra.Util.column; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(OrderedJUnit4ClassRunner.class) +public class ScrubTest extends SchemaLoader +{ + public String TABLE = "Keyspace1"; + public String CF = "Standard1"; + public String CF2 = "Super5"; + public String CF3 = "Standard2"; + + public String copySSTables(String cf) throws IOException + { + String root = System.getProperty("corrupt-sstable-root"); + assert root != null; + File rootDir = new File(root); + assert rootDir.isDirectory(); + + File destDir = Directories.create(TABLE, cf).getDirectoryForNewSSTables(1); + + String corruptSSTableName = null; + + FileUtils.createDirectory(destDir); + for (File srcFile : rootDir.listFiles()) + { + if (srcFile.getName().equals(".svn")) + continue; + if (!srcFile.getName().contains(cf)) + continue; + File destFile = new File(destDir, srcFile.getName()); + CLibrary.createHardLink(srcFile, destFile); + + assert destFile.exists() : destFile.getAbsoluteFile(); + + if(destFile.getName().endsWith("Data.db")) + corruptSSTableName = destFile.getCanonicalPath(); + } + + assert corruptSSTableName != null; + return corruptSSTableName; + } + + @Test + public void testScrubFile() throws Exception + { + copySSTables(CF2); + + Table table = Table.open(TABLE); + ColumnFamilyStore cfs = table.getColumnFamilyStore(CF2); + cfs.loadNewSSTables(); + assert cfs.getSSTables().size() > 0; + + List<Row> rows; + boolean caught = false; + try + { + rows = cfs.getRangeSlice(Util.range("", ""), 1000, new NamesQueryFilter(CompositeType.build(ByteBufferUtil.bytes("1"))), null); + fail("This slice should fail"); + } + catch (IllegalArgumentException e) + { + // thrown by Buffer.limit as the column names are attempted to be read (after the row-level BF is skipped) + caught = true; + } + assert caught : "'corrupt' test file actually was not"; + + CompactionManager.instance.performScrub(cfs); + rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null); + assertEquals(100, rows.size()); + } + + @Test + public void testScrubOneRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException + { + CompactionManager.instance.disableAutoCompaction(); + Table table = Table.open(TABLE); + ColumnFamilyStore cfs = table.getColumnFamilyStore(CF); + + List<Row> rows; + + // insert data and verify we get it back w/ range query + fillCF(cfs, 1); + rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null); + assertEquals(1, rows.size()); + + CompactionManager.instance.performScrub(cfs); + + // check data is still there + rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null); + assertEquals(1, rows.size()); + } + + @Test + public void testScrubDeletedRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException + { + CompactionManager.instance.disableAutoCompaction(); + Table table = Table.open(TABLE); + ColumnFamilyStore cfs = table.getColumnFamilyStore(CF3); + + ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(TABLE, CF3); + cf.delete(new DeletionInfo(0, 1)); // expired tombstone + RowMutation rm = new RowMutation(TABLE, ByteBufferUtil.bytes(1), cf); + rm.applyUnsafe(); + cfs.forceBlockingFlush(); + + CompactionManager.instance.performScrub(cfs); + assert cfs.getSSTables().isEmpty(); + } + + @Test + public void testScrubMultiRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException + { + CompactionManager.instance.disableAutoCompaction(); + Table table = Table.open(TABLE); + ColumnFamilyStore cfs = table.getColumnFamilyStore(CF); + + List<Row> rows; + + // insert data and verify we get it back w/ range query + fillCF(cfs, 10); + rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null); + assertEquals(10, rows.size()); + + CompactionManager.instance.performScrub(cfs); + + // check data is still there + rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null); + assertEquals(10, rows.size()); + } + + @Test + public void testScubOutOfOrder() throws Exception + { + CompactionManager.instance.disableAutoCompaction(); + Table table = Table.open(TABLE); + String columnFamily = "Standard3"; + ColumnFamilyStore cfs = table.getColumnFamilyStore(columnFamily); + + /* + * Code used to generate an outOfOrder sstable. The test for out-of-order key in SSTableWriter must also be commented out. + * The test also assumes an ordered partitioner. + * + * ColumnFamily cf = ColumnFamily.create(TABLE, columnFamily); + * cf.addColumn(new Column(ByteBufferUtil.bytes("someName"), ByteBufferUtil.bytes("someValue"), 0L)); + + * SSTableWriter writer = cfs.createCompactionWriter((long)DatabaseDescriptor.getIndexInterval(), new File("."), Collections.<SSTableReader>emptyList()); + * writer.append(Util.dk("a"), cf); + * writer.append(Util.dk("b"), cf); + * writer.append(Util.dk("z"), cf); + * writer.append(Util.dk("c"), cf); + * writer.append(Util.dk("y"), cf); + * writer.append(Util.dk("d"), cf); + * writer.closeAndOpenReader(); + */ + + copySSTables(columnFamily); + cfs.loadNewSSTables(); + assert cfs.getSSTables().size() > 0; + + List<Row> rows; + rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null); + assert !isRowOrdered(rows) : "'corrupt' test file actually was not"; + + CompactionManager.instance.performScrub(cfs); + rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null); + assert isRowOrdered(rows) : "Scrub failed: " + rows; + assert rows.size() == 6: "Got " + rows.size(); + } + + private static boolean isRowOrdered(List<Row> rows) + { + DecoratedKey prev = null; + for (Row row : rows) + { + if (prev != null && prev.compareTo(row.key) > 0) + return false; + prev = row.key; + } + return true; + } + + protected void fillCF(ColumnFamilyStore cfs, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException + { + for (int i = 0; i < rowsPerSSTable; i++) + { + String key = String.valueOf(i); + // create a row and update the birthdate value, test that the index query fetches the new version + ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(TABLE, CF); + cf.addColumn(column("c1", "1", 1L)); + cf.addColumn(column("c2", "2", 1L)); + RowMutation rm = new RowMutation(TABLE, ByteBufferUtil.bytes(key), cf); + rm.applyUnsafe(); + } + + cfs.forceBlockingFlush(); + } + + + + +}