This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c747f70c05 Snapshot only sstables containing mismatching ranges on 
preview repair mismatch
c747f70c05 is described below

commit c747f70c058aa94d6bcfe1f9132c410db6d2b65a
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Tue Apr 19 12:15:58 2022 +0200

    Snapshot only sstables containing mismatching ranges on preview repair 
mismatch
    
    patch by Marcus Eriksson, reviewed by Sam Tunnicliffe, Stefan Miklosovic 
for CASSANDRA-17561
    
    Co-authored-by: Blake Eggleston <beggles...@apple.com>
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/net/ParamType.java   |  35 ++--
 .../apache/cassandra/repair/PreviewRepairTask.java |  17 +-
 src/java/org/apache/cassandra/repair/SyncStat.java |  15 +-
 src/java/org/apache/cassandra/repair/SyncTask.java |   2 +-
 .../repair/consistent/SyncStatSummary.java         |  56 ++++---
 .../cassandra/service/SnapshotVerbHandler.java     |  14 +-
 .../cassandra/utils/DiagnosticSnapshotService.java |  59 +++++--
 .../apache/cassandra/utils/RangesSerializer.java   |  73 +++++++++
 .../test/PreviewRepairSnapshotTest.java            | 176 +++++++++++++++++++++
 .../apache/cassandra/repair/LocalSyncTaskTest.java |   4 +-
 .../org/apache/cassandra/repair/RepairJobTest.java |   4 +-
 12 files changed, 394 insertions(+), 62 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index b5495844f5..c1d8d00bca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Snapshot only sstables containing mismatching ranges on preview repair 
mismatch (CASSANDRA-17561)
  * More accurate skipping of sstables in read path (CASSANDRA-18134)
  * Prepare for JDK17 experimental support (CASSANDRA-18179, CASSANDRA-18258)
  * Remove Scripted UDFs internals; hooks to be added later in CASSANDRA-17281 
(CASSANDRA-18252)
diff --git a/src/java/org/apache/cassandra/net/ParamType.java 
b/src/java/org/apache/cassandra/net/ParamType.java
index 37f4bf8aed..6cb8fb4467 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.Int32Serializer;
 import org.apache.cassandra.utils.Int64Serializer;
+import org.apache.cassandra.utils.RangesSerializer;
 import org.apache.cassandra.utils.TimeUUID;
 
 import static java.lang.Math.max;
