Repository: cassandra
Updated Branches:
  refs/heads/trunk 73ebd200c -> a05785d82


Add tests for RepairJob

Patch by Alex Petrov; reviewed by Marcus Eriksson for CASSANDRA-14717

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a05785d8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a05785d8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a05785d8

Branch: refs/heads/trunk
Commit: a05785d82c621c9cd04d8a064c38fd2012ef981c
Parents: 73ebd20
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Authored: Mon Oct 1 15:30:58 2018 +0200
Committer: Alex Petrov <alex.pet...@apple.com>
Committed: Tue Oct 9 22:33:31 2018 +0100

----------------------------------------------------------------------
 .../repair/AsymmetricRemoteSyncTask.java        |  19 +-
 .../apache/cassandra/repair/LocalSyncTask.java  |  39 +-
 .../org/apache/cassandra/repair/RepairJob.java  | 146 +++--
 .../apache/cassandra/repair/RepairSession.java  |   8 +-
 .../repair/SymmetricRemoteSyncTask.java         |  20 +-
 .../org/apache/cassandra/repair/SyncTask.java   |   4 +
 .../cassandra/repair/LocalSyncTaskTest.java     |  26 +-
 .../apache/cassandra/repair/RepairJobTest.java  | 625 +++++++++++++++++++
 8 files changed, 793 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java 
b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
index 9ba33dd..d2a6aeb 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -41,15 +41,9 @@ import org.apache.cassandra.utils.MerkleTrees;
  */
 public class AsymmetricRemoteSyncTask extends SyncTask implements 
CompletableRemoteSyncTask
 {
-    public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort 
fetchNode, InetAddressAndPort fetchFrom,
-                                    List<Range<Token>> rangesToFetch, 
PreviewKind previewKind)
+    public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort to, 
InetAddressAndPort from, List<Range<Token>> differences, PreviewKind 
previewKind)
     {
-        super(desc, fetchNode, fetchFrom, rangesToFetch, previewKind);
-    }
-
-    public AsymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse to, 
TreeResponse from, PreviewKind previewKind)
-    {
-        this(desc, to.endpoint, from.endpoint, 
MerkleTrees.difference(to.trees, from.trees), previewKind);
+        super(desc, to, from, differences, previewKind);
     }
 
     public void startSync()
@@ -72,4 +66,13 @@ public class AsymmetricRemoteSyncTask extends SyncTask 
implements CompletableRem
             setException(new RepairException(desc, previewKind, 
String.format("Sync failed between %s and %s", nodePair.coordinator, 
nodePair.peer)));
         }
     }
+
+    @Override
+    public String toString()
+    {
+        return "AsymmetricRemoteSyncTask{" +
+               "rangesToSync=" + rangesToSync +
+               ", nodePair=" + nodePair +
+               '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java 
b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 1923fbe..f9b5765 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -52,15 +52,11 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
     private static final Logger logger = 
LoggerFactory.getLogger(LocalSyncTask.class);
 
     private final UUID pendingRepair;
-    private final boolean requestRanges;
-    private final boolean transferRanges;
 
-    public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse 
remote, UUID pendingRepair,
-                         boolean requestRanges, boolean transferRanges, 
PreviewKind previewKind)
-    {
-        this(desc, local.endpoint, remote.endpoint, 
MerkleTrees.difference(local.trees, remote.trees),
-             pendingRepair, requestRanges, transferRanges, previewKind);
-    }
+    @VisibleForTesting
+    final boolean requestRanges;
+    @VisibleForTesting
+    final boolean transferRanges;
 
     public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, 
InetAddressAndPort remote,
                          List<Range<Token>> diff, UUID pendingRepair,
@@ -76,8 +72,10 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
     }
 
     @VisibleForTesting
-    StreamPlan createStreamPlan(InetAddressAndPort remote, List<Range<Token>> 
differences)
+    StreamPlan createStreamPlan()
     {
+        InetAddressAndPort remote =  nodePair.peer;
+
         StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, 
pendingRepair, previewKind)
                           .listeners(this)
                           .flushBeforeTransfer(pendingRepair == null);
@@ -85,7 +83,7 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
         if (requestRanges)
         {
             // see comment on RangesAtEndpoint.toDummyList for why we 
synthesize replicas here
-            plan.requestRanges(remote, desc.keyspace, 
RangesAtEndpoint.toDummyList(differences),
+            plan.requestRanges(remote, desc.keyspace, 
RangesAtEndpoint.toDummyList(rangesToSync),
                                
RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily);
         }
 
@@ -93,7 +91,7 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
         {
             // send ranges to the remote node if we are not performing a pull 
repair
             // see comment on RangesAtEndpoint.toDummyList for why we 
synthesize replicas here
-            plan.transferRanges(remote, desc.keyspace, 
RangesAtEndpoint.toDummyList(differences), desc.columnFamily);
+            plan.transferRanges(remote, desc.keyspace, 
RangesAtEndpoint.toDummyList(rangesToSync), desc.columnFamily);
         }
 
         return plan;
@@ -112,7 +110,13 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
         logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
         Tracing.traceRepair(message);
 
-        createStreamPlan(remote, rangesToSync).execute();
+        createStreamPlan().execute();
+    }
+
+    @Override
+    public boolean isLocal()
+    {
+        return true;
     }
 
     public void handleStreamEvent(StreamEvent event)
@@ -155,4 +159,15 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
         setException(t);
         finished();
     }
