This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9bde713  Fix IR prepare anti-compaction race
9bde713 is described below

commit 9bde713ee8883f70d130efb6290ec0e6daea524f
Author: Stefan Podkowinski <s.podkowin...@gmail.com>
AuthorDate: Fri Feb 15 10:03:47 2019 +0100

    Fix IR prepare anti-compaction race
    
    Patch by Stefan Podkowinski; Reviewed by Blake Eggleston for CASSANDRA-15027
---
 CHANGES.txt                                        |   1 +
 .../cassandra/db/compaction/CompactionManager.java |  50 +++-
 .../db/repair/CassandraKeyspaceRepairManager.java  |   6 +-
 .../cassandra/db/repair/PendingAntiCompaction.java |  27 +-
 .../cassandra/repair/KeyspaceRepairManager.java    |   4 +-
 .../repair/consistent/CoordinatorSession.java      |  94 ++++--
 .../cassandra/repair/consistent/LocalSessions.java |  28 +-
 .../db/compaction/AntiCompactionBytemanTest.java   |   2 +-
 .../db/compaction/AntiCompactionTest.java          |  10 +-
 .../db/compaction/CancelCompactionsTest.java       |   2 +-
 .../repair/PendingAntiCompactionBytemanTest.java   |   2 +-
 .../db/repair/PendingAntiCompactionTest.java       |  45 ++-
 .../apache/cassandra/net/MockMessagingService.java |   8 +
 .../consistent/CoordinatorMessagingTest.java       | 329 +++++++++++++++++++++
 .../repair/consistent/CoordinatorSessionTest.java  |  31 +-
 .../repair/consistent/LocalSessionTest.java        |  42 ++-
 16 files changed, 586 insertions(+), 95 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 02233dd..87d815b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Fix IR prepare anti-compaction race (CASSANDRA-15027)
  * Fix SimpleStrategy option validation (CASSANDRA-15007)
  * Don't try to cancel 2i compactions when starting anticompaction 
(CASSANDRA-15024)
  * Avoid NPE in RepairRunnable.recordFailure (CASSANDRA-15025)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 85aff08..b388098 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BooleanSupplier;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import javax.management.openmbean.OpenDataException;
@@ -654,7 +655,8 @@ public class CompactionManager implements 
CompactionManagerMBean
                                                            RangesAtEndpoint 
tokenRanges,
                                                            Refs<SSTableReader> 
sstables,
                                                            
