This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 219d209  Include finalized pending sstables in preview repair
219d209 is described below

commit 219d209651759cf702519a100c4f43595f7be8d7
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Wed Feb 5 12:51:47 2020 +0100

    Include finalized pending sstables in preview repair
    
    Patch by marcuse; reviewed by Blake Eggleston and David Capwell for 
CASSANDRA-15553
---
 CHANGES.txt                                        |   1 +
 .../db/compaction/PendingRepairManager.java        |   2 +-
 .../db/repair/CassandraValidationIterator.java     |  24 +-
 .../db/streaming/CassandraStreamManager.java       |  17 +-
 .../org/apache/cassandra/repair/RepairSession.java |  43 +++-
 .../cassandra/repair/consistent/LocalSessions.java |  23 +-
 .../cassandra/service/ActiveRepairService.java     |   5 +-
 .../apache/cassandra/streaming/PreviewKind.java    |  49 +++-
 .../cassandra/distributed/impl/Instance.java       |   2 +
 .../distributed/test/PreviewRepairTest.java        | 281 +++++++++++++++++++++
 10 files changed, 396 insertions(+), 51 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 19906d3..9cd6040 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha4
+ * Include finalized pending sstables in preview repair (CASSANDRA-15553)
  * Reverted to the original behavior of CLUSTERING ORDER on CREATE TABLE 
(CASSANDRA-15271)
  * Correct inaccurate logging message (CASSANDRA-15549)
  * Add documentation of dynamo (CASSANDRA-15486)
diff --git 
a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java 
b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index b2d70f7..78d4483 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -449,7 +449,7 @@ class PendingRepairManager
                 }
                 else
                 {
-                    logger.debug("Setting repairedAt to {} on {} for {}", 
repairedAt, transaction.originals(), sessionID);
+                    logger.info("Moving {} from pending to repaired with 
repaired at = {} and session id = {}", transaction.originals(), repairedAt, 
sessionID);
                     
cfs.getCompactionStrategyManager().mutateRepaired(transaction.originals(), 
repairedAt, ActiveRepairService.NO_PENDING_REPAIR, false);
                 }
                 completed = true;
diff --git 
a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java 
b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
index d653f6c..4eea678 100644
--- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
+++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
@@ -30,7 +30,6 @@ import java.util.function.LongPredicate;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
 import com.google.common.collect.Maps;
 
 import org.slf4j.Logger;
@@ -39,10 +38,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.db.compaction.ActiveCompactions;
 import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
 import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 import org.apache.cassandra.db.compaction.CompactionIterator;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
@@ -54,14 +51,10 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.ValidationPartitionIterator;
-import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -115,21 +108,6 @@ public class CassandraValidationIterator extends 
ValidationPartitionIterator
         }
     }
 
