Revert "Always record row-level tombstones in index component; this time from the correct feature branch"
This reverts commits 811f82c5b151bb1f6178392470981883d5e1dfc5, 317ab72a8cc067bd16db63d9e6c19390e96075f9, 344f5fa5af64b601751269fc570fbf4de930e9e2, d7a09825025374b6a49a250467039dc15f36d053. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7746225d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7746225d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7746225d Branch: refs/heads/trunk Commit: 7746225dc5bfc07297cc485c8035fd7441e38e88 Parents: c5f4cdd Author: Jonathan Ellis <jbel...@apache.org> Authored: Mon Apr 22 08:22:18 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Mon Apr 22 09:38:11 2013 -0500 ---------------------------------------------------------------------- build.xml | 2 +- src/java/org/apache/cassandra/db/ColumnIndex.java | 13 +-- src/java/org/apache/cassandra/db/DeletionInfo.java | 5 - src/java/org/apache/cassandra/db/DeletionTime.java | 2 +- .../org/apache/cassandra/db/RowIndexEntry.java | 101 ++++++----- .../db/columniterator/IndexedSliceReader.java | 91 +++++----- .../db/columniterator/SSTableNamesIterator.java | 131 ++++++++++----- .../db/columniterator/SimpleSliceReader.java | 9 +- .../apache/cassandra/io/sstable/Descriptor.java | 7 +- .../apache/cassandra/io/sstable/IndexHelper.java | 5 + .../apache/cassandra/io/sstable/SSTableReader.java | 2 +- .../apache/cassandra/io/sstable/SSTableWriter.java | 16 +- .../cassandra/utils/AlwaysPresentFilter.java | 4 - .../org/apache/cassandra/utils/FilterFactory.java | 2 +- .../org/apache/cassandra/cache/ObjectSizeTest.java | 9 +- .../apache/cassandra/db/RangeTombstoneTest.java | 4 + 16 files changed, 218 insertions(+), 185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 2ddd43d..3491431 100644 --- a/build.xml +++ b/build.xml @@ -519,7 +519,7 @@ </artifact:pom> </target> - <target name="maven-ant-tasks-retrieve-build" depends="maven-declare-dependencies" unless="without.maven"> + <target name="maven-ant-tasks-retrieve-build" depends="maven-declare-dependencies"> <artifact:dependencies pomRefId="build-deps-pom" filesetId="build-dependency-jars" sourcesFilesetId="build-dependency-sources" http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index 658e94f..bcd0eef 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -22,8 +22,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import com.google.common.annotations.VisibleForTesting; - import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.sstable.IndexHelper; import org.apache.cassandra.utils.AlwaysPresentFilter; @@ -35,7 +33,7 @@ public class ColumnIndex public final List<IndexHelper.IndexInfo> columnsIndex; public final IFilter bloomFilter; - private static final ColumnIndex EMPTY = new ColumnIndex(Collections.<IndexHelper.IndexInfo>emptyList(), AlwaysPresentFilter.instance); + private static final ColumnIndex EMPTY = new ColumnIndex(Collections.<IndexHelper.IndexInfo>emptyList(), new AlwaysPresentFilter()); private ColumnIndex(int estimatedColumnCount) { @@ -44,19 +42,10 @@ public class ColumnIndex private ColumnIndex(List<IndexHelper.IndexInfo> columnsIndex, IFilter bloomFilter) { - assert columnsIndex != null; - assert bloomFilter != null; - this.columnsIndex = columnsIndex; this.bloomFilter = bloomFilter; } - @VisibleForTesting - public static ColumnIndex nothing() - { - return new ColumnIndex(0); - } - /** * Help to create an index for a column family based on size of columns, * and write said columns to disk. http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/src/java/org/apache/cassandra/db/DeletionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java index 095a91d..405645f 100644 --- a/src/java/org/apache/cassandra/db/DeletionInfo.java +++ b/src/java/org/apache/cassandra/db/DeletionInfo.java @@ -73,11 +73,6 @@ public class DeletionInfo this.ranges = ranges; } - public DeletionInfo(DeletionTime deletion) - { - this(deletion, IntervalTree.<ByteBuffer, DeletionTime, RangeTombstone>emptyTree()); - } - public static Serializer serializer() { return serializer; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/src/java/org/apache/cassandra/db/DeletionTime.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java index 350d026..5f39071 100644 --- a/src/java/org/apache/cassandra/db/DeletionTime.java +++ b/src/java/org/apache/cassandra/db/DeletionTime.java @@ -35,7 +35,7 @@ public class DeletionTime implements Comparable<DeletionTime> public static final ISerializer<DeletionTime> serializer = new Serializer(); - public DeletionTime(long markedForDeleteAt, int localDeletionTime) + DeletionTime(long markedForDeleteAt, int localDeletionTime) { this.markedForDeleteAt = markedForDeleteAt; this.localDeletionTime = localDeletionTime; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/src/java/org/apache/cassandra/db/RowIndexEntry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java index d5046a8..a831498 100644 --- a/src/java/org/apache/cassandra/db/RowIndexEntry.java +++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java @@ -28,7 +28,6 @@ import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.IndexHelper; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.AlwaysPresentFilter; import org.apache.cassandra.utils.IFilter; import org.apache.cassandra.utils.FilterFactory; import org.apache.cassandra.utils.ObjectSizes; @@ -46,41 +45,35 @@ public class RowIndexEntry implements IMeasurableMemory public int serializedSize() { - return TypeSizes.NATIVE.sizeof(position) + promotedSize(); + return TypeSizes.NATIVE.sizeof(position); } - public int promotedSize() + public static RowIndexEntry create(long position, DeletionInfo deletionInfo, ColumnIndex index) { - return 0; + if (index != null && index.columnsIndex != null && index.columnsIndex.size() > 1) + return new IndexedEntry(position, deletionInfo, index.columnsIndex, index.bloomFilter); + else + return new RowIndexEntry(position); } - public static RowIndexEntry create(long position, DeletionTime deletion, ColumnIndex index) + public boolean isIndexed() { - assert deletion != null; - assert index != null; - - if (index.columnsIndex.size() > 1 || deletion != DeletionTime.LIVE) - return new IndexedEntry(position, - deletion, - index.columnsIndex.isEmpty() ? Collections.<IndexHelper.IndexInfo>emptyList() : index.columnsIndex, - AlwaysPresentFilter.instance); - else - return new RowIndexEntry(position); + return !columnsIndex().isEmpty(); } - public DeletionTime deletionTime() + public DeletionInfo deletionInfo() { - return DeletionTime.LIVE; + throw new UnsupportedOperationException(); } public List<IndexHelper.IndexInfo> columnsIndex() { - return Collections.emptyList(); + return Collections.<IndexHelper.IndexInfo>emptyList(); } public IFilter bloomFilter() { - return AlwaysPresentFilter.instance; + throw new UnsupportedOperationException(); } public long memorySize() @@ -94,13 +87,14 @@ public class RowIndexEntry implements IMeasurableMemory public void serialize(RowIndexEntry rie, DataOutput dos) throws IOException { dos.writeLong(rie.position); - if (!rie.columnsIndex().isEmpty() || rie.deletionTime() != DeletionTime.LIVE) + if (rie.isIndexed()) { - dos.writeInt(rie.promotedSize()); - DeletionTime.serializer.serialize(rie.deletionTime(), dos); + dos.writeInt(((IndexedEntry)rie).serializedSize()); + DeletionInfo.serializer().serializeForSSTable(rie.deletionInfo(), dos); dos.writeInt(rie.columnsIndex().size()); for (IndexHelper.IndexInfo info : rie.columnsIndex()) info.serialize(dos); + FilterFactory.serialize(rie.bloomFilter(), dos); } else { @@ -108,23 +102,38 @@ public class RowIndexEntry implements IMeasurableMemory } } - public RowIndexEntry deserialize(DataInput dis, Descriptor.Version version) throws IOException + public RowIndexEntry deserializePositionOnly(DataInput dis, Descriptor.Version version) throws IOException { long position = dis.readLong(); - if (!version.hasPromotedIndexes) - return new RowIndexEntry(position); + if (version.hasPromotedIndexes) + { + int size = dis.readInt(); + if (size > 0) + FileUtils.skipBytesFully(dis, size); + } + return new RowIndexEntry(position); + } - int size = dis.readInt(); - if (size > 0) + public RowIndexEntry deserialize(DataInput dis, Descriptor.Version version) throws IOException + { + long position = dis.readLong(); + if (version.hasPromotedIndexes) { - DeletionTime deletion = DeletionTime.serializer.deserialize(dis); - int entries = dis.readInt(); - List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<IndexHelper.IndexInfo>(entries); - for (int i = 0; i < entries; i++) - columnsIndex.add(IndexHelper.IndexInfo.deserialize(dis)); - if (!version.hasPromotedRowTombstones && entries > 0) - FilterFactory.deserialize(dis, version.filterType, false); - return new IndexedEntry(position, deletion, columnsIndex, AlwaysPresentFilter.instance); + int size = dis.readInt(); + if (size > 0) + { + DeletionInfo delInfo = DeletionInfo.serializer().deserializeFromSSTable(dis, version); + int entries = dis.readInt(); + List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<IndexHelper.IndexInfo>(entries); + for (int i = 0; i < entries; i++) + columnsIndex.add(IndexHelper.IndexInfo.deserialize(dis)); + IFilter bf = FilterFactory.deserialize(dis, version.filterType, false); + return new IndexedEntry(position, delInfo, columnsIndex, bf); + } + else + { + return new RowIndexEntry(position); + } } else { @@ -154,24 +163,24 @@ public class RowIndexEntry implements IMeasurableMemory */ private static class IndexedEntry extends RowIndexEntry { - private final DeletionTime deletion; + private final DeletionInfo deletionInfo; private final List<IndexHelper.IndexInfo> columnsIndex; private final IFilter bloomFilter; - private IndexedEntry(long position, DeletionTime deletion, List<IndexHelper.IndexInfo> columnsIndex, IFilter bloomFilter) + private IndexedEntry(long position, DeletionInfo deletionInfo, List<IndexHelper.IndexInfo> columnsIndex, IFilter bloomFilter) { super(position); - assert deletion != null; - assert columnsIndex != null; - this.deletion = deletion; + assert deletionInfo != null; + assert columnsIndex != null && columnsIndex.size() > 1; + this.deletionInfo = deletionInfo; this.columnsIndex = columnsIndex; this.bloomFilter = bloomFilter; } @Override - public DeletionTime deletionTime() + public DeletionInfo deletionInfo() { - return deletion; + return deletionInfo; } @Override @@ -187,15 +196,15 @@ public class RowIndexEntry implements IMeasurableMemory } @Override - public int promotedSize() + public int serializedSize() { TypeSizes typeSizes = TypeSizes.NATIVE; - long size = DeletionTime.serializer.serializedSize(deletion, typeSizes); + long size = DeletionTime.serializer.serializedSize(deletionInfo.getTopLevelDeletion(), typeSizes); size += typeSizes.sizeof(columnsIndex.size()); // number of entries for (IndexHelper.IndexInfo info : columnsIndex) size += info.serializedSize(typeSizes); - size += bloomFilter instanceof AlwaysPresentFilter ? 0 : FilterFactory.serializedSize(bloomFilter); + size += FilterFactory.serializedSize(bloomFilter); assert size <= Integer.MAX_VALUE; return (int)size; } @@ -206,7 +215,7 @@ public class RowIndexEntry implements IMeasurableMemory for (IndexHelper.IndexInfo idx : columnsIndex) internal += idx.memorySize(); long listSize = ObjectSizes.getFieldSize(ObjectSizes.getArraySize(columnsIndex.size(), internal) + 4); - return ObjectSizes.getFieldSize(deletion.memorySize() + listSize); + return ObjectSizes.getFieldSize(deletionInfo.memorySize() + listSize); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java index 01740ca..7289ab0 100644 --- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java +++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java @@ -65,7 +65,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA * finish (reverse start) elements. i.e. forward: [a,b],[d,e],[g,h] reverse: [h,g],[e,d],[b,a]. This reader also * assumes that validation has been performed in terms of intervals (no overlapping intervals). */ - public IndexedSliceReader(SSTableReader sstable, RowIndexEntry rowEntry, FileDataInput input, ColumnSlice[] slices, boolean reversed) + public IndexedSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ColumnSlice[] slices, boolean reversed) { this.sstable = sstable; this.originalInput = input; @@ -76,53 +76,34 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA try { Descriptor.Version version = sstable.descriptor.version; - emptyColumnFamily = ColumnFamily.create(sstable.metadata); - - if (version.hasPromotedRowTombstones && !rowEntry.columnsIndex().isEmpty()) - { - // skip the row header entirely - indexes = rowEntry.columnsIndex(); - emptyColumnFamily.delete(new DeletionInfo(rowEntry.deletionTime())); - fetcher = new IndexedBlockFetcher(rowEntry.position); - return; - } - - // skip up to bloom filter where things get a bit more interesting - if (input == null) - { - file = sstable.getFileDataInput(rowEntry.position); - } - else - { - file = input; - file.seek(rowEntry.position); - } - this.sstable.decodeKey(ByteBufferUtil.readWithShortLength(file)); - SSTableReader.readRowSize(file, this.sstable.descriptor); - - // read the row header up to and including the row-level tombstones if (version.hasPromotedIndexes) { - indexes = rowEntry.columnsIndex(); - // we'll get row deletion time from the row header below + this.indexes = indexEntry.columnsIndex(); + if (indexes.isEmpty()) + { + setToRowStart(sstable, indexEntry, input); + this.emptyColumnFamily = ColumnFamily.create(sstable.metadata); + emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version)); + fetcher = new SimpleBlockFetcher(); + } + else + { + this.emptyColumnFamily = ColumnFamily.create(sstable.metadata); + emptyColumnFamily.delete(indexEntry.deletionInfo()); + fetcher = new IndexedBlockFetcher(indexEntry.position); + } } else { + setToRowStart(sstable, indexEntry, input); IndexHelper.skipSSTableBloomFilter(file, version); - indexes = IndexHelper.deserializeIndex(file); - } - emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version)); - - if (indexes.isEmpty()) - { - fetcher = new SimpleBlockFetcher(); - } - else - { - // index offsets changed to be based against the row key start in 1.2 - fetcher = version.hasPromotedIndexes - ? new IndexedBlockFetcher(rowEntry.position) - : new IndexedBlockFetcher(file.getFilePointer() + 4); // +4 to skip the int column count + this.indexes = IndexHelper.deserializeIndex(file); + this.emptyColumnFamily = ColumnFamily.create(sstable.metadata); + emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version)); + fetcher = indexes.isEmpty() + ? new SimpleBlockFetcher() + : new IndexedBlockFetcher(file.getFilePointer() + 4); // We still have the column count to + // skip to get the basePosition } } catch (IOException e) @@ -132,6 +113,24 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA } } + /** + * Sets the seek position to the start of the row for column scanning. + */ + private void setToRowStart(SSTableReader reader, RowIndexEntry indexEntry, FileDataInput input) throws IOException + { + if (input == null) + { + this.file = sstable.getFileDataInput(indexEntry.position); + } + else + { + this.file = input; + input.seek(indexEntry.position); + } + sstable.decodeKey(ByteBufferUtil.readWithShortLength(file)); + SSTableReader.readRowSize(file, sstable.descriptor); + } + public ColumnFamily getColumnFamily() { return emptyColumnFamily; @@ -198,6 +197,8 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA return reversed ? slices[currentSliceIdx].start : slices[currentSliceIdx].finish; } + protected abstract boolean setNextSlice(); + protected abstract boolean fetchMoreData(); protected boolean isColumnBeforeSliceStart(OnDiskAtom column) @@ -247,7 +248,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA setNextSlice(); } - private boolean setNextSlice() + protected boolean setNextSlice() { while (++currentSliceIdx < slices.length) { @@ -349,7 +350,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA /* seek to the correct offset to the data, and calculate the data size */ long positionToSeek = basePosition + currentIndex.offset; - // With 1.2 promoted indexes, our first seek in the data file will happen at this point + // With new promoted indexes, our first seek in the data file will happen at that point. if (file == null) file = originalInput == null ? sstable.getFileDataInput(positionToSeek) : originalInput; @@ -463,7 +464,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA } } - private boolean setNextSlice() + protected boolean setNextSlice() { if (reversed) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java index 2561ad6..da4631d 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java @@ -19,22 +19,25 @@ package org.apache.cassandra.db.columniterator; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.SortedSet; +import java.util.*; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnFamilySerializer; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionInfo; +import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.OnDiskAtom; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.IndexHelper; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileMark; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.IFilter; public class SSTableNamesIterator extends SimpleAbstractColumnIterator implements ISSTableColumnIterator { @@ -52,13 +55,13 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement this.columns = columns; this.key = key; - RowIndexEntry rowEntry = sstable.getPosition(key, SSTableReader.Operator.EQ); - if (rowEntry == null) + RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ); + if (indexEntry == null) return; try { - read(sstable, null, rowEntry); + read(sstable, null, indexEntry); } catch (IOException e) { @@ -72,7 +75,7 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement } } - public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, SortedSet<ByteBuffer> columns, RowIndexEntry rowEntry) + public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, SortedSet<ByteBuffer> columns, RowIndexEntry indexEntry) { assert columns != null; this.sstable = sstable; @@ -81,7 +84,7 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement try { - read(sstable, file, rowEntry); + read(sstable, file, indexEntry); } catch (IOException e) { @@ -101,67 +104,101 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement return sstable; } - private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry rowEntry) - throws IOException + private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry indexEntry) + throws IOException { + IFilter bf; List<IndexHelper.IndexInfo> indexList; - Descriptor.Version version = sstable.descriptor.version; - cf = ColumnFamily.create(sstable.metadata); - List<OnDiskAtom> result = new ArrayList<OnDiskAtom>(columns.size()); - - if (version.hasPromotedRowTombstones && !rowEntry.columnsIndex().isEmpty()) + // If the entry is not indexed or the index is not promoted, read from the row start + if (!indexEntry.isIndexed()) { - // skip the row header entirely - cf.delete(new DeletionInfo(rowEntry.deletionTime())); + if (file == null) + file = createFileDataInput(indexEntry.position); + else + file.seek(indexEntry.position); - readIndexedColumns(sstable.metadata, file, columns, rowEntry.columnsIndex(), rowEntry.position, result); - iter = result.iterator(); - return; + DecoratedKey keyInDisk = SSTableReader.decodeKey(sstable.partitioner, + sstable.descriptor, + ByteBufferUtil.readWithShortLength(file)); + assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key, file.getPath()); + SSTableReader.readRowSize(file, sstable.descriptor); } - if (file == null) - file = createFileDataInput(rowEntry.position); - else - file.seek(rowEntry.position); - - DecoratedKey keyInDisk = SSTableReader.decodeKey(sstable.partitioner, - sstable.descriptor, - ByteBufferUtil.readWithShortLength(file)); - assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key, file.getPath()); - SSTableReader.readRowSize(file, sstable.descriptor); - if (sstable.descriptor.version.hasPromotedIndexes) { - indexList = rowEntry.columnsIndex(); - // we'll get row deletion time from the row header below + bf = indexEntry.isIndexed() ? indexEntry.bloomFilter() : null; + indexList = indexEntry.columnsIndex(); } else { - IndexHelper.skipSSTableBloomFilter(file, version); + assert file != null; + bf = IndexHelper.defreezeBloomFilter(file, sstable.descriptor.version.filterType); indexList = IndexHelper.deserializeIndex(file); } - cf.delete(DeletionInfo.serializer().deserializeFromSSTable(file, sstable.descriptor.version)); + if (!indexEntry.isIndexed()) + { + // we can stop early if bloom filter says none of the columns actually exist -- but, + // we can't stop before initializing the cf above, in case there's a relevant tombstone + ColumnFamilySerializer serializer = ColumnFamily.serializer; + try + { + cf = ColumnFamily.create(sstable.metadata); + cf.delete(DeletionInfo.serializer().deserializeFromSSTable(file, sstable.descriptor.version)); + } + catch (Exception e) + { + throw new IOException(serializer + " failed to deserialize " + sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " + file, e); + } + } + else + { + cf = ColumnFamily.create(sstable.metadata); + cf.delete(indexEntry.deletionInfo()); + } + + List<OnDiskAtom> result = new ArrayList<OnDiskAtom>(); + List<ByteBuffer> filteredColumnNames = new ArrayList<ByteBuffer>(columns.size()); + for (ByteBuffer name : columns) + { + if (bf == null || bf.isPresent(name)) + { + filteredColumnNames.add(name); + } + } + if (filteredColumnNames.isEmpty()) + return; if (indexList.isEmpty()) { - readSimpleColumns(file, columns, result); + readSimpleColumns(file, columns, filteredColumnNames, result); } else { - long basePosition = version.hasPromotedIndexes ? rowEntry.position : file.getFilePointer() + 4; - readIndexedColumns(sstable.metadata, file, columns, indexList, basePosition, result); + long basePosition; + if (sstable.descriptor.version.hasPromotedIndexes) + { + basePosition = indexEntry.position; + } + else + { + assert file != null; + file.readInt(); // column count + basePosition = file.getFilePointer(); + } + readIndexedColumns(sstable.metadata, file, columns, filteredColumnNames, indexList, basePosition, result); } // create an iterator view of the columns we read iter = result.iterator(); } - private void readSimpleColumns(FileDataInput file, SortedSet<ByteBuffer> columnNames, List<OnDiskAtom> result) throws IOException + private void readSimpleColumns(FileDataInput file, SortedSet<ByteBuffer> columnNames, List<ByteBuffer> filteredColumnNames, List<OnDiskAtom> result) throws IOException { OnDiskAtom.Serializer atomSerializer = cf.getOnDiskSerializer(); int columns = file.readInt(); + int n = 0; for (int i = 0; i < columns; i++) { OnDiskAtom column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version); @@ -170,7 +207,7 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement if (columnNames.contains(column.name())) { result.add(column); - if (result.size() >= columnNames.size()) + if (n++ > filteredColumnNames.size()) break; } } @@ -184,16 +221,17 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement private void readIndexedColumns(CFMetaData metadata, FileDataInput file, SortedSet<ByteBuffer> columnNames, + List<ByteBuffer> filteredColumnNames, List<IndexHelper.IndexInfo> indexList, long basePosition, List<OnDiskAtom> result) - throws IOException + throws IOException { /* get the various column ranges we have to read */ AbstractType<?> comparator = metadata.comparator; List<IndexHelper.IndexInfo> ranges = new ArrayList<IndexHelper.IndexInfo>(); int lastIndexIdx = -1; - for (ByteBuffer name : columnNames) + for (ByteBuffer name : filteredColumnNames) { int index = IndexHelper.indexFor(name, indexList, comparator, false, lastIndexIdx); if (index < 0 || index == indexList.size()) @@ -213,7 +251,7 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement { long positionToSeek = basePosition + indexInfo.offset; - // With 1.2 promoted indexes, our first seek in the data file will happen at this point + // With new promoted indexes, our first seek in the data file will happen at that point. if (file == null) file = createFileDataInput(positionToSeek); @@ -224,6 +262,7 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement while (file.bytesPastMark(mark) < indexInfo.width) { OnDiskAtom column = atomSerializer.deserializeFromSSTable(file, sstable.descriptor.version); + // we check vs the original Set, not the filtered List, for efficiency if (!(column instanceof IColumn) || columnNames.contains(column.name())) result.add(column); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java index deda040..b30d360 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java +++ b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java @@ -49,7 +49,7 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt private FileMark mark; private final OnDiskAtom.Serializer atomSerializer; - public SimpleSliceReader(SSTableReader sstable, RowIndexEntry rowEntry, FileDataInput input, ByteBuffer finishColumn) + public SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ByteBuffer finishColumn) { this.sstable = sstable; this.finishColumn = finishColumn; @@ -58,13 +58,13 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt { if (input == null) { - this.file = sstable.getFileDataInput(rowEntry.position); + this.file = sstable.getFileDataInput(indexEntry.position); this.needsClosing = true; } else { this.file = input; - input.seek(rowEntry.position); + input.seek(indexEntry.position); this.needsClosing = false; } @@ -72,8 +72,6 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt ByteBufferUtil.skipShortLength(file); SSTableReader.readRowSize(file, sstable.descriptor); - emptyColumnFamily = ColumnFamily.create(sstable.metadata); - Descriptor.Version version = sstable.descriptor.version; if (!version.hasPromotedIndexes) { @@ -81,6 +79,7 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt IndexHelper.skipIndex(file); } + emptyColumnFamily = ColumnFamily.create(sstable.metadata); emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version)); atomSerializer = emptyColumnFamily.getOnDiskSerializer(); columns = file.readInt(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/src/java/org/apache/cassandra/io/sstable/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index 7b916cb..f21a0d5 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -47,7 +47,7 @@ public class Descriptor public static class Version { // This needs to be at the begining for initialization sake - public static final String current_version = "ic"; + public static final String current_version = "ib"; public static final Version LEGACY = new Version("a"); // "pre-history" // b (0.7.0): added version to sstable filenames @@ -62,11 +62,10 @@ public class Descriptor // hd (1.0.10): includes row tombstones in maxtimestamp // he (1.1.3): includes ancestors generation in metadata component // hf (1.1.6): marker that replay position corresponds to 1.1.5+ millis-based id (see CASSANDRA-4782) - // ia (1.2.0): column indexes are promoted to the index file. (this means index offsets are now against the start of the row key, rather than the start of columns data, since the former allows us to skip the row header) + // ia (1.2.0): column indexes are promoted to the index file // records estimated histogram of deletion times in tombstones // bloom filter (keys and columns) upgraded to Murmur3 // ib (1.2.1): tracks min client timestamp in metadata component - // ic (1.2.6): always promotes row-level tombstones into index file; previously this was unreliable public static final Version CURRENT = new Version(current_version); @@ -84,7 +83,6 @@ public class Descriptor public final boolean hasPartitioner; public final boolean tracksTombstones; public final boolean hasPromotedIndexes; - public final boolean hasPromotedRowTombstones; public final FilterFactory.Type filterType; public final boolean hasAncestors; public final boolean hasBloomFilterSizeInHeader; @@ -104,7 +102,6 @@ public class Descriptor metadataIncludesModernReplayPosition = version.compareTo("hf") >= 0; tracksTombstones = version.compareTo("ia") >= 0; hasPromotedIndexes = version.compareTo("ia") >= 0; - hasPromotedRowTombstones = version.compareTo("ic") >= 0; isLatestVersion = version.compareTo(current_version) == 0; if (version.compareTo("f") < 0) filterType = FilterFactory.Type.SHA; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/src/java/org/apache/cassandra/io/sstable/IndexHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java index 8be2457..444ec0b 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java @@ -120,6 +120,11 @@ public class IndexHelper return indexList; } + public static IFilter defreezeBloomFilter(FileDataInput file, FilterFactory.Type type) throws IOException + { + return defreezeBloomFilter(file, Integer.MAX_VALUE, type); + } + /** * De-freeze the bloom filter. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 61f505d..21a8673 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -323,7 +323,7 @@ public class SSTableReader extends SSTable { if (!components.contains(Component.FILTER)) { - bf = AlwaysPresentFilter.instance; + bf = new AlwaysPresentFilter(); return; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index c5f14ea..c64fd27 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -134,7 +134,7 @@ public class SSTableWriter extends SSTable return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer(); } - private RowIndexEntry afterAppend(DecoratedKey decoratedKey, long dataPosition, DeletionTime deletion, ColumnIndex index) + private RowIndexEntry afterAppend(DecoratedKey decoratedKey, long dataPosition, DeletionInfo delInfo, ColumnIndex index) { lastWrittenKey = decoratedKey; last = lastWrittenKey; @@ -143,7 +143,7 @@ public class SSTableWriter extends SSTable if (logger.isTraceEnabled()) logger.trace("wrote " + decoratedKey + " at " + dataPosition); - RowIndexEntry entry = RowIndexEntry.create(dataPosition, deletion, index); + RowIndexEntry entry = RowIndexEntry.create(dataPosition, delInfo, index); iwriter.append(decoratedKey, entry); dbuilder.addPotentialBoundary(dataPosition); return entry; @@ -165,7 +165,7 @@ public class SSTableWriter extends SSTable throw new FSWriteError(e, dataFile.getPath()); } sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats()); - return afterAppend(row.key, currentPosition, row.deletionInfo().getTopLevelDeletion(), row.index()); + return afterAppend(row.key, currentPosition, row.deletionInfo(), row.index()); } public void append(DecoratedKey decoratedKey, ColumnFamily cf) @@ -191,7 +191,7 @@ public class SSTableWriter extends SSTable DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), dataFile.stream); dataFile.stream.writeInt(builder.writtenAtomCount()); dataFile.stream.write(buffer.getData(), 0, buffer.getLength()); - afterAppend(decoratedKey, startPosition, cf.deletionInfo().getTopLevelDeletion(), index); + afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index); } catch (IOException e) { @@ -220,12 +220,12 @@ public class SSTableWriter extends SSTable throw new FSWriteError(e, dataFile.getPath()); } - DeletionTime deletion = DeletionTime.serializer.deserialize(in); + DeletionInfo deletionInfo = DeletionInfo.serializer().deserializeFromSSTable(in, descriptor.version); int columnCount = in.readInt(); try { - DeletionTime.serializer.serialize(deletion, dataFile.stream); + DeletionInfo.serializer().serializeForSSTable(deletionInfo, dataFile.stream); dataFile.stream.writeInt(columnCount); } catch (IOException e) @@ -238,7 +238,7 @@ public class SSTableWriter extends SSTable long maxTimestamp = Long.MIN_VALUE; StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE); ColumnFamily cf = ColumnFamily.create(metadata, ArrayBackedSortedColumns.factory()); - cf.delete(new DeletionInfo(deletion)); + cf.delete(deletionInfo); ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, columnCount, dataFile.stream, true); OnDiskAtom.Serializer atomSerializer = cf.getOnDiskSerializer(); @@ -288,7 +288,7 @@ public class SSTableWriter extends SSTable sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition); sstableMetadataCollector.addColumnCount(columnCount); sstableMetadataCollector.mergeTombstoneHistogram(tombstones); - afterAppend(key, currentPosition, deletion, columnIndexer.build()); + afterAppend(key, currentPosition, deletionInfo, columnIndexer.build()); return currentPosition; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java index 39b3d5d..67ac111 100644 --- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java +++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java @@ -26,10 +26,6 @@ import java.nio.ByteBuffer; public class AlwaysPresentFilter implements IFilter { - public static final AlwaysPresentFilter instance = new AlwaysPresentFilter(); - - private AlwaysPresentFilter() { } - public boolean isPresent(ByteBuffer key) { return true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/src/java/org/apache/cassandra/utils/FilterFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java index 1b9027d..88c8973 100644 --- a/src/java/org/apache/cassandra/utils/FilterFactory.java +++ b/src/java/org/apache/cassandra/utils/FilterFactory.java @@ -131,7 +131,7 @@ public class FilterFactory { assert maxFalsePosProbability <= 1.0 : "Invalid probability"; if (maxFalsePosProbability == 1.0) - return AlwaysPresentFilter.instance; + return new AlwaysPresentFilter(); int bucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements); BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement, maxFalsePosProbability); return createFilter(spec.K, numElements, spec.bucketsPerElement, type, offheap); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java b/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java index 94e795f..398b395 100644 --- a/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java +++ b/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java @@ -3,14 +3,13 @@ package org.apache.cassandra.cache; import java.nio.ByteBuffer; import java.util.UUID; -import org.junit.Test; - import junit.framework.Assert; -import org.apache.cassandra.db.ColumnIndex; -import org.apache.cassandra.db.DeletionTime; + +import org.apache.cassandra.db.DeletionInfo; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.utils.ObjectSizes; import org.github.jamm.MemoryMeter; +import org.junit.Test; public class ObjectSizeTest { @@ -57,7 +56,7 @@ public class ObjectSizeTest @Test public void testKeyCacheValueWithDelInfo() { - RowIndexEntry entry = RowIndexEntry.create(123, new DeletionTime(123, 123), ColumnIndex.nothing()); + RowIndexEntry entry = RowIndexEntry.create(123, new DeletionInfo(123, 123), null); long size = entry.memorySize(); long size2 = meter.measureDeep(entry); Assert.assertEquals(size, size2); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7746225d/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java index c531461..1bc846b 100644 --- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java +++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java @@ -164,6 +164,10 @@ public class RangeTombstoneTest extends SchemaLoader return ByteBufferUtil.bytes(i); } + private static void insertData(ColumnFamilyStore cfs, String key) throws Exception + { + } + private static void add(RowMutation rm, int value, long timestamp) { rm.add(new QueryPath(CFNAME, null, b(value)), b(value), timestamp);