Updated Branches: refs/heads/cassandra-1.2 2156fb22e -> 4f75875c2
IndexHelper.skipBloomFilters won't skip non-SHA filters patch by Carl Yeksigian; reviewed by jasobrown for CASSANDRA-5385 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4f75875c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4f75875c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4f75875c Branch: refs/heads/cassandra-1.2 Commit: 4f75875c2ec1b5324d2c4afea5251fb02fadf268 Parents: 2156fb2 Author: Jason Brown <jasedbr...@gmail.com> Authored: Tue Apr 2 11:23:54 2013 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Tue Apr 2 11:23:54 2013 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/columniterator/IndexedSliceReader.java | 10 +- .../db/columniterator/SimpleSliceReader.java | 8 +- .../apache/cassandra/io/sstable/IndexHelper.java | 23 +- .../io/sstable/SSTableIdentityIterator.java | 9 +- test/unit/org/apache/cassandra/db/ScrubTest.java | 243 --------------- 6 files changed, 30 insertions(+), 264 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f75875c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 273b6e6..7e906ce 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,7 @@ * cassandra-all 1.2.0 pom missing netty dependency (CASSANDRA-5392) * Fix writetime/ttl functions on null values (CASSANDRA-5341) * Fix NPE during cql3 select with token() (CASSANDRA-5404) + * IndexHelper.skipBloomFilters won't skip non-SHA filters (CASSANDRA-5385) Merged from 1.1: * cli: Quote ks and cf names in schema output when needed (CASSANDRA-5052) * Fix bad default for min/max timestamp in SSTableMetadata (CASSANDRA-5372) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f75875c/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java index db0130e..9b34a6a 100644 --- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java +++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java @@ -33,6 +33,7 @@ import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.filter.ColumnSlice; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.IndexHelper; import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo; import org.apache.cassandra.io.sstable.SSTableReader; @@ -74,14 +75,15 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA try { - if (sstable.descriptor.version.hasPromotedIndexes) + Descriptor.Version version = sstable.descriptor.version; + if (version.hasPromotedIndexes) { this.indexes = indexEntry.columnsIndex(); if (indexes.isEmpty()) { setToRowStart(sstable, indexEntry, input); this.emptyColumnFamily = ColumnFamily.create(sstable.metadata); - emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, sstable.descriptor.version)); + emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version)); fetcher = new SimpleBlockFetcher(); } else @@ -94,10 +96,10 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA else { setToRowStart(sstable, indexEntry, input); - IndexHelper.skipBloomFilter(file); + IndexHelper.skipBloomFilter(file, version.filterType); this.indexes = IndexHelper.deserializeIndex(file); this.emptyColumnFamily = ColumnFamily.create(sstable.metadata); - emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, sstable.descriptor.version)); + emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version)); fetcher = indexes.isEmpty() ? new SimpleBlockFetcher() : new IndexedBlockFetcher(file.getFilePointer() + 4); // We still have the column count to http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f75875c/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java index d19e6a5..132f9cb 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java +++ b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java @@ -29,6 +29,7 @@ import org.apache.cassandra.db.OnDiskAtom; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.IndexHelper; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileDataInput; @@ -71,14 +72,15 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt ByteBufferUtil.skipShortLength(file); SSTableReader.readRowSize(file, sstable.descriptor); - if (!sstable.descriptor.version.hasPromotedIndexes) + Descriptor.Version version = sstable.descriptor.version; + if (!version.hasPromotedIndexes) { - IndexHelper.skipBloomFilter(file); + IndexHelper.skipBloomFilter(file, version.filterType); IndexHelper.skipIndex(file); } emptyColumnFamily = ColumnFamily.create(sstable.metadata); - emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, sstable.descriptor.version)); + emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version)); atomSerializer = emptyColumnFamily.getOnDiskSerializer(); columns = file.readInt(); mark = file.mark(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f75875c/src/java/org/apache/cassandra/io/sstable/IndexHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java index 36e972e..14b2cda 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java @@ -42,20 +42,23 @@ public class IndexHelper * @param in the data input from which the bloom filter should be skipped * @throws IOException */ - public static void skipBloomFilter(DataInput in) throws IOException + public static void skipBloomFilter(DataInput in, FilterFactory.Type type) throws IOException { /* size of the bloom filter */ int size = in.readInt(); - /* skip the serialized bloom filter */ - if (in instanceof FileDataInput) - { - FileUtils.skipBytesFully(in, size); - } - else + switch (type) { - // skip bytes - byte[] skip = new byte[size]; - in.readFully(skip); + case SHA: + // can skip since bitset = 1 byte + FileUtils.skipBytesFully(in, size); + break; + case MURMUR2: + case MURMUR3: + long bitLength = in.readInt() * 8; + FileUtils.skipBytesFully(in, bitLength); + break; + default: + throw new IllegalStateException("Unknown filterfactory type " + type.toString()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f75875c/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index d9aabfb..157ccf8 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@ -19,6 +19,7 @@ package org.apache.cassandra.io.sstable; import java.io.*; +import org.apache.cassandra.utils.FilterFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,11 +123,11 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat if (dataStart + dataSize > file.length()) throw new IOException(String.format("dataSize of %s starting at %s would be larger than file %s length %s", dataSize, dataStart, file.getPath(), file.length())); - if (checkData && !sstable.descriptor.version.hasPromotedIndexes) + if (checkData && !dataVersion.hasPromotedIndexes) { try { - IndexHelper.defreezeBloomFilter(file, dataSize, sstable.descriptor.version.filterType); + IndexHelper.defreezeBloomFilter(file, dataSize, dataVersion.filterType); } catch (Exception e) { @@ -150,9 +151,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat } } - if (sstable != null && !sstable.descriptor.version.hasPromotedIndexes) + if (sstable != null && !dataVersion.hasPromotedIndexes) { - IndexHelper.skipBloomFilter(inputWithTracker); + IndexHelper.skipBloomFilter(inputWithTracker, dataVersion.filterType); IndexHelper.skipIndex(inputWithTracker); } columnFamily = ColumnFamily.create(metadata); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4f75875c/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java deleted file mode 100644 index fbde908..0000000 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ /dev/null @@ -1,243 +0,0 @@ -package org.apache.cassandra.db; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ExecutionException; - -import org.junit.Test; - -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.db.columniterator.IdentityQueryFilter; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.CLibrary; - -import static org.apache.cassandra.Util.column; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public class ScrubTest extends SchemaLoader -{ - public String TABLE = "Keyspace1"; - public String CF = "Standard1"; - public String CF2 = "Super5"; - public String CF3 = "Standard2"; - - public String copySSTables(String cf) throws IOException - { - String root = System.getProperty("corrupt-sstable-root"); - assert root != null; - File rootDir = new File(root); - assert rootDir.isDirectory(); - - File destDir = Directories.create(TABLE, cf).getDirectoryForNewSSTables(1); - - String corruptSSTableName = null; - - FileUtils.createDirectory(destDir); - for (File srcFile : rootDir.listFiles()) - { - if (srcFile.getName().equals(".svn")) - continue; - if (!srcFile.getName().contains(cf)) - continue; - File destFile = new File(destDir, srcFile.getName()); - CLibrary.createHardLink(srcFile, destFile); - - assert destFile.exists() : destFile.getAbsoluteFile(); - - if(destFile.getName().endsWith("Data.db")) - corruptSSTableName = destFile.getCanonicalPath(); - } - - assert corruptSSTableName != null; - return corruptSSTableName; - } - - @Test - public void testScrubFile() throws Exception - { - copySSTables(CF2); - - Table table = Table.open(TABLE); - ColumnFamilyStore cfs = table.getColumnFamilyStore(CF2); - cfs.loadNewSSTables(); - assert cfs.getSSTables().size() > 0; - - List<Row> rows; - boolean caught = false; - try - { - rows = cfs.getRangeSlice(ByteBufferUtil.bytes("1"), Util.range("", ""), 1000, new IdentityQueryFilter(), null); - fail("This slice should fail"); - } - catch (NegativeArraySizeException e) - { - caught = true; - } - assert caught : "'corrupt' test file actually was not"; - - CompactionManager.instance.performScrub(cfs); - rows = cfs.getRangeSlice(ByteBufferUtil.bytes("1"), Util.range("", ""), 1000, new IdentityQueryFilter(), null); - assertEquals(100, rows.size()); - } - - @Test - public void testScrubOneRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException - { - CompactionManager.instance.disableAutoCompaction(); - Table table = Table.open(TABLE); - ColumnFamilyStore cfs = table.getColumnFamilyStore(CF); - - List<Row> rows; - - // insert data and verify we get it back w/ range query - fillCF(cfs, 1); - rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter(), null); - assertEquals(1, rows.size()); - - CompactionManager.instance.performScrub(cfs); - - // check data is still there - rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter(), null); - assertEquals(1, rows.size()); - } - - @Test - public void testScrubDeletedRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException - { - CompactionManager.instance.disableAutoCompaction(); - Table table = Table.open(TABLE); - ColumnFamilyStore cfs = table.getColumnFamilyStore(CF3); - - RowMutation rm; - rm = new RowMutation(TABLE, ByteBufferUtil.bytes(1)); - ColumnFamily cf = ColumnFamily.create(TABLE, CF3); - cf.delete(new DeletionInfo(0, 1)); // expired tombstone - rm.add(cf); - rm.applyUnsafe(); - cfs.forceBlockingFlush(); - - CompactionManager.instance.performScrub(cfs); - assert cfs.getSSTables().isEmpty(); - } - - @Test - public void testScrubMultiRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException - { - CompactionManager.instance.disableAutoCompaction(); - Table table = Table.open(TABLE); - ColumnFamilyStore cfs = table.getColumnFamilyStore(CF); - - List<Row> rows; - - // insert data and verify we get it back w/ range query - fillCF(cfs, 10); - rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter(), null); - assertEquals(10, rows.size()); - - CompactionManager.instance.performScrub(cfs); - - // check data is still there - rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter(), null); - assertEquals(10, rows.size()); - } - - @Test - public void testScubOutOfOrder() throws Exception - { - CompactionManager.instance.disableAutoCompaction(); - Table table = Table.open(TABLE); - String columnFamily = "Standard3"; - ColumnFamilyStore cfs = table.getColumnFamilyStore(columnFamily); - - /* - * Code used to generate an outOfOrder sstable. The test for out-of-order key in SSTableWriter must also be commented out. - * The test also assumes an ordered partitioner. - * - * ColumnFamily cf = ColumnFamily.create(TABLE, columnFamily); - * cf.addColumn(new Column(ByteBufferUtil.bytes("someName"), ByteBufferUtil.bytes("someValue"), 0L)); - - * SSTableWriter writer = cfs.createCompactionWriter((long)DatabaseDescriptor.getIndexInterval(), new File("."), Collections.<SSTableReader>emptyList()); - * writer.append(Util.dk("a"), cf); - * writer.append(Util.dk("b"), cf); - * writer.append(Util.dk("z"), cf); - * writer.append(Util.dk("c"), cf); - * writer.append(Util.dk("y"), cf); - * writer.append(Util.dk("d"), cf); - * writer.closeAndOpenReader(); - */ - - copySSTables(columnFamily); - cfs.loadNewSSTables(); - assert cfs.getSSTables().size() > 0; - - List<Row> rows; - rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter(), null); - assert !isRowOrdered(rows) : "'corrupt' test file actually was not"; - - CompactionManager.instance.performScrub(cfs); - rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter(), null); - assert isRowOrdered(rows) : "Scrub failed: " + rows; - assert rows.size() == 6: "Got " + rows.size(); - } - - private static boolean isRowOrdered(List<Row> rows) - { - DecoratedKey prev = null; - for (Row row : rows) - { - if (prev != null && prev.compareTo(row.key) > 0) - return false; - prev = row.key; - } - return true; - } - - protected void fillCF(ColumnFamilyStore cfs, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException - { - for (int i = 0; i < rowsPerSSTable; i++) - { - String key = String.valueOf(i); - // create a row and update the birthdate value, test that the index query fetches the new version - RowMutation rm; - rm = new RowMutation(TABLE, ByteBufferUtil.bytes(key)); - ColumnFamily cf = ColumnFamily.create(TABLE, CF); - cf.addColumn(column("c1", "1", 1L)); - cf.addColumn(column("c2", "2", 1L)); - rm.add(cf); - rm.applyUnsafe(); - } - - cfs.forceBlockingFlush(); - } - - - - -}