LifecycleTransaction txn,
-                                                           UUID sessionId)
+                                                           UUID sessionId,
+                                                           BooleanSupplier 
isCancelled)
     {
         Runnable runnable = new WrappedRunnable()
         {
@@ -662,7 +664,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             {
                 try (TableMetrics.TableTimer.Context ctx = 
cfs.metric.anticompactionTime.time())
                 {
-                    performAnticompaction(cfs, tokenRanges, sstables, txn, 
sessionId);
+                    performAnticompaction(cfs, tokenRanges, sstables, txn, 
sessionId, isCancelled);
                 }
             }
         };
@@ -719,6 +721,7 @@ public class CompactionManager implements 
CompactionManagerMBean
      * @param replicas token ranges to be repaired
      * @param validatedForRepair SSTables containing the repaired ranges. 
Should be referenced before passing them.
      * @param sessionID the repair session we're anti-compacting for
+     * @param isCancelled function that indicates if active anti-compaction 
should be canceled
      * @throws InterruptedException
      * @throws IOException
      */
@@ -726,7 +729,8 @@ public class CompactionManager implements 
CompactionManagerMBean
                                       RangesAtEndpoint replicas,
                                       Refs<SSTableReader> validatedForRepair,
                                       LifecycleTransaction txn,
-                                      UUID sessionID) throws IOException
+                                      UUID sessionID,
+                                      BooleanSupplier isCancelled) throws 
IOException
     {
         try
         {
@@ -746,7 +750,7 @@ public class CompactionManager implements 
CompactionManagerMBean
 
             assert txn.originals().equals(sstables);
             if (!sstables.isEmpty())
-                doAntiCompaction(cfs, replicas, txn, sessionID);
+                doAntiCompaction(cfs, replicas, txn, sessionID, isCancelled);
             txn.finish();
         }
         finally
@@ -1432,11 +1436,14 @@ public class CompactionManager implements 
CompactionManagerMBean
      * @param txn a transaction over the repaired sstables to anticompact
      * @param ranges full and transient ranges to be placed into one of the 
new sstables. The repaired table will be tracked via
      *   the {@link 
org.apache.cassandra.io.sstable.metadata.StatsMetadata#pendingRepair} field.
+     * @param sessionID the repair session we're anti-compacting for
+     * @param isCancelled function that indicates if active anti-compaction 
should be canceled
      */
     private void doAntiCompaction(ColumnFamilyStore cfs,
                                   RangesAtEndpoint ranges,
                                   LifecycleTransaction txn,
-                                  UUID pendingRepair)
+                                  UUID pendingRepair,
+                                  BooleanSupplier isCancelled)
     {
         int originalCount = txn.originals().size();
         logger.info("Performing anticompaction on {} sstables", originalCount);
@@ -1458,7 +1465,7 @@ public class CompactionManager implements 
CompactionManagerMBean
         {
             try (LifecycleTransaction groupTxn = txn.split(sstableGroup))
             {
-                int antiCompacted = antiCompactGroup(cfs, ranges, groupTxn, 
pendingRepair);
+                int antiCompacted = antiCompactGroup(cfs, ranges, groupTxn, 
pendingRepair, isCancelled);
                 antiCompactedSSTableCount += antiCompacted;
             }
         }
@@ -1469,9 +1476,10 @@ public class CompactionManager implements 
CompactionManagerMBean
 
     @VisibleForTesting
     int antiCompactGroup(ColumnFamilyStore cfs,
-                                 RangesAtEndpoint ranges,
-                                 LifecycleTransaction txn,
-                                 UUID pendingRepair)
+                         RangesAtEndpoint ranges,
+                         LifecycleTransaction txn,
+                         UUID pendingRepair,
+                         BooleanSupplier isCancelled)
     {
         Preconditions.checkArgument(!ranges.isEmpty(), "need at least one full 
or transient range");
         long groupMaxDataAge = -1;
@@ -1538,7 +1546,7 @@ public class CompactionManager implements 
CompactionManagerMBean
 
              AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(txn.originals());
              CompactionController controller = new CompactionController(cfs, 
sstableAsSet, getDefaultGcBefore(cfs, nowInSec));
-             CompactionIterator ci = 
getAntiCompactionIterator(scanners.scanners, controller, nowInSec, 
UUIDGen.getTimeUUID(), active))
+             CompactionIterator ci = 
getAntiCompactionIterator(scanners.scanners, controller, nowInSec, 
UUIDGen.getTimeUUID(), active, isCancelled))
         {
             int expectedBloomFilterSize = 
Math.max(cfs.metadata().params.minIndexInterval, 
(int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
 
@@ -1601,16 +1609,30 @@ public class CompactionManager implements 
CompactionManagerMBean
         }
         catch (Throwable e)
         {
-            JVMStabilityInspector.inspectThrowable(e);
-            logger.error("Error anticompacting " + txn, e);
+            if (e instanceof CompactionInterruptedException && 
isCancelled.getAsBoolean())
+            {
+                logger.info("Anticompaction has been canceled for session {}", 
pendingRepair);
+                logger.trace(e.getMessage(), e);
+            }
+            else
+            {
+                JVMStabilityInspector.inspectThrowable(e);
+                logger.error("Error anticompacting " + txn + " for " + 
pendingRepair, e);
+            }
             throw e;
         }
     }
 
     @VisibleForTesting
-    public static CompactionIterator 
getAntiCompactionIterator(List<ISSTableScanner> scanners, CompactionController 
controller, int nowInSec, UUID timeUUID, ActiveCompactionsTracker 
activeCompactions)
+    public static CompactionIterator 
getAntiCompactionIterator(List<ISSTableScanner> scanners, CompactionController 
controller, int nowInSec, UUID timeUUID, ActiveCompactionsTracker 
activeCompactions, BooleanSupplier isCancelled)
     {
-        return new CompactionIterator(OperationType.ANTICOMPACTION, scanners, 
controller, nowInSec, timeUUID, activeCompactions);
+        return new CompactionIterator(OperationType.ANTICOMPACTION, scanners, 
controller, nowInSec, timeUUID, activeCompactions) {
+
+            public boolean isStopRequested()
+            {
+                return super.isStopRequested() || isCancelled.getAsBoolean();
+            }
+        };
     }
 
     @VisibleForTesting
diff --git 
a/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java 
b/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java
index fa2e653..4fa8650 100644
--- 
a/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java
+++ 
b/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.repair;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
+import java.util.function.BooleanSupplier;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -42,9 +43,10 @@ public class CassandraKeyspaceRepairManager implements 
KeyspaceRepairManager
     public ListenableFuture prepareIncrementalRepair(UUID sessionID,
                                                      
Collection<ColumnFamilyStore> tables,
                                                      RangesAtEndpoint 
tokenRanges,
-                                                     ExecutorService executor)
+                                                     ExecutorService executor,
+                                                     BooleanSupplier 
isCancelled)
     {
-        PendingAntiCompaction pac = new PendingAntiCompaction(sessionID, 
tables, tokenRanges, executor);
+        PendingAntiCompaction pac = new PendingAntiCompaction(sessionID, 
tables, tokenRanges, executor, isCancelled);
         return pac.run();
     }
 }
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java 
b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index d5c3ca0..fac164d 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -243,16 +244,18 @@ public class PendingAntiCompaction
     {
         private final UUID parentRepairSession;
         private final RangesAtEndpoint tokenRanges;
+        private final BooleanSupplier isCancelled;
 
-        public AcquisitionCallback(UUID parentRepairSession, RangesAtEndpoint 
tokenRanges)
+        public AcquisitionCallback(UUID parentRepairSession, RangesAtEndpoint 
tokenRanges, BooleanSupplier isCancelled)
         {
             this.parentRepairSession = parentRepairSession;
             this.tokenRanges = tokenRanges;
+            this.isCancelled = isCancelled;
         }
 
         ListenableFuture<?> submitPendingAntiCompaction(AcquireResult result)
         {
-            return 
CompactionManager.instance.submitPendingAntiCompaction(result.cfs, tokenRanges, 
result.refs, result.txn, parentRepairSession);
+            return 
CompactionManager.instance.submitPendingAntiCompaction(result.cfs, tokenRanges, 
result.refs, result.txn, parentRepairSession, isCancelled);
         }
 
         private static boolean shouldAbort(AcquireResult result)
@@ -312,22 +315,25 @@ public class PendingAntiCompaction
     private final ExecutorService executor;
     private final int acquireRetrySeconds;
     private final int acquireSleepMillis;
+    private final BooleanSupplier isCancelled;
 
     public PendingAntiCompaction(UUID prsId,
                                  Collection<ColumnFamilyStore> tables,
                                  RangesAtEndpoint tokenRanges,
-                                 ExecutorService executor)
+                                 ExecutorService executor,
+                                 BooleanSupplier isCancelled)
     {
-        this(prsId, tables, tokenRanges, ACQUIRE_RETRY_SECONDS, 
ACQUIRE_SLEEP_MS, executor);
+        this(prsId, tables, tokenRanges, ACQUIRE_RETRY_SECONDS, 
ACQUIRE_SLEEP_MS, executor, isCancelled);
     }
 
     @VisibleForTesting
     PendingAntiCompaction(UUID prsId,
-                                 Collection<ColumnFamilyStore> tables,
-                                 RangesAtEndpoint tokenRanges,
-                                 int acquireRetrySeconds,
-                                 int acquireSleepMillis,
-                                 ExecutorService executor)
+                          Collection<ColumnFamilyStore> tables,
+                          RangesAtEndpoint tokenRanges,
+                          int acquireRetrySeconds,
+                          int acquireSleepMillis,
+                          ExecutorService executor,
+                          BooleanSupplier isCancelled)
     {
         this.prsId = prsId;
         this.tables = tables;
@@ -335,6 +341,7 @@ public class PendingAntiCompaction
         this.executor = executor;
         this.acquireRetrySeconds = acquireRetrySeconds;
         this.acquireSleepMillis = acquireSleepMillis;
+        this.isCancelled = isCancelled;
     }
 
     public ListenableFuture run()
@@ -361,6 +368,6 @@ public class PendingAntiCompaction
     @VisibleForTesting
     protected AcquisitionCallback getAcquisitionCallback(UUID prsId, 
RangesAtEndpoint tokenRanges)
     {
-        return new AcquisitionCallback(prsId, tokenRanges);
+        return new AcquisitionCallback(prsId, tokenRanges, isCancelled);
     }
 }
