This is an automated email from the ASF dual-hosted git repository. smiklosovic pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new c747f70c05 Snapshot only sstables containing mismatching ranges on preview repair mismatch c747f70c05 is described below commit c747f70c058aa94d6bcfe1f9132c410db6d2b65a Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Tue Apr 19 12:15:58 2022 +0200 Snapshot only sstables containing mismatching ranges on preview repair mismatch patch by Marcus Eriksson, reviewed by Sam Tunnicliffe, Stefan Miklosovic for CASSANDRA-17561 Co-authored-by: Blake Eggleston <beggles...@apple.com> --- CHANGES.txt | 1 + src/java/org/apache/cassandra/net/ParamType.java | 35 ++-- .../apache/cassandra/repair/PreviewRepairTask.java | 17 +- src/java/org/apache/cassandra/repair/SyncStat.java | 15 +- src/java/org/apache/cassandra/repair/SyncTask.java | 2 +- .../repair/consistent/SyncStatSummary.java | 56 ++++--- .../cassandra/service/SnapshotVerbHandler.java | 14 +- .../cassandra/utils/DiagnosticSnapshotService.java | 59 +++++-- .../apache/cassandra/utils/RangesSerializer.java | 73 +++++++++ .../test/PreviewRepairSnapshotTest.java | 176 +++++++++++++++++++++ .../apache/cassandra/repair/LocalSyncTaskTest.java | 4 +- .../org/apache/cassandra/repair/RepairJobTest.java | 4 +- 12 files changed, 394 insertions(+), 62 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index b5495844f5..c1d8d00bca 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Snapshot only sstables containing mismatching ranges on preview repair mismatch (CASSANDRA-17561) * More accurate skipping of sstables in read path (CASSANDRA-18134) * Prepare for JDK17 experimental support (CASSANDRA-18179, CASSANDRA-18258) * Remove Scripted UDFs internals; hooks to be added later in CASSANDRA-17281 (CASSANDRA-18252) diff --git a/src/java/org/apache/cassandra/net/ParamType.java b/src/java/org/apache/cassandra/net/ParamType.java index 37f4bf8aed..6cb8fb4467 100644 --- a/src/java/org/apache/cassandra/net/ParamType.java +++ b/src/java/org/apache/cassandra/net/ParamType.java @@ -26,6 +26,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.Int32Serializer; import org.apache.cassandra.utils.Int64Serializer; +import org.apache.cassandra.utils.RangesSerializer; import org.apache.cassandra.utils.TimeUUID; import static java.lang.Math.max; @@ -42,30 +43,30 @@ import static org.apache.cassandra.locator.InetAddressAndPort.FwdFrmSerializer.f */ public enum ParamType { - FORWARD_TO (0, "FWD_TO", ForwardingInfo.serializer), - RESPOND_TO (1, "FWD_FRM", fwdFrmSerializer), + FORWARD_TO (0, "FWD_TO", ForwardingInfo.serializer), + RESPOND_TO (1, "FWD_FRM", fwdFrmSerializer), @Deprecated - FAILURE_RESPONSE (2, "FAIL", LegacyFlag.serializer), + FAILURE_RESPONSE (2, "FAIL", LegacyFlag.serializer), @Deprecated - FAILURE_REASON (3, "FAIL_REASON", RequestFailureReason.serializer), + FAILURE_REASON (3, "FAIL_REASON", RequestFailureReason.serializer), @Deprecated - FAILURE_CALLBACK (4, "CAL_BAC", LegacyFlag.serializer), + FAILURE_CALLBACK (4, "CAL_BAC", LegacyFlag.serializer), - TRACE_SESSION (5, "TraceSession", TimeUUID.Serializer.instance), - TRACE_TYPE (6, "TraceType", Tracing.traceTypeSerializer), + TRACE_SESSION (5, "TraceSession", TimeUUID.Serializer.instance), + TRACE_TYPE (6, "TraceType", Tracing.traceTypeSerializer), @Deprecated - TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer), - - TOMBSTONE_FAIL(8, "TSF", Int32Serializer.serializer), - TOMBSTONE_WARNING(9, "TSW", Int32Serializer.serializer), - LOCAL_READ_SIZE_FAIL(10, "LRSF", Int64Serializer.serializer), - LOCAL_READ_SIZE_WARN(11, "LRSW", Int64Serializer.serializer), - ROW_INDEX_READ_SIZE_FAIL(12, "RIRSF", Int64Serializer.serializer), - ROW_INDEX_READ_SIZE_WARN(13, "RIRSW", Int64Serializer.serializer), - - CUSTOM_MAP (14, "CUSTOM", CustomParamsSerializer.serializer); + TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer), + + TOMBSTONE_FAIL (8, "TSF", Int32Serializer.serializer), + TOMBSTONE_WARNING (9, "TSW", Int32Serializer.serializer), + LOCAL_READ_SIZE_FAIL (10, "LRSF", Int64Serializer.serializer), + LOCAL_READ_SIZE_WARN (11, "LRSW", Int64Serializer.serializer), + ROW_INDEX_READ_SIZE_FAIL (12, "RIRSF", Int64Serializer.serializer), + ROW_INDEX_READ_SIZE_WARN (13, "RIRSW", Int64Serializer.serializer), + CUSTOM_MAP (14, "CUSTOM", CustomParamsSerializer.serializer), + SNAPSHOT_RANGES (15, "SNAPSHOT_RANGES", RangesSerializer.serializer); final int id; @Deprecated final String legacyAlias; // pre-4.0 we used to serialize entire param name string diff --git a/src/java/org/apache/cassandra/repair/PreviewRepairTask.java b/src/java/org/apache/cassandra/repair/PreviewRepairTask.java index 7ce7d1fbba..728a813a2a 100644 --- a/src/java/org/apache/cassandra/repair/PreviewRepairTask.java +++ b/src/java/org/apache/cassandra/repair/PreviewRepairTask.java @@ -27,6 +27,8 @@ import com.google.common.base.Preconditions; import org.apache.cassandra.concurrent.ExecutorPlus; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.RepairMetrics; import org.apache.cassandra.repair.consistent.SyncStatSummary; @@ -104,14 +106,18 @@ public class PreviewRepairTask extends AbstractRepairTask { Set<String> mismatchingTables = new HashSet<>(); Set<InetAddressAndPort> nodes = new HashSet<>(); + Set<Range<Token>> ranges = new HashSet<>(); for (RepairSessionResult sessionResult : results) { for (RepairResult repairResult : emptyIfNull(sessionResult.repairJobResults)) { for (SyncStat stat : emptyIfNull(repairResult.stats)) { - if (stat.numberOfDifferences > 0) + if (!stat.differences.isEmpty()) + { mismatchingTables.add(repairResult.desc.columnFamily); + ranges.addAll(stat.differences); + } // snapshot all replicas, even if they don't have any differences nodes.add(stat.nodes.coordinator); nodes.add(stat.nodes.peer); @@ -125,10 +131,13 @@ public class PreviewRepairTask extends AbstractRepairTask // we can just check snapshot existence locally since the repair coordinator is always a replica (unlike in the read case) if (!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName)) { - logger.info("{} Snapshotting {}.{} for preview repair mismatch with tag {} on instances {}", + List<Range<Token>> normalizedRanges = Range.normalize(ranges); + logger.info("{} Snapshotting {}.{} for preview repair mismatch for ranges {} with tag {} on instances {}", options.getPreviewKind().logPrefix(parentSession), - keyspace, table, snapshotName, nodes); - DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(), nodes); + keyspace, table, normalizedRanges, snapshotName, nodes); + DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(), + nodes, + normalizedRanges); } else { diff --git a/src/java/org/apache/cassandra/repair/SyncStat.java b/src/java/org/apache/cassandra/repair/SyncStat.java index 7bb503fa15..2241915ab4 100644 --- a/src/java/org/apache/cassandra/repair/SyncStat.java +++ b/src/java/org/apache/cassandra/repair/SyncStat.java @@ -17,8 +17,11 @@ */ package org.apache.cassandra.repair; +import java.util.Collection; import java.util.List; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.streaming.SessionSummary; /** @@ -27,23 +30,23 @@ import org.apache.cassandra.streaming.SessionSummary; public class SyncStat { public final SyncNodePair nodes; - public final long numberOfDifferences; // TODO: revert to Range<Token> + public final Collection<Range<Token>> differences; public final List<SessionSummary> summaries; - public SyncStat(SyncNodePair nodes, long numberOfDifferences) + public SyncStat(SyncNodePair nodes, Collection<Range<Token>> differences) { - this(nodes, numberOfDifferences, null); + this(nodes, differences, null); } - public SyncStat(SyncNodePair nodes, long numberOfDifferences, List<SessionSummary> summaries) + public SyncStat(SyncNodePair nodes, Collection<Range<Token>> differences, List<SessionSummary> summaries) { this.nodes = nodes; - this.numberOfDifferences = numberOfDifferences; this.summaries = summaries; + this.differences = differences; } public SyncStat withSummaries(List<SessionSummary> summaries) { - return new SyncStat(nodes, numberOfDifferences, summaries); + return new SyncStat(nodes, differences, summaries); } } diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java index b325eb42ad..7393effd2d 100644 --- a/src/java/org/apache/cassandra/repair/SyncTask.java +++ b/src/java/org/apache/cassandra/repair/SyncTask.java @@ -60,7 +60,7 @@ public abstract class SyncTask extends AsyncFuture<SyncStat> implements Runnable this.rangesToSync = rangesToSync; this.nodePair = new SyncNodePair(primaryEndpoint, peer); this.previewKind = previewKind; - this.stat = new SyncStat(nodePair, rangesToSync.size()); + this.stat = new SyncStat(nodePair, rangesToSync); } protected abstract void startSync(); diff --git a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java index 3d217024c7..855ad4bad3 100644 --- a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java +++ b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java @@ -21,13 +21,18 @@ package org.apache.cassandra.repair.consistent; import java.net.InetSocketAddress; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import com.google.common.collect.Lists; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.repair.RepairResult; import org.apache.cassandra.repair.RepairSessionResult; import org.apache.cassandra.repair.SyncStat; @@ -50,7 +55,7 @@ public class SyncStatSummary int files = 0; long bytes = 0; - long ranges = 0; + Set<Range<Token>> ranges = new HashSet<>(); Session(InetSocketAddress src, InetSocketAddress dst) { @@ -64,15 +69,15 @@ public class SyncStatSummary bytes += summary.totalSize; } - void consumeSummaries(Collection<StreamSummary> summaries, long numRanges) + void consumeSummaries(Collection<StreamSummary> summaries, Collection<Range<Token>> ranges) { summaries.forEach(this::consumeSummary); - ranges += numRanges; + this.ranges.addAll(ranges); } public String toString() { - return String.format("%s -> %s: %s ranges, %s sstables, %s bytes", src, dst, ranges, files, FBUtilities.prettyPrintMemory(bytes)); + return String.format("%s -> %s: %s ranges, %s sstables, %s bytes", src, dst, ranges.size(), files, FBUtilities.prettyPrintMemory(bytes)); } } @@ -84,7 +89,7 @@ public class SyncStatSummary int files = -1; long bytes = -1; - int ranges = -1; + Collection<Range<Token>> ranges = new HashSet<>(); boolean totalsCalculated = false; final Map<Pair<InetSocketAddress, InetSocketAddress>, Session> sessions = new HashMap<>(); @@ -109,8 +114,8 @@ public class SyncStatSummary { for (SessionSummary summary: stat.summaries) { - getOrCreate(summary.coordinator, summary.peer).consumeSummaries(summary.sendingSummaries, stat.numberOfDifferences); - getOrCreate(summary.peer, summary.coordinator).consumeSummaries(summary.receivingSummaries, stat.numberOfDifferences); + getOrCreate(summary.coordinator, summary.peer).consumeSummaries(summary.sendingSummaries, stat.differences); + getOrCreate(summary.peer, summary.coordinator).consumeSummaries(summary.receivingSummaries, stat.differences); } } @@ -123,12 +128,12 @@ public class SyncStatSummary { files = 0; bytes = 0; - ranges = 0; + ranges = new HashSet<>(); for (Session session: sessions.values()) { files += session.files; bytes += session.bytes; - ranges += session.ranges; + ranges.addAll(session.ranges); } totalsCalculated = true; } @@ -147,21 +152,36 @@ public class SyncStatSummary } StringBuilder output = new StringBuilder(); - output.append(String.format("%s.%s - %s ranges, %s sstables, %s bytes\n", keyspace, table, ranges, files, FBUtilities.prettyPrintMemory(bytes))); + output.append(String.format("%s.%s - %s ranges, %s sstables, %s bytes\n", keyspace, table, ranges.size(), files, FBUtilities.prettyPrintMemory(bytes))); + if (ranges.size() > 0) + { + output.append(" Mismatching ranges: "); + int i = 0; + Iterator<Range<Token>> rangeIterator = ranges.iterator(); + while (rangeIterator.hasNext() && i < 30) + { + Range<Token> r = rangeIterator.next(); + output.append('(').append(r.left).append(',').append(r.right).append("],"); + i++; + } + if (i == 30) + output.append("..."); + output.append(System.lineSeparator()); + } for (Session session: sessions.values()) { - output.append(" ").append(session.toString()).append('\n'); + output.append(" ").append(session.toString()).append(System.lineSeparator()); } return output.toString(); } } - private Map<Pair<String, String>, Table> summaries = new HashMap<>(); + private final Map<Pair<String, String>, Table> summaries = new HashMap<>(); private final boolean isEstimate; private int files = -1; private long bytes = -1; - private int ranges = -1; + private Set<Range<Token>> ranges = new HashSet<>(); private boolean totalsCalculated = false; public SyncStatSummary(boolean isEstimate) @@ -190,14 +210,14 @@ public class SyncStatSummary public boolean isEmpty() { calculateTotals(); - return files == 0 && bytes == 0 && ranges == 0; + return files == 0 && bytes == 0 && ranges.isEmpty(); } private void calculateTotals() { files = 0; bytes = 0; - ranges = 0; + ranges = new HashSet<>(); summaries.values().forEach(Table::calculateTotals); for (Table table: summaries.values()) { @@ -208,7 +228,7 @@ public class SyncStatSummary table.calculateTotals(); files += table.files; bytes += table.bytes; - ranges += table.ranges; + ranges.addAll(table.ranges); } totalsCalculated = true; } @@ -228,11 +248,11 @@ public class SyncStatSummary if (isEstimate) { - output.append(String.format("Total estimated streaming: %s ranges, %s sstables, %s bytes\n", ranges, files, FBUtilities.prettyPrintMemory(bytes))); + output.append(String.format("Total estimated streaming: %s ranges, %s sstables, %s bytes\n", ranges.size(), files, FBUtilities.prettyPrintMemory(bytes))); } else { - output.append(String.format("Total streaming: %s ranges, %s sstables, %s bytes\n", ranges, files, FBUtilities.prettyPrintMemory(bytes))); + output.append(String.format("Total streaming: %s ranges, %s sstables, %s bytes\n", ranges.size(), files, FBUtilities.prettyPrintMemory(bytes))); } for (Pair<String, String> tableName: tables) diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java index ecf16fed92..99b5105406 100644 --- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java +++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java @@ -17,16 +17,23 @@ */ package org.apache.cassandra.service; +import java.util.Collections; +import java.util.List; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.db.SnapshotCommand; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SnapshotCommand; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.DiagnosticSnapshotService; +import static org.apache.cassandra.net.ParamType.SNAPSHOT_RANGES; + public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand> { public static final SnapshotVerbHandler instance = new SnapshotVerbHandler(); @@ -41,7 +48,10 @@ public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand> } else if (DiagnosticSnapshotService.isDiagnosticSnapshotRequest(command)) { - DiagnosticSnapshotService.snapshot(command, message.from()); + List<Range<Token>> ranges = Collections.emptyList(); + if (message.header.params().containsKey(SNAPSHOT_RANGES)) + ranges = (List<Range<Token>>) message.header.params().get(SNAPSHOT_RANGES); + DiagnosticSnapshotService.snapshot(command, ranges, message.from()); } else { diff --git a/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java b/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java index ab2d67ec9f..29a9e5c2ba 100644 --- a/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java +++ b/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java @@ -20,6 +20,8 @@ package org.apache.cassandra.utils; import java.time.LocalDate; import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; @@ -29,6 +31,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.*; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -38,6 +43,7 @@ import org.apache.cassandra.schema.TableMetadata; import static org.apache.cassandra.utils.Clock.Global.nanoTime; import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; +import static org.apache.cassandra.net.ParamType.SNAPSHOT_RANGES; /** * Provides a means to take snapshots when triggered by anomalous events or when the breaking of invariants is @@ -68,6 +74,7 @@ public class DiagnosticSnapshotService public static final String REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX = "RepairedDataMismatch-"; public static final String DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX = "DuplicateRows-"; + private static final int MAX_SNAPSHOT_RANGE_COUNT = 100; // otherwise, snapshot everything private final Executor executor; @@ -84,14 +91,19 @@ public class DiagnosticSnapshotService private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.BASIC_ISO_DATE; private final ConcurrentHashMap<TableId, AtomicLong> lastSnapshotTimes = new ConcurrentHashMap<>(); - public static void duplicateRows(TableMetadata metadata, Iterable<InetAddressAndPort> replicas) + public static void repairedDataMismatch(TableMetadata metadata, Iterable<InetAddressAndPort> replicas) { - instance.maybeTriggerSnapshot(metadata, DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX, replicas); + repairedDataMismatch(metadata, replicas, Collections.emptyList()); } - public static void repairedDataMismatch(TableMetadata metadata, Iterable<InetAddressAndPort> replicas) + public static void repairedDataMismatch(TableMetadata metadata, Iterable<InetAddressAndPort> replicas, List<Range<Token>> ranges) + { + instance.maybeTriggerSnapshot(metadata, REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX, replicas, ranges); + } + + public static void duplicateRows(TableMetadata metadata, Iterable<InetAddressAndPort> replicas) { - instance.maybeTriggerSnapshot(metadata, REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX, replicas); + instance.maybeTriggerSnapshot(metadata, DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX, replicas); } public static boolean isDiagnosticSnapshotRequest(SnapshotCommand command) @@ -100,10 +112,10 @@ public class DiagnosticSnapshotService || command.snapshot_name.startsWith(DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX); } - public static void snapshot(SnapshotCommand command, InetAddressAndPort initiator) + public static void snapshot(SnapshotCommand command, List<Range<Token>> ranges, InetAddressAndPort initiator) { Preconditions.checkArgument(isDiagnosticSnapshotRequest(command)); - instance.maybeSnapshot(command, initiator); + instance.maybeSnapshot(command, ranges, initiator); } public static String getSnapshotName(String prefix) @@ -118,6 +130,11 @@ public class DiagnosticSnapshotService } private void maybeTriggerSnapshot(TableMetadata metadata, String prefix, Iterable<InetAddressAndPort> endpoints) + { + maybeTriggerSnapshot(metadata, prefix, endpoints, Collections.emptyList()); + } + + private void maybeTriggerSnapshot(TableMetadata metadata, String prefix, Iterable<InetAddressAndPort> endpoints, List<Range<Token>> ranges) { long now = nanoTime(); AtomicLong cached = lastSnapshotTimes.computeIfAbsent(metadata.id, u -> new AtomicLong(0)); @@ -130,6 +147,9 @@ public class DiagnosticSnapshotService metadata.name, getSnapshotName(prefix), false)); + + if (!ranges.isEmpty() && ranges.size() < MAX_SNAPSHOT_RANGE_COUNT) + msg = msg.withParam(SNAPSHOT_RANGES, ranges); for (InetAddressAndPort replica : endpoints) MessagingService.instance().send(msg, replica); } @@ -139,19 +159,21 @@ public class DiagnosticSnapshotService } } - private void maybeSnapshot(SnapshotCommand command, InetAddressAndPort initiator) + private void maybeSnapshot(SnapshotCommand command, List<Range<Token>> ranges, InetAddressAndPort initiator) { - executor.execute(new DiagnosticSnapshotTask(command, initiator)); + executor.execute(new DiagnosticSnapshotTask(command, ranges, initiator)); } private static class DiagnosticSnapshotTask implements Runnable { final SnapshotCommand command; final InetAddressAndPort from; + final List<Range<Token>> ranges; - DiagnosticSnapshotTask(SnapshotCommand command, InetAddressAndPort from) + DiagnosticSnapshotTask(SnapshotCommand command, List<Range<Token>> ranges, InetAddressAndPort from) { this.command = command; + this.ranges = ranges; this.from = from; } @@ -185,7 +207,17 @@ public class DiagnosticSnapshotService command.keyspace, command.column_family, command.snapshot_name); - cfs.snapshot(command.snapshot_name); + + if (ranges.isEmpty()) + cfs.snapshot(command.snapshot_name); + else + { + cfs.snapshot(command.snapshot_name, + (sstable) -> checkIntersection(ranges, + sstable.first.getToken(), + sstable.last.getToken()), + false, false); + } } catch (IllegalArgumentException e) { @@ -196,4 +228,11 @@ public class DiagnosticSnapshotService } } } + + private static boolean checkIntersection(List<Range<Token>> normalizedRanges, Token first, Token last) + { + Bounds<Token> bounds = new Bounds<>(first, last); + return normalizedRanges.stream().anyMatch(range -> range.intersects(bounds)); + } + } diff --git a/src/java/org/apache/cassandra/utils/RangesSerializer.java b/src/java/org/apache/cassandra/utils/RangesSerializer.java new file mode 100644 index 0000000000..5707503f6b --- /dev/null +++ b/src/java/org/apache/cassandra/utils/RangesSerializer.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class RangesSerializer implements IVersionedSerializer<Collection<Range<Token>>> +{ + public static final RangesSerializer serializer = new RangesSerializer(); + + @Override + public void serialize(Collection<Range<Token>> ranges, DataOutputPlus out, int version) throws IOException + { + out.writeInt(ranges.size()); + for (Range<Token> r : ranges) + { + Token.serializer.serialize(r.left, out, version); + Token.serializer.serialize(r.right, out, version); + } + } + + @Override + public Collection<Range<Token>> deserialize(DataInputPlus in, int version) throws IOException + { + int count = in.readInt(); + List<Range<Token>> ranges = new ArrayList<>(count); + IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); + for (int i = 0; i < count; i++) + { + Token start = Token.serializer.deserialize(in, partitioner, version); + Token end = Token.serializer.deserialize(in, partitioner, version); + ranges.add(new Range<>(start, end)); + } + return ranges; + } + + @Override + public long serializedSize(Collection<Range<Token>> ranges, int version) + { + int size = TypeSizes.sizeof(ranges.size()); + if (ranges.size() > 0) + size += ranges.size() * 2 * Token.serializer.serializedSize(ranges.iterator().next().left, version); + return size; + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairSnapshotTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairSnapshotTest.java new file mode 100644 index 0000000000..e3679e9af7 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairSnapshotTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import com.google.common.collect.Sets; +import org.junit.Test; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.concurrent.Refs; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.emptyString; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class PreviewRepairSnapshotTest extends TestBaseImpl +{ + /** + * Makes sure we only snapshot sstables containing the mismatching token + * <p> + * 1. create 100 sstables per instance, compaction disabled, one token per sstable + * 2. make 3 tokens mismatching on node2, one token per sstable + * 3. run preview repair + * 4. make sure that only the sstables containing the token are in the snapshot + */ + @Test + public void testSnapshotOfSStablesContainingMismatchingTokens() throws IOException + { + try (Cluster cluster = init(Cluster.build(2).withConfig(config -> + config.set("snapshot_on_repaired_data_mismatch", true) + .with(GOSSIP) + .with(NETWORK)).start())) + { + Set<Integer> tokensToMismatch = Sets.newHashSet(1, 50, 99); + cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key) with compaction = {'class' : 'SizeTieredCompactionStrategy', 'enabled':false }")); + // 1 token per sstable; + for (int i = 0; i < 100; i++) + { + cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (id) values (?)"), ConsistencyLevel.ALL, i); + cluster.stream().forEach(instance -> instance.flush(KEYSPACE)); + } + cluster.stream().forEach(instance -> instance.flush(KEYSPACE)); + for (int i = 1; i <= 2; i++) + markRepaired(cluster, i); + + cluster.get(1) + .nodetoolResult("repair", "-vd", "-pr", KEYSPACE, "tbl") + .asserts() + .success() + .stdoutContains("Repaired data is in sync"); + + Set<Token> mismatchingTokens = new HashSet<>(); + for (Integer token : tokensToMismatch) + { + cluster.get(2).executeInternal(withKeyspace("insert into %s.tbl (id) values (?)"), token); + cluster.get(2).flush(KEYSPACE); + Object[][] res = cluster.get(2).executeInternal(withKeyspace("select token(id) from %s.tbl where id = ?"), token); + mismatchingTokens.add(new Murmur3Partitioner.LongToken((long) res[0][0])); + } + + markRepaired(cluster, 2); + + cluster.get(1) + .nodetoolResult("repair", "-vd", KEYSPACE, "tbl") + .asserts() + .success() + .stdoutContains("Repaired data is inconsistent"); + + cluster.get(1).runOnInstance(checkSnapshot(mismatchingTokens, 3)); + // node2 got the duplicate mismatch-tokens above, so it should exist in exactly 6 sstables + cluster.get(2).runOnInstance(checkSnapshot(mismatchingTokens, 6)); + } + } + + private IIsolatedExecutor.SerializableRunnable checkSnapshot(Set<Token> mismatchingTokens, int expectedSnapshotSize) + { + return () -> { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl"); + + String snapshotTag = await().atMost(1, MINUTES) + .pollInterval(100, MILLISECONDS) + .until(() -> { + for (String tag : cfs.listSnapshots().keySet()) + { + // we create the snapshot schema file last, so when this exists we know the snapshot is complete; + if (cfs.getDirectories().getSnapshotSchemaFile(tag).exists()) + return tag; + } + + return ""; + }, not(emptyString())); + + Set<SSTableReader> inSnapshot = new HashSet<>(); + + try (Refs<SSTableReader> sstables = cfs.getSnapshotSSTableReaders(snapshotTag)) + { + inSnapshot.addAll(sstables); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + assertEquals(expectedSnapshotSize, inSnapshot.size()); + + for (SSTableReader sstable : cfs.getLiveSSTables()) + { + Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken()); + boolean shouldBeInSnapshot = false; + for (Token mismatchingToken : mismatchingTokens) + { + if (sstableBounds.contains(mismatchingToken)) + { + assertFalse(shouldBeInSnapshot); + shouldBeInSnapshot = true; + } + } + assertEquals(shouldBeInSnapshot, inSnapshot.contains(sstable)); + } + }; + } + + private void markRepaired(Cluster cluster, int instance) + { + cluster.get(instance).runOnInstance(() -> { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl"); + for (SSTableReader sstable : cfs.getLiveSSTables()) + { + try + { + sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, + System.currentTimeMillis(), + null, + false); + sstable.reloadSSTableMetadata(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + }); + } +} diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index 47d50a244b..4409b3cdef 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -98,7 +98,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest NO_PENDING_REPAIR, true, true, PreviewKind.NONE); task.run(); - assertEquals(0, task.get().numberOfDifferences); + assertTrue(task.stat.differences.isEmpty()); } @Test @@ -145,7 +145,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest } // ensure that the changed range was recorded - assertEquals("Wrong differing ranges", interesting.size(), task.stat.numberOfDifferences); + assertEquals("Wrong differing ranges", interesting.size(), task.stat.differences.size()); } @Test diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java index c7c68c4689..03ac2edc60 100644 --- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java @@ -305,8 +305,8 @@ public class RepairJobTest assertThat(results) .hasSize(2) - .extracting(s -> s.numberOfDifferences) - .containsOnly(1L); + .extracting(s -> s.differences.size()) + .containsOnly(1); assertThat(messages) .hasSize(2) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org