+
+    @Override
+    public String toString()
+    {
+        return "LocalSyncTask{" +
+               "requestRanges=" + requestRanges +
+               ", transferRanges=" + transferRanges +
+               ", rangesToSync=" + rangesToSync +
+               ", nodePair=" + nodePair +
+               '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index c96e7fb..ebeb1f9 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -18,6 +18,8 @@
 package org.apache.cassandra.repair;
 
 import java.util.*;
+import java.util.function.Predicate;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
@@ -39,6 +41,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTrees;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -52,9 +55,6 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
     private final RepairJobDesc desc;
     private final RepairParallelism parallelismDegree;
     private final ListeningExecutorService taskExecutor;
-    private final boolean isIncremental;
-    private final PreviewKind previewKind;
-    private final boolean optimiseStreams;
 
     /**
      * Create repair job to run on specific columnfamily
@@ -62,15 +62,12 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
      * @param session RepairSession that this RepairJob belongs
      * @param columnFamily name of the ColumnFamily to repair
      */
-    public RepairJob(RepairSession session, String columnFamily, boolean 
isIncremental, PreviewKind previewKind, boolean optimiseStreams)
+    public RepairJob(RepairSession session, String columnFamily)
     {
         this.session = session;
         this.desc = new RepairJobDesc(session.parentRepairSession, 
session.getId(), session.keyspace, columnFamily, session.commonRange.ranges);
         this.taskExecutor = session.taskExecutor;
         this.parallelismDegree = session.parallelismDegree;
-        this.isIncremental = isIncremental;
-        this.previewKind = previewKind;
-        this.optimiseStreams = optimiseStreams;
     }
 
     /**
@@ -92,7 +89,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
         if (parallelismDegree != RepairParallelism.PARALLEL)
         {
             ListenableFuture<List<InetAddressAndPort>> allSnapshotTasks;
-            if (isIncremental)
+            if (session.isIncremental)
             {
                 // consistent repair does it's own "snapshotting"
                 allSnapshotTasks = Futures.immediateFuture(allEndpoints);
@@ -130,7 +127,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
 
         // When all validations complete, submit sync tasks
         ListenableFuture<List<SyncStat>> syncResults = 
Futures.transformAsync(validations,
-                                                                              
optimiseStreams && !session.pullRepair ? this::optimisedSyncing : 
this::standardSyncing,
+                                                                              
session.optimiseStreams && !session.pullRepair ? this::optimisedSyncing : 
this::standardSyncing,
                                                                               
taskExecutor);
 
         // When all sync complete, set the final result
@@ -138,9 +135,9 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
         {
             public void onSuccess(List<SyncStat> stats)
             {
-                if (!previewKind.isPreview())
+                if (!session.previewKind.isPreview())
                 {
-                    logger.info("{} {} is fully synced", 
previewKind.logPrefix(session.getId()), desc.columnFamily);
+                    logger.info("{} {} is fully synced", 
session.previewKind.logPrefix(session.getId()), desc.columnFamily);
                     
SystemDistributedKeyspace.successfulRepairJob(session.getId(), desc.keyspace, 
desc.columnFamily);
                 }
                 cfs.metric.repairsCompleted.inc();
@@ -152,9 +149,9 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
              */
             public void onFailure(Throwable t)
             {
-                if (!previewKind.isPreview())
+                if (!session.previewKind.isPreview())
                 {
-                    logger.warn("{} {} sync failed", 
previewKind.logPrefix(session.getId()), desc.columnFamily);
+                    logger.warn("{} {} sync failed", 
session.previewKind.logPrefix(session.getId()), desc.columnFamily);
                     SystemDistributedKeyspace.failedRepairJob(session.getId(), 
desc.keyspace, desc.columnFamily, t);
                 }
                 cfs.metric.repairsCompleted.inc();
@@ -170,8 +167,24 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
 
     private ListenableFuture<List<SyncStat>> 
standardSyncing(List<TreeResponse> trees)
     {
-        InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
+        List<SyncTask> syncTasks = createStandardSyncTasks(desc,
+                                                           trees,
+                                                           
FBUtilities.getLocalAddressAndPort(),
+                                                           this::isTransient,
+                                                           
session.isIncremental,
+                                                           session.pullRepair,
+                                                           
session.previewKind);
+        return executeTasks(syncTasks);
+    }
 
+    static List<SyncTask> createStandardSyncTasks(RepairJobDesc desc,
+                                                  List<TreeResponse> trees,
+                                                  InetAddressAndPort local,
+                                                  
Predicate<InetAddressAndPort> isTransient,
+                                                  boolean isIncremental,
+                                                  boolean pullRepair,
+                                                  PreviewKind previewKind)
+    {
         List<SyncTask> syncTasks = new ArrayList<>();
         // We need to difference all trees one against another
         for (int i = 0; i < trees.size() - 1; ++i)
@@ -182,7 +195,13 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
                 TreeResponse r2 = trees.get(j);
 
                 // Avoid streming between two tansient replicas
-                if (isTransient(r1.endpoint) && isTransient(r2.endpoint))
+                if (isTransient.test(r1.endpoint) && 
isTransient.test(r2.endpoint))
+                    continue;
+
+                List<Range<Token>> differences = 
MerkleTrees.difference(r1.trees, r2.trees);
+
+                // Nothing to do
+                if (differences.isEmpty())
                     continue;
 
                 SyncTask task;
@@ -192,43 +211,67 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
                     TreeResponse remote = r2.endpoint.equals(local) ? r1 : r2;
 
                     // pull only if local is full
-                    boolean requestRanges = !isTransient(self.endpoint);
+                    boolean requestRanges = !isTransient.test(self.endpoint);
                     // push only if remote is full; additionally check for 
pull repair
-                    boolean transferRanges = !isTransient(remote.endpoint) && 
!session.pullRepair;
+                    boolean transferRanges = 
!isTransient.test(remote.endpoint) && !pullRepair;
 
                     // Nothing to do
                     if (!requestRanges && !transferRanges)
                         continue;
 
-                    task = new LocalSyncTask(desc, self, remote, isIncremental 
? desc.parentSessionId : null,
-                                             requestRanges, transferRanges, 
session.previewKind);
+                    task = new LocalSyncTask(desc, self.endpoint, 
remote.endpoint, differences, isIncremental ? desc.parentSessionId : null,
+                                             requestRanges, transferRanges, 
previewKind);
                 }
-                else if (isTransient(r1.endpoint) || isTransient(r2.endpoint))
+                else if (isTransient.test(r1.endpoint) || 
isTransient.test(r2.endpoint))
                 {
                     // Stream only from transient replica
-                    TreeResponse streamFrom = isTransient(r1.endpoint) ? r1 : 
r2;
-                    TreeResponse streamTo = isTransient(r1.endpoint) ? r2 : r1;
-                    task = new AsymmetricRemoteSyncTask(desc, streamTo, 
streamFrom, previewKind);
-                    session.waitForSync(Pair.create(desc, task.nodePair()), 
(AsymmetricRemoteSyncTask) task);
+                    TreeResponse streamFrom = isTransient.test(r1.endpoint) ? 
r1 : r2;
+                    TreeResponse streamTo = isTransient.test(r1.endpoint) ? r2 
: r1;
+                    task = new AsymmetricRemoteSyncTask(desc, 
streamTo.endpoint, streamFrom.endpoint, differences, previewKind);
                 }
                 else
                 {
-                    task = new SymmetricRemoteSyncTask(desc, r1, r2, 
session.previewKind);
-                    // SymmetricRemoteSyncTask expects SyncComplete message 
sent back.
-                    // Register task to RepairSession to receive response.
-                    session.waitForSync(Pair.create(desc, task.nodePair()), 
(SymmetricRemoteSyncTask) task);
+                    task = new SymmetricRemoteSyncTask(desc, r1.endpoint, 
r2.endpoint, differences, previewKind);
                 }
                 syncTasks.add(task);
-                taskExecutor.submit(task);
             }
         }