diff --git a/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java 
b/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java
index bc614dc..0739f10 100644
--- a/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java
+++ b/src/java/org/apache/cassandra/repair/KeyspaceRepairManager.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.repair;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
+import java.util.function.BooleanSupplier;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -40,5 +41,6 @@ public interface KeyspaceRepairManager
     ListenableFuture prepareIncrementalRepair(UUID sessionID,
                                               Collection<ColumnFamilyStore> 
tables,
                                               RangesAtEndpoint tokenRanges,
-                                              ExecutorService executor);
+                                              ExecutorService executor,
+                                              BooleanSupplier isCancelled);
 }
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java 
b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index f52a28d..b921342 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -26,6 +26,7 @@ import java.util.function.Supplier;
 
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.AsyncFunction;
@@ -95,6 +96,12 @@ public class CoordinatorSession extends ConsistentSession
         super.setState(state);
     }
 
+    @VisibleForTesting
+    synchronized State getParticipantState(InetAddressAndPort participant)
+    {
+        return participantStates.get(participant);
+    }
+
     public synchronized void setParticipantState(InetAddressAndPort 
participant, State state)
     {
         logger.trace("Setting participant {} to state {} for repair {}", 
participant, state, sessionID);
@@ -153,25 +160,31 @@ public class CoordinatorSession extends ConsistentSession
 
     public synchronized void handlePrepareResponse(InetAddressAndPort 
participant, boolean success)
     {
-        if (getState() == State.FAILED)
-        {
-            logger.trace("Incremental repair session {} has failed, ignoring 
prepare response from {}", sessionID, participant);
-        }
-        else if (!success)
+        if (!success)
         {
-            logger.debug("{} failed the prepare phase for incremental repair 
session {}. Aborting session", participant, sessionID);
-            fail();
-            prepareFuture.set(false);
+            logger.debug("{} failed the prepare phase for incremental repair 
session {}", participant, sessionID);
+            sendFailureMessageToParticipants();
+            setParticipantState(participant, State.FAILED);
         }
         else
         {
             logger.trace("Successful prepare response received from {} for 
repair session {}", participant, sessionID);
             setParticipantState(participant, State.PREPARED);
-            if (getState() == State.PREPARED)
-            {
-                logger.debug("Incremental repair session {} successfully 
prepared.", sessionID);
-                prepareFuture.set(true);
-            }
+        }
+
+        // don't progress until we've heard from all replicas
+        if(Iterables.any(participantStates.values(), v -> v == 
State.PREPARING))
+            return;
+
+        if (getState() == State.PREPARED)
+        {
+            logger.info("Incremental repair session {} successfully 
prepared.", sessionID);
+            prepareFuture.set(true);
+        }
+        else
+        {
+            fail();
+            prepareFuture.set(false);
         }
     }
 
@@ -229,9 +242,8 @@ public class CoordinatorSession extends ConsistentSession
         logger.info("Incremental repair session {} completed", sessionID);
     }
 
-    public synchronized void fail()
+    private void sendFailureMessageToParticipants()
     {
-        logger.info("Incremental repair session {} failed", sessionID);
         FailSession message = new FailSession(sessionID);
         for (final InetAddressAndPort participant : participants)
         {
@@ -240,6 +252,12 @@ public class CoordinatorSession extends ConsistentSession
                 sendMessage(participant, message);
             }
         }
+    }
+
+    public synchronized void fail()
+    {
+        logger.info("Incremental repair session {} failed", sessionID);
+        sendFailureMessageToParticipants();
         setAll(State.FAILED);
 
         String exceptionMsg = String.format("Incremental repair session %s has 
failed", sessionID);
@@ -312,41 +330,59 @@ public class CoordinatorSession extends ConsistentSession
             }
         }, MoreExecutors.directExecutor());
 
+        // return execution result as set by following callback
+        SettableFuture<Boolean> resultFuture = SettableFuture.create();
+
         // commit repaired data
         Futures.addCallback(proposeFuture, new FutureCallback<Boolean>()
         {
             public void onSuccess(@Nullable Boolean result)
             {
-                if (result != null && result)
+                try
                 {
-                    if (logger.isDebugEnabled())
+                    if (result != null && result)
                     {
-                        logger.debug("Incremental repair {} finalization phase 
completed in {}", sessionID, formatDuration(finalizeStart, 
System.currentTimeMillis()));
+                        if (logger.isDebugEnabled())
+                        {
+                            logger.debug("Incremental repair {} finalization 
phase completed in {}", sessionID, formatDuration(finalizeStart, 
System.currentTimeMillis()));
+                        }
+                        finalizeCommit();
+                        if (logger.isDebugEnabled())
+                        {
+                            logger.debug("Incremental repair {} phase 
completed in {}", sessionID, formatDuration(sessionStart, 
System.currentTimeMillis()));
+                        }
                     }
-                    finalizeCommit();
-                    if (logger.isDebugEnabled())
+                    else
                     {
-                        logger.debug("Incremental repair {} phase completed in 
{}", sessionID, formatDuration(sessionStart, System.currentTimeMillis()));
+                        hasFailure.set(true);
+                        fail();
                     }
+                    resultFuture.set(result);
                 }
-                else
+                catch (Exception e)
                 {
-                    hasFailure.set(true);
-                    fail();
+                    resultFuture.setException(e);
                 }
             }
 
             public void onFailure(Throwable t)
             {
-                if (logger.isDebugEnabled())
+                try
+                {
+                    if (logger.isDebugEnabled())
+                    {
+                        logger.debug("Incremental repair {} phase failed in 
{}", sessionID, formatDuration(sessionStart, System.currentTimeMillis()));
+                    }
+                    hasFailure.set(true);
+                    fail();
+                }
+                finally
                 {
-                    logger.debug("Incremental repair {} phase failed in {}", 
sessionID, formatDuration(sessionStart, System.currentTimeMillis()));
+                    resultFuture.setException(t);
                 }
-                hasFailure.set(true);
-                fail();
             }
         });
 
-        return proposeFuture;
+        return resultFuture;
     }
 }
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java 
b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index c28391a..c39c4e6 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -33,6 +33,7 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
@@ -523,11 +524,17 @@ public class LocalSessions
 
     public void failSession(UUID sessionID, boolean sendMessage)
     {
-        logger.info("Failing local repair session {}", sessionID);
         LocalSession session = getSession(sessionID);
         if (session != null)
         {
-            setStateAndSave(session, FAILED);
+            synchronized (session)
+            {
+                if (session.getState() != FAILED)
+                {
+                    logger.info("Failing local repair session {}", sessionID);
+                    setStateAndSave(session, FAILED);
+                }
+            }
             if (sendMessage)
             {
                 sendMessage(session.coordinator, new FailSession(sessionID));
@@ -550,9 +557,10 @@ public class LocalSessions
                                     UUID sessionID,
                                     Collection<ColumnFamilyStore> tables,
                                     RangesAtEndpoint tokenRanges,
-                                    ExecutorService executor)
+                                    ExecutorService executor,
+                                    BooleanSupplier isCancelled)
     {
-        return repairManager.prepareIncrementalRepair(sessionID, tables, 
tokenRanges, executor);
+        return repairManager.prepareIncrementalRepair(sessionID, tables, 
tokenRanges, executor, isCancelled);
     }
 
     RangesAtEndpoint filterLocalRanges(String keyspace, Set<Range<Token>> 
ranges)
@@ -599,8 +607,8 @@ public class LocalSessions
         }
         catch (Throwable e)
         {
-            logger.trace("Error retrieving ParentRepairSession for session {}, 
responding with failure", sessionID);
-            sendMessage(coordinator, new FailSession(sessionID));
+            logger.error("Error retrieving ParentRepairSession for session {}, 
responding with failure", sessionID);
+            sendMessage(coordinator, new PrepareConsistentResponse(sessionID, 
getBroadcastAddressAndPort(), false));
             return;
         }
 
@@ -612,7 +620,8 @@ public class LocalSessions
 
         KeyspaceRepairManager repairManager = 
parentSession.getKeyspace().getRepairManager();
         RangesAtEndpoint tokenRanges = 
filterLocalRanges(parentSession.getKeyspace().getName(), 
parentSession.getRanges());
-        ListenableFuture repairPreparation = prepareSession(repairManager, 
sessionID, parentSession.getColumnFamilyStores(), tokenRanges, executor);
+        ListenableFuture repairPreparation = prepareSession(repairManager, 
sessionID, parentSession.getColumnFamilyStores(),
+                                                            tokenRanges, 
executor, () -> session.getState() != PREPARING);
 
         Futures.addCallback(repairPreparation, new FutureCallback<Object>()
         {
@@ -620,7 +629,7 @@ public class LocalSessions
             {
                 try
                 {
-                    logger.debug("Prepare phase for incremental repair session 
{} completed", sessionID);
+                    logger.info("Prepare phase for incremental repair session 
{} completed", sessionID);
                     if (session.getState() != FAILED)
                     {
                         setStateAndSave(session, PREPARED);
@@ -628,7 +637,8 @@ public class LocalSessions
                     }
                     else
                     {
-                        logger.debug("Session {} failed before anticompaction 
completed", sessionID);
+                        logger.info("Session {} failed before anticompaction 
completed", sessionID);
+                        sendMessage(coordinator, new 
PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
                     }
                 }
                 finally
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionBytemanTest.java 
b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionBytemanTest.java
index 31187c7..1673d01 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionBytemanTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionBytemanTest.java
@@ -125,7 +125,7 @@ public class AntiCompactionBytemanTest extends CQLTester
 
         try (LifecycleTransaction txn = 
getCurrentColumnFamilyStore().getTracker().tryModify(getCurrentColumnFamilyStore().getLiveSSTables(),
 OperationType.ANTICOMPACTION))
         {
-            
CompactionManager.instance.antiCompactGroup(getCurrentColumnFamilyStore(), 
ranges, txn, UUID.randomUUID());
+            
CompactionManager.instance.antiCompactGroup(getCurrentColumnFamilyStore(), 
ranges, txn, UUID.randomUUID(), () -> false);
         }
         finished.set(true);
         t.join();
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java 
b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 08609f3..7b5164a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -150,7 +150,7 @@ public class AntiCompactionTest
             if (txn == null)
                 throw new IllegalStateException();
             registerParentRepairSession(sessionID, ranges.ranges(), 
FBUtilities.nowInSeconds(), sessionID);
-            CompactionManager.instance.performAnticompaction(store, ranges, 
refs, txn, sessionID);
+            CompactionManager.instance.performAnticompaction(store, ranges, 
refs, txn, sessionID, () -> false);
         }
 
         SSTableStats stats = new SSTableStats();
@@ -251,7 +251,7 @@ public class AntiCompactionTest
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, 
OperationType.ANTICOMPACTION);
              Refs<SSTableReader> refs = Refs.ref(sstables))
         {
-            CompactionManager.instance.performAnticompaction(cfs, 
atEndpoint(ranges, NO_RANGES), refs, txn, parentRepairSession);
+            CompactionManager.instance.performAnticompaction(cfs, 
atEndpoint(ranges, NO_RANGES), refs, txn, parentRepairSession, () -> false);
         }
         long sum = 0;
         long rows = 0;
@@ -381,7 +381,7 @@ public class AntiCompactionTest
         try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, 
OperationType.ANTICOMPACTION);
              Refs<SSTableReader> refs = Refs.ref(sstables))
         {
-            CompactionManager.instance.performAnticompaction(store, 
atEndpoint(ranges, NO_RANGES), refs, txn, pendingRepair);
+            CompactionManager.instance.performAnticompaction(store, 
atEndpoint(ranges, NO_RANGES), refs, txn, pendingRepair, () -> false);
         }
 
         assertThat(store.getLiveSSTables().size(), is(1));
@@ -415,7 +415,7 @@ public class AntiCompactionTest
         try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, 
OperationType.ANTICOMPACTION);
              Refs<SSTableReader> refs = Refs.ref(sstables))
         {
-            CompactionManager.instance.performAnticompaction(store, 
atEndpoint(ranges, NO_RANGES), refs, txn, parentRepairSession);
+            CompactionManager.instance.performAnticompaction(store, 
atEndpoint(ranges, NO_RANGES), refs, txn, parentRepairSession, () -> false);
         }
         catch (IllegalStateException e)
         {
@@ -485,7 +485,7 @@ public class AntiCompactionTest
             Assert.assertFalse(refs.isEmpty());
             try
             {
-                CompactionManager.instance.performAnticompaction(store, 
atEndpoint(ranges, NO_RANGES), refs, txn, missingRepairSession);
+                CompactionManager.instance.performAnticompaction(store, 
atEndpoint(ranges, NO_RANGES), refs, txn, missingRepairSession, () -> false);
                 Assert.fail("expected RuntimeException");
             }
             catch (RuntimeException e)
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
index cb4ef4a..976180c 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
@@ -259,7 +259,7 @@ public class CancelCompactionsTest extends CQLTester
             InetAddressAndPort local = 
FBUtilities.getBroadcastAddressAndPort();
             RangesAtEndpoint rae = RangesAtEndpoint.builder(local).add(new 
Replica(local, range, true)).build();
 
-            PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs), rae, Executors.newSingleThreadExecutor());
+            PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs), rae, Executors.newSingleThreadExecutor(), () -> 
false);
             Future<?> fut = pac.run();
             Thread.sleep(600);
             List<TestCompactionTask> toAbort = new ArrayList<>();
diff --git 
a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java
 
b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java
index 2f2612a..9271866 100644
--- 
a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java
+++ 
b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java
@@ -67,7 +67,7 @@ public class PendingAntiCompactionBytemanTest extends 
AbstractPendingAntiCompact
         UUID prsid = prepareSession();
         try
         {
-            PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Lists.newArrayList(cfs, cfs2), atEndpoint(ranges, NO_RANGES), es);
+            PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Lists.newArrayList(cfs, cfs2), atEndpoint(ranges, NO_RANGES), es, () -> false);
             pac.run().get();
             fail("PAC should throw exception when anticompaction throws 
exception!");
         }