@@ -42,30 +43,30 @@ import static 
org.apache.cassandra.locator.InetAddressAndPort.FwdFrmSerializer.f
  */
 public enum ParamType
 {
-    FORWARD_TO          (0, "FWD_TO",        ForwardingInfo.serializer),
-    RESPOND_TO          (1, "FWD_FRM",       fwdFrmSerializer),
+    FORWARD_TO                  (0,  "FWD_TO",          
ForwardingInfo.serializer),
+    RESPOND_TO                  (1,  "FWD_FRM",         fwdFrmSerializer),
 
     @Deprecated
-    FAILURE_RESPONSE    (2, "FAIL",          LegacyFlag.serializer),
+    FAILURE_RESPONSE            (2,  "FAIL",            LegacyFlag.serializer),
     @Deprecated
-    FAILURE_REASON      (3, "FAIL_REASON",   RequestFailureReason.serializer),
+    FAILURE_REASON              (3,  "FAIL_REASON",     
RequestFailureReason.serializer),
     @Deprecated
-    FAILURE_CALLBACK    (4, "CAL_BAC",       LegacyFlag.serializer),
+    FAILURE_CALLBACK            (4,  "CAL_BAC",         LegacyFlag.serializer),
 
-    TRACE_SESSION       (5, "TraceSession",  TimeUUID.Serializer.instance),
-    TRACE_TYPE          (6, "TraceType",     Tracing.traceTypeSerializer),
+    TRACE_SESSION               (5,  "TraceSession",    
TimeUUID.Serializer.instance),
+    TRACE_TYPE                  (6,  "TraceType",       
Tracing.traceTypeSerializer),
 
     @Deprecated
-    TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer),
-
-    TOMBSTONE_FAIL(8, "TSF", Int32Serializer.serializer),
-    TOMBSTONE_WARNING(9, "TSW", Int32Serializer.serializer),
-    LOCAL_READ_SIZE_FAIL(10, "LRSF", Int64Serializer.serializer),
-    LOCAL_READ_SIZE_WARN(11, "LRSW", Int64Serializer.serializer),
-    ROW_INDEX_READ_SIZE_FAIL(12, "RIRSF", Int64Serializer.serializer),
-    ROW_INDEX_READ_SIZE_WARN(13, "RIRSW", Int64Serializer.serializer),
-
-    CUSTOM_MAP          (14, "CUSTOM",       
CustomParamsSerializer.serializer);
+    TRACK_REPAIRED_DATA         (7,  "TrackRepaired",   LegacyFlag.serializer),
+
+    TOMBSTONE_FAIL              (8,  "TSF",             
Int32Serializer.serializer),
+    TOMBSTONE_WARNING           (9,  "TSW",             
Int32Serializer.serializer),
+    LOCAL_READ_SIZE_FAIL        (10, "LRSF",            
Int64Serializer.serializer),
+    LOCAL_READ_SIZE_WARN        (11, "LRSW",            
Int64Serializer.serializer),
+    ROW_INDEX_READ_SIZE_FAIL    (12, "RIRSF",           
Int64Serializer.serializer),
+    ROW_INDEX_READ_SIZE_WARN    (13, "RIRSW",           
Int64Serializer.serializer),
+    CUSTOM_MAP                  (14, "CUSTOM",          
CustomParamsSerializer.serializer),
+    SNAPSHOT_RANGES             (15, "SNAPSHOT_RANGES", 
RangesSerializer.serializer);
 
     final int id;
     @Deprecated final String legacyAlias; // pre-4.0 we used to serialize 
entire param name string
diff --git a/src/java/org/apache/cassandra/repair/PreviewRepairTask.java 
b/src/java/org/apache/cassandra/repair/PreviewRepairTask.java
index 7ce7d1fbba..728a813a2a 100644
--- a/src/java/org/apache/cassandra/repair/PreviewRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/PreviewRepairTask.java
@@ -27,6 +27,8 @@ import com.google.common.base.Preconditions;
 import org.apache.cassandra.concurrent.ExecutorPlus;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.RepairMetrics;
 import org.apache.cassandra.repair.consistent.SyncStatSummary;