-        return Futures.allAsList(syncTasks);
+        return syncTasks;
     }
 
     private ListenableFuture<List<SyncStat>> 
optimisedSyncing(List<TreeResponse> trees)
     {
-        InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
+        List<SyncTask> syncTasks = createOptimisedSyncingSyncTasks(desc,
+                                                                   trees,
+                                                                   
FBUtilities.getLocalAddressAndPort(),
+                                                                   
this::isTransient,
+                                                                   this::getDC,
+                                                                   
session.isIncremental,
+                                                                   
session.previewKind);
+
+        return executeTasks(syncTasks);
+    }
+
+    private ListenableFuture<List<SyncStat>> executeTasks(List<SyncTask> 
syncTasks)
+    {
+        for (SyncTask task : syncTasks)
+        {
+            if (!task.isLocal())
+                session.trackSyncCompletion(Pair.create(desc, 
task.nodePair()), (CompletableRemoteSyncTask) task);
+            taskExecutor.submit(task);
+        }
+
+        return Futures.allAsList(syncTasks);
+    }
 
+    static List<SyncTask> createOptimisedSyncingSyncTasks(RepairJobDesc desc,
+                                                          List<TreeResponse> 
trees,
+                                                          InetAddressAndPort 
local,
+                                                          
Predicate<InetAddressAndPort> isTransient,
+                                                          
Function<InetAddressAndPort, String> getDC,
+                                                          boolean 
isIncremental,
+                                                          PreviewKind 
previewKind)
+    {
         List<SyncTask> syncTasks = new ArrayList<>();
         // We need to difference all trees one against another
         DifferenceHolder diffHolder = new DifferenceHolder(trees);
@@ -236,8 +279,8 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
         logger.debug("diffs = {}", diffHolder);
         PreferedNodeFilter preferSameDCFilter = (streaming, candidates) ->
                                                 candidates.stream()
-                                                          .filter(node -> 
getDC(streaming)
-                                                                          
.equals(getDC(node)))
+                                                          .filter(node -> 
getDC.apply(streaming)
+                                                                          
.equals(getDC.apply(node)))
                                                           
.collect(Collectors.toSet());
         ImmutableMap<InetAddressAndPort, HostDifferences> reducedDifferences = 
ReduceHelper.reduce(diffHolder, preferSameDCFilter);
 
@@ -246,7 +289,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
             InetAddressAndPort address = trees.get(i).endpoint;
 
             // we don't stream to transient replicas
-            if (isTransient(address))
+            if (isTransient.test(address))
                 continue;
 
             HostDifferences streamsFor = reducedDifferences.get(address);
@@ -256,20 +299,21 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
                 for (InetAddressAndPort fetchFrom : streamsFor.hosts())
                 {
                     List<Range<Token>> toFetch = streamsFor.get(fetchFrom);
+                    assert !toFetch.isEmpty();
+
                     logger.debug("{} is about to fetch {} from {}", address, 
toFetch, fetchFrom);
                     SyncTask task;
                     if (address.equals(local))
                     {
                         task = new LocalSyncTask(desc, address, fetchFrom, 
toFetch, isIncremental ? desc.parentSessionId : null,
-                                                 true, false, 
session.previewKind);
+                                                 true, false, previewKind);
                     }
                     else
                     {
                         task = new AsymmetricRemoteSyncTask(desc, address, 
fetchFrom, toFetch, previewKind);
-                        session.waitForSync(Pair.create(desc, 
task.nodePair()), (AsymmetricRemoteSyncTask) task);
                     }
                     syncTasks.add(task);
-                    taskExecutor.submit(task);
+
                 }
             }
             else
@@ -277,7 +321,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
                 logger.debug("Node {} has nothing to stream", address);
             }
         }
-        return Futures.allAsList(syncTasks);
+        return syncTasks;
     }
 
     private String getDC(InetAddressAndPort address)
@@ -294,15 +338,15 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
     private ListenableFuture<List<TreeResponse>> 
sendValidationRequest(Collection<InetAddressAndPort> endpoints)
     {
         String message = String.format("Requesting merkle trees for %s (to 
%s)", desc.columnFamily, endpoints);
-        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
+        logger.info("{} {}", session.previewKind.logPrefix(desc.sessionId), 
message);
         Tracing.traceRepair(message);
         int nowInSec = FBUtilities.nowInSeconds();
         List<ListenableFuture<TreeResponse>> tasks = new 
ArrayList<>(endpoints.size());
         for (InetAddressAndPort endpoint : endpoints)
         {
-            ValidationTask task = new ValidationTask(desc, endpoint, nowInSec, 
previewKind);
+            ValidationTask task = new ValidationTask(desc, endpoint, nowInSec, 
session.previewKind);
             tasks.add(task);
-            session.waitForValidation(Pair.create(desc, endpoint), task);
+            session.trackValidationCompletion(Pair.create(desc, endpoint), 
task);
             taskExecutor.execute(task);
         }
         return Futures.allAsList(tasks);
@@ -314,29 +358,29 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
     private ListenableFuture<List<TreeResponse>> 
sendSequentialValidationRequest(Collection<InetAddressAndPort> endpoints)
     {
         String message = String.format("Requesting merkle trees for %s (to 
%s)", desc.columnFamily, endpoints);
-        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
+        logger.info("{} {}", session.previewKind.logPrefix(desc.sessionId), 
message);
         Tracing.traceRepair(message);
         int nowInSec = FBUtilities.nowInSeconds();
         List<ListenableFuture<TreeResponse>> tasks = new 
ArrayList<>(endpoints.size());
 
         Queue<InetAddressAndPort> requests = new LinkedList<>(endpoints);
         InetAddressAndPort address = requests.poll();
-        ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, 
previewKind);
+        ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, 
session.previewKind);
         logger.info("Validating {}", address);