diff --git 
a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java 
b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index e44c697..b140813 100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -36,6 +36,7 @@ import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -94,7 +95,7 @@ public class PendingAntiCompactionTest extends 
AbstractPendingAntiCompactionTest
     {
         public InstrumentedAcquisitionCallback(UUID parentRepairSession, 
RangesAtEndpoint ranges)
         {
-            super(parentRepairSession, ranges);
+            super(parentRepairSession, ranges, () -> false);
         }
 
         Set<TableId> submittedCompactions = new HashSet<>();
@@ -142,7 +143,7 @@ public class PendingAntiCompactionTest extends 
AbstractPendingAntiCompactionTest
         ExecutorService executor = Executors.newSingleThreadExecutor();
         try
         {
-            pac = new PendingAntiCompaction(sessionID, tables, 
atEndpoint(ranges, NO_RANGES), executor);
+            pac = new PendingAntiCompaction(sessionID, tables, 
atEndpoint(ranges, NO_RANGES), executor, () -> false);
             pac.run().get();
         }
         finally
@@ -371,10 +372,36 @@ public class PendingAntiCompactionTest extends 
AbstractPendingAntiCompactionTest
                                                                  true,0,
                                                                  true,
                                                                  
PreviewKind.NONE);
-        CompactionManager.instance.performAnticompaction(result.cfs, 
atEndpoint(FULL_RANGE, NO_RANGES), result.refs, result.txn, sessionID);
+        CompactionManager.instance.performAnticompaction(result.cfs, 
atEndpoint(FULL_RANGE, NO_RANGES), result.refs, result.txn, sessionID, () -> 
false);
 
     }
 
