Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 6a1b1f26b -> 5e57dd14e refs/heads/cassandra-3.11 59d4c2719 -> ed9b04d6a refs/heads/trunk d2dcd7f88 -> 652d9f64f
Potential AssertionError during ReadRepair of range tombstone and partition deletions patch by Sylvain Lebresne; reviewed by Branimir Lambov for CASSANDRA-13719 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5e57dd14 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5e57dd14 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5e57dd14 Branch: refs/heads/cassandra-3.0 Commit: 5e57dd14eb37adf06d2105227e0105d871ea6f76 Parents: 6a1b1f2 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Fri Jul 21 16:58:53 2017 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Thu Aug 24 11:28:57 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ReadResponse.java | 22 +++++ .../db/partitions/AbstractBTreePartition.java | 14 +-- .../db/partitions/PartitionUpdate.java | 9 ++ .../apache/cassandra/service/DataResolver.java | 70 +++++++++++++-- .../cassandra/service/DataResolverTest.java | 91 +++++++++++++++++++- 6 files changed, 192 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 97dda05..2b49bc3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.15 + * Potential AssertionError during ReadRepair of range tombstone and partition deletions (CASSANDRA-13719) * Don't let stress write warmup data if n=0 (CASSANDRA-13773) * Gossip thread slows down when using batch commit log (CASSANDRA-12966) * Randomize batchlog endpoint selection with only 1 or 2 racks (CASSANDRA-12884) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 693b52b..c59d00a 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -92,6 +92,28 @@ public abstract class ReadResponse public abstract boolean isDigestResponse(); + /** + * Creates a string of the requested partition in this read response suitable for debugging. + */ + public String toDebugString(ReadCommand command, DecoratedKey key) + { + if (isDigestResponse()) + return "Digest:0x" + ByteBufferUtil.bytesToHex(digest(command)); + + try (UnfilteredPartitionIterator iter = makeIterator(command)) + { + while (iter.hasNext()) + { + try (UnfilteredRowIterator partition = iter.next()) + { + if (partition.partitionKey().equals(key)) + return ImmutableBTreePartition.create(partition).toString(); + } + } + } + return "<key " + key + " not found>"; + } + protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, ReadCommand command) { MessageDigest digest = FBUtilities.threadLocalMD5Digest(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java index 2aa622e..1f3dbd0 100644 --- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java @@ -99,7 +99,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row> public DeletionTime partitionLevelDeletion() { - return holder().deletionInfo.getPartitionDeletion(); + return deletionInfo().getPartitionDeletion(); } public PartitionColumns columns() @@ -372,17 +372,21 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row> { StringBuilder sb = new StringBuilder(); - sb.append(String.format("[%s.%s] key=%s columns=%s", + sb.append(String.format("[%s.%s] key=%s partition_deletion=%s columns=%s", metadata.ksName, metadata.cfName, metadata.getKeyValidator().getString(partitionKey().getKey()), + partitionLevelDeletion(), columns())); if (staticRow() != Rows.EMPTY_STATIC_ROW) - sb.append("\n ").append(staticRow().toString(metadata)); + sb.append("\n ").append(staticRow().toString(metadata, true)); - for (Row row : this) - sb.append("\n ").append(row.toString(metadata)); + try (UnfilteredRowIterator iter = unfilteredIterator()) + { + while (iter.hasNext()) + sb.append("\n ").append(iter.next().toString(metadata, true)); + } return sb.toString(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index 2a881a3..7bd5345 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -299,6 +299,15 @@ public class PartitionUpdate extends AbstractBTreePartition return fromIterator(UnfilteredRowIterators.merge(asIterators, nowInSecs)); } + // We override this, because the version in the super-class calls holder(), which build the update preventing + // further updates, but that's not necessary here and being able to check at least the partition deletion without + // "locking" the update is nice (and used in DataResolver.RepairMergeListener.MergeListener). + @Override + public DeletionInfo deletionInfo() + { + return deletionInfo; + } + /** * Modify this update to set every timestamp for live data to {@code newTimestamp} and * every deletion timestamp to {@code newTimestamp - 1}. http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index c96a893..26b1b2a 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -22,6 +22,8 @@ import java.util.*; import java.util.concurrent.TimeoutException; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; @@ -229,6 +231,17 @@ public class DataResolver extends ResponseResolver return repairs[i]; } + /** + * The partition level deletion with with which source {@code i} is currently repaired, or + * {@code DeletionTime.LIVE} if the source is not repaired on the partition level deletion (meaning it was + * up to date on it). The output* of this method is only valid after the call to + * {@link #onMergedPartitionLevelDeletion}. + */ + private DeletionTime partitionLevelRepairDeletion(int i) + { + return repairs[i] == null ? DeletionTime.LIVE : repairs[i].partitionLevelDeletion(); + } + private Row.Builder currentRow(int i, Clustering clustering) { if (currentRows[i] == null) @@ -273,6 +286,37 @@ public class DataResolver extends ResponseResolver public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) { + try + { + // The code for merging range tombstones is a tad complex and we had the assertions there triggered + // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights + // when that happen without more context that what the assertion errors give us however, hence the + // catch here that basically gather as much as context as reasonable. + internalOnMergedRangeTombstoneMarkers(merged, versions); + } + catch (AssertionError e) + { + // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd + // rather get more info to debug than not. + CFMetaData table = command.metadata(); + String details = String.format("Error merging RTs on %s.%s: merged=%s, versions=%s, sources={%s}, responses:%n %s", + table.ksName, table.cfName, + merged == null ? "null" : merged.toString(table), + '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', + Arrays.toString(sources), + makeResponsesDebugString()); + throw new AssertionError(details, e); + } + } + + private String makeResponsesDebugString() + { + return Joiner.on(",\n") + .join(Iterables.transform(getMessages(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey))); + } + + private void internalOnMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) + { // The current deletion as of dealing with this marker. DeletionTime currentDeletion = currentDeletion(); @@ -297,21 +341,27 @@ public class DataResolver extends ResponseResolver // active after that point. Further whatever deletion was open or is open by this marker on the // source, that deletion cannot supersedes the current one. // - // But while the marker deletion (before and/or after this point) cannot supersed the current + // But while the marker deletion (before and/or after this point) cannot supersede the current // deletion, we want to know if it's equal to it (both before and after), because in that case // the source is up to date and we don't want to include repair. // // So in practice we have 2 possible case: - // 1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null). Then - // it won't be from that point on unless it's a boundary and the new opened deletion time - // is also equal to the current deletion (note that this implies the boundary has the same - // closing and opening deletion time, which should generally not happen, but can due to legacy - // reading code not avoiding this for a while, see CASSANDRA-13237). - // 2) the source wasn't up-to-date on deletion up to that point (markerToRepair[i] != null), and - // it may now be (if it isn't we just have nothing to do for that marker). + // 1) the source was up-to-date on deletion up to that point: then it won't be from that point + // on unless it's a boundary and the new opened deletion time is also equal to the current + // deletion (note that this implies the boundary has the same closing and opening deletion + // time, which should generally not happen, but can due to legacy reading code not avoiding + // this for a while, see CASSANDRA-13237). + // 2) the source wasn't up-to-date on deletion up to that point and it may now be (if it isn't + // we just have nothing to do for that marker). assert !currentDeletion.isLive() : currentDeletion.toString(); - if (markerToRepair[i] == null) + // Is the source up to date on deletion? It's up to date if it doesn't have an open RT repair + // nor an "active" partition level deletion (where "active" means that it's greater or equal + // to the current deletion: if the source has a repaired partition deletion lower than the + // current deletion, this means the current deletion is due to a previously open range tombstone, + // and if the source isn't currently repaired for that RT, then it means it's up to date on it). + DeletionTime partitionRepairDeletion = partitionLevelRepairDeletion(i); + if (markerToRepair[i] == null && currentDeletion.supersedes(partitionRepairDeletion)) { // Since there is an ongoing merged deletion, the only way we don't have an open repair for // this source is that it had a range open with the same deletion as current and it's @@ -326,6 +376,8 @@ public class DataResolver extends ResponseResolver markerToRepair[i] = marker.closeBound(isReversed).invert(); } // In case 2) above, we only have something to do if the source is up-to-date after that point + // (which, since the source isn't up-to-date before that point, means we're opening a new deletion + // that is equal to the current one). else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed))) { closeOpenMarker(i, marker.openBound(isReversed).invert()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5e57dd14/test/unit/org/apache/cassandra/service/DataResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java index 2f72093..65e18ce 100644 --- a/test/unit/org/apache/cassandra/service/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java @@ -574,7 +574,7 @@ public class DataResolverTest * same deletion on both side (while is useless but could be created by legacy code pre-CASSANDRA-13237 and could * thus still be sent). */ - public void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException + private void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException { DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2); InetAddress peer1 = peer(); @@ -621,6 +621,95 @@ public class DataResolverTest assertRepairContainsDeletions(msg, null, expected); } + /** + * Test for CASSANDRA-13719: tests that having a partition deletion shadow a range tombstone on another source + * doesn't trigger an assertion error. + */ + @Test + public void testRepairRangeTombstoneWithPartitionDeletion() + { + DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2); + InetAddress peer1 = peer(); + InetAddress peer2 = peer(); + + // 1st "stream": just a partition deletion + UnfilteredPartitionIterator iter1 = iter(PartitionUpdate.fullPartitionDelete(cfm, dk, 10, nowInSec)); + + // 2nd "stream": a range tombstone that is covered by the 1st stream + RangeTombstone rt = tombstone("0", true , "10", true, 5, nowInSec); + UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk) + .addRangeTombstone(rt) + .buildUpdate()); + + resolver.preprocess(readResponseMessage(peer1, iter1)); + resolver.preprocess(readResponseMessage(peer2, iter2)); + + // No results, we've only reconciled tombstones. + try (PartitionIterator data = resolver.resolve()) + { + assertFalse(data.hasNext()); + // 2nd stream should get repaired + assertRepairFuture(resolver, 1); + } + + assertEquals(1, messageRecorder.sent.size()); + + MessageOut msg = getSentMessage(peer2); + assertRepairMetadata(msg); + assertRepairContainsNoColumns(msg); + + assertRepairContainsDeletions(msg, new DeletionTime(10, nowInSec)); + } + + /** + * Additional test for CASSANDRA-13719: tests the case where a partition deletion doesn't shadow a range tombstone. + */ + @Test + public void testRepairRangeTombstoneWithPartitionDeletion2() + { + DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2); + InetAddress peer1 = peer(); + InetAddress peer2 = peer(); + + // 1st "stream": a partition deletion and a range tombstone + RangeTombstone rt1 = tombstone("0", true , "9", true, 11, nowInSec); + PartitionUpdate upd1 = new RowUpdateBuilder(cfm, nowInSec, 1L, dk) + .addRangeTombstone(rt1) + .buildUpdate(); + ((MutableDeletionInfo)upd1.deletionInfo()).add(new DeletionTime(10, nowInSec)); + UnfilteredPartitionIterator iter1 = iter(upd1); + + // 2nd "stream": a range tombstone that is covered by the other stream rt + RangeTombstone rt2 = tombstone("2", true , "3", true, 11, nowInSec); + RangeTombstone rt3 = tombstone("4", true , "5", true, 10, nowInSec); + UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk) + .addRangeTombstone(rt2) + .addRangeTombstone(rt3) + .buildUpdate()); + + resolver.preprocess(readResponseMessage(peer1, iter1)); + resolver.preprocess(readResponseMessage(peer2, iter2)); + + // No results, we've only reconciled tombstones. + try (PartitionIterator data = resolver.resolve()) + { + assertFalse(data.hasNext()); + // 2nd stream should get repaired + assertRepairFuture(resolver, 1); + } + + assertEquals(1, messageRecorder.sent.size()); + + MessageOut msg = getSentMessage(peer2); + assertRepairMetadata(msg); + assertRepairContainsNoColumns(msg); + + // 2nd stream should get both the partition deletion, as well as the part of the 1st stream RT that it misses + assertRepairContainsDeletions(msg, new DeletionTime(10, nowInSec), + tombstone("0", true, "2", false, 11, nowInSec), + tombstone("3", false, "9", true, 11, nowInSec)); + } + // Forces the start to be exclusive if the condition holds private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org