This is an automated email from the ASF dual-hosted git repository. slebresne pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 8358e19840d352475a5831d130ff3c43a11f2f4e Author: Sylvain Lebresne <lebre...@gmail.com> AuthorDate: Fri May 8 18:12:55 2020 +0200 Fix legacy handling of RangeTombstone with collection ones When a multi-row range tombstone interacts with a a collection tombstone within one of a covered row, the resulting range tombstone in the legacy format will start in the middle of the row and extend past said row and it needs special handling. Before this commit, the code deserializing that RT was making it artificially start at the end of the row (in which the collection tombstone is), but that means that when `LegacyLayout.CellGrouper` encountered it, it decided the row was finished, even if it was not, leading to potential row duplication. The patch solves this by: 1. making that problematic tombstone start at the beginning of the row instead of its end (to avoid code deciding the row is over). 2. modify `UnfilteredDeserializer` to 'split' that range tombstone into a row tombstone for the row it covers, which is handled as a normal row tombstone, and push the rest of the range tombstone (that starts after the row and extends to the original end of the RT) to be handled after that row is fully "grouped". The patch also removes the possibility of getting an empty row from `LegacyLayout#getNextRow` to avoid theoretical problems with that. Patch by Sylvain Lebresne; reviewed by Marcus Eriksson & Aleksey Yeschenko for CASSANDRA-15805 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/LegacyLayout.java | 99 ++++++++++++---- .../cassandra/db/UnfilteredDeserializer.java | 129 ++++++++++++++++----- .../upgrade/MixedModeRangeTombstoneTest.java | 73 ++++++++++++ 4 files changed, 252 insertions(+), 50 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index cdb9ad0..46b3f56 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.21 + * Fix duplicated row on 2.x upgrades when multi-rows range tombstones interact with collection ones (CASSANDRA-15805) * Rely on snapshotted session infos on StreamResultFuture.maybeComplete to avoid race conditions (CASSANDRA-15667) * EmptyType doesn't override writeValue so could attempt to write bytes when expected not to (CASSANDRA-15790) * Fix index queries on partition key columns when some partitions contains only static data (CASSANDRA-13666) diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java index 37cc935..39dd54a 100644 --- a/src/java/org/apache/cassandra/db/LegacyLayout.java +++ b/src/java/org/apache/cassandra/db/LegacyLayout.java @@ -1115,7 +1115,7 @@ public abstract class LegacyLayout return true; } - private static Comparator<LegacyAtom> legacyAtomComparator(CFMetaData metadata) + static Comparator<LegacyAtom> legacyAtomComparator(CFMetaData metadata) { return (o1, o2) -> { @@ -1373,8 +1373,24 @@ public abstract class LegacyLayout this.hasValidCells = false; } + /** + * Try adding the provided atom to the currently grouped row. + * + * @param atom the new atom to try to add. This <b>must</b> be a "row" atom, that is either a cell or a legacy + * range tombstone that covers only one row (row deletion) or a subset of it (collection + * deletion). Meaning that legacy range tombstone covering multiple rows (that should be handled as + * legit range tombstone in the new storage engine) should be handled separately. Atoms should also + * be provided in proper clustering order. + * @return {@code true} if the provided atom has been "consumed" by this grouper (this does _not_ mean the + * atom has been "used" by the grouper as the grouper will skip some shadowed atoms for instance, just + * that {@link #getRow()} shouldn't be called just yet if there is more atom in the atom iterator we're + * grouping). {@code false} otherwise, that is if the row currently built by this grouper is done + * _without_ the provided atom being "consumed" (and so {@link #getRow()} should be called and the + * grouper resetted, after which the provided atom should be provided again). + */ public boolean addAtom(LegacyAtom atom) { + assert atom.isRowAtom(metadata) : "Unexpected non in-row legacy range tombstone " + atom; return atom.isCell() ? addCell(atom.asCell()) : addRangeTombstone(atom.asRangeTombstone()); @@ -1472,11 +1488,16 @@ public abstract class LegacyLayout private boolean addRangeTombstone(LegacyRangeTombstone tombstone) { if (tombstone.isRowDeletion(metadata)) + { return addRowTombstone(tombstone); - else if (tombstone.isCollectionTombstone()) - return addCollectionTombstone(tombstone); + } else - return addGenericRangeTombstone(tombstone); + { + // The isRowAtom() assertion back in addAtom would have already triggered otherwise, but spelling it + // out nonetheless. + assert tombstone.isCollectionTombstone(); + return addCollectionTombstone(tombstone); + } } private boolean addRowTombstone(LegacyRangeTombstone tombstone) @@ -1545,24 +1566,32 @@ public abstract class LegacyLayout return true; } - private boolean addGenericRangeTombstone(LegacyRangeTombstone tombstone) + /** + * Whether the provided range tombstone starts strictly after the current row of the cell grouper (if no row is + * currently started, this return false). + */ + public boolean startsAfterCurrentRow(LegacyRangeTombstone rangeTombstone) { - /* - * We can see a non-collection, non-row deletion in two scenarios: - * - * 1. Most commonly, the tombstone's start bound is bigger than current row's clustering, which means that - * the current row is over, and we should move on to the next row or RT; - * - * 2. Less commonly, the tombstone's start bound is smaller than current row's clustering, which means that - * we've crossed an index boundary and are seeing a non-closed RT from the previous block, repeated; - * we should ignore it and stay in the current row. - * - * In either case, clustering should be non-null, or we shouldn't have gotten to this method at all - * However, to be absolutely SURE we're in case two above, we check here. - */ - return clustering != null && metadata.comparator.compare(clustering, tombstone.start.bound.clustering()) > 0; + return clustering != null && metadata.comparator.compare(rangeTombstone.start.bound, clustering) > 0; + } + + /** + * The clustering of the current row of the cell grouper, or {@code null} if no row is currently started. + */ + public Clustering currentRowClustering() + { + return clustering; } + /** + * Generates the row currently grouped by this grouper and reset it for the following row. + * <p> + * Note that the only correct way to call this is when either all the atom we're trying to group has been + * consumed, or when {@link #addAtom(LegacyAtom)} returns {@code false}. + * + * @return the current row that has been grouped, or {@code null} in the rare case where all the atoms + * "consumed" by {@link #addAtom(LegacyAtom)} for this row were skipped (we skip atoms under a few conditions). + */ public Row getRow() { if (!hasValidCells && invalidLivenessInfo != null) @@ -1718,6 +1747,12 @@ public abstract class LegacyLayout public LegacyCell asCell(); public LegacyRangeTombstone asRangeTombstone(); + + /** + * Whether the atom is one that becomes part of a {@link Row} in the new storage engine, meaning it is either + * as cell or a legacy range tombstone that covers a single row, or parts of one. + */ + public boolean isRowAtom(CFMetaData metadata); } /** @@ -1835,6 +1870,12 @@ public abstract class LegacyLayout throw new UnsupportedOperationException(); } + @Override + public boolean isRowAtom(CFMetaData metaData) + { + return true; + } + public boolean isCounter() { return kind == Kind.COUNTER; @@ -1889,9 +1930,9 @@ public abstract class LegacyLayout if ((start.collectionName == null) != (stop.collectionName == null)) { if (start.collectionName == null) - stop = new LegacyBound(stop.bound, stop.isStatic, null); + stop = new LegacyBound(Slice.Bound.inclusiveEndOf(stop.bound.values), stop.isStatic, null); else - start = new LegacyBound(start.bound, start.isStatic, null); + start = new LegacyBound(Slice.Bound.inclusiveStartOf(start.bound.values), start.isStatic, null); } else if (!Objects.equals(start.collectionName, stop.collectionName)) { @@ -1918,11 +1959,21 @@ public abstract class LegacyLayout return new LegacyRangeTombstone(newStart, stop, deletionTime); } + public LegacyRangeTombstone withNewStart(Slice.Bound newStart) + { + return withNewStart(new LegacyBound(newStart, start.isStatic, null)); + } + public LegacyRangeTombstone withNewEnd(LegacyBound newStop) { return new LegacyRangeTombstone(start, newStop, deletionTime); } + public LegacyRangeTombstone withNewEnd(Slice.Bound newEnd) + { + return withNewEnd(new LegacyBound(newEnd, stop.isStatic, null)); + } + public boolean isCell() { return false; @@ -1943,6 +1994,12 @@ public abstract class LegacyLayout return this; } + @Override + public boolean isRowAtom(CFMetaData metadata) + { + return isCollectionTombstone() || isRowDeletion(metadata); + } + public boolean isCollectionTombstone() { return start.collectionName != null; diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java index 62ad76a..2d270bc 100644 --- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java +++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java @@ -480,16 +480,7 @@ public abstract class UnfilteredDeserializer this.helper = helper; this.grouper = new LegacyLayout.CellGrouper(metadata, helper); this.tombstoneTracker = new TombstoneTracker(partitionDeletion); - this.atoms = new AtomIterator(atomReader); - } - - private boolean isRow(LegacyLayout.LegacyAtom atom) - { - if (atom.isCell()) - return true; - - LegacyLayout.LegacyRangeTombstone tombstone = atom.asRangeTombstone(); - return tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata); + this.atoms = new AtomIterator(atomReader, metadata); } public boolean hasNext() @@ -515,7 +506,7 @@ public abstract class UnfilteredDeserializer if (tombstoneTracker.isShadowed(atom)) continue; - if (isRow(atom)) + if (atom.isRowAtom(metadata)) next = readRow(atom); else tombstoneTracker.openNew(atom.asRangeTombstone()); @@ -539,13 +530,57 @@ public abstract class UnfilteredDeserializer ? LegacyLayout.CellGrouper.staticGrouper(metadata, helper) : this.grouper; grouper.reset(); + // We know the first atom is not shadowed and is a "row" atom, so can be added blindly. grouper.addAtom(first); - // As long as atoms are part of the same row, consume them. Note that the call to addAtom() uses - // atoms.peek() so that the atom is only consumed (by next) if it's part of the row (addAtom returns true) - while (atoms.hasNext() && grouper.addAtom(atoms.peek())) + + // We're less sure about the next atoms. In particular, CellGrouper want to make sure we only pass it + // "row" atoms (it's the only type it knows how to handle) so we should handle anything else. + while (atoms.hasNext()) { - atoms.next(); + // Peek, but don't consume the next atom just yet + LegacyLayout.LegacyAtom atom = atoms.peek(); + // First, that atom may be shadowed in which case we can simply ignore it. Note that this handles + // the case of repeated RT start marker after we've crossed an index boundary, which could well + // appear in the middle of a row (CASSANDRA-14008). + if (!tombstoneTracker.hasClosingMarkerBefore(atom) && tombstoneTracker.isShadowed(atom)) + { + atoms.next(); // consume the atom since we only peeked it so far + continue; + } + + // Second, we should only pass "row" atoms to the cell grouper + if (atom.isRowAtom(metadata)) + { + if (!grouper.addAtom(atom)) + break; // done with the row; don't consume the atom + atoms.next(); // the grouper "accepted" the atom, consume it since we only peeked above + } + else + { + LegacyLayout.LegacyRangeTombstone rt = (LegacyLayout.LegacyRangeTombstone) atom; + // This means we have a non-row range tombstone. Unfortunately, that does not guarantee the + // current row is finished (though it may), because due to the logic within LegacyRangeTombstone + // constructor, we can get an out-of-order RT that includes on the current row (even if it is + // already started) and extends past it. + + // So first, evacuate the easy case of the range tombstone simply starting after the current + // row, in which case we're done with the current row (but don't consume the new RT yet so it + // gets handled as any other non-row RT). + if (grouper.startsAfterCurrentRow(rt)) + break; + + // Otherwise, we "split" the RT in 2: the part covering the current row, which is now an + // inRowAtom and can be passed to the grouper, and the part after that, which we push back into + // the iterator for later processing. + Clustering currentRow = grouper.currentRowClustering(); + atoms.next(); // consume since we had only just peeked it so far and we're using it + atoms.pushOutOfOrder(rt.withNewStart(Slice.Bound.exclusiveStartOf(currentRow))); + // Note: in theory the withNewStart is a no-op here, but not taking any risk + grouper.addAtom(rt.withNewStart(Slice.Bound.inclusiveStartOf(currentRow)) + .withNewEnd(Slice.Bound.inclusiveEndOf(currentRow))); + } } + return grouper.getRow(); } @@ -583,51 +618,87 @@ public abstract class UnfilteredDeserializer private static class AtomIterator implements PeekingIterator<LegacyLayout.LegacyAtom> { private final Supplier<LegacyLayout.LegacyAtom> atomReader; - private boolean isDone; + private boolean readerExhausted; private LegacyLayout.LegacyAtom next; - private AtomIterator(Supplier<LegacyLayout.LegacyAtom> atomReader) + private final Comparator<LegacyLayout.LegacyAtom> atomComparator; + // May temporarily store atoms that needs to be handler later than when they were deserialized. + // Lazily initialized since it is used infrequently. + private Queue<LegacyLayout.LegacyAtom> outOfOrderAtoms; + + private AtomIterator(Supplier<LegacyLayout.LegacyAtom> atomReader, CFMetaData metadata) { this.atomReader = atomReader; + this.atomComparator = LegacyLayout.legacyAtomComparator(metadata); } public boolean hasNext() { - if (isDone) - return false; + if (readerExhausted) + return hasOutOfOrderAtoms(); // We have to return out of order atoms when reader exhausts + // Note that next() and peek() assumes that next has been set by this method, so we do it even if + // we have some outOfOrderAtoms stacked up. if (next == null) - { next = atomReader.get(); - if (next == null) - { - isDone = true; - return false; - } - } - return true; + + readerExhausted = next == null; + return !readerExhausted || hasOutOfOrderAtoms(); } public LegacyLayout.LegacyAtom next() { if (!hasNext()) throw new UnsupportedOperationException(); + + if (hasOutOrderAtomBeforeNext()) + return outOfOrderAtoms.poll(); + LegacyLayout.LegacyAtom toReturn = next; next = null; return toReturn; } + private boolean hasOutOfOrderAtoms() + { + return outOfOrderAtoms != null && !outOfOrderAtoms.isEmpty(); + } + + private boolean hasOutOrderAtomBeforeNext() + { + // Note that if outOfOrderAtoms is null, the first condition will be false, so we can save a null + // check on calling `outOfOrderAtoms.peek()` in the right branch. + return hasOutOfOrderAtoms() + && (next == null || atomComparator.compare(outOfOrderAtoms.peek(), next) <= 0); + } + public LegacyLayout.LegacyAtom peek() { if (!hasNext()) throw new UnsupportedOperationException(); + if (hasOutOrderAtomBeforeNext()) + return outOfOrderAtoms.peek(); return next; } + /** + * Push back an atom in the iterator assuming said atom sorts strictly _after_ the atom returned by + * the last next() call (meaning the pushed atom fall in the part of the iterator that has not been + * returned yet, not before). The atom will then be returned by the iterator in proper order. + */ + public void pushOutOfOrder(LegacyLayout.LegacyAtom atom) + { + if (outOfOrderAtoms == null) + outOfOrderAtoms = new PriorityQueue<>(atomComparator); + outOfOrderAtoms.offer(atom); + } + public void clearState() { this.next = null; - this.isDone = false; + this.readerExhausted = false; + if (outOfOrderAtoms != null) + outOfOrderAtoms.clear(); } public void remove() @@ -685,7 +756,7 @@ public abstract class UnfilteredDeserializer if (partitionDeletion.deletes(timestamp)) return true; - SortedSet<LegacyLayout.LegacyRangeTombstone> coveringTombstones = isRow(atom) ? openTombstones : openTombstones.tailSet(atom.asRangeTombstone()); + SortedSet<LegacyLayout.LegacyRangeTombstone> coveringTombstones = atom.isRowAtom(metadata) ? openTombstones : openTombstones.tailSet(atom.asRangeTombstone()); return Iterables.any(coveringTombstones, tombstone -> tombstone.deletionTime.deletes(timestamp)); } diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRangeTombstoneTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRangeTombstoneTest.java new file mode 100644 index 0000000..e4b3a17 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRangeTombstoneTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import org.junit.Test; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.shared.DistributedTestBase; +import org.apache.cassandra.distributed.shared.Versions; + +import static java.lang.String.format; +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; +import static org.apache.cassandra.distributed.shared.AssertUtils.row; + +/** + * Tests related to the handle of range tombstones during 2.x to 3.x upgrades. + */ +public class MixedModeRangeTombstoneTest extends UpgradeTestBase +{ + /** + * Tests the interaction of range tombstones covering multiple rows and collection tombsones within the covered + * rows. + * + * <p>This test reproduces the issue of CASSANDRA-15805. + */ + @Test + public void multiRowsRangeTombstoneAndCollectionTombstoneInteractionTest() throws Throwable { + String tableName = DistributedTestBase.KEYSPACE + ".t"; + String schema = "CREATE TABLE " + tableName + " (" + + " k int," + + " c1 text," + + " c2 text," + + " a text," + + " b set<text>," + + " c text," + + " PRIMARY KEY((k), c1, c2)" + + " )"; + + + new TestCase() + .nodes(2) + .upgrade(Versions.Major.v22, Versions.Major.v30) + .setup(cluster -> { + cluster.schemaChange(schema); + cluster.coordinator(1).execute(format("DELETE FROM %s USING TIMESTAMP 1 WHERE k = 0 AND c1 = 'A'", tableName), ConsistencyLevel.ALL); + cluster.coordinator(1).execute(format("INSERT INTO %s(k, c1, c2, a, b, c) VALUES (0, 'A', 'X', 'foo', {'whatever'}, 'bar') USING TIMESTAMP 2", tableName), ConsistencyLevel.ALL); + cluster.coordinator(1).execute(format("DELETE b FROM %s USING TIMESTAMP 3 WHERE k = 0 AND c1 = 'A' and c2 = 'X'", tableName), ConsistencyLevel.ALL); + cluster.get(1).flush(DistributedTestBase.KEYSPACE); + cluster.get(2).flush(DistributedTestBase.KEYSPACE); + }) + .runAfterNodeUpgrade((cluster, node) -> { + assertRows(cluster.coordinator(node).execute(format("SELECT * FROM %s", tableName), ConsistencyLevel.ALL), + row(0, "A", "X", "foo", null, "bar")); + }) + .run(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org