This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 1b1f88e38a363b7937e8472b6863f9168b687659
Merge: a9da19c311 7c79d91b6f
Author: ci worker <dcapw...@apache.org>
AuthorDate: Wed May 1 14:34:24 2024 -0700

    Merge branch 'cassandra-4.1' into cassandra-5.0

 CHANGES.txt                                        |  1 +
 .../db/compaction/AbstractStrategyHolder.java      |  1 +
 .../db/compaction/CompactionStrategyHolder.java    |  6 ++++
 .../db/compaction/CompactionStrategyManager.java   | 18 ++++++++++-
 .../db/compaction/PendingRepairHolder.java         | 12 ++++++++
 .../db/compaction/PendingRepairManager.java        | 11 ++++++-
 .../org/apache/cassandra/db/lifecycle/Tracker.java |  2 ++
 .../apache/cassandra/repair/RepairCoordinator.java | 15 +++++++++
 .../cassandra/repair/RepairMessageVerbHandler.java |  3 ++
 .../distributed/test/DistributedRepairUtils.java   | 34 ++++++++++++++++++++
 .../distributed/test/RepairCoordinatorFast.java    |  3 ++
 .../test/RepairCoordinatorNeighbourDown.java       |  3 ++
 .../repair/ConcurrentIrWithPreviewFuzzTest.java    |  2 +-
 .../cassandra/repair/FailingRepairFuzzTest.java    |  1 +
 .../org/apache/cassandra/repair/FuzzTestBase.java  | 36 ++++++++++++++++++++--
 15 files changed, 143 insertions(+), 5 deletions(-)

diff --cc CHANGES.txt
index 7e18e40c97,17257d606a..ac41c7c180
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -38,9 -4,8 +38,10 @@@ Merged from 4.1
   * Fix hints delivery for a node going down repeatedly (CASSANDRA-19495)
   * Do not go to disk for reading hints file sizes (CASSANDRA-19477)
   * Fix system_views.settings to handle array types (CASSANDRA-19475)
 + * Memoize Cassandra verion and add a backoff interval for failed schema 
pulls (CASSANDRA-18902)
 + * Fix StackOverflowError on ALTER after many previous schema changes 
(CASSANDRA-19166)
  Merged from 4.0:
+  * IR may leak SSTables with pending repair when coming from streaming 
(CASSANDRA-19182)
   * Streaming exception race creates corrupt transaction log files that 
prevent restart (CASSANDRA-18736)
   * Fix CQL tojson timestamp output on negative timestamp values before 
Gregorian calendar reform in 1582 (CASSANDRA-19566)
   * Fix few types issues and implement types compatibility tests 
(CASSANDRA-19479)
diff --cc src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
index 534bf3ae82,86c40e8958..0c5d53c1d8
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
@@@ -285,12 -289,8 +292,17 @@@ public class PendingRepairHolder extend
          return Iterables.any(managers, prm -> prm.containsSSTable(sstable));
      }
  
 +    @Override
 +    public int getEstimatedRemainingTasks()
 +    {
 +        int tasks = 0;
 +        for (PendingRepairManager manager : managers)
 +            tasks += manager.getEstimatedRemainingTasks();
 +        return tasks;
 +    }
++
+     public boolean hasPendingRepairSSTable(TimeUUID sessionID, SSTableReader 
sstable)
+     {
+         return Iterables.any(managers, prm -> 
prm.hasPendingRepairSSTable(sessionID, sstable));
+     }
  }
diff --cc src/java/org/apache/cassandra/repair/RepairCoordinator.java
index 27dd3a73b5,0000000000..82664f2d3c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/RepairCoordinator.java
+++ b/src/java/org/apache/cassandra/repair/RepairCoordinator.java
@@@ -1,635 -1,0 +1,650 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.repair;
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.function.BiFunction;
 +import java.util.function.Function;
 +import java.util.function.Supplier;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.ImmutableMap;
 +import com.google.common.collect.ImmutableSet;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.Sets;
 +
 +import org.apache.cassandra.locator.RangesAtEndpoint;
++import org.apache.cassandra.net.Verb;
++import org.apache.cassandra.repair.messages.FailSession;
++import org.apache.cassandra.repair.messages.RepairMessage;
++import org.apache.cassandra.repair.state.ParticipateState;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.Future;
 +import org.apache.commons.lang3.time.DurationFormatUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.codahale.metrics.Timer;
 +import org.apache.cassandra.concurrent.ExecutorPlus;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.cql3.statements.SelectStatement;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.RepairException;
 +import org.apache.cassandra.locator.EndpointsForRange;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.repair.state.CoordinatorState;
 +import org.apache.cassandra.schema.SchemaConstants;
 +import org.apache.cassandra.schema.SystemDistributedKeyspace;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
 +import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.tracing.TraceKeyspace;
 +import org.apache.cassandra.tracing.TraceState;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.transport.messages.ResultMessage;
 +import org.apache.cassandra.utils.progress.ProgressEvent;
 +import org.apache.cassandra.utils.progress.ProgressEventNotifier;
 +import org.apache.cassandra.utils.progress.ProgressEventType;
 +import org.apache.cassandra.utils.progress.ProgressListener;
 +
 +import static org.apache.cassandra.repair.state.AbstractState.COMPLETE;
 +import static org.apache.cassandra.repair.state.AbstractState.INIT;
 +import static org.apache.cassandra.service.QueryState.forInternalCalls;
 +
 +public class RepairCoordinator implements Runnable, ProgressEventNotifier, 
RepairNotifier
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(RepairCoordinator.class);
 +
 +    private static final AtomicInteger THREAD_COUNTER = new AtomicInteger(1);
 +
 +    public final CoordinatorState state;
 +    private final String tag;
 +    private final BiFunction<String, String[], Iterable<ColumnFamilyStore>> 
validColumnFamilies;
 +    private final Function<String, RangesAtEndpoint> getLocalReplicas;
 +
 +    private final List<ProgressListener> listeners = new ArrayList<>();
 +    private final AtomicReference<Throwable> firstError = new 