+    @Test (expected = CompactionInterruptedException.class)
+    public void cancelledAntiCompaction() throws Exception
+    {
+        cfs.disableAutoCompaction();
+        makeSSTables(1);
+
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new 
PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, 
UUIDGen.getTimeUUID(), 0, 0);
+        PendingAntiCompaction.AcquireResult result = 
acquisitionCallable.call();
+        UUID sessionID = UUIDGen.getTimeUUID();
+        ActiveRepairService.instance.registerParentRepairSession(sessionID,
+                                                                 
InetAddressAndPort.getByName("127.0.0.1"),
+                                                                 
Lists.newArrayList(cfs),
+                                                                 FULL_RANGE,
+                                                                 true,0,
+                                                                 true,
+                                                                 
PreviewKind.NONE);
+
+        // attempt to anti-compact the sstable in half
+        SSTableReader sstable = 
Iterables.getOnlyElement(cfs.getLiveSSTables());
+        Token left = cfs.getPartitioner().midpoint(sstable.first.getToken(), 
sstable.last.getToken());
+        Token right = sstable.last.getToken();
+        CompactionManager.instance.performAnticompaction(result.cfs,
+                                                         
atEndpoint(Collections.singleton(new Range<>(left, right)), NO_RANGES),
+                                                         result.refs, 
result.txn, sessionID, () -> true);
+    }
+
     /**
      * Makes sure that PendingAntiCompaction fails when anticompaction throws 
exception
      */
@@ -385,11 +412,11 @@ public class PendingAntiCompactionTest extends 
AbstractPendingAntiCompactionTest
         makeSSTables(2);
         UUID prsid = UUID.randomUUID();
         ListeningExecutorService es = 
MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService());
-        PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es) {
+        PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es, () -> false) 
{
             @Override
             protected AcquisitionCallback getAcquisitionCallback(UUID prsId, 
RangesAtEndpoint tokenRanges)
             {
-                return new AcquisitionCallback(prsid, tokenRanges)
+                return new AcquisitionCallback(prsid, tokenRanges, () -> false)
                 {
                     @Override
                     ListenableFuture<?> 
submitPendingAntiCompaction(AcquireResult result)
@@ -431,13 +458,13 @@ public class PendingAntiCompactionTest extends 
AbstractPendingAntiCompactionTest
         {
             try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
                  CompactionController controller = new 
CompactionController(cfs, sstables, 0);
-                 CompactionIterator ci = 
CompactionManager.getAntiCompactionIterator(scanners, controller, 0, 
UUID.randomUUID(), CompactionManager.instance.active))
+                 CompactionIterator ci = 
CompactionManager.getAntiCompactionIterator(scanners, controller, 0, 
UUID.randomUUID(), CompactionManager.instance.active, () -> false))
             {
                 // `ci` is our imaginary ongoing anticompaction which makes no 
progress until after 30s
                 // now we try to start a new AC, which will try to cancel all 
ongoing compactions
 
                 CompactionManager.instance.active.beginCompaction(ci);
-                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), 0, 0, es);
+                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), 0, 0, es, () -> 
false);
                 ListenableFuture fut = pac.run();
                 try
                 {
@@ -491,7 +518,7 @@ public class PendingAntiCompactionTest extends 
AbstractPendingAntiCompactionTest
                 // now we try to start a new AC, which will try to cancel all 
ongoing compactions
 
                 CompactionManager.instance.active.beginCompaction(ci);
-                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es);
+                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es, () -> false);
                 ListenableFuture fut = pac.run();
                 try
                 {
@@ -693,7 +720,7 @@ public class PendingAntiCompactionTest extends 
AbstractPendingAntiCompactionTest
             // mark the sstables pending, with a 2i compaction going, which 
should be untouched;
             try (LifecycleTransaction txn = 
idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION))
             {
-                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs2), atEndpoint(FULL_RANGE, NO_RANGES), es);
+                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, 
Collections.singleton(cfs2), atEndpoint(FULL_RANGE, NO_RANGES), es, () -> 
false);
                 pac.run().get();
             }
             // and make sure it succeeded;
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingService.java 
b/test/unit/org/apache/cassandra/net/MockMessagingService.java
index 4ea1bf5..79edae8 100644
--- a/test/unit/org/apache/cassandra/net/MockMessagingService.java
+++ b/test/unit/org/apache/cassandra/net/MockMessagingService.java
@@ -81,6 +81,14 @@ public class MockMessagingService
     }
 
     /**
+     * Creates a matcher that will indicate if the target address of the 
outgoing message matches the provided predicate.
+     */
+    public static Matcher<InetAddressAndPort> to(Predicate<InetAddressAndPort> 
predicate)
+    {
+        return (in, to) -> predicate.test(to);
+    }
+
+    /**
      * Creates a matcher that will indicate if the verb of the outgoing 
message equals the
      * provided value.
      */
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
new file mode 100644
index 0000000..b532abd
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorMessagingTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.consistent;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.MockMessagingService;
+import org.apache.cassandra.net.MockMessagingSpy;
+import org.apache.cassandra.repair.AbstractRepairTest;
+import org.apache.cassandra.repair.RepairSessionResult;
+import org.apache.cassandra.repair.messages.FailSession;
+import org.apache.cassandra.repair.messages.FinalizeCommit;
+import org.apache.cassandra.repair.messages.FinalizePromise;
+import org.apache.cassandra.repair.messages.FinalizePropose;
+import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
+import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+
+import static org.apache.cassandra.net.MockMessagingService.all;
+import static org.apache.cassandra.net.MockMessagingService.payload;
+import static org.apache.cassandra.net.MockMessagingService.to;
+import static org.apache.cassandra.net.MockMessagingService.verb;
+import static org.junit.Assert.fail;
+
+public class CoordinatorMessagingTest extends AbstractRepairTest
+{
+
+    protected ColumnFamilyStore cfs;
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        SchemaLoader.prepareServer();
+        LocalSessionAccessor.startup();
+    }
+
+    @Before
+    public void setup()
+    {
+        String ks = "ks_" + System.currentTimeMillis();
+        TableMetadata cfm = CreateTableStatement.parse(String.format("CREATE 
TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, "tbl"), ks).build();
+        SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
+        cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+        cfs.disableAutoCompaction();
+    }
+
+    @After
+    public void cleanup()
+    {
+        MockMessagingService.cleanup();
+    }
+
+    @Test
+    public void testMockedMessagingHappyPath() throws InterruptedException, 
ExecutionException, TimeoutException
+    {
+
+        MockMessagingSpy spyPrepare = createPrepareSpy(Collections.emptySet(), 
Collections.emptySet());
+        MockMessagingSpy spyFinalize = 
createFinalizeSpy(Collections.emptySet(), Collections.emptySet());
+        MockMessagingSpy spyCommit = createCommitSpy();
+
+        UUID uuid = registerSession(cfs, true, true);
+        CoordinatorSession coordinator = 
ActiveRepairService.instance.consistent.coordinated.registerSession(uuid, 
PARTICIPANTS, false);
+        AtomicBoolean repairSubmitted = new AtomicBoolean(false);
+        SettableFuture<List<RepairSessionResult>> repairFuture = 
SettableFuture.create();
+        Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier 
= () ->
+        {
+            repairSubmitted.set(true);
+            return repairFuture;
+        };
+
+        // coordinator sends prepare requests to create local session and 
perform anticompaction
+        AtomicBoolean hasFailures = new AtomicBoolean(false);
+        Assert.assertFalse(repairSubmitted.get());
+
+        // execute repair and start prepare phase
+        ListenableFuture<Boolean> sessionResult = 
coordinator.execute(sessionSupplier, hasFailures);
+        Assert.assertFalse(sessionResult.isDone());
+        Assert.assertFalse(hasFailures.get());
+        // prepare completed
+        spyPrepare.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
+        Assert.assertFalse(sessionResult.isDone());
+        Assert.assertFalse(hasFailures.get());
+
+        // set result from local repair session
+        repairFuture.set(Lists.newArrayList(createResult(coordinator), 
createResult(coordinator), createResult(coordinator)));
+
+        // finalize phase
+        spyFinalize.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
+        Assert.assertFalse(sessionResult.isDone());
+        Assert.assertFalse(hasFailures.get());
+
+        // commit phase
+        spyCommit.interceptMessageOut(3).get(1, TimeUnit.SECONDS);
+        Assert.assertTrue(sessionResult.get());
+        Assert.assertFalse(hasFailures.get());
+
+        // expect no other messages except from intercepted so far
+        spyPrepare.interceptNoMsg(100, TimeUnit.MILLISECONDS);
+        spyFinalize.interceptNoMsg(100, TimeUnit.MILLISECONDS);
+        spyCommit.interceptNoMsg(100, TimeUnit.MILLISECONDS);
+
+        Assert.assertEquals(ConsistentSession.State.FINALIZED, 
coordinator.getState());
+        
Assert.assertFalse(ActiveRepairService.instance.consistent.local.isSessionInProgress(uuid));
+    }
+
+
+    @Test
+    public void testMockedMessagingPrepareFailureP1() throws 
InterruptedException, ExecutionException, TimeoutException
+    {
+        createPrepareSpy(Collections.singleton(PARTICIPANT1), 
Collections.emptySet());
+        testMockedMessagingPrepareFailure();
+    }
+
+    @Test
+    public void testMockedMessagingPrepareFailureP12() throws 
InterruptedException, ExecutionException, TimeoutException
+    {
+        createPrepareSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2), 
Collections.emptySet());
+        testMockedMessagingPrepareFailure();
+    }
+
+    @Test
+    public void testMockedMessagingPrepareFailureP3() throws 
InterruptedException, ExecutionException, TimeoutException
+    {
+        createPrepareSpy(Collections.singleton(PARTICIPANT3), 
Collections.emptySet());
+        testMockedMessagingPrepareFailure();
+    }
+
+    @Test
+    public void testMockedMessagingPrepareFailureP123() throws 
InterruptedException, ExecutionException, TimeoutException
+    {
+        createPrepareSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2, 
PARTICIPANT3), Collections.emptySet());
+        testMockedMessagingPrepareFailure();
+    }
+
+    @Test(expected = TimeoutException.class)
+    public void testMockedMessagingPrepareFailureWrongSessionId() throws 
InterruptedException, ExecutionException, TimeoutException
+    {
+        createPrepareSpy(Collections.singleton(PARTICIPANT1), 
Collections.emptySet(), (msgOut) -> UUID.randomUUID());
+        testMockedMessagingPrepareFailure();
+    }
+
+    private void testMockedMessagingPrepareFailure() throws 
InterruptedException, ExecutionException, TimeoutException
+    {
+        // we expect FailSession messages to all participants
+        MockMessagingSpy sendFailSessionExpectedSpy = 
createFailSessionSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2, 
PARTICIPANT3));
+
+        UUID uuid = registerSession(cfs, true, true);
+        CoordinatorSession coordinator = 
ActiveRepairService.instance.consistent.coordinated.registerSession(uuid, 
PARTICIPANTS, false);
+        AtomicBoolean repairSubmitted = new AtomicBoolean(false);
+        SettableFuture<List<RepairSessionResult>> repairFuture = 
SettableFuture.create();
+        Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier 
= () ->
+        {
+            repairSubmitted.set(true);
+            return repairFuture;
+        };
+
+        // coordinator sends prepare requests to create local session and 
perform anticompaction
+        AtomicBoolean proposeFailed = new AtomicBoolean(false);
+        Assert.assertFalse(repairSubmitted.get());
+
+        // execute repair and start prepare phase
+        ListenableFuture<Boolean> sessionResult = 
coordinator.execute(sessionSupplier, proposeFailed);
+        Assert.assertFalse(proposeFailed.get());
+        // prepare completed
+        try
+        {
+            sessionResult.get(1, TimeUnit.SECONDS);
+            fail("Completed session without failure after prepare failed");
+        }
+        catch (ExecutionException e)
+        {
+        }
+        sendFailSessionExpectedSpy.interceptMessageOut(3).get(1, 
TimeUnit.SECONDS);
+        Assert.assertFalse(repairSubmitted.get());
+        Assert.assertTrue(proposeFailed.get());
+        Assert.assertEquals(ConsistentSession.State.FAILED, 
coordinator.getState());
+        
Assert.assertFalse(ActiveRepairService.instance.consistent.local.isSessionInProgress(uuid));
+    }
+
+    @Test
+    public void testMockedMessagingPrepareTimeout() throws 
InterruptedException, ExecutionException, TimeoutException
+    {
+        MockMessagingSpy spyPrepare = createPrepareSpy(Collections.emptySet(), 
Collections.singleton(PARTICIPANT3));
+        MockMessagingSpy sendFailSessionUnexpectedSpy = 
createFailSessionSpy(Lists.newArrayList(PARTICIPANT1, PARTICIPANT2, 
PARTICIPANT3));
+
+        UUID uuid = registerSession(cfs, true, true);
+        CoordinatorSession coordinator = 
ActiveRepairService.instance.consistent.coordinated.registerSession(uuid, 
PARTICIPANTS, false);
+        AtomicBoolean repairSubmitted = new AtomicBoolean(false);
+        SettableFuture<List<RepairSessionResult>> repairFuture = 
SettableFuture.create();
+        Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier 
= () ->
+        {
+            repairSubmitted.set(true);
+            return repairFuture;
+        };
+
+        // coordinator sends prepare requests to create local session and 
perform anticompaction
+        AtomicBoolean hasFailures = new AtomicBoolean(false);
+        Assert.assertFalse(repairSubmitted.get());
+
+        // execute repair and start prepare phase
+        ListenableFuture<Boolean> sessionResult = 
coordinator.execute(sessionSupplier, hasFailures);
+        try
+        {
+            sessionResult.get(1, TimeUnit.SECONDS);
+            fail("Completed session without failure after prepare failed");
+        }
+        catch (ExecutionException e)
+        {
+            fail("Failed session in prepare failed during timeout from 
participant");
+        }
+        catch (TimeoutException e)
+        {
+            // expected
+        }
+        // we won't send out any fail session message in case of timeouts
+        spyPrepare.expectMockedMessageIn(2).get(100, TimeUnit.MILLISECONDS);
+        sendFailSessionUnexpectedSpy.interceptNoMsg(100, 
TimeUnit.MILLISECONDS);
+        Assert.assertFalse(repairSubmitted.get());
+        Assert.assertFalse(hasFailures.get());
+        Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
+        
Assert.assertFalse(ActiveRepairService.instance.consistent.local.isSessionInProgress(uuid));
+    }
+
+    private MockMessagingSpy createPrepareSpy(Collection<InetAddressAndPort> 
failed,
+                                              Collection<InetAddressAndPort> 
timeout)
+    {
+        return createPrepareSpy(failed, timeout, (msgOut) -> 
msgOut.parentSession);
+    }
+
+    private MockMessagingSpy createPrepareSpy(Collection<InetAddressAndPort> 
failed,
+                                              Collection<InetAddressAndPort> 
timeout,
+                                              
Function<PrepareConsistentRequest, UUID> sessionIdFunc)
+    {
+        return MockMessagingService.when(
+        all(verb(MessagingService.Verb.REPAIR_MESSAGE),
+            payload((p) -> p instanceof PrepareConsistentRequest))
+        ).respond((msgOut, to) ->
+                  {
+                      if(timeout.contains(to)) return null;
+                      else return MessageIn.create(to,
+                                                   new 
PrepareConsistentResponse(sessionIdFunc.apply((PrepareConsistentRequest) 
msgOut.payload), to, !failed.contains(to)),
+                                                   Collections.emptyMap(),
+                                                   
MessagingService.Verb.REPAIR_MESSAGE,
+                                                   
MessagingService.current_version);
+                  });
+    }
+
+    private MockMessagingSpy createFinalizeSpy(Collection<InetAddressAndPort> 
failed,
+                                               Collection<InetAddressAndPort> 
timeout)
+    {
+        return MockMessagingService.when(
+        all(verb(MessagingService.Verb.REPAIR_MESSAGE),
+            payload((p) -> p instanceof FinalizePropose))
+        ).respond((msgOut, to) ->
+                  {
+                      if(timeout.contains(to)) return null;
+                      else return MessageIn.create(to,
+                                                   new 
FinalizePromise(((FinalizePropose) msgOut.payload).sessionID, to, 
!failed.contains(to)),
+                                                   Collections.emptyMap(),
+                                                   
MessagingService.Verb.REPAIR_MESSAGE,
+                                                   
MessagingService.current_version);
+                  });
+    }
+
+    private MockMessagingSpy createCommitSpy()
+    {
+        return MockMessagingService.when(
+            all(verb(MessagingService.Verb.REPAIR_MESSAGE),
+                payload((p) -> p instanceof FinalizeCommit))
+        ).dontReply();
+    }
+
+    private MockMessagingSpy 
createFailSessionSpy(Collection<InetAddressAndPort> participants)
+    {
+        return MockMessagingService.when(
+            all(verb(MessagingService.Verb.REPAIR_MESSAGE),
+                payload((p) -> p instanceof FailSession),
+                to(participants::contains))
+        ).dontReply();
+    }
+
+    private static RepairSessionResult createResult(CoordinatorSession 
coordinator)
+    {
+        return new RepairSessionResult(coordinator.sessionID, "ks", 
coordinator.ranges, null, false);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
index 5d054d3..c6980fe 100644
--- 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
+++ 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
@@ -374,31 +374,42 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
 
         coordinator.sentMessages.clear();
 
-        // participants respond to coordinator, and repair begins once all 
participants have responded with success
+        // participants respond to coordinator, and repair begins once all 
participants have responded
         Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
 
         coordinator.handlePrepareResponse(PARTICIPANT1, true);
         Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
+        Assert.assertEquals(PREPARED, 
coordinator.getParticipantState(PARTICIPANT1));
+        Assert.assertFalse(sessionResult.isDone());
 
         // participant 2 fails to prepare for consistent repair
         Assert.assertFalse(coordinator.failCalled);
         coordinator.handlePrepareResponse(PARTICIPANT2, false);
-        Assert.assertEquals(ConsistentSession.State.FAILED, 
coordinator.getState());
-        Assert.assertTrue(coordinator.failCalled);
+        Assert.assertEquals(ConsistentSession.State.PREPARING, 
coordinator.getState());
+        // we should have sent failure messages to the other participants, but 
not yet marked them failed internally
+        assertMessageSent(coordinator, PARTICIPANT1, new 
FailSession(coordinator.sessionID));
+        assertMessageSent(coordinator, PARTICIPANT2, new 
FailSession(coordinator.sessionID));
+        assertMessageSent(coordinator, PARTICIPANT3, new 
FailSession(coordinator.sessionID));
+        Assert.assertEquals(FAILED, 
coordinator.getParticipantState(PARTICIPANT2));
+        Assert.assertEquals(PREPARED, 
coordinator.getParticipantState(PARTICIPANT1));
+        Assert.assertEquals(PREPARING, 
coordinator.getParticipantState(PARTICIPANT3));
+        Assert.assertFalse(sessionResult.isDone());
+        Assert.assertFalse(coordinator.failCalled);
+        coordinator.sentMessages.clear();
 
-        // additional success messages should be ignored
+        // last outstanding response should cause repair to complete in failed 
state
         Assert.assertFalse(coordinator.setRepairingCalled);
         coordinator.onSetRepairing = Assert::fail;
         coordinator.handlePrepareResponse(PARTICIPANT3, true);
+        Assert.assertTrue(coordinator.failCalled);
         Assert.assertFalse(coordinator.setRepairingCalled);
         Assert.assertFalse(repairSubmitted.get());
 
-        // all participants should have been notified of session failure
-        for (InetAddressAndPort participant : PARTICIPANTS)
-        {
-            RepairMessage expected = new FailSession(coordinator.sessionID);
-            assertMessageSent(coordinator, participant, expected);
-        }
+        // all participants that did not fail should have been notified of 
session failure
+        RepairMessage expected = new FailSession(coordinator.sessionID);
+        assertMessageSent(coordinator, PARTICIPANT1, expected);
+        assertMessageSent(coordinator, PARTICIPANT3, expected);
+        Assert.assertFalse(coordinator.sentMessages.containsKey(PARTICIPANT2));
 
         Assert.assertTrue(sessionResult.isDone());
         Assert.assertTrue(hasFailures.get());
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index e387c41..a6b4fe2 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 
 import com.google.common.collect.Lists;
@@ -139,7 +140,8 @@ public class LocalSessionTest extends AbstractRepairTest
                                         UUID sessionID,
                                         Collection<ColumnFamilyStore> tables,
                                         RangesAtEndpoint ranges,
-                                        ExecutorService executor)
+                                        ExecutorService executor,
+                                        BooleanSupplier isCancelled)
         {
             prepareSessionCalled = true;
             if (prepareSessionFuture != null)
@@ -148,7 +150,7 @@ public class LocalSessionTest extends AbstractRepairTest
             }
             else
             {
-                return super.prepareSession(repairManager, sessionID, tables, 
ranges, executor);
+                return super.prepareSession(repairManager, sessionID, tables, 
ranges, executor, isCancelled);
             }
         }
 
