Repository: cassandra Updated Branches: refs/heads/trunk 29576a44d -> 0e3da95d6
Skip redundant tombstones on compaction. Patch by Branimir Lambov; reviewed by marcuse for CASSANDRA-7953 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a61fc01f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a61fc01f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a61fc01f Branch: refs/heads/trunk Commit: a61fc01f418426847e3aad133127da3615813236 Parents: 02f88e3 Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Wed Oct 7 14:46:24 2015 +0300 Committer: Marcus Eriksson <marc...@apache.org> Committed: Thu Oct 15 15:28:42 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnIndex.java | 32 +++-- .../org/apache/cassandra/db/RangeTombstone.java | 135 ++++++++++--------- .../cassandra/cql3/RangeTombstoneMergeTest.java | 125 +++++++++++++++++ 4 files changed, 218 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a61fc01f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b16acb5..68b44ed 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.12 + * Merge range tombstones during compaction (CASSANDRA-7953) * (cqlsh) Distinguish negative and positive infinity in output (CASSANDRA-10523) * (cqlsh) allow custom time_format for COPY TO (CASSANDRA-8970) * Don't allow startup if the node's rack has changed (CASSANDRA-10242) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a61fc01f/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index d9d6a9c..0ea5c87 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -180,14 +180,24 @@ public class ColumnIndex firstColumn = column; startPosition = endPosition; // TODO: have that use the firstColumn as min + make sure we optimize that on read - endPosition += tombstoneTracker.writeOpenedMarker(firstColumn, output, atomSerializer); + endPosition += tombstoneTracker.writeOpenedMarkers(firstColumn.name(), output, atomSerializer); blockSize = 0; // We don't count repeated tombstone marker in the block size, to avoid a situation // where we wouldn't make any progress because a block is filled by said marker + + maybeWriteRowHeader(); } - long size = atomSerializer.serializedSizeForSSTable(column); - endPosition += size; - blockSize += size; + if (tombstoneTracker.update(column, false)) + { + long size = tombstoneTracker.writeUnwrittenTombstones(output, atomSerializer); + size += atomSerializer.serializedSizeForSSTable(column); + endPosition += size; + blockSize += size; + + atomSerializer.serializeForSSTable(column, output); + } + + lastColumn = column; // if we hit the column index size that we have to index after, go ahead and index it. if (blockSize >= DatabaseDescriptor.getColumnIndexSize()) @@ -197,14 +207,6 @@ public class ColumnIndex firstColumn = null; lastBlockClosing = column; } - - maybeWriteRowHeader(); - atomSerializer.serializeForSSTable(column, output); - - // TODO: Should deal with removing unneeded tombstones - tombstoneTracker.update(column, false); - - lastColumn = column; } private void maybeWriteRowHeader() throws IOException @@ -216,12 +218,16 @@ public class ColumnIndex } } - public ColumnIndex build() + public ColumnIndex build() throws IOException { // all columns were GC'd after all if (lastColumn == null) return ColumnIndex.EMPTY; + long size = tombstoneTracker.writeUnwrittenTombstones(output, atomSerializer); + endPosition += size; + blockSize += size; + // the last column may have fallen on an index boundary already. if not, index it explicitly. if (result.columnsIndex.isEmpty() || lastBlockClosing != lastColumn) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a61fc01f/src/java/org/apache/cassandra/db/RangeTombstone.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java index 590b005..4d22d48 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstone.java +++ b/src/java/org/apache/cassandra/db/RangeTombstone.java @@ -24,6 +24,7 @@ import java.security.MessageDigest; import java.util.*; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.OnDiskAtom.Serializer; import org.apache.cassandra.db.composites.CType; import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.io.ISSTableSerializer; @@ -122,7 +123,12 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements // never have to test the RTs start since it's always assumed to be less than what we have. // Also note that this will store expired RTs (#7810). Those will be of type ExpiredRangeTombstone and // will be ignored by writeOpenedMarker. - private final List<RangeTombstone> openedTombstones = new LinkedList<RangeTombstone>(); + private final List<RangeTombstone> openedTombstones = new LinkedList<>(); + + // Holds tombstones that are processed but not yet written out. Delaying the write allows us to remove + // duplicate / completely covered tombstones. + // Sorted in open order (to be written in that order). + private final Set<RangeTombstone> unwrittenTombstones = new LinkedHashSet<>(); // Total number of atoms written by writeOpenedMarker(). private int atomCount; @@ -146,54 +152,49 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements * @return the total serialized size of said tombstones and write them to * {@code out} it if isn't null. */ - public long writeOpenedMarker(OnDiskAtom firstColumn, DataOutputPlus out, OnDiskAtom.Serializer atomSerializer) throws IOException + public long writeOpenedMarkers(Composite startPos, DataOutputPlus out, OnDiskAtom.Serializer atomSerializer) throws IOException { long size = 0; - if (openedTombstones.isEmpty()) - return size; - - /* - * Compute the markers that needs to be written at the beginning of - * this block. We need to write one if it is the more recent - * (opened) tombstone for at least some part of its range. - */ - List<RangeTombstone> toWrite = new LinkedList<RangeTombstone>(); - outer: - for (RangeTombstone tombstone : openedTombstones) - { - // If the first column is outside the range, skip it (in case update() hasn't been called yet) - if (comparator.compare(firstColumn.name(), tombstone.max) > 0) - continue; - if (tombstone instanceof ExpiredRangeTombstone) + for (RangeTombstone rt : openedTombstones) + { + if (rt instanceof ExpiredRangeTombstone || comparator.compare(rt.max, startPos) < 0) continue; - RangeTombstone updated = new RangeTombstone(firstColumn.name(), tombstone.max, tombstone.data); - - Iterator<RangeTombstone> iter = toWrite.iterator(); - while (iter.hasNext()) - { - RangeTombstone other = iter.next(); - if (other.supersedes(updated, comparator)) - break outer; - if (updated.supersedes(other, comparator)) - iter.remove(); - } - toWrite.add(tombstone); + size += writeTombstone(rt, out, atomSerializer); } + return size; + } - for (RangeTombstone tombstone : toWrite) + /** + * Writes out all tombstones that have been accepted after the previous call of this method. + * Tombstones are not written immediately to permit redundant ones to be skipped. + * + * @return the serialized size of written tombstones + */ + public long writeUnwrittenTombstones(DataOutputPlus out, OnDiskAtom.Serializer atomSerializer) throws IOException + { + long size = 0; + for (RangeTombstone rt : unwrittenTombstones) { - size += atomSerializer.serializedSizeForSSTable(tombstone); - atomCount++; - if (out != null) - atomSerializer.serializeForSSTable(tombstone, out); + size += writeTombstone(rt, out, atomSerializer); } + unwrittenTombstones.clear(); + return size; + } + + private long writeTombstone(RangeTombstone rt, DataOutputPlus out, OnDiskAtom.Serializer atomSerializer) + throws IOException + { + long size = atomSerializer.serializedSizeForSSTable(rt); + atomCount++; + if (out != null) + atomSerializer.serializeForSSTable(rt, out); return size; } /** - * The total number of atoms written by calls to the method {@link #writeOpenedMarker}. + * The total number of atoms written by calls to the above methods. */ public int writtenAtom() { @@ -210,7 +211,7 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements * Note that this method should be called on *every* atom of a partition for * the tracker to work as efficiently as possible (#9486). */ - public void update(OnDiskAtom atom, boolean isExpired) + public boolean update(OnDiskAtom atom, boolean isExpired) { // Get rid of now useless RTs ListIterator<RangeTombstone> iterator = openedTombstones.listIterator(); @@ -223,6 +224,8 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements if (comparator.compare(atom.name(), t.max) > 0) { iterator.remove(); + // The iterator may still be in the unwrittenTombstones list. That's ok, it still needs to be written + // but it can't influence anything else. } else { @@ -237,8 +240,6 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements if (atom instanceof RangeTombstone) { RangeTombstone toAdd = (RangeTombstone)atom; - if (isExpired) - toAdd = new ExpiredRangeTombstone(toAdd); // We want to maintain openedTombstones in end bounds order so we find where to insert the new element // and add it. While doing so, we also check if that new tombstone fully shadow or is fully shadowed @@ -252,41 +253,51 @@ public class RangeTombstone extends Interval<Composite, DeletionTime> implements { // the new one covers more than the existing one. If the new one happens to also supersedes // the existing one, remove the existing one. In any case, we're not done yet. - if (toAdd.data.supersedes(existing.data)) + if (!existing.data.supersedes(toAdd.data)) + { iterator.remove(); + // If the existing one starts at the same position as the new, it does not need to be written + // (it won't have been yet). + if (comparator.compare(toAdd.min, existing.min) == 0) + unwrittenTombstones.remove(existing); + } } else { // the new one is included in the existing one. If the new one supersedes the existing one, // then we add the new one (and if the new one ends like the existing one, we can actually remove // the existing one), otherwise we can actually ignore it. In any case, we're done. - if (toAdd.data.supersedes(existing.data)) + if (!toAdd.data.supersedes(existing.data)) + return false; + + if (cmp == 0) { - if (cmp == 0) - iterator.set(toAdd); - else - insertBefore(toAdd, iterator); + iterator.remove(); + // If the existing one starts at the same position as the new, it does not need to be written + // (it won't have been yet). + if (comparator.compare(toAdd.min, existing.min) == 0) + unwrittenTombstones.remove(existing); } - return; + else + { + iterator.previous(); + } + // Found the insert position for the new tombstone + break; } } - // If we reach here, either we had no tombstones and the new one ends after all existing ones. - iterator.add(toAdd); - } - } - /** - * Adds the provided {@code tombstone} _before_ the last element returned by {@code iterator.next()}. - * <p> - * This method assumes that {@code iterator.next()} has been called prior to this method call, i.e. that - * {@code iterator.hasPrevious() == true}. - */ - private static void insertBefore(RangeTombstone tombstone, ListIterator<RangeTombstone> iterator) - { - assert iterator.hasPrevious(); - iterator.previous(); - iterator.add(tombstone); - iterator.next(); + if (isExpired) + iterator.add(new ExpiredRangeTombstone(toAdd)); + else + { + iterator.add(toAdd); + unwrittenTombstones.add(toAdd); + } + return false; + } + // Caller should write cell. + return true; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/a61fc01f/test/unit/org/apache/cassandra/cql3/RangeTombstoneMergeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/RangeTombstoneMergeTest.java b/test/unit/org/apache/cassandra/cql3/RangeTombstoneMergeTest.java new file mode 100644 index 0000000..0460a16 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/RangeTombstoneMergeTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Iterables; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.Util; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.composites.*; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableReader; + +public class RangeTombstoneMergeTest extends CQLTester +{ + @Before + public void before() throws Throwable + { + createTable("CREATE TABLE %s(" + + " key text," + + " column text," + + " data text," + + " extra text," + + " PRIMARY KEY(key, column)" + + ");"); + + // If the sstable only contains tombstones during compaction it seems that the sstable either gets removed or isn't created (but that could probably be a separate JIRA issue). + execute("INSERT INTO %s (key, column, data) VALUES (?, ?, ?)", "1", "1", "1"); + } + + @Test + public void testEqualMerge() throws Throwable + { + addRemoveAndFlush(); + + for (int i=0; i<3; ++i) + { + addRemoveAndFlush(); + compact(); + } + + assertOneTombstone(); + } + + @Test + public void testRangeMerge() throws Throwable + { + addRemoveAndFlush(); + + execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", "1", "2", "2", "2"); + execute("DELETE extra FROM %s WHERE key=? AND column=?", "1", "2"); + + flush(); + compact(); + + execute("DELETE FROM %s WHERE key=? AND column=?", "1", "2"); + + flush(); + compact(); + + assertOneTombstone(); + } + + void assertOneTombstone() throws Throwable + { + assertRows(execute("SELECT column FROM %s"), + row("1")); + assertAllRows(row("1", "1", "1", null)); + + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()); + ColumnFamily cf = cfs.getColumnFamily(Util.dk("1"), Composites.EMPTY, Composites.EMPTY, false, 100, System.currentTimeMillis()); + assertTrue(cf.deletionInfo().hasRanges()); + assertEquals(1, cf.deletionInfo().rangeCount()); // Ranges merged during CF construction + + assertEquals(1, cfs.getSSTables().size()); + SSTableReader reader = Iterables.get(cfs.getSSTables(), 0); + assertEquals(1, countTombstones(reader)); // See CASSANDRA-7953. + } + + void addRemoveAndFlush() throws Throwable + { + execute("INSERT INTO %s (key, column, data) VALUES (?, ?, ?)", "1", "2", "2"); + execute("DELETE FROM %s WHERE key=? AND column=?", "1", "2"); + flush(); + } + + int countTombstones(SSTableReader reader) + { + int tombstones = 0; + ISSTableScanner partitions = reader.getScanner(); + while (partitions.hasNext()) + { + OnDiskAtomIterator iter = partitions.next(); + while (iter.hasNext()) + { + OnDiskAtom atom = iter.next(); + if (atom instanceof RangeTombstone) + ++tombstones; + } + } + return tombstones; + } +}