-        session.waitForValidation(Pair.create(desc, address), firstTask);
+        session.trackValidationCompletion(Pair.create(desc, address), 
firstTask);
         tasks.add(firstTask);
         ValidationTask currentTask = firstTask;
         while (requests.size() > 0)
         {
             final InetAddressAndPort nextAddress = requests.poll();
-            final ValidationTask nextTask = new ValidationTask(desc, 
nextAddress, nowInSec, previewKind);
+            final ValidationTask nextTask = new ValidationTask(desc, 
nextAddress, nowInSec, session.previewKind);
             tasks.add(nextTask);
             Futures.addCallback(currentTask, new FutureCallback<TreeResponse>()
             {
                 public void onSuccess(TreeResponse result)
                 {
                     logger.info("Validating {}", nextAddress);
-                    session.waitForValidation(Pair.create(desc, nextAddress), 
nextTask);
+                    session.trackValidationCompletion(Pair.create(desc, 
nextAddress), nextTask);
                     taskExecutor.execute(nextTask);
                 }
 
@@ -356,7 +400,7 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
     private ListenableFuture<List<TreeResponse>> 
sendDCAwareValidationRequest(Collection<InetAddressAndPort> endpoints)
     {
         String message = String.format("Requesting merkle trees for %s (to 
%s)", desc.columnFamily, endpoints);
-        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
+        logger.info("{} {}", session.previewKind.logPrefix(desc.sessionId), 
message);
         Tracing.traceRepair(message);
         int nowInSec = FBUtilities.nowInSeconds();
         List<ListenableFuture<TreeResponse>> tasks = new 
ArrayList<>(endpoints.size());
@@ -378,22 +422,22 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
         {
             Queue<InetAddressAndPort> requests = entry.getValue();
             InetAddressAndPort address = requests.poll();
-            ValidationTask firstTask = new ValidationTask(desc, address, 
nowInSec, previewKind);
+            ValidationTask firstTask = new ValidationTask(desc, address, 
nowInSec, session.previewKind);
             logger.info("Validating {}", address);
-            session.waitForValidation(Pair.create(desc, address), firstTask);
+            session.trackValidationCompletion(Pair.create(desc, address), 
firstTask);
             tasks.add(firstTask);
             ValidationTask currentTask = firstTask;
             while (requests.size() > 0)
             {
                 final InetAddressAndPort nextAddress = requests.poll();
-                final ValidationTask nextTask = new ValidationTask(desc, 
nextAddress, nowInSec, previewKind);
+                final ValidationTask nextTask = new ValidationTask(desc, 
nextAddress, nowInSec, session.previewKind);
                 tasks.add(nextTask);
                 Futures.addCallback(currentTask, new 
FutureCallback<TreeResponse>()
                 {
                     public void onSuccess(TreeResponse result)
                     {
                         logger.info("Validating {}", nextAddress);
-                        session.waitForValidation(Pair.create(desc, 
nextAddress), nextTask);
+                        session.trackValidationCompletion(Pair.create(desc, 
nextAddress), nextTask);
                         taskExecutor.execute(nextTask);
                     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index 4dc563a..a38205e 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -106,7 +106,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
 
     // Tasks(snapshot, validate request, differencing, ...) are run on 
taskExecutor
     public final ListeningExecutorService taskExecutor = 
MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"));
-    private final boolean optimiseStreams;
+    public final boolean optimiseStreams;
 
     private volatile boolean terminated = false;
 
@@ -189,12 +189,12 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         return commonRange.endpoints;
     }
 
-    public void waitForValidation(Pair<RepairJobDesc, InetAddressAndPort> key, 
ValidationTask task)
+    public void trackValidationCompletion(Pair<RepairJobDesc, 
InetAddressAndPort> key, ValidationTask task)
     {
         validating.put(key, task);
     }
 
-    public void waitForSync(Pair<RepairJobDesc, SyncNodePair> key, 
CompletableRemoteSyncTask task)
+    public void trackSyncCompletion(Pair<RepairJobDesc, SyncNodePair> key, 
CompletableRemoteSyncTask task)
     {
         syncingTasks.put(key, task);
     }
@@ -305,7 +305,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         List<ListenableFuture<RepairResult>> jobs = new 
ArrayList<>(cfnames.length);
         for (String cfname : cfnames)
         {
-            RepairJob job = new RepairJob(this, cfname, isIncremental, 
previewKind, optimiseStreams);
+            RepairJob job = new RepairJob(this, cfname);
             executor.execute(job);
             jobs.add(job);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java 
b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
index 4e44c15..c731bc1 100644
--- a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
@@ -47,16 +47,9 @@ public class SymmetricRemoteSyncTask extends SyncTask 
implements CompletableRemo
 {
     private static final Logger logger = 
LoggerFactory.getLogger(SymmetricRemoteSyncTask.class);
 
-    public SymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse r1, 
TreeResponse r2, PreviewKind previewKind)
+    public SymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort r1, 
InetAddressAndPort r2, List<Range<Token>> differences, PreviewKind previewKind)
     {
-        super(desc, r1.endpoint, r2.endpoint, MerkleTrees.difference(r1.trees, 
r2.trees), previewKind);
-    }
-
-    @VisibleForTesting
-    SymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort e1, 
InetAddressAndPort e2,
-                            List<Range<Token>> differences, PreviewKind 
previewKind)
-    {
-        super(desc, e1, e2, differences, previewKind);
+        super(desc, r1, r2, differences, previewKind);
     }
 
     void sendRequest(RepairMessage request, InetAddressAndPort to)
@@ -89,4 +82,13 @@ public class SymmetricRemoteSyncTask extends SyncTask 
implements CompletableRemo
         }
         finished();
     }
+
+    @Override
+    public String toString()
+    {
+        return "SymmetricRemoteSyncTask{" +
+               "rangesToSync=" + rangesToSync +
+               ", nodePair=" + nodePair +
+               '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/src/java/org/apache/cassandra/repair/SyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java 
b/src/java/org/apache/cassandra/repair/SyncTask.java
index ccbd26c..8b622c4 100644
--- a/src/java/org/apache/cassandra/repair/SyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SyncTask.java
@@ -87,6 +87,10 @@ public abstract class SyncTask extends 
AbstractFuture<SyncStat> implements Runna
         startSync();
     }
 
+    public boolean isLocal()
+    {
+        return false;
+    }
 
     protected void finished()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java 
b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 6e691f5..903a273 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -96,7 +96,8 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         // note: we reuse the same endpoint which is bogus in theory but fine 
here
         TreeResponse r1 = new TreeResponse(local, tree1);
         TreeResponse r2 = new TreeResponse(ep2, tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
NO_PENDING_REPAIR, true, true, PreviewKind.NONE);
+        LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, 
MerkleTrees.difference(r1.trees, r2.trees),
+                                               NO_PENDING_REPAIR, true, true, 
PreviewKind.NONE);
         task.run();
 
         assertEquals(0, task.get().numberOfDifferences);
@@ -133,7 +134,8 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         // note: we reuse the same endpoint which is bogus in theory but fine 
here
         TreeResponse r1 = new TreeResponse(local, tree1);
         TreeResponse r2 = new 
TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
NO_PENDING_REPAIR, true, true, PreviewKind.NONE);
+        LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, 
MerkleTrees.difference(r1.trees, r2.trees),
+                                               NO_PENDING_REPAIR, true, true, 
PreviewKind.NONE);
         DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 1;
         DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = 
TimeUnit.SECONDS.toNanos(2);
         try
@@ -160,8 +162,9 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, 
DatabaseDescriptor.getPartitioner()));
         TreeResponse r2 = new TreeResponse(PARTICIPANT2, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
 
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
NO_PENDING_REPAIR, true, true, PreviewKind.NONE);
-        StreamPlan plan = task.createStreamPlan(local, 
Lists.newArrayList(RANGE1));
+        LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, 
MerkleTrees.difference(r1.trees, r2.trees),
+                                               NO_PENDING_REPAIR, true, true, 
PreviewKind.NONE);
+        StreamPlan plan = task.createStreamPlan();
 
         assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair());
         assertTrue(plan.getFlushBeforeTransfer());