@@ -334,7 +336,41 @@ public class LocalSessionTest extends AbstractRepairTest
         InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
         sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
         Assert.assertNull(sessions.getSession(sessionID));
-        assertMessagesSent(sessions, COORDINATOR, new FailSession(sessionID));
+        assertMessagesSent(sessions, COORDINATOR, new 
PrepareConsistentResponse(sessionID, PARTICIPANT1, false));
+    }
+
+    /**
+     * If the session is cancelled mid-prepare, the isCancelled boolean 
supplier should start returning true
+     */
+    @Test
+    public void prepareCancellation()
+    {
+        UUID sessionID = registerSession();
+        AtomicReference<BooleanSupplier> isCancelledRef = new 
AtomicReference<>();
+        SettableFuture future = SettableFuture.create();
+
+        InstrumentedLocalSessions sessions = new InstrumentedLocalSessions() {
+            ListenableFuture prepareSession(KeyspaceRepairManager 
repairManager, UUID sessionID, Collection<ColumnFamilyStore> tables, 
RangesAtEndpoint ranges, ExecutorService executor, BooleanSupplier isCancelled)
+            {
+                isCancelledRef.set(isCancelled);
+                return future;
+            }
+        };
+        sessions.start();
+
+        sessions.handlePrepareMessage(PARTICIPANT1, new 
PrepareConsistentRequest(sessionID, COORDINATOR, PARTICIPANTS));
+
+        BooleanSupplier isCancelled = isCancelledRef.get();
+        Assert.assertNotNull(isCancelled);
+        Assert.assertFalse(isCancelled.getAsBoolean());
+        Assert.assertTrue(sessions.sentMessages.isEmpty());
+
+        sessions.failSession(sessionID, false);
+        Assert.assertTrue(isCancelled.getAsBoolean());
+
+        // now that the session has failed, it send a negative response to the 
coordinator (even if the anti-compaction completed successfully)
+        future.set(new Object());
+        assertMessagesSent(sessions, COORDINATOR, new 
PrepareConsistentResponse(sessionID, PARTICIPANT1, false));
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to