-    private static Predicate<SSTableReader> getPreviewPredicate(PreviewKind 
previewKind)
-    {
-        switch (previewKind)
-        {
-            case ALL:
-                return (s) -> true;
-            case REPAIRED:
-                return (s) -> s.isRepaired();
-            case UNREPAIRED:
-                return (s) -> !s.isRepaired();
-            default:
-                throw new RuntimeException("Can't get preview predicate for 
preview kind " + previewKind);
-        }
-    }
-
     @VisibleForTesting
     static synchronized Refs<SSTableReader> 
getSSTablesToValidate(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, 
UUID parentId, boolean isIncremental)
     {
@@ -147,7 +125,7 @@ public class CassandraValidationIterator extends 
ValidationPartitionIterator
         com.google.common.base.Predicate<SSTableReader> predicate;
         if (prs.isPreview())
         {
-            predicate = getPreviewPredicate(prs.previewKind);
+            predicate = prs.previewKind.predicate();
 
         }
         else if (isIncremental)
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
index b88a5d6..a84fd27 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
@@ -81,21 +81,6 @@ public class CassandraStreamManager implements 
TableStreamManager
         return new CassandraStreamReceiver(cfs, session, totalStreams);
     }
 
-    private static Predicate<SSTableReader> getPreviewPredicate(PreviewKind 
kind)
-    {
-        switch (kind)
-        {
-            case ALL:
-                return Predicates.alwaysTrue();
-            case UNREPAIRED:
-                return Predicates.not(SSTableReader::isRepaired);
-            case REPAIRED:
-                return SSTableReader::isRepaired;
-            default:
-                throw new IllegalArgumentException("Unsupported kind: " + 
kind);
-        }
-    }
-
     @Override
     public Collection<OutgoingStream> createOutgoingStreams(StreamSession 
session, RangesAtEndpoint replicas, UUID pendingRepair, PreviewKind previewKind)
     {
@@ -111,7 +96,7 @@ public class CassandraStreamManager implements 
TableStreamManager
                 Predicate<SSTableReader> predicate;
                 if (previewKind.isPreview())
                 {
-                    predicate = getPreviewPredicate(previewKind);
+                    predicate = previewKind.predicate();
                 }
                 else if (pendingRepair == 
ActiveRepairService.NO_PENDING_REPAIR)
                 {
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index 3483e59..95a6e57 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -31,10 +31,16 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.consistent.ConsistentSession;
+import org.apache.cassandra.repair.consistent.LocalSession;
+import org.apache.cassandra.repair.consistent.LocalSessions;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.SessionSummary;
 import org.apache.cassandra.tracing.Tracing;
@@ -78,7 +84,8 @@ import org.apache.cassandra.utils.Pair;
  * all of them in parallel otherwise.
  */
 public class RepairSession extends AbstractFuture<RepairSessionResult> 
implements IEndpointStateChangeSubscriber,
-                                                                               
  IFailureDetectionEventListener
+                                                                               
   IFailureDetectionEventListener,
+                                                                               
   LocalSessions.Listener
 {
     private static Logger logger = 
LoggerFactory.getLogger(RepairSession.class);
 
@@ -401,4 +408,38 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         // If a node failed, we stop everything (though there could still be 
some activity in the background)
         forceShutdown(exception);
     }
+
+    public void onIRStateChange(LocalSession session)
+    {
+        // we should only be registered as listeners for PreviewKind.REPAIRED, 
but double check here
+        if (previewKind == PreviewKind.REPAIRED &&
+            session.getState() == ConsistentSession.State.FINALIZED &&
+            includesTables(session.tableIds))
+        {
+            for (Range<Token> range : session.ranges)
+            {
+                if (range.intersects(ranges()))
+                {
+                    logger.error("{} An intersecting incremental repair with 
session id = {} finished, preview repair might not be accurate", 
previewKind.logPrefix(getId()), session.sessionID);
+                    forceShutdown(new Exception("An incremental repair with 
session id "+session.sessionID+" finished during this preview repair runtime"));
+                    return;
+                }
+            }
+        }
+    }
+
+    private boolean includesTables(Set<TableId> tableIds)
+    {
+        Keyspace ks = Keyspace.open(keyspace);
+        if (ks != null)
+        {
+            for (String table : cfnames)
+            {
+                ColumnFamilyStore cfs = ks.getColumnFamilyStore(table);
+                if (tableIds.contains(cfs.metadata.id))
+                    return true;
+            }
+        }
+        return false;
+    }
 }
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java 
b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 6475794..fa224d1 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -106,6 +107,7 @@ import static 
org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
 public class LocalSessions
 {
     private static final Logger logger = 
LoggerFactory.getLogger(LocalSessions.class);
+    private static final Set<Listener> listeners = new CopyOnWriteArraySet<>();
 
     /**
      * Amount of time a session can go without any activity before we start 
checking the status of other
@@ -441,7 +443,7 @@ public class LocalSessions
         return new LocalSession(builder);
     }
 
-    protected LocalSession getSession(UUID sessionID)
+    public LocalSession getSession(UUID sessionID)
     {
         return sessions.get(sessionID);
     }
@@ -520,6 +522,8 @@ public class LocalSessions
             {
                 sessionCompleted(session);
             }
+            for (Listener listener : listeners)
+                listener.onIRStateChange(session);
         }
     }
 
@@ -777,7 +781,7 @@ public class LocalSessions
         LocalSession session = getSession(sessionID);
         if (session == null)
         {
-            logger.warn("Received status response message for unknown session 
{}", sessionID);
+            logger.warn("Received status request message for unknown session 
{}", sessionID);
             sendMessage(from, Message.out(STATUS_RSP, new 
StatusResponse(sessionID, FAILED)));
         }
         else
@@ -868,4 +872,19 @@ public class LocalSessions
             throw new IllegalStateException("Cannot get final repaired at 
value for in progress session: " + session);
         }
     }
+
+    public static void registerListener(Listener listener)
+    {
+        listeners.add(listener);
+    }
+
+    public static void unregisterListener(Listener listener)
+    {
+        listeners.remove(listener);
+    }
+
+    public interface Listener
+    {
+        void onIRStateChange(LocalSession session);
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 6f4c474..7499c36 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -70,7 +70,6 @@ import 
org.apache.cassandra.repair.consistent.CoordinatorSessions;
 import org.apache.cassandra.repair.consistent.LocalSessions;
 import org.apache.cassandra.repair.messages.*;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.Pair;
@@ -230,6 +229,9 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         // register listeners
         registerOnFdAndGossip(session);
 
+        if (session.previewKind == PreviewKind.REPAIRED)
+            LocalSessions.registerListener(session);
+
         // remove session at completion
         session.addListener(new Runnable()
         {
@@ -239,6 +241,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
             public void run()
             {
                 sessions.remove(session.getId());
+                LocalSessions.unregisterListener(session);
             }
         }, MoreExecutors.directExecutor());
         session.start(executor);
diff --git a/src/java/org/apache/cassandra/streaming/PreviewKind.java 
b/src/java/org/apache/cassandra/streaming/PreviewKind.java
index 51c5746..b5467de 100644
--- a/src/java/org/apache/cassandra/streaming/PreviewKind.java
+++ b/src/java/org/apache/cassandra/streaming/PreviewKind.java
@@ -18,22 +18,34 @@
 
 package org.apache.cassandra.streaming;
 
-
 import java.util.UUID;
 
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.repair.consistent.ConsistentSession;
+import org.apache.cassandra.repair.consistent.LocalSession;
+import org.apache.cassandra.service.ActiveRepairService;
+
 public enum PreviewKind
 {
-    NONE(0),
-    ALL(1),
-    UNREPAIRED(2),
-    REPAIRED(3);
+    NONE(0, (sstable) -> {
+        throw new RuntimeException("Can't get preview predicate for preview 
kind NONE");
+    }),
+    ALL(1, Predicates.alwaysTrue()),
+    UNREPAIRED(2, sstable -> !sstable.isRepaired()),
+    REPAIRED(3, new PreviewRepairedSSTablePredicate());
 
     private final int serializationVal;
+    private final Predicate<SSTableReader> predicate;
 
-    PreviewKind(int serializationVal)
+    PreviewKind(int serializationVal, Predicate<SSTableReader> predicate)
     {
         assert ordinal() == serializationVal;
         this.serializationVal = serializationVal;
+        this.predicate = predicate;
     }
 
     public int getSerializationVal()
@@ -46,7 +58,6 @@ public enum PreviewKind
         return values()[serializationVal];
     }
 
-
     public boolean isPreview()
     {
         return this != NONE;
@@ -62,4 +73,28 @@ public enum PreviewKind
         return '[' + logPrefix() + " #" + sessionId.toString() + ']';
     }
 
+    public Predicate<SSTableReader> predicate()
+    {
+        return predicate;
+    }
+
+    private static class PreviewRepairedSSTablePredicate implements 
Predicate<SSTableReader>
+    {
+        public boolean apply(SSTableReader sstable)
+        {
+            // grab the metadata before checking pendingRepair since this can 
be nulled out at any time
+            StatsMetadata sstableMetadata = sstable.getSSTableMetadata();
+            if (sstableMetadata.pendingRepair != null)
+            {
+                LocalSession session = 
ActiveRepairService.instance.consistent.local.getSession(sstableMetadata.pendingRepair);
+                if (session == null)
+                    return false;
+                else if (session.getState() == 
ConsistentSession.State.FINALIZED)
+                    return true;
+                else if (session.getState() != ConsistentSession.State.FAILED)
+                    throw new IllegalStateException(String.format("SSTable %s 
is marked pending for non-finalized incremental repair session %s, failing 
preview repair", sstable, sstableMetadata.pendingRepair));
+            }
+            return sstableMetadata.repairedAt != 
ActiveRepairService.UNREPAIRED_SSTABLE;
+        }
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index b8ef25d..1beb708 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -401,6 +401,8 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
 
                 if 
(!FBUtilities.getBroadcastAddressAndPort().equals(broadcastAddressAndPort()))
                     throw new IllegalStateException();
+
+                ActiveRepairService.instance.start();
             }
             catch (Throwable t)
             {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
new file mode 100644
index 0000000..ed29a30
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.repair.RepairParallelism;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class PreviewRepairTest extends DistributedTestBase
+{
+    /**
+     * makes sure that the repaired sstables are not matching on the two
+     * nodes by disabling autocompaction on node2 and then running an
+     * incremental repair
+     */
+    @Test
+    public void testWithMismatchingPending() throws Throwable
+    {
+        try(Cluster cluster = init(Cluster.build(2).withConfig(config -> 
config.with(GOSSIP).with(NETWORK)).start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int 
primary key, t int)");
+            insert(cluster.coordinator(1), 0, 100);
+            cluster.forEach((node) -> node.flush(KEYSPACE));
+            cluster.get(1).callOnInstance(repair(options(false)));
+            insert(cluster.coordinator(1), 100, 100);
+            cluster.forEach((node) -> node.flush(KEYSPACE));
+
+            // make sure that all sstables have moved to repaired by 
triggering a compaction
+            // also disables autocompaction on the nodes
+            cluster.forEach((node) -> node.runOnInstance(() -> {
+                ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+                
FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
+                cfs.disableAutoCompaction();
+            }));
+            cluster.get(1).callOnInstance(repair(options(false)));
+            // now re-enable autocompaction on node1, this moves the sstables 
for the new repair to repaired
+            cluster.get(1).runOnInstance(() -> {
+                ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+                cfs.enableAutoCompaction();
+                
FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
+            });
+            Pair<Boolean, Boolean> rs = 
cluster.get(1).callOnInstance(repair(options(true)));
+            assertTrue(rs.left); // preview repair should succeed
+            assertFalse(rs.right); // and we should see no mismatches
+        }
+    }
+
+    /**
+     * another case where the repaired datasets could mismatch is if an 
incremental repair finishes just as the preview
+     * repair is starting up.
+     *
+     * This tests this case:
+     * 1. we start a preview repair
+     * 2. pause the validation requests from node1 -> node2
+     * 3. node1 starts its validation
+     * 4. run an incremental repair which completes fine
+     * 5. node2 resumes its validation
+     *
+     * Now we will include sstables from the second incremental repair on 
node2 but not on node1
+     * This should fail since we fail any preview repair which is ongoing when 
an incremental repair finishes (step 4 above)
+     */
+    @Test
+    public void testFinishingIncRepairDuringPreview() throws IOException, 
InterruptedException, ExecutionException
+    {
+        ExecutorService es = Executors.newSingleThreadExecutor();
+        try(Cluster cluster = init(Cluster.build(2).withConfig(config -> 
config.with(GOSSIP).with(NETWORK)).start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int 
primary key, t int)");
+
+            insert(cluster.coordinator(1), 0, 100);
+            cluster.forEach((node) -> node.flush(KEYSPACE));
+            cluster.get(1).callOnInstance(repair(options(false)));
+
+            insert(cluster.coordinator(1), 100, 100);
+            cluster.forEach((node) -> node.flush(KEYSPACE));
+
+            SimpleCondition continuePreviewRepair = new SimpleCondition();
+            DelayMessageFilter filter = new 
DelayMessageFilter(continuePreviewRepair);
+            // this pauses the validation request sent from node1 to node2 
until we have run a full inc repair below
+            
cluster.filters().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
+
+            Future<Pair<Boolean, Boolean>> rsFuture = es.submit(() -> 
cluster.get(1).callOnInstance(repair(options(true))));
+            Thread.sleep(1000);
+            // this needs to finish before the preview repair is unpaused on 
node2
+            cluster.get(1).callOnInstance(repair(options(false)));
+            continuePreviewRepair.signalAll();
+            Pair<Boolean, Boolean> rs = rsFuture.get();
+            assertFalse(rs.left); // preview repair should have failed
+            assertFalse(rs.right); // and no mismatches should have been 
reported
+        }
+        finally
+        {
+            es.shutdown();
+        }
+    }
+
+    /**
+     * Same as testFinishingIncRepairDuringPreview but the previewed range 
does not intersect the incremental repair
+     * so both preview and incremental repair should finish fine (without any 
mismatches)
+     */
+    @Test
+    public void testFinishingNonIntersectingIncRepairDuringPreview() throws 
IOException, InterruptedException, ExecutionException
+    {
+        ExecutorService es = Executors.newSingleThreadExecutor();
+        try(Cluster cluster = init(Cluster.build(2).withConfig(config -> 
config.with(GOSSIP).with(NETWORK)).start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int 
primary key, t int)");
+
+            insert(cluster.coordinator(1), 0, 100);
+            cluster.forEach((node) -> node.flush(KEYSPACE));
+            
assertTrue(cluster.get(1).callOnInstance(repair(options(false))).left);
+
+            insert(cluster.coordinator(1), 100, 100);
+            cluster.forEach((node) -> node.flush(KEYSPACE));
+
+            // pause preview repair validation messages on node2 until node1 
has finished
+            SimpleCondition continuePreviewRepair = new SimpleCondition();
+            DelayMessageFilter filter = new 
DelayMessageFilter(continuePreviewRepair);
+            
cluster.filters().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
+
+            // get local ranges to repair two separate ranges:
+            List<String> localRanges = cluster.get(1).callOnInstance(() -> {
+                List<String> res = new ArrayList<>();
+                for (Range<Token> r : 
StorageService.instance.getLocalReplicas(KEYSPACE).ranges())
+                    res.add(r.left.getTokenValue()+ ":"+ 
r.right.getTokenValue());
+                return res;
+            });
+
+            assertEquals(2, localRanges.size());
+            Future<Pair<Boolean, Boolean>> repairStatusFuture = es.submit(() 
-> cluster.get(1).callOnInstance(repair(options(true, localRanges.get(0)))));
+            Thread.sleep(1000); // wait for node1 to start validation 
compaction
+            // this needs to finish before the preview repair is unpaused on 
node2
+            assertTrue(cluster.get(1).callOnInstance(repair(options(false, 
localRanges.get(1)))).left);
+
+            continuePreviewRepair.signalAll();
+            Pair<Boolean, Boolean> rs = repairStatusFuture.get();
+            assertTrue(rs.left); // repair should succeed
+            assertFalse(rs.right); // and no mismatches
+        }
+        finally
+        {
+            es.shutdown();
+        }
+    }
+
+    private static class DelayMessageFilter implements IMessageFilters.Matcher
+    {
+        private final SimpleCondition condition;
+        private final AtomicBoolean waitForRepair = new AtomicBoolean(true);
+
+        public DelayMessageFilter(SimpleCondition condition)
+        {
+            this.condition = condition;
+        }
+        public boolean matches(int from, int to, IMessage message)
+        {
+            try
+            {
+                // only the first validation req should be delayed:
+                if (waitForRepair.compareAndSet(true, false))
+                    condition.await();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+            return false; // don't drop the message
+        }
+    }
+
+    private static void insert(ICoordinator coordinator, int start, int count)
+    {
+        for (int i = start; i < start + count; i++)
+            coordinator.execute("insert into " + KEYSPACE + ".tbl (id, t) 
values (?, ?)", ConsistencyLevel.ALL, i, i);
+    }
+
+    /**
+     * returns a pair with [repair success, was inconsistent]
+     */
+    private static IIsolatedExecutor.SerializableCallable<Pair<Boolean, 
Boolean>> repair(Map<String, String> options)
+    {
+        return () -> {
+            SimpleCondition await = new SimpleCondition();
+            AtomicBoolean success = new AtomicBoolean(true);
+            AtomicBoolean wasInconsistent = new AtomicBoolean(false);
+            StorageService.instance.repair(KEYSPACE, options, 
ImmutableList.of((tag, event) -> {
+                if (event.getType() == ProgressEventType.ERROR)
+                {
+                    success.set(false);
+                    await.signalAll();
+                }
+                else if (event.getType() == ProgressEventType.NOTIFICATION && 
event.getMessage().contains("Repaired data is inconsistent"))
+                {
+                    wasInconsistent.set(true);
+                }
+                else if (event.getType() == ProgressEventType.COMPLETE)
+                    await.signalAll();
+            }));
+            try
+            {
+                await.await(1, TimeUnit.MINUTES);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            return Pair.create(success.get(), wasInconsistent.get());
+        };
+    }
+
+    private static Map<String, String> options(boolean preview)
+    {
+        Map<String, String> config = new HashMap<>();
+        config.put(RepairOption.INCREMENTAL_KEY, "true");
+        config.put(RepairOption.PARALLELISM_KEY, 
RepairParallelism.PARALLEL.toString());
+        if (preview)
+            config.put(RepairOption.PREVIEW, PreviewKind.REPAIRED.toString());
+        return config;
+    }
+
+    private static Map<String, String> options(boolean preview, String range)
+    {
+        Map<String, String> options = options(preview);
+        options.put(RepairOption.RANGES_KEY, range);
+        return options;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to