@@ -185,8 +188,9 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, 
DatabaseDescriptor.getPartitioner()));
         TreeResponse r2 = new TreeResponse(PARTICIPANT2, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
 
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
desc.parentSessionId, true, true, PreviewKind.NONE);
-        StreamPlan plan = task.createStreamPlan(local, 
Lists.newArrayList(RANGE1));
+        LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, 
MerkleTrees.difference(r1.trees, r2.trees),
+                                               desc.parentSessionId, true, 
true, PreviewKind.NONE);
+        StreamPlan plan = task.createStreamPlan();
 
         assertEquals(desc.parentSessionId, plan.getPendingRepair());
         assertFalse(plan.getFlushBeforeTransfer());
@@ -206,8 +210,9 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, 
DatabaseDescriptor.getPartitioner()));
         TreeResponse r2 = new TreeResponse(PARTICIPANT2, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
 
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
desc.parentSessionId, true, false, PreviewKind.NONE);
-        StreamPlan plan = task.createStreamPlan(local, 
Lists.newArrayList(RANGE1));
+        LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, 
MerkleTrees.difference(r1.trees, r2.trees),
+                                               desc.parentSessionId, true, 
false, PreviewKind.NONE);
+        StreamPlan plan = task.createStreamPlan();
         assertNumInOut(plan, 1, 0);
     }
 
@@ -224,8 +229,9 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, 
DatabaseDescriptor.getPartitioner()));
         TreeResponse r2 = new TreeResponse(PARTICIPANT2, 
createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
 
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
desc.parentSessionId, false, true, PreviewKind.NONE);
-        StreamPlan plan = task.createStreamPlan(local, 
Lists.newArrayList(RANGE1));
+        LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, 
MerkleTrees.difference(r1.trees, r2.trees),
+                                               desc.parentSessionId, false, 
true, PreviewKind.NONE);
+        StreamPlan plan = task.createStreamPlan();
         assertNumInOut(plan, 0, 1);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/test/unit/org/apache/cassandra/repair/RepairJobTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java 
b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
new file mode 100644
index 0000000..263ecbc
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest
+{
+    private static final IPartitioner PARTITIONER = 
ByteOrderedPartitioner.instance;
+
+    static InetAddressAndPort addr1;
+    static InetAddressAndPort addr2;
+    static InetAddressAndPort addr3;
+    static InetAddressAndPort addr4;
+    static InetAddressAndPort addr5;
+
+    static Range<Token> range1 = range(0, 1);
+    static Range<Token> range2 = range(2, 3);
+    static Range<Token> range3 = range(4, 5);
+    static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), "ks", "cf", Arrays.asList());
+
+    @After
+    public void reset()
+    {
+        FBUtilities.reset();
+        DatabaseDescriptor.setBroadcastAddress(addr1.address);
+    }
+
+    static
+    {
+        try
+        {
+            addr1 = InetAddressAndPort.getByName("127.0.0.1");
+            addr2 = InetAddressAndPort.getByName("127.0.0.2");
+            addr3 = InetAddressAndPort.getByName("127.0.0.3");
+            addr4 = InetAddressAndPort.getByName("127.0.0.4");
+            addr5 = InetAddressAndPort.getByName("127.0.0.5");
+            DatabaseDescriptor.setBroadcastAddress(addr1.address);
+        }
+        catch (UnknownHostException e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testCreateStandardSyncTasks()
+    {
+        testCreateStandardSyncTasks(false);
+    }
+
+    @Test
+    public void testCreateStandardSyncTasksPullRepair()
+    {
+        testCreateStandardSyncTasks(true);
+    }
+
+    public static void testCreateStandardSyncTasks(boolean pullRepair)
+    {
+        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "same",      range2, "same", range3, "same"),
+                                                         treeResponse(addr2, 
range1, "different", range2, "same", range3, "different"),
+                                                         treeResponse(addr3, 
range1, "same",      range2, "same", range3, "same"));
+
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
+                                                                               
     treeResponses,
+                                                                               
     addr1, // local
+                                                                               
     noTransient(), // transient
+                                                                               
     false,
+                                                                               
     pullRepair,
+                                                                               
     PreviewKind.ALL));