AtomicReference<>(null);
 +    final SharedContext ctx;
 +    final Scheduler validationScheduler;
 +
 +    private TraceState traceState;
 +
 +    public RepairCoordinator(StorageService storageService, int cmd, 
RepairOption options, String keyspace)
 +    {
 +        this(SharedContext.Global.instance,
 +                (ks, tables) -> storageService.getValidColumnFamilies(false, 
false, ks, tables),
 +                storageService::getLocalReplicas,
 +                cmd, options, keyspace);
 +    }
 +
 +    RepairCoordinator(SharedContext ctx,
 +                      BiFunction<String, String[], 
Iterable<ColumnFamilyStore>> validColumnFamilies,
 +                      Function<String, RangesAtEndpoint> getLocalReplicas,
 +                      int cmd, RepairOption options, String keyspace)
 +    {
 +        this.ctx = ctx;
 +        this.validationScheduler = 
Scheduler.build(DatabaseDescriptor.getConcurrentMerkleTreeRequests());
 +        this.state = new CoordinatorState(ctx.clock(), cmd, keyspace, 
options);
 +        this.tag = "repair:" + cmd;
 +        this.validColumnFamilies = validColumnFamilies;
 +        this.getLocalReplicas = getLocalReplicas;
 +        ctx.repair().register(state);
 +    }
 +
 +    @Override
 +    public void addProgressListener(ProgressListener listener)
 +    {
 +        listeners.add(listener);
 +    }
 +
 +    @Override
 +    public void removeProgressListener(ProgressListener listener)
 +    {
 +        listeners.remove(listener);
 +    }
 +
 +
 +    protected void fireProgressEvent(ProgressEvent event)
 +    {
 +        for (ProgressListener listener : listeners)
 +        {
 +            listener.progress(tag, event);
 +        }
 +    }
 +
 +    @Override
 +    public void notification(String msg)
 +    {
 +        logger.info(msg);
 +        fireProgressEvent(jmxEvent(ProgressEventType.NOTIFICATION, msg));
 +    }
 +
 +    @Override
 +    public void notifyError(Throwable error)
 +    {
 +        // exception should be ignored
 +        if (error instanceof SomeRepairFailedException)
 +            return;
 +
 +        if (Throwables.anyCauseMatches(error, RepairException::shouldWarn))
 +        {
 +            logger.warn("Repair {} aborted: {}", state.id, 
error.getMessage());
 +            if (logger.isDebugEnabled())
 +                logger.debug("Repair {} aborted: ", state.id, error);
 +        }
 +        else
 +        {
 +            logger.error("Repair {} failed:", state.id, error);
 +        }
 +
 +        StorageMetrics.repairExceptions.inc();
 +        String errorMessage = String.format("Repair command #%d failed with 
error %s", state.cmd, error.getMessage());
 +        fireProgressEvent(jmxEvent(ProgressEventType.ERROR, errorMessage));
 +        firstError.compareAndSet(null, error);
 +
 +        // since this can fail, update table only after updating in-memory 
and notification state
 +        maybeStoreParentRepairFailure(error);
 +    }
 +
 +    @Override
 +    public void notifyProgress(String message)
 +    {
 +        logger.info(message);
 +        fireProgressEvent(jmxEvent(ProgressEventType.PROGRESS, message));
 +    }
 +
 +    private void skip(String msg)
 +    {
 +        state.phase.skip(msg);
 +        notification("Repair " + state.id + " skipped: " + msg);
 +        success(msg);
 +    }
 +
 +    private void success(String msg)
 +    {
 +        state.phase.success(msg);
 +        fireProgressEvent(jmxEvent(ProgressEventType.SUCCESS, msg));
 +        ctx.repair().recordRepairStatus(state.cmd, 
ActiveRepairService.ParentRepairStatus.COMPLETED,
 +                                                        
ImmutableList.of(msg));
 +        complete(null);
 +    }
 +
 +    private void fail(String reason)
 +    {
 +        if (reason == null)
 +        {
 +            Throwable error = firstError.get();
 +            reason = error != null ? error.toString() : "Some repair failed";
 +        }
 +        state.phase.fail(reason);
++        ParticipateState p = ctx.repair().participate(state.id);
++        if (p != null)
++            p.phase.fail(reason);
++        NeighborsAndRanges neighborsAndRanges = state.getNeighborsAndRanges();
++        // this is possible if the failure happened during input processing, 
in which case no particpates have been notified
++        if (neighborsAndRanges != null)
++        {
++            FailSession msg = new FailSession(state.id);
++            for (InetAddressAndPort participate : 
neighborsAndRanges.participants)
++                RepairMessage.sendMessageWithRetries(ctx, msg, 
Verb.FAILED_SESSION_MSG, participate);
++        }
 +        String completionMessage = String.format("Repair command #%d finished 
with error", state.cmd);
 +
 +        // Note we rely on the first message being the reason for the failure
 +        // when inspecting this state from 
RepairRunner.queryForCompletedRepair
 +        ctx.repair().recordRepairStatus(state.cmd, ParentRepairStatus.FAILED,
 +                                                        
ImmutableList.of(reason, completionMessage));
 +
 +        complete(completionMessage);
 +    }
 +
 +    private void complete(String msg)
 +    {
 +        long durationMillis = state.getDurationMillis();
 +        if (msg == null)
 +        {
 +            String duration = 
DurationFormatUtils.formatDurationWords(durationMillis, true, true);
 +            msg = String.format("Repair command #%d finished in %s", 
state.cmd, duration);
 +        }
 +
 +        fireProgressEvent(jmxEvent(ProgressEventType.COMPLETE, msg));
 +        logger.info(state.options.getPreviewKind().logPrefix(state.id) + msg);
 +
 +        ctx.repair().removeParentRepairSession(state.id);
 +        TraceState localState = traceState;
 +        if (state.options.isTraced() && localState != null)
 +        {
 +            for (ProgressListener listener : listeners)
 +                localState.removeProgressListener(listener);
 +            // Because ExecutorPlus#afterExecute and this callback
 +            // run in a nondeterministic order (within the same thread), the
 +            // TraceState may have been nulled out at this point. The 
TraceState
 +            // should be traceState, so just set it without bothering to 
check if it
 +            // actually was nulled out.
 +            Tracing.instance.set(localState);
 +            Tracing.traceRepair(msg);
 +            Tracing.instance.stopSession();
 +        }
 +
 +        
Keyspace.open(state.keyspace).metric.repairTime.update(durationMillis, 
TimeUnit.MILLISECONDS);
 +    }
 +
 +    public void run()
 +    {
 +        try
 +        {
 +            runMayThrow();
 +        }
 +        catch (SkipRepairException e)
 +        {
 +            skip(e.getMessage());
 +        }
 +        catch (Throwable e)
 +        {
 +            notifyError(e);
 +            fail(e.getMessage());
 +        }
 +    }
 +
 +    private void runMayThrow() throws Throwable
 +    {
 +        state.phase.setup();
 +        ctx.repair().recordRepairStatus(state.cmd, 
ParentRepairStatus.IN_PROGRESS, ImmutableList.of());
 +
 +        List<ColumnFamilyStore> columnFamilies = getColumnFamilies();
 +        String[] cfnames = columnFamilies.stream().map(cfs -> 
cfs.name).toArray(String[]::new);
 +
 +        this.traceState = maybeCreateTraceState(columnFamilies);
 +        notifyStarting();
 +        NeighborsAndRanges neighborsAndRanges = getNeighborsAndRanges();
 +        // We test to validate the start JMX notification is seen before we 
compute neighbors and ranges
 +        // but in state (vtable) tracking, we rely on getNeighborsAndRanges 
to know where we are running repair...
 +        // JMX start != state start, its possible we fail in 
getNeighborsAndRanges and state start is never reached
 +        state.phase.start(columnFamilies, neighborsAndRanges);
 +
 +        maybeStoreParentRepairStart(cfnames);
 +
 +        prepare(columnFamilies, neighborsAndRanges.participants, 
neighborsAndRanges.shouldExcludeDeadParticipants)
 +        .flatMap(ignore -> repair(cfnames, neighborsAndRanges))
 +        .addCallback((pair, failure) -> {
 +            if (failure != null)
 +            {
 +                notifyError(failure);
 +                fail(failure.getMessage());
 +            }
 +            else
 +            {
 +                state.phase.repairCompleted();
 +                CoordinatedRepairResult result = pair.left;
 +                maybeStoreParentRepairSuccess(result.successfulRanges);
 +                if (result.hasFailed())
 +                {
 +                    fail(null);
 +                }
 +                else
 +                {
 +                    success(pair.right.get());
 +                    ctx.repair().cleanUp(state.id, 
neighborsAndRanges.participants);
 +                }
 +            }
 +        });
 +    }
 +
 +    private List<ColumnFamilyStore> getColumnFamilies()
 +    {
 +        String[] columnFamilies = 
state.options.getColumnFamilies().toArray(new 
String[state.options.getColumnFamilies().size()]);
 +        Iterable<ColumnFamilyStore> validColumnFamilies = 
this.validColumnFamilies.apply(state.keyspace, columnFamilies);
 +
 +        if (Iterables.isEmpty(validColumnFamilies))
 +            throw new SkipRepairException(String.format("%s Empty keyspace, 
skipping repair: %s", state.id, state.keyspace));
 +        return Lists.newArrayList(validColumnFamilies);
 +    }
 +
 +    private TraceState maybeCreateTraceState(Iterable<ColumnFamilyStore> 
columnFamilyStores)
 +    {
 +        if (!state.options.isTraced())
 +            return null;
 +
 +        StringBuilder cfsb = new StringBuilder();
 +        for (ColumnFamilyStore cfs : columnFamilyStores)
 +            cfsb.append(", 
").append(cfs.getKeyspaceName()).append(".").append(cfs.name);
 +
 +        TimeUUID sessionId = 
Tracing.instance.newSession(Tracing.TraceType.REPAIR);
 +        TraceState traceState = Tracing.instance.begin("repair", 
ImmutableMap.of("keyspace", state.keyspace, "columnFamilies",
 +                                                                              
   cfsb.substring(2)));
 +        traceState.enableActivityNotification(tag);
 +        for (ProgressListener listener : listeners)
 +            traceState.addProgressListener(listener);
 +        Thread queryThread = createQueryThread(sessionId);
 +        queryThread.setName("RepairTracePolling");
 +        return traceState;
 +    }
 +
 +    private void notifyStarting()
 +    {
 +        String message = String.format("Starting repair command #%d (%s), 
repairing keyspace %s with %s", state.cmd, state.id, state.keyspace,
 +                                       state.options);
 +        logger.info(message);
 +        Tracing.traceRepair(message);
 +        fireProgressEvent(jmxEvent(ProgressEventType.START, message));
 +    }
 +
 +    private NeighborsAndRanges getNeighborsAndRanges() throws RepairException
 +    {
 +        Set<InetAddressAndPort> allNeighbors = new HashSet<>();
 +        List<CommonRange> commonRanges = new ArrayList<>();
 +
 +        //pre-calculate output of getLocalReplicas and pass it to 
getNeighbors to increase performance and prevent
 +        //calculation multiple times
 +        Iterable<Range<Token>> keyspaceLocalRanges = 
getLocalReplicas.apply(state.keyspace).ranges();
 +
 +        for (Range<Token> range : state.options.getRanges())
 +        {
 +            EndpointsForRange neighbors = 
ctx.repair().getNeighbors(state.keyspace, keyspaceLocalRanges, range,
 +                                                                           
state.options.getDataCenters(),
 +                                                                           
state.options.getHosts());
 +            if (neighbors.isEmpty())
 +            {
 +                if (state.options.ignoreUnreplicatedKeyspaces())
 +                {
 +                    logger.info("{} Found no neighbors for range {} for {} - 
ignoring since repairing with --ignore-unreplicated-keyspaces", state.id, 
range, state.keyspace);
 +                    continue;
 +                }
 +                else
 +                {
 +                    throw RepairException.warn(String.format("Nothing to 
repair for %s in %s - aborting", range, state.keyspace));
 +                }
 +            }
 +            addRangeToNeighbors(commonRanges, range, neighbors);
 +            allNeighbors.addAll(neighbors.endpoints());
 +        }
 +
 +        if (state.options.ignoreUnreplicatedKeyspaces() && 
allNeighbors.isEmpty())
 +        {
 +            throw new SkipRepairException(String.format("Nothing to repair 
for %s in %s - unreplicated keyspace is ignored since repair was called with 
--ignore-unreplicated-keyspaces",
 +                                                        
state.options.getRanges(),
 +                                                        state.keyspace));
 +        }
 +
 +        boolean shouldExcludeDeadParticipants = 
state.options.isForcedRepair();
 +
 +        if (shouldExcludeDeadParticipants)
 +        {
 +            Set<InetAddressAndPort> actualNeighbors = 
Sets.newHashSet(Iterables.filter(allNeighbors, ctx.failureDetector()::isAlive));
 +            shouldExcludeDeadParticipants = 
!allNeighbors.equals(actualNeighbors);
 +            allNeighbors = actualNeighbors;
 +        }
 +        return new NeighborsAndRanges(shouldExcludeDeadParticipants, 
allNeighbors, commonRanges);
 +    }
 +
 +    private void maybeStoreParentRepairStart(String[] cfnames)
 +    {
 +        if (!state.options.isPreview())
 +        {
 +            SystemDistributedKeyspace.startParentRepair(state.id, 
state.keyspace, cfnames, state.options);
 +        }
 +    }
 +
 +    private void maybeStoreParentRepairSuccess(Collection<Range<Token>> 
successfulRanges)
 +    {
 +        if (!state.options.isPreview())
 +        {
 +            SystemDistributedKeyspace.successfulParentRepair(state.id, 
successfulRanges);
 +        }
 +    }
 +
 +    private void maybeStoreParentRepairFailure(Throwable error)
 +    {
 +        if (!state.options.isPreview())
 +        {
 +            SystemDistributedKeyspace.failParentRepair(state.id, error);
 +        }
 +    }
 +
 +    private Future<?> prepare(List<ColumnFamilyStore> columnFamilies, 
Set<InetAddressAndPort> allNeighbors, boolean force)
 +    {
 +        state.phase.prepareStart();
 +        Timer timer = Keyspace.open(state.keyspace).metric.repairPrepareTime;
 +        long startNanos = ctx.clock().nanoTime();
 +        return ctx.repair().prepareForRepair(state.id, 
ctx.broadcastAddressAndPort(), allNeighbors, state.options, force, 
columnFamilies)
 +                  .map(ignore -> {
 +                      timer.update(ctx.clock().nanoTime() - startNanos, 
TimeUnit.NANOSECONDS);
 +                      state.phase.prepareComplete();
 +                      return null;
 +                  });
 +    }
 +
 +    private Future<Pair<CoordinatedRepairResult, Supplier<String>>> 
repair(String[] cfnames, NeighborsAndRanges neighborsAndRanges)
 +    {
 +        RepairTask task;
 +        if (state.options.isPreview())
 +        {
 +            task = new PreviewRepairTask(this, state.id, 
neighborsAndRanges.filterCommonRanges(state.keyspace, cfnames), cfnames);
 +        }
 +        else if (state.options.isIncremental())
 +        {
 +            task = new IncrementalRepairTask(this, state.id, 
neighborsAndRanges, cfnames);
 +        }
 +        else
 +        {
 +            task = new NormalRepairTask(this, state.id, 
neighborsAndRanges.filterCommonRanges(state.keyspace, cfnames), cfnames);
 +        }
 +
 +        ExecutorPlus executor = createExecutor();
 +        state.phase.repairSubmitted();
 +        return task.perform(executor, validationScheduler)
 +                   // after adding the callback java could no longer infer 
the type...
 +                   .<Pair<CoordinatedRepairResult, Supplier<String>>>map(r -> 
Pair.create(r, task::successMessage))
 +                   .addCallback((s, f) -> executor.shutdown());
 +    }
 +
 +    private ExecutorPlus createExecutor()
 +    {
 +        return ctx.executorFactory()
 +                .localAware()
 +                .withJmxInternal()
 +                .pooled("Repair#" + state.cmd, state.options.getJobThreads());
 +    }
 +
 +    private static void addRangeToNeighbors(List<CommonRange> 
neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
 +    {
 +        Set<InetAddressAndPort> endpoints = neighbors.endpoints();
 +        Set<InetAddressAndPort> transEndpoints = 
neighbors.filter(Replica::isTransient).endpoints();
 +
 +        for (CommonRange commonRange : neighborRangeList)
 +        {
 +            if (commonRange.matchesEndpoints(endpoints, transEndpoints))
 +            {
 +                commonRange.ranges.add(range);
 +                return;
 +            }
 +        }
 +
 +        List<Range<Token>> ranges = new ArrayList<>();
 +        ranges.add(range);
 +        neighborRangeList.add(new CommonRange(endpoints, transEndpoints, 
ranges));
 +    }
 +
 +    private Thread createQueryThread(final TimeUUID sessionId)
 +    {
 +        return ctx.executorFactory().startThread("Repair-Runnable-" + 
THREAD_COUNTER.incrementAndGet(), new WrappedRunnable()
 +        {
 +            // Query events within a time interval that overlaps the last by 
one second. Ignore duplicates. Ignore local traces.
 +            // Wake up upon local trace activity. Query when notified of 
trace activity with a timeout that doubles every two timeouts.
 +            public void runMayThrow() throws Exception
 +            {
 +                TraceState state = Tracing.instance.get(sessionId);
 +                if (state == null)
 +                    throw new Exception("no tracestate");
 +
 +                String format = "select event_id, source, source_port, 
activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;";
 +                String query = String.format(format, 
SchemaConstants.TRACE_KEYSPACE_NAME, TraceKeyspace.EVENTS);
 +                SelectStatement statement = (SelectStatement) 
QueryProcessor.parseStatement(query).prepare(ClientState.forInternalCalls());
 +
 +                ByteBuffer sessionIdBytes = sessionId.toBytes();
 +                InetAddressAndPort source = ctx.broadcastAddressAndPort();
 +
 +                HashSet<UUID>[] seen = new HashSet[]{ new HashSet<>(), new 
HashSet<>() };
 +                int si = 0;
 +                UUID uuid;
 +
 +                long tlast = ctx.clock().currentTimeMillis(), tcur;
 +
 +                TraceState.Status status;
 +                long minWaitMillis = 125;
 +                long maxWaitMillis = 1000 * 1024L;
 +                long timeout = minWaitMillis;
 +                boolean shouldDouble = false;
 +
 +                while ((status = state.waitActivity(timeout)) != 
TraceState.Status.STOPPED)
 +                {
 +                    if (status == TraceState.Status.IDLE)
 +                    {
 +                        timeout = shouldDouble ? Math.min(timeout * 2, 
maxWaitMillis) : timeout;
 +                        shouldDouble = !shouldDouble;
 +                    }
 +                    else
 +                    {
 +                        timeout = minWaitMillis;
 +                        shouldDouble = false;
 +                    }
 +                    ByteBuffer tminBytes = TimeUUID.minAtUnixMillis(tlast - 
1000).toBytes();
 +                    ByteBuffer tmaxBytes = TimeUUID.maxAtUnixMillis(tcur = 
ctx.clock().currentTimeMillis()).toBytes();
 +                    QueryOptions options = 
QueryOptions.forInternalCalls(ConsistencyLevel.ONE, 
Lists.newArrayList(sessionIdBytes,
 +                                                                              
                                    tminBytes,
 +                                                                              
                                    tmaxBytes));
 +                    ResultMessage.Rows rows = 
statement.execute(forInternalCalls(), options, ctx.clock().nanoTime());
 +                    UntypedResultSet result = 
UntypedResultSet.create(rows.result);
 +
 +                    for (UntypedResultSet.Row r : result)
 +                    {
 +                        int port = DatabaseDescriptor.getStoragePort();
 +                        if (r.has("source_port"))
 +                            port = r.getInt("source_port");
 +                        InetAddressAndPort eventNode = 
InetAddressAndPort.getByAddressOverrideDefaults(r.getInetAddress("source"), 
port);
 +                        if (source.equals(eventNode))
 +                            continue;
 +                        if ((uuid = r.getUUID("event_id")).timestamp() > 
(tcur - 1000) * 10000)
 +                            seen[si].add(uuid);
 +                        if (seen[si == 0 ? 1 : 0].contains(uuid))
 +                            continue;
 +                        String message = String.format("%s: %s", 
r.getInetAddress("source"), r.getString("activity"));
 +                        notification(message);
 +                    }
 +                    tlast = tcur;
 +
 +                    si = si == 0 ? 1 : 0;
 +                    seen[si].clear();
 +                }
 +            }
 +        });
 +    }
 +
 +    private ProgressEvent jmxEvent(ProgressEventType type, String msg)
 +    {
 +        int length = CoordinatorState.State.values().length + 1; // +1 to 
include completed state
 +        int currentState = state.getCurrentState();
 +        return new ProgressEvent(type, currentState == INIT ? 0 : 
currentState == COMPLETE ? length : currentState, length, msg);
 +    }
 +
 +    private static final class SkipRepairException extends RuntimeException
 +    {
 +        SkipRepairException(String message)
 +        {
 +            super(message);
 +        }
 +    }
 +
 +    public static final class NeighborsAndRanges
 +    {
 +        final boolean shouldExcludeDeadParticipants;
 +        public final Set<InetAddressAndPort> participants;
 +        public final List<CommonRange> commonRanges;
 +
 +        public NeighborsAndRanges(boolean shouldExcludeDeadParticipants, 
Set<InetAddressAndPort> participants, List<CommonRange> commonRanges)
 +        {
 +            this.shouldExcludeDeadParticipants = 
shouldExcludeDeadParticipants;
 +            this.participants = participants;
 +            this.commonRanges = commonRanges;
 +        }
 +
 +        /**
 +         * When in the force mode, removes dead nodes from common ranges (not 
contained within `allNeighbors`),
 +         * and exludes ranges left without any participants
 +         * When not in the force mode, no-op.
 +         */
 +        public List<CommonRange> filterCommonRanges(String keyspace, String[] 
tableNames)
 +        {
 +            if (!shouldExcludeDeadParticipants)
 +            {
 +                return commonRanges;
 +            }
 +            else
 +            {
 +                logger.debug("force flag set, removing dead endpoints if 
possible");
 +
 +                List<CommonRange> filtered = new 
ArrayList<>(commonRanges.size());
 +
 +                for (CommonRange commonRange : commonRanges)
 +                {
 +                    Set<InetAddressAndPort> endpoints = 
ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, 
participants::contains));
 +                    Set<InetAddressAndPort> transEndpoints = 
ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, 
participants::contains));
 +                    
Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints 
must be a subset of endpoints");
 +
 +                    // this node is implicitly a participant in this repair, 
so a single endpoint is ok here
 +                    if (!endpoints.isEmpty())
 +                    {
 +                        Set<InetAddressAndPort> skippedReplicas = 
Sets.difference(commonRange.endpoints, endpoints);
 +                        skippedReplicas.forEach(endpoint -> 
logger.info("Removing a dead node {} from repair for ranges {} due to -force", 
endpoint, commonRange.ranges));
 +                        filtered.add(new CommonRange(endpoints, 
transEndpoints, commonRange.ranges, !skippedReplicas.isEmpty()));
 +                    }
 +                    else
 +                    {
 +                        logger.warn("Skipping forced repair for ranges {} of 
tables {} in keyspace {}, as no neighbor nodes are live.",
 +                                    commonRange.ranges, 
Arrays.asList(tableNames), keyspace);
 +                    }
 +                }
 +                Preconditions.checkState(!filtered.isEmpty(), "Not enough 
live endpoints for a repair");
 +                return filtered;
 +            }
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 34f20cd708,12fa6c341c..610c03dcbf
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@@ -306,9 -258,11 +306,12 @@@ public class RepairMessageVerbHandler i
  
                  case FAILED_SESSION_MSG:
                      FailSession failure = (FailSession) message.payload;
 -                    
ActiveRepairService.instance.consistent.coordinated.handleFailSessionMessage(failure);
 -                    
ActiveRepairService.instance.consistent.local.handleFailSessionMessage(message.from(),
 failure);
 -                    ParticipateState p = 
ActiveRepairService.instance.participate(failure.sessionID);
 +                    sendAck(message);
++                    ParticipateState p = 
ctx.repair().participate(failure.sessionID);
+                     if (p != null)
+                         p.phase.fail("Failure message from " + 
message.from());
 +                    
ctx.repair().consistent.coordinated.handleFailSessionMessage(failure);
 +                    
ctx.repair().consistent.local.handleFailSessionMessage(message.from(), failure);
                      break;
  
                  case STATUS_REQ:
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
index fce67b52d1,441fcadf9b..c71a611c01
--- 
a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
@@@ -167,6 -173,34 +173,34 @@@ public final class DistributedRepairUti
          Assert.assertFalse("Only one repair expected, but found more than 
one", rs.hasNext());
      }
  
+     public static void assertNoSSTableLeak(ICluster<IInvokableInstance> 
cluster, String ks, String table)
+     {
+         cluster.forEach(i -> {
+             String name = "node" + i.config().num();
+             i.forceCompact(ks, table); // cleanup happens in compaction, so 
run before checking
+             i.runOnInstance(() -> {
+                 ColumnFamilyStore cfs = 
Keyspace.open(ks).getColumnFamilyStore(table);
+                 for (SSTableReader sstable : 
cfs.getTracker().getView().liveSSTables())
+                 {
+                     TimeUUID pendingRepair = 
sstable.getSSTableMetadata().pendingRepair;
+                     if (pendingRepair == null)
+                         continue;
 -                    LocalSession session = 
ActiveRepairService.instance.consistent.local.getSession(pendingRepair);
++                    LocalSession session = 
ActiveRepairService.instance().consistent.local.getSession(pendingRepair);
+                     // repair maybe async, so some participates may still 
think the repair is active, which means the sstable SHOULD link to it
+                     if (session != null && !session.isCompleted())
+                         continue;
+                     // The session is complete, yet the sstable is not 
updated... is this still pending in compaction?
+                     if 
(cfs.getCompactionStrategyManager().hasPendingRepairSSTable(pendingRepair, 
sstable))
+                         continue;
+                     // compaction does not know about the pending repair... 
race condition since this check started?
+                     if (sstable.getSSTableMetadata().pendingRepair == null)
+                         continue; // yep, race condition... ignore
+                     throw new AssertionError(String.format("%s had leak 
detected on sstable %s", name, sstable.descriptor));
+                 }
+             });
+         });
+     }
+ 
      public enum RepairType {
          FULL {
              public String[] append(String... args)
diff --cc 
test/unit/org/apache/cassandra/repair/ConcurrentIrWithPreviewFuzzTest.java
index 54ae644e34,0000000000..4eba7f0e36
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/repair/ConcurrentIrWithPreviewFuzzTest.java
+++ b/test/unit/org/apache/cassandra/repair/ConcurrentIrWithPreviewFuzzTest.java
@@@ -1,107 -1,0 +1,107 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.repair;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Test;
 +
 +import accord.utils.Gen;
 +import accord.utils.Gens;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.RetrySpec;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.repair.consistent.LocalSessions;
 +import org.apache.cassandra.repair.state.Completable;
 +import org.apache.cassandra.utils.Closeable;
 +import org.apache.cassandra.utils.FailingBiConsumer;
 +import org.assertj.core.api.Assertions;
 +
 +import static accord.utils.Property.qt;
 +
 +public class ConcurrentIrWithPreviewFuzzTest extends FuzzTestBase
 +{
 +    @Test
 +    public void concurrentIrWithPreview()
 +    {
 +        // to avoid unlucky timing issues, retry until success; given enough 
retries we should eventually become success
 +        DatabaseDescriptor.getRepairRetrySpec().maxAttempts = new 
RetrySpec.MaxAttempt(Integer.MAX_VALUE);
 +        qt().withPure(false).withExamples(1).check(rs -> {
 +            Cluster cluster = new Cluster(rs);
 +            enableMessageFaults(cluster);
 +
 +            Gen<Cluster.Node> coordinatorGen = 
Gens.pick(cluster.nodes.keySet()).map(cluster.nodes::get);
 +
 +            List<Closeable> closeables = new ArrayList<>();
 +            for (int example = 0; example < 100; example++)
 +            {
 +                Cluster.Node irCoordinator = coordinatorGen.next(rs);
 +                Cluster.Node previewCoordinator = coordinatorGen.next(rs);
 +                RepairCoordinator ir = irCoordinator.repair(KEYSPACE, 
irOption(rs, irCoordinator, KEYSPACE, ignore -> TABLES));
 +                ir.run();
 +                RepairCoordinator preview = 
previewCoordinator.repair(KEYSPACE, previewOption(rs, previewCoordinator, 
KEYSPACE, ignore -> TABLES), false);
 +                preview.run();
 +
 +                closeables.add(cluster.nodes.get(pickParticipant(rs, 
previewCoordinator, preview)).doValidation(ignore -> (cfs, validator) -> 
addMismatch(rs, cfs, validator)));
 +                // cause a delay in validation to have more failing previews
 +                closeables.add(cluster.nodes.get(pickParticipant(rs, 
previewCoordinator, preview)).doValidation(next -> (cfs, validator) -> {
 +                    if 
(validator.desc.parentSessionId.equals(preview.state.id))
 +                        delayValidation(cluster, ir, next, cfs, validator);
 +                    else next.acceptOrFail(cfs, validator);
 +                }));
 +                // make sure listeners don't leak
 +                closeables.add(LocalSessions::unsafeClearListeners);
 +
 +                cluster.processAll();
 +
 +                // IR will always pass, but preview is what may fail (if the 
coordinator is the same)
 +                
Assertions.assertThat(ir.state.getResult()).describedAs("Unexpected state: %s 
-> %s; example %d", ir.state, ir.state.getResult(), 
example).isEqualTo(Completable.Result.success(repairSuccessMessage(ir)));
 +
 +                
Assertions.assertThat(preview.state.getResult()).describedAs("Unexpected state: 
%s; example %d", preview.state, example).isNotNull();
 +
 +                if (irCoordinator == previewCoordinator)
 +                {
 +                    
Assertions.assertThat(preview.state.getResult().message).describedAs("Unexpected
 state: %s -> %s; example %d", preview.state, preview.state.getResult(), 
example).contains("failed with error An incremental repair with session id");
 +                }
 +                else
 +                {
-                     assertSuccess(example, true, preview);
++                    assertSuccess(cluster, example, true, preview);
 +                }
 +                closeables.forEach(Closeable::close);
 +                closeables.clear();
 +            }
 +        });
 +    }
 +
 +    private void delayValidation(Cluster cluster, RepairCoordinator ir, 
FailingBiConsumer<ColumnFamilyStore, Validator> next, ColumnFamilyStore cfs, 
Validator validator)
 +    {
 +        cluster.unorderedScheduled.schedule(() -> {
 +            // make sure to wait for IR to complete...
 +            Completable.Result result = ir.state.getResult();
 +            if (result == null)
 +            {
 +                delayValidation(cluster, ir, next, cfs, validator);
 +                return;
 +            }
 +            next.accept(cfs, validator);
 +        }, 1, TimeUnit.HOURS);
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java
index 39576c95f0,0000000000..2d1438e030
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java
+++ b/test/unit/org/apache/cassandra/repair/FailingRepairFuzzTest.java
@@@ -1,163 -1,0 +1,164 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.repair;
 +
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.concurrent.TimeUnit;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.ImmutableList;
 +import org.junit.Test;
 +
 +import accord.utils.Gen;
 +import accord.utils.Gens;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.RetrySpec;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.repair.state.Completable;
 +import org.apache.cassandra.streaming.StreamEventHandler;
 +import org.apache.cassandra.streaming.StreamState;
 +import org.apache.cassandra.utils.Closeable;
 +import org.assertj.core.api.AbstractStringAssert;
 +import org.assertj.core.api.Assertions;
 +
 +import static accord.utils.Property.qt;
 +
 +public class FailingRepairFuzzTest extends FuzzTestBase
 +{
 +    private enum RepairJobStage { VALIDATION, SYNC }
 +
 +    @Test
 +    public void failingRepair()
 +    {
 +        // to avoid unlucky timing issues, retry until success; given enough 
retries we should eventually become success
 +        DatabaseDescriptor.getRepairRetrySpec().maxAttempts = new 
RetrySpec.MaxAttempt(Integer.MAX_VALUE);
 +        Gen<RepairJobStage> stageGen = Gens.enums().all(RepairJobStage.class);
 +        qt().withPure(false).withExamples(10).check(rs -> {
 +            Cluster cluster = new Cluster(rs);
 +            enableMessageFaults(cluster);
 +
 +            Gen<Cluster.Node> coordinatorGen = 
Gens.pick(cluster.nodes.keySet()).map(cluster.nodes::get);
 +
 +            List<Closeable> closeables = new ArrayList<>();
 +            for (int example = 0; example < 100; example++)
 +            {
 +                Cluster.Node coordinator = coordinatorGen.next(rs);
 +
 +                RepairCoordinator repair = coordinator.repair(KEYSPACE, 
repairOption(rs, coordinator, KEYSPACE, TABLES), false);
 +                repair.run();
 +                InetAddressAndPort failingAddress = pickParticipant(rs, 
coordinator, repair);
 +                Cluster.Node failingNode = cluster.nodes.get(failingAddress);
 +                RepairJobStage stage = stageGen.next(rs);
 +                // because of local syncs reaching out to the failing 
address, a different address may actually be what failed
 +                Set<InetAddressAndPort> syncFailedAddresses = new HashSet<>();
 +                switch (stage)
 +                {
 +                    case VALIDATION:
 +                    {
 +                        closeables.add(failingNode.doValidation((cfs, 
validator) -> {
 +                            long delayNanos = 
rs.nextLong(TimeUnit.MILLISECONDS.toNanos(5), TimeUnit.MINUTES.toNanos(1));
 +                            cluster.unorderedScheduled.schedule(() -> 
validator.fail(new SimulatedFault("Validation failed")), delayNanos, 
TimeUnit.NANOSECONDS);
 +                        }));
 +                    }
 +                    break;
 +                    case SYNC:
 +                    {
 +                        closeables.add(failingNode.doValidation((cfs, 
validator) -> addMismatch(rs, cfs, validator)));
 +                        List<InetAddressAndPort> addresses = 
ImmutableList.<InetAddressAndPort>builder().add(coordinator.addressAndPort).addAll(repair.state.getNeighborsAndRanges().participants).build();
 +                        for (InetAddressAndPort address : addresses)
 +                        {
 +                            
closeables.add(cluster.nodes.get(address).doSync(plan -> {
 +                                long delayNanos = 
rs.nextLong(TimeUnit.SECONDS.toNanos(5), TimeUnit.MINUTES.toNanos(10));
 +                                cluster.unorderedScheduled.schedule(() -> {
 +                                    if (address == failingAddress || 
plan.getCoordinator().getPeers().contains(failingAddress))
 +                                    {
 +                                        syncFailedAddresses.add(address);
 +                                        SimulatedFault fault = new 
SimulatedFault("Sync failed");
 +                                        for (StreamEventHandler handler : 
plan.handlers())
 +                                            handler.onFailure(fault);
 +                                    }
 +                                    else
 +                                    {
 +                                        StreamState success = new 
StreamState(plan.planId(), plan.streamOperation(), Collections.emptySet());
 +                                        for (StreamEventHandler handler : 
plan.handlers())
 +                                            handler.onSuccess(success);
 +                                    }
 +                                }, delayNanos, TimeUnit.NANOSECONDS);
 +                                return null;
 +                            }));
 +                        }
 +                    }
 +                    break;
 +                    default:
 +                        throw new IllegalArgumentException("Unknown stage: " 
+ stage);
 +                }
 +
 +                cluster.processAll();
 +                
Assertions.assertThat(repair.state.isComplete()).describedAs("Repair job did 
not complete, and no work is pending...").isTrue();
 +                
Assertions.assertThat(repair.state.getResult().kind).describedAs("Unexpected 
state: %s -> %s; example %d", repair.state, repair.state.getResult(), 
example).isEqualTo(Completable.Result.Kind.FAILURE);
 +                switch (stage)
 +                {
 +                    case VALIDATION:
 +                    {
 +                        // Got VALIDATION_REQ failure from 
Ero2.MJ.N8kkw2.w3iFYdDw.HJiVYC32.mWb.b.xwi3tZ.s5k1l.mb.asTy_7QmQ.Q3.u.kjgh.GKjx.g1aKfkjB.YlyKg9.DyQszn7F.Ox2DMYIph.xlgH.EV.A9yEz2J.l6UHdC.C6FYLXE.J0CNHBH./[4905:e9f:2f00:e418:baac:b8d9:9ff9:6604]:33363:
 UNKNOWN"
 +                        
Assertions.assertThat(repair.state.getResult().message)
 +                                  .describedAs("Unexpected state: %s -> %s; 
example %d", repair.state, repair.state.getResult(), example)
 +                                  // ValidationResponse with null tree seen
 +                                  .containsAnyOf("Validation failed in " + 
failingAddress,
 +                                                 // ack was dropped and on 
retry the participate detected dup so rejected as the task failed
 +                                                 "Got VALIDATION_REQ failure 
from " + failingAddress + ": UNKNOWN");
 +                    }
 +                    break;
 +                    case SYNC:
 +                        AbstractStringAssert<?> a = 
Assertions.assertThat(repair.state.getResult().message).describedAs("Unexpected 
state: %s -> %s; example %d", repair.state, repair.state.getResult(), example);
 +                        // SymmetricRemoteSyncTask + AsymmetricRemoteSyncTask
 +                        // ... Sync failed between 
/[81fc:714:2c56:a2d3:faf3:eb7c:e4dd:cb9e]:54401 and /220.3.10.72:21402
 +                        // LocalSyncTask
 +                        // ... failed with error Sync failed
 +                        // Dedup nack, but may be remote or local sync!
 +                        // ... Got SYNC_REQ failure from ...: UNKNOWN
 +                        String failingMsg = repair.state.getResult().message;
 +                        if (failingMsg.contains("Sync failed between"))
 +                        {
 +                            a.contains("Sync failed 
between").contains(failingAddress.toString());
 +                        }
 +                        else if (failingMsg.contains("Got SYNC_REQ failure 
from"))
 +                        {
 +                            
Assertions.assertThat(syncFailedAddresses).isNotEmpty();
 +                            
a.containsAnyOf(syncFailedAddresses.stream().map(s -> "Got SYNC_REQ failure 
from " + s + ": UNKNOWN").collect(Collectors.toList()).toArray(String[]::new));
 +                        }
 +                        else
 +                        {
 +                            a.contains("failed with error Sync failed");
 +                        }
 +                        break;
 +                    default:
 +                        throw new IllegalArgumentException("Unknown stage: " 
+ stage);
 +                }
++                assertParticipateResult(cluster, repair, 
Completable.Result.Kind.FAILURE);
 +                closeables.forEach(Closeable::close);
 +                closeables.clear();
 +            }
 +        });
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/repair/FuzzTestBase.java
index 81b341bed9,0000000000..b389231c68
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
+++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
@@@ -1,1448 -1,0 +1,1480 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.repair;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.nio.ByteBuffer;
 +import java.nio.charset.StandardCharsets;
 +import java.sql.Timestamp;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.EnumSet;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.LinkedHashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Objects;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.function.BiFunction;
 +import java.util.function.Consumer;
 +import java.util.function.Function;
 +import java.util.function.LongSupplier;
 +import java.util.function.Supplier;
 +import javax.annotation.Nullable;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Maps;
 +import org.junit.Before;
++import com.google.common.collect.Sets;
++
 +import org.junit.BeforeClass;
 +
 +import accord.utils.DefaultRandom;
 +import accord.utils.Gen;
 +import accord.utils.Gens;
 +import accord.utils.RandomSource;
 +import org.agrona.collections.LongHashSet;
 +import org.apache.cassandra.concurrent.ExecutorBuilder;
 +import org.apache.cassandra.concurrent.ExecutorBuilderFactory;
 +import org.apache.cassandra.concurrent.ExecutorFactory;
 +import org.apache.cassandra.concurrent.ExecutorPlus;
 +import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
 +import org.apache.cassandra.concurrent.Interruptible;
 +import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
 +import org.apache.cassandra.concurrent.SequentialExecutorPlus;
 +import org.apache.cassandra.concurrent.SimulatedExecutorFactory;
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.UnitConfigOverride;
 +import org.apache.cassandra.cql3.CQLTester;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Digest;
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.compaction.ICompactionManager;
 +import org.apache.cassandra.db.marshal.EmptyType;
 +import org.apache.cassandra.db.repair.CassandraTableRepairManager;
 +import org.apache.cassandra.db.repair.PendingAntiCompaction;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.RequestFailureReason;
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.EndpointState;
 +import org.apache.cassandra.gms.HeartBeatState;
 +import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
 +import org.apache.cassandra.gms.IFailureDetector;
 +import org.apache.cassandra.gms.IGossiper;
 +import org.apache.cassandra.gms.VersionedValue;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.locator.IEndpointSnitch;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.LocalStrategy;
 +import org.apache.cassandra.locator.RangesAtEndpoint;
 +import org.apache.cassandra.locator.TokenMetadata;
 +import org.apache.cassandra.net.ConnectionType;
 +import org.apache.cassandra.net.IVerbHandler;
 +import org.apache.cassandra.net.Message;
 +import org.apache.cassandra.net.MessageDelivery;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.net.RequestCallback;
 +import org.apache.cassandra.repair.messages.RepairMessage;
 +import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.repair.messages.ValidationResponse;
 +import org.apache.cassandra.repair.state.Completable;
 +import org.apache.cassandra.repair.state.CoordinatorState;
 +import org.apache.cassandra.repair.state.JobState;
++import org.apache.cassandra.repair.state.ParticipateState;
 +import org.apache.cassandra.repair.state.SessionState;
 +import org.apache.cassandra.repair.state.ValidationState;
 +import org.apache.cassandra.schema.KeyspaceMetadata;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.schema.SchemaConstants;
 +import org.apache.cassandra.schema.SystemDistributedKeyspace;
 +import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.schema.Tables;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.PendingRangeCalculatorService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupComplete;
 +import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupHistory;
 +import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupRequest;
 +import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupResponse;
 +import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
 +import org.apache.cassandra.service.paxos.cleanup.PaxosFinishPrepareCleanup;
 +import org.apache.cassandra.service.paxos.cleanup.PaxosStartPrepareCleanup;
 +import org.apache.cassandra.streaming.StreamEventHandler;
 +import org.apache.cassandra.streaming.StreamReceiveException;
 +import org.apache.cassandra.streaming.StreamSession;
 +import org.apache.cassandra.streaming.StreamState;
 +import org.apache.cassandra.streaming.StreamingChannel;
 +import org.apache.cassandra.streaming.StreamingDataInputPlus;
 +import org.apache.cassandra.tools.nodetool.Repair;
 +import org.apache.cassandra.utils.AbstractTypeGenerators;
 +import org.apache.cassandra.utils.CassandraGenerators;
 +import org.apache.cassandra.utils.Clock;
 +import org.apache.cassandra.utils.Closeable;
 +import org.apache.cassandra.utils.FailingBiConsumer;
 +import org.apache.cassandra.utils.Generators;
 +import org.apache.cassandra.utils.MBeanWrapper;
 +import org.apache.cassandra.utils.MerkleTree;
 +import org.apache.cassandra.utils.MerkleTrees;
 +import org.apache.cassandra.utils.NoSpamLogger;
++import org.apache.cassandra.utils.TimeUUID;
 +import org.apache.cassandra.utils.concurrent.AsyncPromise;
 +import org.apache.cassandra.utils.concurrent.Future;
 +import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 +import org.apache.cassandra.utils.progress.ProgressEventType;
 +import org.assertj.core.api.Assertions;
 +import org.mockito.Mockito;
 +import org.quicktheories.impl.JavaRandom;
 +
 +import static 
org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_GLOBAL;
 +import static 
org.apache.cassandra.config.CassandraRelevantProperties.ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION;
 +
 +public abstract class FuzzTestBase extends CQLTester.InMemory
 +{
 +    private static final int MISMATCH_NUM_PARTITIONS = 1;
 +    private static final Gen<String> IDENTIFIER_GEN = 
fromQT(Generators.IDENTIFIER_GEN);
 +    private static final Gen<String> KEYSPACE_NAME_GEN = 
fromQT(CassandraGenerators.KEYSPACE_NAME_GEN);
 +    private static final Gen<TableId> TABLE_ID_GEN = 
fromQT(CassandraGenerators.TABLE_ID_GEN);
 +    private static final Gen<InetAddressAndPort> ADDRESS_W_PORT = 
fromQT(CassandraGenerators.INET_ADDRESS_AND_PORT_GEN);
 +
 +    private static boolean SETUP_SCHEMA = false;
 +    static String KEYSPACE;
 +    static List<String> TABLES;
 +
 +    @BeforeClass
 +    public static void setUpClass()
 +    {
 +        ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION.setBoolean(true);
 +        CLOCK_GLOBAL.setString(ClockAccess.class.getName());
 +        // when running in CI an external actor will replace the test configs 
based off the test type (such as trie, cdc, etc.), this could then have failing 
tests
 +        // that do not repo with the same seed!  To fix that, go to 
UnitConfigOverride and update the config type to match the one that failed in 
CI, this should then
 +        // use the same config, so the seed should not reproduce.
 +        UnitConfigOverride.maybeOverrideConfig();
 +
 +        DatabaseDescriptor.daemonInitialization();
 +        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); 
// TOOD (coverage): random select
 +        DatabaseDescriptor.setLocalDataCenter("test");
 +        StreamingChannel.Factory.Global.unsafeSet(new 
StreamingChannel.Factory()
 +        {
 +            private final AtomicInteger counter = new AtomicInteger();
 +
 +            @Override
 +            public StreamingChannel create(InetSocketAddress to, int 
messagingVersion, StreamingChannel.Kind kind) throws IOException
 +            {
 +                StreamingChannel mock = Mockito.mock(StreamingChannel.class);
 +                int id = counter.incrementAndGet();
 +                StreamSession session = Mockito.mock(StreamSession.class);
 +                StreamReceiveException access = new 
StreamReceiveException(session, "mock access rejected");
 +                StreamingDataInputPlus input = 
Mockito.mock(StreamingDataInputPlus.class, invocationOnMock -> {
 +                    throw access;
 +                });
 +                Mockito.doNothing().when(input).close();
 +                Mockito.when(mock.in()).thenReturn(input);
 +                Mockito.when(mock.id()).thenReturn(id);
 +                Mockito.when(mock.peer()).thenReturn(to);
 +                Mockito.when(mock.connectedTo()).thenReturn(to);
 +                
Mockito.when(mock.send(Mockito.any())).thenReturn(ImmediateFuture.success(null));
 +                
Mockito.when(mock.close()).thenReturn(ImmediateFuture.success(null));
 +                return mock;
 +            }
 +        });
 +        ExecutorFactory delegate = ExecutorFactory.Global.executorFactory();
 +        ExecutorFactory.Global.unsafeSet(new ExecutorFactory()
 +        {
 +            @Override
 +            public LocalAwareSubFactory localAware()
 +            {
 +                return delegate.localAware();
 +            }
 +
 +            @Override
 +            public ScheduledExecutorPlus scheduled(boolean executeOnShutdown, 
String name, int priority, SimulatorSemantics simulatorSemantics)
 +            {
 +                return delegate.scheduled(executeOnShutdown, name, priority, 
simulatorSemantics);
 +            }
 +
 +            private boolean shouldMock()
 +            {
 +                return StackWalker.getInstance().walk(frame -> {
 +                    StackWalker.StackFrame caller = 
frame.skip(3).findFirst().get();
 +                    return 
caller.getClassName().startsWith("org.apache.cassandra.streaming.");
 +                });
 +            }
 +
 +            @Override
 +            public Thread startThread(String name, Runnable runnable, 
InfiniteLoopExecutor.Daemon daemon)
 +            {
 +                if (shouldMock()) return new Thread();
 +                return delegate.startThread(name, runnable, daemon);
 +            }
 +
 +            @Override
 +            public Interruptible infiniteLoop(String name, Interruptible.Task 
task, InfiniteLoopExecutor.SimulatorSafe simulatorSafe, 
InfiniteLoopExecutor.Daemon daemon, InfiniteLoopExecutor.Interrupts interrupts)
 +            {
 +                return delegate.infiniteLoop(name, task, simulatorSafe, 
daemon, interrupts);
 +            }
 +
 +            @Override
 +            public ThreadGroup newThreadGroup(String name)
 +            {
 +                return delegate.newThreadGroup(name);
 +            }
 +
 +            @Override
 +            public ExecutorBuilderFactory<ExecutorPlus, 
SequentialExecutorPlus> withJmx(String jmxPath)
 +            {
 +                return delegate.withJmx(jmxPath);
 +            }
 +
 +            @Override
 +            public ExecutorBuilder<? extends SequentialExecutorPlus> 
configureSequential(String name)
 +            {
 +                return delegate.configureSequential(name);
 +            }
 +
 +            @Override
 +            public ExecutorBuilder<? extends ExecutorPlus> 
configurePooled(String name, int threads)
 +            {
 +                return delegate.configurePooled(name, threads);
 +            }
 +        });
 +
 +        // will both make sure this is loaded and used
 +        if (!(Clock.Global.clock() instanceof ClockAccess)) throw new 
IllegalStateException("Unable to override clock");
 +
 +        // set the repair rcp timeout high so we don't hit it... this class 
is mostly testing repair reaching success
 +        // so don't want to deal with unlucky histories...
 +        DatabaseDescriptor.setRepairRpcTimeout(TimeUnit.DAYS.toMillis(1));
 +
 +
 +        InMemory.setUpClass();
 +    }
 +
 +    @Before
 +    public void setupSchema()
 +    {
 +        if (SETUP_SCHEMA) return;
 +        SETUP_SCHEMA = true;
 +        // StorageService can not be mocked out, nor can ColumnFamilyStores, 
so make sure that the keyspace is a "local" keyspace to avoid replication as 
the peers don't actually exist for replication
 +        schemaChange(String.format("CREATE KEYSPACE %s WITH REPLICATION = 
{'class': '%s'}", SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, 
HackStrat.class.getName()));
 +        for (TableMetadata table : 
SystemDistributedKeyspace.metadata().tables)
 +            schemaChange(table.toCqlString(false, false));
 +
 +        createSchema();
 +    }
 +
 +    protected void cleanupRepairTables()
 +    {
 +        for (String table : Arrays.asList(SystemKeyspace.REPAIRS))
 +            execute(String.format("TRUNCATE %s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, table));
 +    }
 +
 +    private void createSchema()
 +    {
 +        // The main reason to use random here with a fixed seed is just to 
have a set of tables that are not hard coded.
 +        // The tables will have diversity to them that most likely doesn't 
matter to repair (hence why the tables are shared), but
 +        // is useful just in case some assumptions change.
 +        RandomSource rs = new DefaultRandom(42);
 +        String ks = KEYSPACE_NAME_GEN.next(rs);
 +        List<String> tableNames = 
Gens.lists(IDENTIFIER_GEN).unique().ofSizeBetween(10, 100).next(rs);
 +        JavaRandom qt = new JavaRandom(rs.asJdkRandom());
 +        Tables.Builder tableBuilder = Tables.builder();
 +        List<TableId> ids = 
Gens.lists(TABLE_ID_GEN).unique().ofSize(tableNames.size()).next(rs);
 +        for (int i = 0; i < tableNames.size(); i++)
 +        {
 +            String name = tableNames.get(i);
 +            TableId id = ids.get(i);
 +            TableMetadata tableMetadata = new 
CassandraGenerators.TableMetadataBuilder().withKeyspaceName(ks).withTableName(name).withTableId(id).withTableKinds(TableMetadata.Kind.REGULAR)
 +                                                                              
          // shouldn't matter, just wanted to avoid UDT as that needs more setup
 +                                                                              
          
.withDefaultTypeGen(AbstractTypeGenerators.builder().withTypeKinds(AbstractTypeGenerators.TypeKind.PRIMITIVE).withoutPrimitive(EmptyType.instance).build()).build().generate(qt);
 +            tableBuilder.add(tableMetadata);
 +        }
 +        KeyspaceParams params = KeyspaceParams.simple(3);
 +        KeyspaceMetadata metadata = KeyspaceMetadata.create(ks, params, 
tableBuilder.build());
 +
 +        // create
 +        schemaChange(metadata.toCqlString(false, false));
 +        KEYSPACE = ks;
 +        for (TableMetadata table : metadata.tables)
 +            schemaChange(table.toCqlString(false, false));
 +        TABLES = tableNames;
 +    }
 +
 +    static void enableMessageFaults(Cluster cluster)
 +    {
 +        cluster.allowedMessageFaults(new BiFunction<>()
 +        {
 +            private final LongHashSet noFaults = new LongHashSet();
 +            private final LongHashSet allowDrop = new LongHashSet();
 +
 +            @Override
 +            public Set<Faults> apply(Cluster.Node node, Message<?> message)
 +            {
 +                if (RepairMessage.ALLOWS_RETRY.contains(message.verb()))
 +                {
 +                    allowDrop.add(message.id());
 +                    return Faults.DROPPED;
 +                }
 +                switch (message.verb())
 +                {
 +                    // these messages are not resilent to ephemeral issues
 +                    case STATUS_REQ:
 +                    case STATUS_RSP:
 +                    // paxos repair does not support faults and will cause a 
TIMEOUT error, failing the repair
 +                    case PAXOS2_CLEANUP_COMPLETE_REQ:
 +                    case PAXOS2_CLEANUP_REQ:
 +                    case PAXOS2_CLEANUP_RSP2:
 +                    case PAXOS2_CLEANUP_START_PREPARE_REQ:
 +                    case PAXOS2_CLEANUP_FINISH_PREPARE_REQ:
 +                        noFaults.add(message.id());
 +                        return Faults.NONE;
 +                    default:
 +                        if (noFaults.contains(message.id())) return 
Faults.NONE;
 +                        if (allowDrop.contains(message.id())) return 
Faults.DROPPED;
 +                        // was a new message added and the test not updated?
 +                        IllegalStateException e = new 
IllegalStateException("Verb: " + message.verb());
 +                        cluster.failures.add(e);
 +                        throw e;
 +                }
 +            }
 +        });
 +    }
 +
 +    static void runAndAssertSuccess(Cluster cluster, int example, boolean 
shouldSync, RepairCoordinator repair)
 +    {
 +        cluster.processAll();
-         assertSuccess(example, shouldSync, repair);
++        assertSuccess(cluster, example, shouldSync, repair);
 +    }
 +
-     static void assertSuccess(int example, boolean shouldSync, 
RepairCoordinator repair)
++    static void assertSuccess(Cluster cluster, int example, boolean 
shouldSync, RepairCoordinator repair)
 +    {
 +        Completable.Result result = repair.state.getResult();
 +        Assertions.assertThat(result)
 +                  .describedAs("Expected repair to have completed with 
success, but is still running... %s; example %d", repair.state, 
example).isNotNull()
 +                  .describedAs("Unexpected state: %s -> %s; example %d", 
repair.state, result, 
example).isEqualTo(Completable.Result.success(repairSuccessMessage(repair)));
 +        
Assertions.assertThat(repair.state.getStateTimesMillis().keySet()).isEqualTo(EnumSet.allOf(CoordinatorState.State.class));
 +        Assertions.assertThat(repair.state.getSessions()).isNotEmpty();
 +        boolean shouldSnapshot = repair.state.options.getParallelism() != 
RepairParallelism.PARALLEL
 +                                 && (!repair.state.options.isIncremental() || 
repair.state.options.isPreview());
 +        for (SessionState session : repair.state.getSessions())
 +        {
 +            
Assertions.assertThat(session.getStateTimesMillis().keySet()).isEqualTo(EnumSet.allOf(SessionState.State.class));
 +            Assertions.assertThat(session.getJobs()).isNotEmpty();
 +            for (JobState job : session.getJobs())
 +            {
 +                EnumSet<JobState.State> expected = 
EnumSet.allOf(JobState.State.class);
 +                if (!shouldSnapshot)
 +                {
 +                    expected.remove(JobState.State.SNAPSHOT_START);
 +                    expected.remove(JobState.State.SNAPSHOT_COMPLETE);
 +                }
 +                if (!shouldSync)
 +                {
 +                    expected.remove(JobState.State.STREAM_START);
 +                }
 +                Set<JobState.State> actual = 
job.getStateTimesMillis().keySet();
 +                Assertions.assertThat(actual).isEqualTo(expected);
 +            }
 +        }
++
++        assertParticipateResult(cluster, repair, 
Completable.Result.Kind.SUCCESS);
++    }
++
++    protected static void assertParticipateResult(Cluster cluster, 
RepairCoordinator repair, Completable.Result.Kind kind)
++    {
++        for (InetAddressAndPort participate : 
Sets.union(Collections.singleton(repair.ctx.broadcastAddressAndPort()), 
repair.state.getNeighborsAndRanges().participants))
++        {
++            assertParticipateResult(cluster, participate, repair.state.id, 
kind);
++        }
++    }
++
++    protected static void assertParticipateResult(Cluster cluster, 
InetAddressAndPort participate, TimeUUID id, Completable.Result.Kind kind)
++    {
++        Cluster.Node node = cluster.nodes.get(participate);
++        ParticipateState state = node.repair().participate(id);
++        Assertions.assertThat(state).describedAs("Node %s is missing 
ParticipateState", node).isNotNull();
++        Completable.Result particpateResult = state.getResult();
++        Assertions.assertThat(particpateResult).describedAs("Node %s has the 
ParticipateState as still pending", node).isNotNull();
++        Assertions.assertThat(particpateResult.kind).isEqualTo(kind);
 +    }
 +
 +    static String repairSuccessMessage(RepairCoordinator repair)
 +    {
 +        RepairOption options = repair.state.options;
 +        if (options.isPreview())
 +        {
 +            String suffix;
 +            switch (options.getPreviewKind())
 +            {
 +                case UNREPAIRED:
 +                case ALL:
 +                    suffix = "Previewed data was in sync";
 +                    break;
 +                case REPAIRED:
 +                    suffix = "Repaired data is in sync";
 +                    break;
 +                default:
 +                    throw new IllegalArgumentException("Unexpected preview 
repair kind: " + options.getPreviewKind());
 +            }
 +            return "Repair preview completed successfully; " + suffix;
 +        }
 +        return "Repair completed successfully";
 +    }
 +
 +    InetAddressAndPort pickParticipant(RandomSource rs, Cluster.Node 
coordinator, RepairCoordinator repair)
 +    {
 +        if (repair.state.isComplete())
 +            throw new IllegalStateException("Repair is completed! " + 
repair.state.getResult());
 +        List<InetAddressAndPort> participaents = new 
ArrayList<>(repair.state.getNeighborsAndRanges().participants.size() + 1);
 +        if (rs.nextBoolean()) 
participaents.add(coordinator.broadcastAddressAndPort());
 +        
participaents.addAll(repair.state.getNeighborsAndRanges().participants);
 +        participaents.sort(Comparator.naturalOrder());
 +
 +        InetAddressAndPort selected = rs.pick(participaents);
 +        return selected;
 +    }
 +
 +    static void addMismatch(RandomSource rs, ColumnFamilyStore cfs, Validator 
validator)
 +    {
 +        ValidationState state = validator.state;
 +        int maxDepth = DatabaseDescriptor.getRepairSessionMaxTreeDepth();
 +        state.phase.start(MISMATCH_NUM_PARTITIONS, 1024);
 +
 +        MerkleTrees trees = new MerkleTrees(cfs.getPartitioner());
 +        for (Range<Token> range : validator.desc.ranges)
 +        {
 +            int depth = (int) 
Math.min(Math.ceil(Math.log(MISMATCH_NUM_PARTITIONS) / Math.log(2)), maxDepth);
 +            trees.addMerkleTree((int) Math.pow(2, depth), range);
 +        }
 +        Set<Token> allTokens = new HashSet<>();
 +        for (Range<Token> range : validator.desc.ranges)
 +        {
 +            Gen<Token> gen = fromQT(CassandraGenerators.tokensInRange(range));
 +            Set<Token> tokens = new LinkedHashSet<>();
 +            for (int i = 0, size = rs.nextInt(1, 10); i < size; i++)
 +            {
 +                for (int attempt = 0; !tokens.add(gen.next(rs)) && attempt < 
5; attempt++)
 +                {
 +                }
 +            }
 +            // tokens may or may not be of the expected size; this depends on 
how wide the range is
 +            for (Token token : tokens)
 +                trees.split(token);
 +            allTokens.addAll(tokens);
 +        }
 +        for (Token token : allTokens)
 +        {
 +            findCorrectRange(trees, token, range -> {
 +                Digest digest = Digest.forValidator();
 +                
digest.update(ByteBuffer.wrap(token.toString().getBytes(StandardCharsets.UTF_8)));
 +                range.addHash(new MerkleTree.RowHash(token, digest.digest(), 
1));
 +            });
 +        }
 +        state.partitionsProcessed++;
 +        state.bytesRead = 1024;
 +        state.phase.sendingTrees();
 +        Stage.ANTI_ENTROPY.execute(() -> {
 +            state.phase.success();
 +            validator.respond(new ValidationResponse(validator.desc, trees));
 +        });
 +    }
 +
 +    private static void findCorrectRange(MerkleTrees trees, Token token, 
Consumer<MerkleTree.TreeRange> fn)
 +    {
 +        MerkleTrees.TreeRangeIterator it = trees.rangeIterator();
 +        while (it.hasNext())
 +        {
 +            MerkleTree.TreeRange next = it.next();
 +            if (next.contains(token))
 +            {
 +                fn.accept(next);
 +                return;
 +            }
 +        }
 +    }
 +
 +    private enum RepairType
 +    {FULL, IR}
 +
 +    private enum PreviewType
 +    {NONE, REPAIRED, UNREPAIRED}
 +
 +    static RepairOption repairOption(RandomSource rs, Cluster.Node 
coordinator, String ks, List<String> tableNames)
 +    {
 +        return repairOption(rs, coordinator, ks, 
Gens.lists(Gens.pick(tableNames)).ofSizeBetween(1, tableNames.size()), 
Gens.enums().all(RepairType.class), Gens.enums().all(PreviewType.class), 
Gens.enums().all(RepairParallelism.class));
 +    }
 +
 +    static RepairOption irOption(RandomSource rs, Cluster.Node coordinator, 
String ks, Gen<List<String>> tablesGen)
 +    {
 +        return repairOption(rs, coordinator, ks, tablesGen, 
Gens.constant(RepairType.IR), Gens.constant(PreviewType.NONE), 
Gens.enums().all(RepairParallelism.class));
 +    }
 +
 +    static RepairOption previewOption(RandomSource rs, Cluster.Node 
coordinator, String ks, Gen<List<String>> tablesGen)
 +    {
 +        return repairOption(rs, coordinator, ks, tablesGen, 
Gens.constant(RepairType.FULL), Gens.constant(PreviewType.REPAIRED), 
Gens.enums().all(RepairParallelism.class));
 +    }
 +
 +    private static RepairOption repairOption(RandomSource rs, Cluster.Node 
coordinator, String ks, Gen<List<String>> tablesGen, Gen<RepairType> 
repairTypeGen, Gen<PreviewType> previewTypeGen, Gen<RepairParallelism> 
repairParallelismGen)
 +    {
 +        List<String> args = new ArrayList<>();
 +        args.add(ks);
 +        args.addAll(tablesGen.next(rs));
 +        args.add("-pr");
 +        RepairType type = repairTypeGen.next(rs);
 +        switch (type)
 +        {
 +            case IR:
 +                // default
 +                break;
 +            case FULL:
 +                args.add("--full");
 +                break;
 +            default:
 +                throw new AssertionError("Unsupported repair type: " + type);
 +        }
 +        PreviewType previewType = previewTypeGen.next(rs);
 +        switch (previewType)
 +        {
 +            case NONE:
 +                break;
 +            case REPAIRED:
 +                args.add("--validate");
 +                break;
 +            case UNREPAIRED:
 +                args.add("--preview");
 +                break;
 +            default:
 +                throw new AssertionError("Unsupported preview type: " + 
previewType);
 +        }
 +        RepairParallelism parallelism = repairParallelismGen.next(rs);
 +        switch (parallelism)
 +        {
 +            case SEQUENTIAL:
 +                args.add("--sequential");
 +                break;
 +            case PARALLEL:
 +                // default
 +                break;
 +            case DATACENTER_AWARE:
 +                args.add("--dc-parallel");
 +                break;
 +            default:
 +                throw new AssertionError("Unknown parallelism: " + 
parallelism);
 +        }
 +        if (rs.nextBoolean()) args.add("--optimise-streams");
 +        RepairOption options = RepairOption.parse(Repair.parseOptionMap(() -> 
"test", args), DatabaseDescriptor.getPartitioner());
 +        if (options.getRanges().isEmpty())
 +        {
 +            if (options.isPrimaryRange())
 +            {
 +                // when repairing only primary range, neither dataCenters nor 
hosts can be set
 +                if (options.getDataCenters().isEmpty() && 
options.getHosts().isEmpty())
 +                    
options.getRanges().addAll(coordinator.getPrimaryRanges(ks));
 +                    // except dataCenters only contain local DC (i.e. -local)
 +                else if (options.isInLocalDCOnly())
 +                    
options.getRanges().addAll(coordinator.getPrimaryRangesWithinDC(ks));
 +                else
 +                    throw new IllegalArgumentException("You need to run 
primary range repair on all nodes in the cluster.");
 +            }
 +            else
 +            {
 +                Iterables.addAll(options.getRanges(), 
coordinator.getLocalReplicas(ks).onlyFull().ranges());
 +            }
 +        }
 +        return options;
 +    }
 +
 +    enum Faults
 +    {
 +        DELAY, DROP;
 +
 +        public static final Set<Faults> NONE = Collections.emptySet();
 +        public static final Set<Faults> DROPPED = EnumSet.of(DELAY, DROP);
 +    }
 +
 +    private static class Connection
 +    {
 +        final InetAddressAndPort from, to;
 +
 +        private Connection(InetAddressAndPort from, InetAddressAndPort to)
 +        {
 +            this.from = from;
 +            this.to = to;
 +        }
 +
 +        @Override
 +        public boolean equals(Object o)
 +        {
 +            if (this == o) return true;
 +            if (o == null || getClass() != o.getClass()) return false;
 +            Connection that = (Connection) o;
 +            return from.equals(that.from) && to.equals(that.to);
 +        }
 +
 +        @Override
 +        public int hashCode()
 +        {
 +            return Objects.hash(from, to);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return "Connection{" + "from=" + from + ", to=" + to + '}';
 +        }
 +    }
 +
 +    interface MessageListener
 +    {
 +        default void preHandle(Cluster.Node node, Message<?> msg) {}
 +    }
 +
 +    static class Cluster
 +    {
 +        private static final FailingBiConsumer<ColumnFamilyStore, Validator> 
DEFAULT_VALIDATION = ValidationManager::doValidation;
 +
 +        final Map<InetAddressAndPort, Node> nodes;
 +        private final IFailureDetector failureDetector = 
Mockito.mock(IFailureDetector.class);
 +        private final IEndpointSnitch snitch = 
Mockito.mock(IEndpointSnitch.class);
 +        private final SimulatedExecutorFactory globalExecutor;
 +        final ScheduledExecutorPlus unorderedScheduled;
 +        final ExecutorPlus orderedExecutor;
 +        private final Gossip gossiper = new Gossip();
 +        private final MBeanWrapper mbean = Mockito.mock(MBeanWrapper.class);
 +        private final List<Throwable> failures = new ArrayList<>();
 +        private final List<MessageListener> listeners = new ArrayList<>();
 +        private final RandomSource rs;
 +        private BiFunction<Node, Message<?>, Set<Faults>> 
allowedMessageFaults = (a, b) -> Collections.emptySet();
 +
 +        private final Map<Connection, LongSupplier> networkLatencies = new 
HashMap<>();
 +        private final Map<Connection, Supplier<Boolean>> networkDrops = new 
HashMap<>();
 +
 +        Cluster(RandomSource rs)
 +        {
 +            ClockAccess.includeThreadAsOwner();
 +            this.rs = rs;
 +            globalExecutor = new SimulatedExecutorFactory(rs, 
fromQT(Generators.TIMESTAMP_GEN.map(Timestamp::getTime)).mapToLong(TimeUnit.MILLISECONDS::toNanos).next(rs));
 +            orderedExecutor = 
globalExecutor.configureSequential("ignore").build();
 +            unorderedScheduled = globalExecutor.scheduled("ignored");
 +
 +
 +
 +            // We run tests in an isolated JVM per class, so not cleaing up 
is safe... but if that assumption ever changes, will need to cleanup
 +            Stage.ANTI_ENTROPY.unsafeSetExecutor(orderedExecutor);
 +            Stage.MISC.unsafeSetExecutor(orderedExecutor);
 +            Stage.INTERNAL_RESPONSE.unsafeSetExecutor(unorderedScheduled);
 +            
Mockito.when(failureDetector.isAlive(Mockito.any())).thenReturn(true);
 +            Thread expectedThread = Thread.currentThread();
 +            NoSpamLogger.unsafeSetClock(() -> {
 +                if (Thread.currentThread() != expectedThread)
 +                    throw new AssertionError("NoSpamLogger.Clock accessed 
outside of fuzzing...");
 +                return globalExecutor.nanoTime();
 +            });
 +
 +            int numNodes = rs.nextInt(3, 10);
 +            List<String> dcs = 
Gens.lists(IDENTIFIER_GEN).unique().ofSizeBetween(1, Math.min(10, 
numNodes)).next(rs);
 +            Map<InetAddressAndPort, Node> nodes = 
Maps.newHashMapWithExpectedSize(numNodes);
 +            Gen<Token> tokenGen = 
fromQT(CassandraGenerators.token(DatabaseDescriptor.getPartitioner()));
 +            Gen<UUID> hostIdGen = fromQT(Generators.UUID_RANDOM_GEN);
 +            Set<Token> tokens = new HashSet<>();
 +            Set<UUID> hostIds = new HashSet<>();
 +            for (int i = 0; i < numNodes; i++)
 +            {
 +                InetAddressAndPort addressAndPort = ADDRESS_W_PORT.next(rs);
 +                while (nodes.containsKey(addressAndPort)) addressAndPort = 
ADDRESS_W_PORT.next(rs);
 +                Token token;
 +                while (!tokens.add(token = tokenGen.next(rs)))
 +                {
 +                }
 +                UUID hostId;
 +                while (!hostIds.add(hostId = hostIdGen.next(rs)))
 +                {
 +                }
 +
 +                String dc = rs.pick(dcs);
 +                String rack = "rack";
 +                
Mockito.when(snitch.getDatacenter(Mockito.eq(addressAndPort))).thenReturn(dc);
 +                
Mockito.when(snitch.getRack(Mockito.eq(addressAndPort))).thenReturn(rack);
 +
 +                VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
 +                EndpointState state = new EndpointState(new 
HeartBeatState(42, 42));
 +                state.addApplicationState(ApplicationState.STATUS, 
valueFactory.normal(Collections.singleton(token)));
 +                state.addApplicationState(ApplicationState.STATUS_WITH_PORT, 
valueFactory.normal(Collections.singleton(token)));
 +                state.addApplicationState(ApplicationState.HOST_ID, 
valueFactory.hostId(hostId));
 +                state.addApplicationState(ApplicationState.TOKENS, 
valueFactory.tokens(Collections.singleton(token)));
 +                state.addApplicationState(ApplicationState.DC, 
valueFactory.datacenter(dc));
 +                state.addApplicationState(ApplicationState.RACK, 
valueFactory.rack(rack));
 +                state.addApplicationState(ApplicationState.RELEASE_VERSION, 
valueFactory.releaseVersion());
 +
 +                gossiper.endpoints.put(addressAndPort, state);
 +
 +                Node node = new Node(hostId, addressAndPort, 
Collections.singletonList(token), new Messaging(addressAndPort));
 +                nodes.put(addressAndPort, node);
 +            }
 +            this.nodes = nodes;
 +
 +            TokenMetadata tm = StorageService.instance.getTokenMetadata();
 +            tm.clearUnsafe();
 +            for (Node inst : nodes.values())
 +            {
 +                tm.updateHostId(inst.hostId(), 
inst.broadcastAddressAndPort());
 +                for (Token token : inst.tokens())
 +                    tm.updateNormalToken(token, 
inst.broadcastAddressAndPort());
 +            }
 +        }
 +
 +        public Closeable addListener(MessageListener listener)
 +        {
 +            listeners.add(listener);
 +            return () -> removeListener(listener);
 +        }
 +
 +        public void removeListener(MessageListener listener)
 +        {
 +            listeners.remove(listener);
 +        }
 +
 +        public void allowedMessageFaults(BiFunction<Node, Message<?>, 
Set<Faults>> fn)
 +        {
 +            this.allowedMessageFaults = fn;
 +        }
 +
 +        public void checkFailures()
 +        {
 +            if (Thread.interrupted())
 +                failures.add(new InterruptedException());
 +            if (failures.isEmpty()) return;
 +            AssertionError error = new AssertionError("Unexpected exceptions 
found");
 +            failures.forEach(error::addSuppressed);
 +            failures.clear();
 +            throw error;
 +        }
 +
 +        public boolean processOne()
 +        {
 +            boolean result = globalExecutor.processOne();
 +            checkFailures();
 +            return result;
 +        }
 +
 +        public void processAll()
 +        {
 +            while (processOne())
 +            {
 +            }
 +        }
 +
 +        private class CallbackContext
 +        {
 +            final RequestCallback callback;
 +
 +            private CallbackContext(RequestCallback callback)
 +            {
 +                this.callback = Objects.requireNonNull(callback);
 +            }
 +
 +            public void onResponse(Message msg)
 +            {
 +                callback.onResponse(msg);
 +            }
 +
 +            public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
 +            {
 +                if (callback.invokeOnFailure()) callback.onFailure(from, 
failureReason);
 +            }
 +        }
 +
 +        private static class CallbackKey
 +        {
 +            private final long id;
 +            private final InetAddressAndPort peer;
 +
 +            private CallbackKey(long id, InetAddressAndPort peer)
 +            {
 +                this.id = id;
 +                this.peer = peer;
 +            }
 +
 +            @Override
 +            public boolean equals(Object o)
 +            {
 +                if (this == o) return true;
 +                if (o == null || getClass() != o.getClass()) return false;
 +                CallbackKey that = (CallbackKey) o;
 +                return id == that.id && peer.equals(that.peer);
 +            }
 +
 +            @Override
 +            public int hashCode()
 +            {
 +                return Objects.hash(id, peer);
 +            }
 +
 +            @Override
 +            public String toString()
 +            {
 +                return "CallbackKey{" +
 +                       "id=" + id +
 +                       ", peer=" + peer +
 +                       '}';
 +            }
 +        }
 +
 +        private class Messaging implements MessageDelivery
 +        {
 +            final InetAddressAndPort broadcastAddressAndPort;
 +            final Map<CallbackKey, CallbackContext> callbacks = new 
HashMap<>();
 +
 +            private Messaging(InetAddressAndPort broadcastAddressAndPort)
 +            {
 +                this.broadcastAddressAndPort = broadcastAddressAndPort;
 +            }
 +
 +            @Override
 +            public <REQ> void send(Message<REQ> message, InetAddressAndPort 
to)
 +            {
 +                message = message.withFrom(broadcastAddressAndPort);
 +                maybeEnqueue(message, to, null);
 +            }
 +
 +            @Override
 +            public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb)
 +            {
 +                message = message.withFrom(broadcastAddressAndPort);
 +                maybeEnqueue(message, to, cb);
 +            }
 +
 +            @Override
 +            public <REQ, RSP> void sendWithCallback(Message<REQ> message, 
InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType 
specifyConnection)
 +            {
 +                message = message.withFrom(broadcastAddressAndPort);
 +                maybeEnqueue(message, to, cb);
 +            }
 +
 +            private <REQ, RSP> void maybeEnqueue(Message<REQ> message, 
InetAddressAndPort to, @Nullable RequestCallback<RSP> callback)
 +            {
 +                CallbackContext cb;
 +                if (callback != null)
 +                {
 +                    CallbackKey key = new CallbackKey(message.id(), to);
 +                    if (callbacks.containsKey(key))
 +                        throw new AssertionError("Message id " + message.id() 
+ " to " + to + " already has a callback");
 +                    cb = new CallbackContext(callback);
 +                    callbacks.put(key, cb);
 +                }
 +                else
 +                {
 +                    cb = null;
 +                }
 +                boolean toSelf = this.broadcastAddressAndPort.equals(to);
 +                Node node = nodes.get(to);
 +                Set<Faults> allowedFaults = allowedMessageFaults.apply(node, 
message);
 +                if (allowedFaults.isEmpty())
 +                {
 +                    // enqueue so stack overflow doesn't happen with the 
inlining
 +                    unorderedScheduled.submit(() -> node.handle(message));
 +                }
 +                else
 +                {
 +                    Runnable enqueue = () -> {
 +                        if (!allowedFaults.contains(Faults.DELAY))
 +                        {
 +                            unorderedScheduled.submit(() -> 
node.handle(message));
 +                        }
 +                        else
 +                        {
 +                            if (toSelf) unorderedScheduled.submit(() -> 
node.handle(message));
 +                            else
 +                                unorderedScheduled.schedule(() -> 
node.handle(message), networkJitterNanos(to), TimeUnit.NANOSECONDS);
 +                        }
 +                    };
 +
 +                    if (!allowedFaults.contains(Faults.DROP)) enqueue.run();
 +                    else
 +                    {
 +                        if (!toSelf && networkDrops(to))
 +                        {
 +//                            logger.warn("Dropped message {}", message);
 +                            // drop
 +                        }
 +                        else
 +                        {
 +                            enqueue.run();
 +                        }
 +                    }
 +
 +                    if (cb != null)
 +                    {
 +                        unorderedScheduled.schedule(() -> {
 +                            CallbackContext ctx = callbacks.remove(new 
CallbackKey(message.id(), to));
 +                            if (ctx != null)
 +                            {
 +                                assert ctx == cb;
 +                                try
 +                                {
 +                                    ctx.onFailure(to, 
RequestFailureReason.TIMEOUT);
 +                                }
 +                                catch (Throwable t)
 +                                {
 +                                    failures.add(t);
 +                                }
 +                            }
 +                        }, message.verb().expiresAfterNanos(), 
TimeUnit.NANOSECONDS);
 +                    }
 +                }
 +            }
 +
 +            private long networkJitterNanos(InetAddressAndPort to)
 +            {
 +                return networkLatencies.computeIfAbsent(new 
Connection(broadcastAddressAndPort, to), ignore -> {
 +                    long min = TimeUnit.MICROSECONDS.toNanos(500);
 +                    long maxSmall = TimeUnit.MILLISECONDS.toNanos(5);
 +                    long max = TimeUnit.SECONDS.toNanos(5);
 +                    LongSupplier small = () -> rs.nextLong(min, maxSmall);
 +                    LongSupplier large = () -> rs.nextLong(maxSmall, max);
 +                    return Gens.bools().runs(rs.nextInt(1, 11) / 100.0D, 
rs.nextInt(3, 15)).mapToLong(b -> b ? large.getAsLong() : 
small.getAsLong()).asLongSupplier(rs);
 +                }).getAsLong();
 +            }
 +
 +            private boolean networkDrops(InetAddressAndPort to)
 +            {
 +                return networkDrops.computeIfAbsent(new 
Connection(broadcastAddressAndPort, to), ignore -> 
Gens.bools().runs(rs.nextInt(1, 11) / 100.0D, rs.nextInt(3, 
15)).asSupplier(rs)).get();
 +            }
 +
 +            @Override
 +            public <REQ, RSP> Future<Message<RSP>> 
sendWithResult(Message<REQ> message, InetAddressAndPort to)
 +            {
 +                AsyncPromise<Message<RSP>> promise = new AsyncPromise<>();
 +                sendWithCallback(message, to, new RequestCallback<RSP>()
 +                {
 +                    @Override
 +                    public void onResponse(Message<RSP> msg)
 +                    {
 +                        promise.trySuccess(msg);
 +                    }
 +
 +                    @Override
 +                    public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
 +                    {
 +                        promise.tryFailure(new 
MessagingService.FailureResponseException(from, failureReason));
 +                    }
 +
 +                    @Override
 +                    public boolean invokeOnFailure()
 +                    {
 +                        return true;
 +                    }
 +                });
 +                return promise;
 +            }
 +
 +            @Override
 +            public <V> void respond(V response, Message<?> message)
 +            {
 +                send(message.responseWith(response), message.respondTo());
 +            }
 +        }
 +
 +        private class Gossip implements IGossiper
 +        {
 +            private final Map<InetAddressAndPort, EndpointState> endpoints = 
new HashMap<>();
 +
 +            @Override
 +            public void register(IEndpointStateChangeSubscriber subscriber)
 +            {
 +
 +            }
 +
 +            @Override
 +            public void unregister(IEndpointStateChangeSubscriber subscriber)
 +            {
 +
 +            }
 +
 +            @Nullable
 +            @Override
 +            public EndpointState 
getEndpointStateForEndpoint(InetAddressAndPort ep)
 +            {
 +                return endpoints.get(ep);
 +            }
 +
 +            @Override
 +            public void notifyFailureDetector(Map<InetAddressAndPort, 
EndpointState> remoteEpStateMap)
 +            {
 +
 +            }
 +
 +            @Override
 +            public void applyStateLocally(Map<InetAddressAndPort, 
EndpointState> epStateMap)
 +            {
 +                // If we were testing paxos this would be wrong...
 +                // CASSANDRA-18917 added support for simulating Gossip, but 
gossip issues were found so couldn't merge that patch...
 +                // For the paxos repair, since we don't care about paxos 
messages, this is ok to no-op for now, but if paxos cleanup
 +                // ever was to be tested this logic would need to be 
implemented
 +            }
 +        }
 +
 +        class Node implements SharedContext
 +        {
 +            private final ICompactionManager compactionManager = 
Mockito.mock(ICompactionManager.class);
 +            final UUID hostId;
 +            final InetAddressAndPort addressAndPort;
 +            final Collection<Token> tokens;
 +            final ActiveRepairService activeRepairService;
 +            final IVerbHandler verbHandler;
 +            final Messaging messaging;
 +            final IValidationManager validationManager;
 +            private FailingBiConsumer<ColumnFamilyStore, Validator> 
doValidation = DEFAULT_VALIDATION;
 +            final PaxosRepairState paxosRepairState;
 +            private final StreamExecutor defaultStreamExecutor = plan -> {
 +                long delayNanos = rs.nextLong(TimeUnit.SECONDS.toNanos(5), 
TimeUnit.MINUTES.toNanos(10));
 +                unorderedScheduled.schedule(() -> {
 +                    StreamState success = new StreamState(plan.planId(), 
plan.streamOperation(), Collections.emptySet());
 +                    for (StreamEventHandler handler : plan.handlers())
 +                        handler.onSuccess(success);
 +                }, delayNanos, TimeUnit.NANOSECONDS);
 +                return null;
 +            };
 +            private StreamExecutor streamExecutor = defaultStreamExecutor;
 +
 +            private Node(UUID hostId, InetAddressAndPort addressAndPort, 
Collection<Token> tokens, Messaging messaging)
 +            {
 +                this.hostId = hostId;
 +                this.addressAndPort = addressAndPort;
 +                this.tokens = tokens;
 +                this.messaging = messaging;
 +                this.activeRepairService = new ActiveRepairService(this);
 +                this.paxosRepairState = new PaxosRepairState(this);
 +                this.validationManager = (cfs, validator) -> 
unorderedScheduled.submit(() -> {
 +                    try
 +                    {
 +                        doValidation.acceptOrFail(cfs, validator);
 +                    }
 +                    catch (Throwable e)
 +                    {
 +                        validator.fail(e);
 +                    }
 +                });
 +                this.verbHandler = new IVerbHandler<>()
 +                {
 +                    private final RepairMessageVerbHandler repairVerbHandler 
= new RepairMessageVerbHandler(Node.this);
 +                    private final 
IVerbHandler<PaxosStartPrepareCleanup.Request> paxosStartPrepareCleanup = 
PaxosStartPrepareCleanup.createVerbHandler(Node.this);
 +                    private final IVerbHandler<PaxosCleanupRequest> 
paxosCleanupRequestIVerbHandler = 
PaxosCleanupRequest.createVerbHandler(Node.this);
 +                    private final IVerbHandler<PaxosCleanupHistory> 
paxosFinishPrepareCleanup = 
PaxosFinishPrepareCleanup.createVerbHandler(Node.this);
 +                    private final IVerbHandler<PaxosCleanupResponse> 
paxosCleanupResponse = PaxosCleanupResponse.createVerbHandler(Node.this);
 +                    private final IVerbHandler<PaxosCleanupComplete.Request> 
paxosCleanupComplete = PaxosCleanupComplete.createVerbHandler(Node.this);
 +                    @Override
 +                    public void doVerb(Message message) throws IOException
 +                    {
 +                        switch (message.verb())
 +                        {
 +                            case PAXOS2_CLEANUP_START_PREPARE_REQ:
 +                                paxosStartPrepareCleanup.doVerb(message);
 +                                break;
 +                            case PAXOS2_CLEANUP_REQ:
 +                                
paxosCleanupRequestIVerbHandler.doVerb(message);
 +                                break;
 +                            case PAXOS2_CLEANUP_FINISH_PREPARE_REQ:
 +                                paxosFinishPrepareCleanup.doVerb(message);
 +                                break;
 +                            case PAXOS2_CLEANUP_RSP2:
 +                                paxosCleanupResponse.doVerb(message);
 +                                break;
 +                            case PAXOS2_CLEANUP_COMPLETE_REQ:
 +                                paxosCleanupComplete.doVerb(message);
 +                                break;
 +                            default:
 +                                repairVerbHandler.doVerb(message);
 +                        }
 +                    }
 +                };
 +
 +                activeRepairService.start();
 +            }
 +
 +            public Closeable 
doValidation(FailingBiConsumer<ColumnFamilyStore, Validator> fn)
 +            {
 +                FailingBiConsumer<ColumnFamilyStore, Validator> previous = 
this.doValidation;
 +                if (previous != DEFAULT_VALIDATION)
 +                    throw new IllegalStateException("Attemptted to override 
validation, but was already overridden");
 +                this.doValidation = fn;
 +                return () -> this.doValidation = previous;
 +            }
 +
 +            public Closeable 
doValidation(Function<FailingBiConsumer<ColumnFamilyStore, Validator>, 
FailingBiConsumer<ColumnFamilyStore, Validator>> fn)
 +            {
 +                FailingBiConsumer<ColumnFamilyStore, Validator> previous = 
this.doValidation;
 +                this.doValidation = fn.apply(previous);
 +                return () -> this.doValidation = previous;
 +            }
 +
 +            public Closeable doSync(StreamExecutor streamExecutor)
 +            {
 +                StreamExecutor previous = this.streamExecutor;
 +                if (previous != defaultStreamExecutor)
 +                    throw new IllegalStateException("Attemptted to override 
sync, but was already overridden");
 +                this.streamExecutor = streamExecutor;
 +                return () -> this.streamExecutor = previous;
 +            }
 +
 +            void handle(Message msg)
 +            {
 +                msg = serde(msg);
 +                if (msg == null)
 +                {
 +                    logger.warn("Got a message that failed to 
serialize/deserialize");
 +                    return;
 +                }
 +                for (MessageListener l : listeners)
 +                    l.preHandle(this, msg);
 +                if (msg.verb().isResponse())
 +                {
 +                    // handle callbacks
 +                    CallbackKey key = new CallbackKey(msg.id(), msg.from());
 +                    if (messaging.callbacks.containsKey(key))
 +                    {
 +                        CallbackContext callback = 
messaging.callbacks.remove(key);
 +                        if (callback == null)
 +                            return;
 +                        try
 +                        {
 +                            if (msg.isFailureResponse())
 +                                callback.onFailure(msg.from(), 
(RequestFailureReason) msg.payload);
 +                            else callback.onResponse(msg);
 +                        }
 +                        catch (Throwable t)
 +                        {
 +                            failures.add(t);
 +                        }
 +                    }
 +                }
 +                else
 +                {
 +                    try
 +                    {
 +                        verbHandler.doVerb(msg);
 +                    }
 +                    catch (Throwable e)
 +                    {
 +                        failures.add(e);
 +                    }
 +                }
 +            }
 +
 +            public UUID hostId()
 +            {
 +                return hostId;
 +            }
 +
 +            @Override
 +            public InetAddressAndPort broadcastAddressAndPort()
 +            {
 +                return addressAndPort;
 +            }
 +
 +            public Collection<Token> tokens()
 +            {
 +                return tokens;
 +            }
 +
 +            public IFailureDetector failureDetector()
 +            {
 +                return failureDetector;
 +            }
 +
 +            @Override
 +            public IEndpointSnitch snitch()
 +            {
 +                return snitch;
 +            }
 +
 +            @Override
 +            public IGossiper gossiper()
 +            {
 +                return gossiper;
 +            }
 +
 +            @Override
 +            public ICompactionManager compactionManager()
 +            {
 +                return compactionManager;
 +            }
 +
 +            public ExecutorFactory executorFactory()
 +            {
 +                return globalExecutor;
 +            }
 +
 +            public ScheduledExecutorPlus optionalTasks()
 +            {
 +                return unorderedScheduled;
 +            }
 +
 +            @Override
 +            public ScheduledExecutorPlus nonPeriodicTasks()
 +            {
 +                return unorderedScheduled;
 +            }
 +
 +            @Override
 +            public ScheduledExecutorPlus scheduledTasks()
 +            {
 +                return unorderedScheduled;
 +            }
 +
 +            @Override
 +            public Supplier<Random> random()
 +            {
 +                return () -> rs.fork().asJdkRandom();
 +            }
 +
 +            public Clock clock()
 +            {
 +                return globalExecutor;
 +            }
 +
 +            public MessageDelivery messaging()
 +            {
 +                return messaging;
 +            }
 +
 +            public MBeanWrapper mbean()
 +            {
 +                return mbean;
 +            }
 +
 +            public RepairCoordinator repair(String ks, RepairOption options)
 +            {
 +                return repair(ks, options, true);
 +            }
 +
 +            public RepairCoordinator repair(String ks, RepairOption options, 
boolean addFailureOnErrorNotification)
 +            {
 +                RepairCoordinator repair = new RepairCoordinator(this, (name, 
tables) -> StorageService.instance.getValidColumnFamilies(false, false, name, 
tables), name -> StorageService.instance.getReplicas(name, 
broadcastAddressAndPort()), 42, options, ks);
 +                if (addFailureOnErrorNotification)
 +                {
 +                    repair.addProgressListener((tag, event) -> {
 +                        if (event.getType() == ProgressEventType.ERROR)
 +                            failures.add(new 
AssertionError(event.getMessage()));
 +                    });
 +                }
 +                return repair;
 +            }
 +
 +            public RangesAtEndpoint getLocalReplicas(String ks)
 +            {
 +                return StorageService.instance.getReplicas(ks, 
broadcastAddressAndPort());
 +            }
 +
 +            public Collection<? extends Range<Token>> getPrimaryRanges(String 
ks)
 +            {
 +                return 
StorageService.instance.getPrimaryRangesForEndpoint(ks, 
broadcastAddressAndPort());
 +            }
 +
 +            public Collection<? extends Range<Token>> 
getPrimaryRangesWithinDC(String ks)
 +            {
 +                return 
StorageService.instance.getPrimaryRangeForEndpointWithinDC(ks, 
broadcastAddressAndPort());
 +            }
 +
 +            @Override
 +            public ActiveRepairService repair()
 +            {
 +                return activeRepairService;
 +            }
 +
 +            @Override
 +            public IValidationManager validationManager()
 +            {
 +                return validationManager;
 +            }
 +
 +            @Override
 +            public TableRepairManager repairManager(ColumnFamilyStore store)
 +            {
 +                return new CassandraTableRepairManager(store, this)
 +                {
 +                    @Override
 +                    public void snapshot(String name, 
Collection<Range<Token>> ranges, boolean force)
 +                    {
 +                        // no-op
 +                    }
 +                };
 +            }
 +
 +            @Override
 +            public StreamExecutor streamExecutor()
 +            {
 +                return streamExecutor;
 +            }
 +
 +            @Override
 +            public PendingRangeCalculatorService pendingRangeCalculator()
 +            {
 +                return PendingRangeCalculatorService.instance;
 +            }
 +
 +            @Override
 +            public PaxosRepairState paxosRepairState()
 +            {
 +                return paxosRepairState;
 +            }
++
++            public String toString()
++            {
++                return "Node{" +
++                       "hostId=" + hostId +
++                       ", addressAndPort=" + 
addressAndPort.getAddress().getHostAddress() + ":" + addressAndPort.getPort() +
++                       '}';
++            }
 +        }
 +
 +        private Message serde(Message msg)
 +        {
 +            try (DataOutputBuffer b = DataOutputBuffer.scratchBuffer.get())
 +            {
 +                int messagingVersion = MessagingService.current_version;
 +                Message.serializer.serialize(msg, b, messagingVersion);
 +                DataInputBuffer in = new 
DataInputBuffer(b.unsafeGetBufferAndFlip(), false);
 +                return Message.serializer.deserialize(in, msg.from(), 
messagingVersion);
 +            }
 +            catch (Throwable e)
 +            {
 +                failures.add(e);
 +                return null;
 +            }
 +        }
 +    }
 +
 +    private static <T> Gen<T> fromQT(org.quicktheories.core.Gen<T> qt)
 +    {
 +        return rs -> {
 +            JavaRandom r = new JavaRandom(rs.asJdkRandom());
 +            return qt.generate(r);
 +        };
 +    }
 +
 +    public static class HackStrat extends LocalStrategy
 +    {
 +        public HackStrat(String keyspaceName, TokenMetadata tokenMetadata, 
IEndpointSnitch snitch, Map<String, String> configOptions)
 +        {
 +            super(keyspaceName, tokenMetadata, snitch, configOptions);
 +        }
 +    }
 +
 +    /**
 +     * Since the clock is accessable via {@link Global#currentTimeMillis()} 
and {@link Global#nanoTime()}, and only repair subsystem has the requirement 
not to touch that, this class
 +     * acts as a safty check that validates that repair does not touch these 
methods but allows the other subsystems to do so.
 +     */
 +    public static class ClockAccess implements Clock
 +    {
 +        private static final Set<Thread> OWNERS = 
Collections.synchronizedSet(new HashSet<>());
 +        private final Clock delegate = new Default();
 +
 +        public static void includeThreadAsOwner()
 +        {
 +            OWNERS.add(Thread.currentThread());
 +        }
 +
 +        @Override
 +        public long nanoTime()
 +        {
 +            checkAccess();
 +            return delegate.nanoTime();
 +        }
 +
 +        @Override
 +        public long currentTimeMillis()
 +        {
 +            checkAccess();
 +            return delegate.currentTimeMillis();
 +        }
 +
 +        private enum Access
 +        {MAIN_THREAD_ONLY, REJECT, IGNORE}
 +
 +        private void checkAccess()
 +        {
 +            Access access = StackWalker.getInstance().walk(frames -> {
 +                Iterator<StackWalker.StackFrame> it = frames.iterator();
 +                boolean topLevel = false;
 +                while (it.hasNext())
 +                {
 +                    StackWalker.StackFrame next = it.next();
 +                    if (!topLevel)
 +                    {
 +                        // need to find the top level!
 +                        while 
(!Clock.Global.class.getName().equals(next.getClassName()))
 +                        {
 +                            assert it.hasNext();
 +                            next = it.next();
 +                        }
 +                        topLevel = true;
 +                        assert it.hasNext();
 +                        next = it.next();
 +                    }
 +                    if 
(FuzzTestBase.class.getName().equals(next.getClassName())) return 
Access.MAIN_THREAD_ONLY;
 +                    // this is non-deterministic... but since the scope of 
the work is testing repair and not paxos... this is unblocked for now...
 +                    if 
(("org.apache.cassandra.service.paxos.Paxos".equals(next.getClassName()) && 
"newBallot".equals(next.getMethodName()))
 +                        || 
("org.apache.cassandra.service.paxos.uncommitted.PaxosBallotTracker".equals(next.getClassName())
 && "updateLowBound".equals(next.getMethodName())))
 +                        return Access.MAIN_THREAD_ONLY;
 +                    if 
(next.getClassName().startsWith("org.apache.cassandra.db.") || 
next.getClassName().startsWith("org.apache.cassandra.gms.") || 
next.getClassName().startsWith("org.apache.cassandra.cql3.") || 
next.getClassName().startsWith("org.apache.cassandra.metrics.") || 
next.getClassName().startsWith("org.apache.cassandra.utils.concurrent.")
 +                        || 
next.getClassName().startsWith("org.apache.cassandra.utils.TimeUUID") // this 
would be good to solve
 +                        || 
next.getClassName().startsWith(PendingAntiCompaction.class.getName()))
 +                        return Access.IGNORE;
 +                    if 
(next.getClassName().startsWith("org.apache.cassandra.repair") || 
ActiveRepairService.class.getName().startsWith(next.getClassName()))
 +                        return Access.REJECT;
 +                }
 +                return Access.IGNORE;
 +            });
 +            Thread current = Thread.currentThread();
 +            switch (access)
 +            {
 +                case IGNORE:
 +                    return;
 +                case REJECT:
 +                    throw new IllegalStateException("Rejecting access");
 +                case MAIN_THREAD_ONLY:
 +                    if (!OWNERS.contains(current)) throw new 
IllegalStateException("Accessed in wrong thread: " + current);
 +                    break;
 +            }
 +        }
 +    }
 +
 +    static class SimulatedFault extends RuntimeException
 +    {
 +        SimulatedFault(String message)
 +        {
 +            super(message);
 +        }
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to