@@ -104,14 +106,18 @@ public class PreviewRepairTask extends AbstractRepairTask
         {
             Set<String> mismatchingTables = new HashSet<>();
             Set<InetAddressAndPort> nodes = new HashSet<>();
+            Set<Range<Token>> ranges = new HashSet<>();
             for (RepairSessionResult sessionResult : results)
             {
                 for (RepairResult repairResult : 
emptyIfNull(sessionResult.repairJobResults))
                 {
                     for (SyncStat stat : emptyIfNull(repairResult.stats))
                     {
-                        if (stat.numberOfDifferences > 0)
+                        if (!stat.differences.isEmpty())
+                        {
                             
mismatchingTables.add(repairResult.desc.columnFamily);
+                            ranges.addAll(stat.differences);
+                        }
                         // snapshot all replicas, even if they don't have any 
differences
                         nodes.add(stat.nodes.coordinator);
                         nodes.add(stat.nodes.peer);
@@ -125,10 +131,13 @@ public class PreviewRepairTask extends AbstractRepairTask
                 // we can just check snapshot existence locally since the 
repair coordinator is always a replica (unlike in the read case)
                 if 
(!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName))
                 {
-                    logger.info("{} Snapshotting {}.{} for preview repair 
mismatch with tag {} on instances {}",
+                    List<Range<Token>> normalizedRanges = 
Range.normalize(ranges);
+                    logger.info("{} Snapshotting {}.{} for preview repair 
mismatch for ranges {} with tag {} on instances {}",
                                 
options.getPreviewKind().logPrefix(parentSession),
-                                keyspace, table, snapshotName, nodes);
-                    
DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(),
 nodes);
+                                keyspace, table, normalizedRanges, 
snapshotName, nodes);
+                    
DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(),
+                                                                   nodes,
+                                                                   
normalizedRanges);
                 }
                 else
                 {
diff --git a/src/java/org/apache/cassandra/repair/SyncStat.java 
b/src/java/org/apache/cassandra/repair/SyncStat.java
index 7bb503fa15..2241915ab4 100644
--- a/src/java/org/apache/cassandra/repair/SyncStat.java
+++ b/src/java/org/apache/cassandra/repair/SyncStat.java
@@ -17,8 +17,11 @@
  */
 package org.apache.cassandra.repair;
 
+import java.util.Collection;
 import java.util.List;
 
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.streaming.SessionSummary;
 
 /**
@@ -27,23 +30,23 @@ import org.apache.cassandra.streaming.SessionSummary;
 public class SyncStat
 {
     public final SyncNodePair nodes;
-    public final long numberOfDifferences; // TODO: revert to Range<Token>
+    public final Collection<Range<Token>> differences;
     public final List<SessionSummary> summaries;
 
-    public SyncStat(SyncNodePair nodes, long numberOfDifferences)
+    public SyncStat(SyncNodePair nodes, Collection<Range<Token>> differences)
     {
-        this(nodes, numberOfDifferences, null);
+        this(nodes, differences, null);
     }
 
-    public SyncStat(SyncNodePair nodes, long numberOfDifferences, 
List<SessionSummary> summaries)
+    public SyncStat(SyncNodePair nodes,  Collection<Range<Token>> differences, 
List<SessionSummary> summaries)
     {
         this.nodes = nodes;
-        this.numberOfDifferences = numberOfDifferences;
         this.summaries = summaries;
+        this.differences = differences;
     }
 
     public SyncStat withSummaries(List<SessionSummary> summaries)
     {
-        return new SyncStat(nodes, numberOfDifferences, summaries);
+        return new SyncStat(nodes, differences, summaries);
     }
 }
diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java 
b/src/java/org/apache/cassandra/repair/SyncTask.java
index b325eb42ad..7393effd2d 100644
--- a/src/java/org/apache/cassandra/repair/SyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SyncTask.java
@@ -60,7 +60,7 @@ public abstract class SyncTask extends AsyncFuture<SyncStat> 
implements Runnable
         this.rangesToSync = rangesToSync;
         this.nodePair = new SyncNodePair(primaryEndpoint, peer);
         this.previewKind = previewKind;
-        this.stat = new SyncStat(nodePair, rangesToSync.size());
+        this.stat = new SyncStat(nodePair, rangesToSync);
     }
 
     protected abstract void startSync();
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java 
b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
index 3d217024c7..855ad4bad3 100644
--- a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
+++ b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
@@ -21,13 +21,18 @@ package org.apache.cassandra.repair.consistent;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 
 import com.google.common.collect.Lists;
 
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.repair.RepairResult;
 import org.apache.cassandra.repair.RepairSessionResult;
 import org.apache.cassandra.repair.SyncStat;
@@ -50,7 +55,7 @@ public class SyncStatSummary
 
         int files = 0;
         long bytes = 0;
-        long ranges = 0;
+        Set<Range<Token>> ranges = new HashSet<>();
 
         Session(InetSocketAddress src, InetSocketAddress dst)
         {
@@ -64,15 +69,15 @@ public class SyncStatSummary
             bytes += summary.totalSize;
         }
 
-        void consumeSummaries(Collection<StreamSummary> summaries, long 
numRanges)
+        void consumeSummaries(Collection<StreamSummary> summaries, 
Collection<Range<Token>> ranges)
         {
             summaries.forEach(this::consumeSummary);
-            ranges += numRanges;
+            this.ranges.addAll(ranges);
         }
 
         public String toString()
         {
-            return String.format("%s -> %s: %s ranges, %s sstables, %s bytes", 
src, dst, ranges, files, FBUtilities.prettyPrintMemory(bytes));
+            return String.format("%s -> %s: %s ranges, %s sstables, %s bytes", 
src, dst, ranges.size(), files, FBUtilities.prettyPrintMemory(bytes));
         }
     }
 
@@ -84,7 +89,7 @@ public class SyncStatSummary
 
         int files = -1;
         long bytes = -1;
-        int ranges = -1;
+        Collection<Range<Token>> ranges = new HashSet<>();
         boolean totalsCalculated = false;
 
         final Map<Pair<InetSocketAddress, InetSocketAddress>, Session> 
sessions = new HashMap<>();
@@ -109,8 +114,8 @@ public class SyncStatSummary
         {
             for (SessionSummary summary: stat.summaries)
             {
-                getOrCreate(summary.coordinator, 
summary.peer).consumeSummaries(summary.sendingSummaries, 
stat.numberOfDifferences);
-                getOrCreate(summary.peer, 
summary.coordinator).consumeSummaries(summary.receivingSummaries, 
stat.numberOfDifferences);
+                getOrCreate(summary.coordinator, 
summary.peer).consumeSummaries(summary.sendingSummaries, stat.differences);
+                getOrCreate(summary.peer, 
summary.coordinator).consumeSummaries(summary.receivingSummaries, 
stat.differences);
             }
         }
 
@@ -123,12 +128,12 @@ public class SyncStatSummary
         {
             files = 0;
             bytes = 0;
-            ranges = 0;
+            ranges = new HashSet<>();
             for (Session session: sessions.values())
             {
                 files += session.files;
                 bytes += session.bytes;
-                ranges += session.ranges;
+                ranges.addAll(session.ranges);
             }
             totalsCalculated = true;
         }
@@ -147,21 +152,36 @@ public class SyncStatSummary
             }
             StringBuilder output = new StringBuilder();
 
-            output.append(String.format("%s.%s - %s ranges, %s sstables, %s 
bytes\n", keyspace, table, ranges, files, 
FBUtilities.prettyPrintMemory(bytes)));
+            output.append(String.format("%s.%s - %s ranges, %s sstables, %s 
bytes\n", keyspace, table, ranges.size(), files, 
FBUtilities.prettyPrintMemory(bytes)));
+            if (ranges.size() > 0)
+            {
+                output.append("    Mismatching ranges: ");
+                int i = 0;
+                Iterator<Range<Token>> rangeIterator = ranges.iterator();
+                while (rangeIterator.hasNext() && i < 30)
+                {
+                    Range<Token> r = rangeIterator.next();
+                    
output.append('(').append(r.left).append(',').append(r.right).append("],");
+                    i++;
+                }
+                if (i == 30)
+                    output.append("...");
+                output.append(System.lineSeparator());
+            }
             for (Session session: sessions.values())
             {
-                output.append("    ").append(session.toString()).append('\n');
+                output.append("    
").append(session.toString()).append(System.lineSeparator());
             }
             return output.toString();
         }
     }
 
-    private Map<Pair<String, String>, Table> summaries = new HashMap<>();
+    private final Map<Pair<String, String>, Table> summaries = new HashMap<>();
     private final boolean isEstimate;
 
     private int files = -1;
     private long bytes = -1;
-    private int ranges = -1;
+    private Set<Range<Token>> ranges = new HashSet<>();
     private boolean totalsCalculated = false;
 
     public SyncStatSummary(boolean isEstimate)
@@ -190,14 +210,14 @@ public class SyncStatSummary
     public boolean isEmpty()
     {
         calculateTotals();
-        return files == 0 && bytes == 0 && ranges == 0;
+        return files == 0 && bytes == 0 && ranges.isEmpty();
     }
 
     private void calculateTotals()
     {
         files = 0;
         bytes = 0;
-        ranges = 0;
+        ranges = new HashSet<>();
         summaries.values().forEach(Table::calculateTotals);
         for (Table table: summaries.values())
         {
@@ -208,7 +228,7 @@ public class SyncStatSummary
             table.calculateTotals();
             files += table.files;
             bytes += table.bytes;
-            ranges += table.ranges;
+            ranges.addAll(table.ranges);
         }
         totalsCalculated = true;
     }
@@ -228,11 +248,11 @@ public class SyncStatSummary
 
         if (isEstimate)
         {
-            output.append(String.format("Total estimated streaming: %s ranges, 
%s sstables, %s bytes\n", ranges, files, FBUtilities.prettyPrintMemory(bytes)));
+            output.append(String.format("Total estimated streaming: %s ranges, 
%s sstables, %s bytes\n", ranges.size(), files, 
FBUtilities.prettyPrintMemory(bytes)));
         }
         else
         {
-            output.append(String.format("Total streaming: %s ranges, %s 
sstables, %s bytes\n", ranges, files, FBUtilities.prettyPrintMemory(bytes)));
+            output.append(String.format("Total streaming: %s ranges, %s 
sstables, %s bytes\n", ranges.size(), files, 
FBUtilities.prettyPrintMemory(bytes)));
         }
 
         for (Pair<String, String> tableName: tables)
diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java 
b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index ecf16fed92..99b5105406 100644
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@ -17,16 +17,23 @@
  */
 package org.apache.cassandra.service;
 
+import java.util.Collections;
+import java.util.List;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.SnapshotCommand;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.DiagnosticSnapshotService;
 
+import static org.apache.cassandra.net.ParamType.SNAPSHOT_RANGES;
+
 public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
 {
     public static final SnapshotVerbHandler instance = new 
SnapshotVerbHandler();
@@ -41,7 +48,10 @@ public class SnapshotVerbHandler implements 
IVerbHandler<SnapshotCommand>
         }
         else if 
(DiagnosticSnapshotService.isDiagnosticSnapshotRequest(command))
         {
-            DiagnosticSnapshotService.snapshot(command, message.from());
+            List<Range<Token>> ranges = Collections.emptyList();
+            if (message.header.params().containsKey(SNAPSHOT_RANGES))
+                ranges = (List<Range<Token>>) 
message.header.params().get(SNAPSHOT_RANGES);
+            DiagnosticSnapshotService.snapshot(command, ranges, 
message.from());
         }
         else
         {
diff --git a/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java 
b/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
index ab2d67ec9f..29a9e5c2ba 100644
--- a/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
+++ b/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.utils;
 
 import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -29,6 +31,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -38,6 +43,7 @@ import org.apache.cassandra.schema.TableMetadata;
 
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.apache.cassandra.net.ParamType.SNAPSHOT_RANGES;
 
 /**
  * Provides a means to take snapshots when triggered by anomalous events or 
when the breaking of invariants is
@@ -68,6 +74,7 @@ public class DiagnosticSnapshotService
 
     public static final String REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX = 
"RepairedDataMismatch-";
     public static final String DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX = 
"DuplicateRows-";
+    private static final int MAX_SNAPSHOT_RANGE_COUNT = 100; // otherwise, 
snapshot everything
 
     private final Executor executor;
 
@@ -84,14 +91,19 @@ public class DiagnosticSnapshotService
     private static final DateTimeFormatter DATE_FORMAT = 
DateTimeFormatter.BASIC_ISO_DATE;
     private final ConcurrentHashMap<TableId, AtomicLong> lastSnapshotTimes = 
new ConcurrentHashMap<>();
 
-    public static void duplicateRows(TableMetadata metadata, 
Iterable<InetAddressAndPort> replicas)
+    public static void repairedDataMismatch(TableMetadata metadata, 
Iterable<InetAddressAndPort> replicas)
     {
-        instance.maybeTriggerSnapshot(metadata, 
DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX, replicas);
+        repairedDataMismatch(metadata, replicas, Collections.emptyList());
     }
 
-    public static void repairedDataMismatch(TableMetadata metadata, 
Iterable<InetAddressAndPort> replicas)
+    public static void repairedDataMismatch(TableMetadata metadata, 
Iterable<InetAddressAndPort> replicas, List<Range<Token>> ranges)
+    {
+        instance.maybeTriggerSnapshot(metadata, 
REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX, replicas, ranges);
+    }
+
+    public static void duplicateRows(TableMetadata metadata, 
Iterable<InetAddressAndPort> replicas)
     {
-        instance.maybeTriggerSnapshot(metadata, 
REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX, replicas);
+        instance.maybeTriggerSnapshot(metadata, 
DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX, replicas);
     }
 
     public static boolean isDiagnosticSnapshotRequest(SnapshotCommand command)
@@ -100,10 +112,10 @@ public class DiagnosticSnapshotService
             || 
command.snapshot_name.startsWith(DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX);
     }
 
-    public static void snapshot(SnapshotCommand command, InetAddressAndPort 
initiator)
+    public static void snapshot(SnapshotCommand command, List<Range<Token>> 
ranges, InetAddressAndPort initiator)
     {
         Preconditions.checkArgument(isDiagnosticSnapshotRequest(command));
-        instance.maybeSnapshot(command, initiator);
+        instance.maybeSnapshot(command, ranges, initiator);
     }
 
     public static String getSnapshotName(String prefix)
@@ -118,6 +130,11 @@ public class DiagnosticSnapshotService
     }
 
     private void maybeTriggerSnapshot(TableMetadata metadata, String prefix, 
Iterable<InetAddressAndPort> endpoints)
+    {
+        maybeTriggerSnapshot(metadata, prefix, endpoints, 
Collections.emptyList());
+    }
+
+    private void maybeTriggerSnapshot(TableMetadata metadata, String prefix, 
Iterable<InetAddressAndPort> endpoints, List<Range<Token>> ranges)
     {
         long now = nanoTime();
         AtomicLong cached = lastSnapshotTimes.computeIfAbsent(metadata.id, u 
-> new AtomicLong(0));
@@ -130,6 +147,9 @@ public class DiagnosticSnapshotService
                                                                            
metadata.name,
                                                                            
getSnapshotName(prefix),
                                                                            
false));
+
+            if (!ranges.isEmpty() && ranges.size() < MAX_SNAPSHOT_RANGE_COUNT)
+                msg = msg.withParam(SNAPSHOT_RANGES, ranges);
             for (InetAddressAndPort replica : endpoints)
                 MessagingService.instance().send(msg, replica);
         }
@@ -139,19 +159,21 @@ public class DiagnosticSnapshotService
         }
     }
 
-    private void maybeSnapshot(SnapshotCommand command, InetAddressAndPort 
initiator)
+    private void maybeSnapshot(SnapshotCommand command, List<Range<Token>> 
ranges, InetAddressAndPort initiator)
     {
-        executor.execute(new DiagnosticSnapshotTask(command, initiator));
+        executor.execute(new DiagnosticSnapshotTask(command, ranges, 
initiator));
     }
 
     private static class DiagnosticSnapshotTask implements Runnable
     {
         final SnapshotCommand command;
         final InetAddressAndPort from;
+        final List<Range<Token>> ranges;
 
-        DiagnosticSnapshotTask(SnapshotCommand command, InetAddressAndPort 
from)
+        DiagnosticSnapshotTask(SnapshotCommand command, List<Range<Token>> 
ranges, InetAddressAndPort from)
         {
             this.command = command;
+            this.ranges = ranges;
             this.from = from;
         }
 
@@ -185,7 +207,17 @@ public class DiagnosticSnapshotService
                             command.keyspace,
                             command.column_family,
                             command.snapshot_name);
-                cfs.snapshot(command.snapshot_name);
+
+                if (ranges.isEmpty())
+                    cfs.snapshot(command.snapshot_name);
+                else
+                {
+                    cfs.snapshot(command.snapshot_name,
+                                 (sstable) -> checkIntersection(ranges,
+                                                                
sstable.first.getToken(),
+                                                                
sstable.last.getToken()),
+                                 false, false);
+                }
             }
             catch (IllegalArgumentException e)
             {
@@ -196,4 +228,11 @@ public class DiagnosticSnapshotService
             }
         }
     }
+
+    private static boolean checkIntersection(List<Range<Token>> 
normalizedRanges, Token first, Token last)
+    {
+        Bounds<Token> bounds = new Bounds<>(first, last);
+        return normalizedRanges.stream().anyMatch(range -> 
range.intersects(bounds));
+    }
+
 }
diff --git a/src/java/org/apache/cassandra/utils/RangesSerializer.java 
b/src/java/org/apache/cassandra/utils/RangesSerializer.java
new file mode 100644
index 0000000000..5707503f6b
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/RangesSerializer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public class RangesSerializer implements 
IVersionedSerializer<Collection<Range<Token>>>
+{
+    public static final RangesSerializer serializer = new RangesSerializer();
+
+    @Override
+    public void serialize(Collection<Range<Token>> ranges, DataOutputPlus out, 
int version) throws IOException
+    {
+        out.writeInt(ranges.size());
+        for (Range<Token> r : ranges)
+        {
+            Token.serializer.serialize(r.left, out, version);
+            Token.serializer.serialize(r.right, out, version);
+        }
+    }
+
+    @Override
+    public Collection<Range<Token>> deserialize(DataInputPlus in, int version) 
throws IOException
+    {
+        int count = in.readInt();
+        List<Range<Token>> ranges = new ArrayList<>(count);
+        IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+        for (int i = 0; i < count; i++)
+        {
+            Token start = Token.serializer.deserialize(in, partitioner, 
version);
+            Token end = Token.serializer.deserialize(in, partitioner, version);
+            ranges.add(new Range<>(start, end));
+        }
+        return ranges;
+    }
+
+    @Override
+    public long serializedSize(Collection<Range<Token>> ranges, int version)
+    {
+        int size = TypeSizes.sizeof(ranges.size());
+        if (ranges.size() > 0)
+            size += ranges.size() * 2 * 
Token.serializer.serializedSize(ranges.iterator().next().left, version);
+        return size;
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairSnapshotTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairSnapshotTest.java
new file mode 100644
index 0000000000..e3679e9af7
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairSnapshotTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.emptyString;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class PreviewRepairSnapshotTest extends TestBaseImpl
+{
+    /**
+     * Makes sure we only snapshot sstables containing the mismatching token
+     * <p>
+     * 1. create 100 sstables per instance, compaction disabled, one token per 
sstable
+     * 2. make 3 tokens mismatching on node2, one token per sstable
+     * 3. run preview repair
+     * 4. make sure that only the sstables containing the token are in the 
snapshot
+     */
+    @Test
+    public void testSnapshotOfSStablesContainingMismatchingTokens() throws 
IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2).withConfig(config ->
+                                                                
config.set("snapshot_on_repaired_data_mismatch", true)
+                                                                      
.with(GOSSIP)
+                                                                      
.with(NETWORK)).start()))
+        {
+            Set<Integer> tokensToMismatch = Sets.newHashSet(1, 50, 99);
+            cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key) with compaction = {'class' : 'SizeTieredCompactionStrategy', 
'enabled':false }"));
+            // 1 token per sstable;
+            for (int i = 0; i < 100; i++)
+            {
+                cluster.coordinator(1).execute(withKeyspace("insert into 
%s.tbl (id) values (?)"), ConsistencyLevel.ALL, i);
+                cluster.stream().forEach(instance -> instance.flush(KEYSPACE));
+            }
+            cluster.stream().forEach(instance -> instance.flush(KEYSPACE));
+            for (int i = 1; i <= 2; i++)
+                markRepaired(cluster, i);
+
+            cluster.get(1)
+                   .nodetoolResult("repair", "-vd", "-pr", KEYSPACE, "tbl")
+                   .asserts()
+                   .success()
+                   .stdoutContains("Repaired data is in sync");
+
+            Set<Token> mismatchingTokens = new HashSet<>();
+            for (Integer token : tokensToMismatch)
+            {
+                cluster.get(2).executeInternal(withKeyspace("insert into 
%s.tbl (id) values (?)"), token);
+                cluster.get(2).flush(KEYSPACE);
+                Object[][] res = 
cluster.get(2).executeInternal(withKeyspace("select token(id) from %s.tbl where 
id = ?"), token);
+                mismatchingTokens.add(new Murmur3Partitioner.LongToken((long) 
res[0][0]));
+            }
+
+            markRepaired(cluster, 2);
+
+            cluster.get(1)
+                   .nodetoolResult("repair", "-vd", KEYSPACE, "tbl")
+                   .asserts()
+                   .success()
+                   .stdoutContains("Repaired data is inconsistent");
+
+            cluster.get(1).runOnInstance(checkSnapshot(mismatchingTokens, 3));
+            // node2 got the duplicate mismatch-tokens above, so it should 
exist in exactly 6 sstables
+            cluster.get(2).runOnInstance(checkSnapshot(mismatchingTokens, 6));
+        }
+    }
+
+    private IIsolatedExecutor.SerializableRunnable checkSnapshot(Set<Token> 
mismatchingTokens, int expectedSnapshotSize)
+    {
+        return () -> {
+            ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+
+            String snapshotTag = await().atMost(1, MINUTES)
+                                        .pollInterval(100, MILLISECONDS)
+                                        .until(() -> {
+                                            for (String tag : 
cfs.listSnapshots().keySet())
+                                            {
+                                                // we create the snapshot 
schema file last, so when this exists we know the snapshot is complete;
+                                                if 
(cfs.getDirectories().getSnapshotSchemaFile(tag).exists())
+                                                    return tag;
+                                            }
+
+                                            return "";
+                                        }, not(emptyString()));
+
+            Set<SSTableReader> inSnapshot = new HashSet<>();
+
+            try (Refs<SSTableReader> sstables = 
cfs.getSnapshotSSTableReaders(snapshotTag))
+            {
+                inSnapshot.addAll(sstables);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            assertEquals(expectedSnapshotSize, inSnapshot.size());
+
+            for (SSTableReader sstable : cfs.getLiveSSTables())
+            {
+                Bounds<Token> sstableBounds = new 
Bounds<>(sstable.first.getToken(), sstable.last.getToken());
+                boolean shouldBeInSnapshot = false;
+                for (Token mismatchingToken : mismatchingTokens)
+                {
+                    if (sstableBounds.contains(mismatchingToken))
+                    {
+                        assertFalse(shouldBeInSnapshot);
+                        shouldBeInSnapshot = true;
+                    }
+                }
+                assertEquals(shouldBeInSnapshot, inSnapshot.contains(sstable));
+            }
+        };
+    }
+
+    private void markRepaired(Cluster cluster, int instance)
+    {
+        cluster.get(instance).runOnInstance(() -> {
+            ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+            for (SSTableReader sstable : cfs.getLiveSSTables())
+            {
+                try
+                {
+                    
sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor,
+                                                                               
     System.currentTimeMillis(),
+                                                                               
     null,
+                                                                               
     false);
+                    sstable.reloadSSTableMetadata();
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+    }
+}
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java 
b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 47d50a244b..4409b3cdef 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -98,7 +98,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
                                                NO_PENDING_REPAIR, true, true, 
PreviewKind.NONE);
         task.run();
 
-        assertEquals(0, task.get().numberOfDifferences);
+        assertTrue(task.stat.differences.isEmpty());
     }
 
     @Test
@@ -145,7 +145,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         }
 
         // ensure that the changed range was recorded
-        assertEquals("Wrong differing ranges", interesting.size(), 
task.stat.numberOfDifferences);
+        assertEquals("Wrong differing ranges", interesting.size(), 
task.stat.differences.size());
     }
 
     @Test
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java 
b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
index c7c68c4689..03ac2edc60 100644
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -305,8 +305,8 @@ public class RepairJobTest
 
         assertThat(results)
             .hasSize(2)
-            .extracting(s -> s.numberOfDifferences)
-            .containsOnly(1L);
+            .extracting(s -> s.differences.size())
+            .containsOnly(1);
 
         assertThat(messages)
             .hasSize(2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to