+
+        Assert.assertEquals(2, tasks.size());
+
+        SyncTask task = tasks.get(pair(addr1, addr2));
+        Assert.assertTrue(task.isLocal());
+        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+        Assert.assertEquals(!pullRepair, ((LocalSyncTask) 
task).transferRanges);
+        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+        task = tasks.get(pair(addr2, addr3));
+        Assert.assertFalse(task.isLocal());
+        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
+        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+
+        Assert.assertNull(tasks.get(pair(addr1, addr3)));
+    }
+
+    @Test
+    public void testStandardSyncTransient()
+    {
+        // Do not stream towards transient nodes
+        testStandardSyncTransient(true);
+        testStandardSyncTransient(false);
+    }
+
+    public void testStandardSyncTransient(boolean pullRepair)
+    {
+        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "same", range2, "same", range3, "same"),
+                                                         treeResponse(addr2, 
range1, "different", range2, "same", range3, "different"));
+
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
+                                                                               
     treeResponses,
+                                                                               
     addr1, // local
+                                                                               
     transientPredicate(addr2),
+                                                                               
     false,
+                                                                               
     pullRepair,
+                                                                               
     PreviewKind.ALL));
+
+        Assert.assertEquals(1, tasks.size());
+
+        SyncTask task = tasks.get(pair(addr1, addr2));
+        Assert.assertTrue(task.isLocal());
+        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
+        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
+        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+    }
+
+    @Test
+    public void testStandardSyncLocalTransient()
+    {
+        // Do not stream towards transient nodes
+        testStandardSyncLocalTransient(true);
+        testStandardSyncLocalTransient(false);
+    }
+
+    public void testStandardSyncLocalTransient(boolean pullRepair)
+    {
+        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "same", range2, "same", range3, "same"),
+                                                         treeResponse(addr2, 
range1, "different", range2, "same", range3, "different"));
+
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
+                                                                               
     treeResponses,
+                                                                               
     addr1, // local
+                                                                               
     transientPredicate(addr1),
+                                                                               
     false,
+                                                                               
     pullRepair,
+                                                                               
     PreviewKind.ALL));
+
+        if (pullRepair)
+        {
+            Assert.assertTrue(tasks.isEmpty());
+            return;
+        }
+
+        Assert.assertEquals(1, tasks.size());
+
+        SyncTask task = tasks.get(pair(addr1, addr2));
+        Assert.assertTrue(task.isLocal());
+        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
+        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
+        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
+    }
+
+    @Test
+    public void testEmptyDifference()
+    {
+        // one of the nodes is a local coordinator
+        testEmptyDifference(addr1, noTransient(), true);
+        testEmptyDifference(addr1, noTransient(), false);
+        testEmptyDifference(addr2, noTransient(), true);
+        testEmptyDifference(addr2, noTransient(), false);
+        testEmptyDifference(addr1, transientPredicate(addr1), true);
+        testEmptyDifference(addr2, transientPredicate(addr1), true);
+        testEmptyDifference(addr1, transientPredicate(addr1), false);
+        testEmptyDifference(addr2, transientPredicate(addr1), false);
+        testEmptyDifference(addr1, transientPredicate(addr2), true);
+        testEmptyDifference(addr2, transientPredicate(addr2), true);
+        testEmptyDifference(addr1, transientPredicate(addr2), false);
+        testEmptyDifference(addr2, transientPredicate(addr2), false);
+
+        // nonlocal coordinator
+        testEmptyDifference(addr3, noTransient(), true);
+        testEmptyDifference(addr3, noTransient(), false);
+        testEmptyDifference(addr3, noTransient(), true);
+        testEmptyDifference(addr3, noTransient(), false);
+        testEmptyDifference(addr3, transientPredicate(addr1), true);
+        testEmptyDifference(addr3, transientPredicate(addr1), true);
+        testEmptyDifference(addr3, transientPredicate(addr1), false);
+        testEmptyDifference(addr3, transientPredicate(addr1), false);
+        testEmptyDifference(addr3, transientPredicate(addr2), true);
+        testEmptyDifference(addr3, transientPredicate(addr2), true);
+        testEmptyDifference(addr3, transientPredicate(addr2), false);
+        testEmptyDifference(addr3, transientPredicate(addr2), false);
+    }
+
+    public void testEmptyDifference(InetAddressAndPort local, 
Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
+    {
+        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "same", range2, "same", range3, "same"),
+                                                         treeResponse(addr2, 
range1, "same", range2, "same", range3, "same"));
+
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
+                                                                               
     treeResponses,
+                                                                               
     local, // local
+                                                                               
     isTransient,
+                                                                               
     false,
+                                                                               
     pullRepair,
+                                                                               
     PreviewKind.ALL));
+
+        Assert.assertTrue(tasks.isEmpty());
+    }
+
+    @Test
+    public void testCreateStandardSyncTasksAllDifferent()
+    {
+        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "one",   range2, "one",   range3, "one"),
+                                                         treeResponse(addr2, 
range1, "two",   range2, "two",   range3, "two"),
+                                                         treeResponse(addr3, 
range1, "three", range2, "three", range3, "three"));
+
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
+                                                                               
     treeResponses,
+                                                                               
     addr1, // local
+                                                                               
     ep -> ep.equals(addr3), // transient
+                                                                               
     false,
+                                                                               
     true,
+                                                                               
     PreviewKind.ALL));
+
+        Assert.assertEquals(3, tasks.size());
+        SyncTask task = tasks.get(pair(addr1, addr2));
+        Assert.assertTrue(task.isLocal());
+        Assert.assertEquals(Arrays.asList(range1, range2, range3), 
task.rangesToSync);
+
+        task = tasks.get(pair(addr2, addr3));
+        Assert.assertFalse(task.isLocal());
+        Assert.assertEquals(Arrays.asList(range1, range2, range3), 
task.rangesToSync);
+
+        task = tasks.get(pair(addr1, addr3));
+        Assert.assertTrue(task.isLocal());
+        Assert.assertEquals(Arrays.asList(range1, range2, range3), 
task.rangesToSync);
+    }
+
+    @Test
+    public void testCreate5NodeStandardSyncTasksWithTransient()
+    {
+        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "one",   range2, "one",   range3, "one"),
+                                                         treeResponse(addr2, 
range1, "two",   range2, "two",   range3, "two"),
+                                                         treeResponse(addr3, 
range1, "three", range2, "three", range3, "three"),
+                                                         treeResponse(addr4, 
range1, "four",  range2, "four",  range3, "four"),
+                                                         treeResponse(addr5, 
range1, "five",  range2, "five",  range3, "five"));
+
+        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || 
ep.equals(addr5);
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
+                                                                               
     treeResponses,
+                                                                               
     addr1, // local
+                                                                               
     isTransient, // transient
+                                                                               
     false,
+                                                                               
     true,
+                                                                               
     PreviewKind.ALL));
+
+        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
+                                                   pair(addr1, addr3),
+                                                   pair(addr1, addr4),
+                                                   pair(addr1, addr5),
+                                                   pair(addr2, addr4),
+                                                   pair(addr2, addr4),
+                                                   pair(addr2, addr5),
+                                                   pair(addr3, addr4),
+                                                   pair(addr3, addr5)};
+
+        for (SyncNodePair pair : pairs)
+        {
+            SyncTask task = tasks.get(pair);
+            // Local only if addr1 is a coordinator
+            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
+
+            boolean isRemote = !pair.coordinator.equals(addr1) && 
!pair.peer.equals(addr1);
+            boolean involvesTransient = isTransient.test(pair.coordinator) || 
isTransient.test(pair.peer);
+            assertEquals(String.format("Coordinator: %s\n, Peer: 
%s\n",pair.coordinator, pair.peer),
+                         isRemote && involvesTransient,
+                         task instanceof AsymmetricRemoteSyncTask);
+
+            // All ranges to be synchronised
+            Assert.assertEquals(Arrays.asList(range1, range2, range3), 
task.rangesToSync);
+        }
+    }
+
+    @Test
+    public void testLocalSyncWithTransient()
+    {
+        try
+        {
+            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, 
addr2, addr3 })
+            {
+                FBUtilities.reset();
+                DatabaseDescriptor.setBroadcastAddress(local.address);
+                testLocalSyncWithTransient(local, false);
+            }
+        }
+        finally
+        {
+            FBUtilities.reset();
+            DatabaseDescriptor.setBroadcastAddress(addr1.address);
+        }
+    }
+
+    @Test
+    public void testLocalSyncWithTransientPullRepair()
+    {
+        try
+        {
+            for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, 
addr2, addr3 })
+            {
+                FBUtilities.reset();
+                DatabaseDescriptor.setBroadcastAddress(local.address);
+                testLocalSyncWithTransient(local, true);
+            }
+        }
+        finally
+        {
+            FBUtilities.reset();
+            DatabaseDescriptor.setBroadcastAddress(addr1.address);
+        }
+
+    }
+
+    public static void testLocalSyncWithTransient(InetAddressAndPort local, 
boolean pullRepair)
+    {
+        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "one",   range2, "one",   range3, "one"),
+                                                         treeResponse(addr2, 
range1, "two",   range2, "two",   range3, "two"),
+                                                         treeResponse(addr3, 
range1, "three", range2, "three", range3, "three"),
+                                                         treeResponse(addr4, 
range1, "four",  range2, "four",  range3, "four"),
+                                                         treeResponse(addr5, 
range1, "five",  range2, "five",  range3, "five"));
+
+        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || 
ep.equals(addr5);
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
+                                                                               
     treeResponses,
+                                                                               
     local, // local
+                                                                               
     isTransient, // transient
+                                                                               
     false,
+                                                                               
     pullRepair,
+                                                                               
     PreviewKind.ALL));
+
+        assertEquals(9, tasks.size());
+        for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, 
addr3 })
+        {
+            if (local.equals(addr))
+                continue;
+
+            LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
+            assertTrue(task.requestRanges);
+            assertEquals(!pullRepair, task.transferRanges);
+        }
+
+        LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
+        assertTrue(task.requestRanges);
+        assertFalse(task.transferRanges);
+
+        task = (LocalSyncTask) tasks.get(pair(local, addr5));
+        assertTrue(task.requestRanges);
+        assertFalse(task.transferRanges);
+    }
+
+    @Test
+    public void testLocalAndRemoteTransient()
+    {
+        testLocalAndRemoteTransient(false);
+    }
+
+    @Test
+    public void testLocalAndRemoteTransientPullRepair()
+    {
+        testLocalAndRemoteTransient(true);
+    }
+
+    private static void testLocalAndRemoteTransient(boolean pullRepair)
+    {
+        DatabaseDescriptor.setBroadcastAddress(addr4.address);
+        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "one", range2, "one", range3, "one"),
+                                                         treeResponse(addr2, 
range1, "two", range2, "two", range3, "two"),
+                                                         treeResponse(addr3, 
range1, "three", range2, "three", range3, "three"),
+                                                         treeResponse(addr4, 
range1, "four", range2, "four", range3, "four"),
+                                                         treeResponse(addr5, 
range1, "five", range2, "five", range3, "five"));
+
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
+                                                                               
     treeResponses,
+                                                                               
     addr4, // local
+                                                                               
     ep -> ep.equals(addr4) || ep.equals(addr5), // transient
+                                                                               
     false,
+                                                                               
     pullRepair,
+                                                                               
     PreviewKind.ALL));
+
+        assertNull(tasks.get(pair(addr4, addr5)));
+    }
+
+    @Test
+    public void testOptimizedCreateStandardSyncTasksAllDifferent()
+    {
+        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "one",   range2, "one",   range3, "one"),
+                                                         treeResponse(addr2, 
range1, "two",   range2, "two",   range3, "two"),
+                                                         treeResponse(addr3, 
range1, "three", range2, "three", range3, "three"));
+
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+                                                                               
             treeResponses,
+                                                                               
             addr1, // local
+                                                                               
             noTransient(),
+                                                                               
             addr -> "DC1",
+                                                                               
             false,
+                                                                               
             PreviewKind.ALL));
+
+        for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
+                                                     pair(addr1, addr3),
+                                                     pair(addr2, addr1),
+                                                     pair(addr2, addr3),
+                                                     pair(addr3, addr1),
+                                                     pair(addr3, addr2) })
+        {
+            assertEquals(Arrays.asList(range1, range2, range3), 
tasks.get(pair).rangesToSync);
+        }
+    }
+
+    @Test
+    public void testOptimizedCreateStandardSyncTasks()
+    {
+        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "one",   range2, "one"),
+                                                         treeResponse(addr2, 
range1, "one",   range2, "two"),
+                                                         treeResponse(addr3, 
range1, "three", range2, "two"));
+
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+                                                                               
             treeResponses,
+                                                                               
             addr4, // local
+                                                                               
             noTransient(),
+                                                                               
             addr -> "DC1",
+                                                                               
             false,
+                                                                               
             PreviewKind.ALL));
+
+        for (SyncTask task : tasks.values())
+            assertTrue(task instanceof AsymmetricRemoteSyncTask);
+
+        assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, 
addr3)).rangesToSync);
+        // addr1 can get range2 from either addr2 or addr3 but not from both
+        assertStreamRangeFromEither(tasks, Arrays.asList(range2),
+                                    addr1, addr2, addr3);
+
+        assertEquals(Arrays.asList(range1), tasks.get(pair(addr2, 
addr3)).rangesToSync);
+        assertEquals(Arrays.asList(range2), tasks.get(pair(addr2, 
addr1)).rangesToSync);
+
+        // addr3 can get range1 from either addr1 or addr2 but not from both
+        assertStreamRangeFromEither(tasks, Arrays.asList(range1),
+                                    addr3, addr2, addr1);
+        assertEquals(Arrays.asList(range2), tasks.get(pair(addr3, 
addr1)).rangesToSync);
+    }
+
+    @Test
+    public void testOptimizedCreateStandardSyncTasksWithTransient()
+    {
+        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "same",      range2, "same", range3, "same"),
+                                                         treeResponse(addr2, 
range1, "different", range2, "same", range3, "different"),
+                                                         treeResponse(addr3, 
range1, "same",      range2, "same", range3, "same"));
+
+        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), "ks", "cf", Arrays.asList());
+        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
+                                                                               
             treeResponses,
+                                                                               
             addr1, // local
+                                                                               
             ep -> ep.equals(addr3),
+                                                                               
             addr -> "DC1",
+                                                                               
             false,
+                                                                               
             PreviewKind.ALL));
+
+        assertEquals(3, tasks.size());
+        SyncTask task = tasks.get(pair(addr1, addr2));
+        assertTrue(task.isLocal());
+        assertElementEquals(Arrays.asList(range1, range3), task.rangesToSync);
+        assertTrue(((LocalSyncTask)task).requestRanges);
+        assertFalse(((LocalSyncTask)task).transferRanges);
+
+        assertStreamRangeFromEither(tasks, Arrays.asList(range3),
+                                    addr2, addr1, addr3);
+
+        task = tasks.get(pair(addr2, addr3));
+        assertFalse(task.isLocal());
+        assertElementEquals(Arrays.asList(range1), task.rangesToSync);
+    }
+
+    // Asserts that ranges are streamed from one of the nodes but not from the 
both
+    public static void assertStreamRangeFromEither(Map<SyncNodePair, SyncTask> 
tasks, List<Range<Token>> ranges,
+                                                   InetAddressAndPort target, 
InetAddressAndPort either, InetAddressAndPort or)
+    {
+        InetAddressAndPort streamsFrom;
+        InetAddressAndPort doesntStreamFrom;
+        if (tasks.containsKey(pair(target, either)))
+        {
+            streamsFrom = either;
+            doesntStreamFrom = or;
+        }
+        else
+        {
+            doesntStreamFrom = either;
+            streamsFrom = or;
+        }
+
+        SyncTask task = tasks.get(pair(target, streamsFrom));
+        assertTrue(task instanceof AsymmetricRemoteSyncTask);
+        assertElementEquals(ranges, task.rangesToSync);
+        assertDoesntStreamRangeFrom(tasks, ranges, target, doesntStreamFrom);
+    }
+
+    public static void assertDoesntStreamRangeFrom(Map<SyncNodePair, SyncTask> 
tasks, List<Range<Token>> ranges,
+                                                   InetAddressAndPort target, 
InetAddressAndPort source)
+    {
+        Set<Range<Token>> rangeSet = new HashSet<>(ranges);
+        SyncTask task = tasks.get(pair(target, source));
+        if (task == null)
+            return; // Doesn't stream anything
+
+        for (Range<Token> range : task.rangesToSync)
+        {
+            assertFalse(String.format("%s shouldn't stream %s from %s",
+                                      target, range, source),
+                        rangeSet.contains(range));
+        }
+    }
+
+    public static <T> void assertElementEquals(Collection<T> col1, 
Collection<T> col2)
+    {
+        Set<T> set1 = new HashSet<>(col1);
+        Set<T> set2 = new HashSet<>(col2);
+        Set<T> difference = Sets.difference(set1, set2);
+        assertTrue("Expected empty difference but got: " + 
difference.toString(),
+                   difference.isEmpty());
+    }
+
+    public static Token tk(int i)
+    {
+        return PARTITIONER.getToken(ByteBufferUtil.bytes(i));
+    }
+
+    public static Range<Token> range(int from, int to)
+    {
+        return new Range<>(tk(from), tk(to));
+    }
+
+    public static TreeResponse treeResponse(InetAddressAndPort addr, Object... 
rangesAndHashes)
+    {
+        MerkleTrees trees = new MerkleTrees(PARTITIONER);
+        for (int i = 0; i < rangesAndHashes.length; i += 2)
+        {
+            Range<Token> range = (Range<Token>) rangesAndHashes[i];
+            String hash = (String) rangesAndHashes[i + 1];
+            MerkleTree tree = trees.addMerkleTree(2, 
MerkleTree.RECOMMENDED_DEPTH, range);
+            tree.get(range.left).hash(hash.getBytes());
+        }
+
+        return new TreeResponse(addr, trees);
+    }
+
+    public static SyncNodePair pair(InetAddressAndPort node1, 
InetAddressAndPort node2)
+    {
+        return new SyncNodePair(node1, node2);
+    }
+
+    public static Map<SyncNodePair, SyncTask> toMap(List<SyncTask> tasks)
+    {
+        Map<SyncNodePair, SyncTask> map = new HashMap();
+        for (SyncTask task : tasks)
+        {
+            SyncTask oldTask = map.put(task.nodePair, task);
+            Assert.assertNull(String.format("\nNode pair: %s\nOld task:  
%s\nNew task:  %s\n",
+                                            task.nodePair,
+                                            oldTask,
+                                            task),
+                              oldTask);
+        }
+        return map;
+    }
+
+    public static Predicate<InetAddressAndPort> 
transientPredicate(InetAddressAndPort... transientNodes)
+    {
+        Set<InetAddressAndPort> set = new HashSet<>();
+        for (InetAddressAndPort node : transientNodes)
+            set.add(node);
+
+        return set::contains;
+    }
+
+    public static Predicate<InetAddressAndPort> noTransient()
+    {
+        return node -> false;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to