Repository: cassandra Updated Branches: refs/heads/trunk 73ebd200c -> a05785d82
Add tests for RepairJob Patch by Alex Petrov; reviewed by Marcus Eriksson for CASSANDRA-14717 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a05785d8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a05785d8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a05785d8 Branch: refs/heads/trunk Commit: a05785d82c621c9cd04d8a064c38fd2012ef981c Parents: 73ebd20 Author: Alex Petrov <oleksandr.pet...@gmail.com> Authored: Mon Oct 1 15:30:58 2018 +0200 Committer: Alex Petrov <alex.pet...@apple.com> Committed: Tue Oct 9 22:33:31 2018 +0100 ---------------------------------------------------------------------- .../repair/AsymmetricRemoteSyncTask.java | 19 +- .../apache/cassandra/repair/LocalSyncTask.java | 39 +- .../org/apache/cassandra/repair/RepairJob.java | 146 +++-- .../apache/cassandra/repair/RepairSession.java | 8 +- .../repair/SymmetricRemoteSyncTask.java | 20 +- .../org/apache/cassandra/repair/SyncTask.java | 4 + .../cassandra/repair/LocalSyncTaskTest.java | 26 +- .../apache/cassandra/repair/RepairJobTest.java | 625 +++++++++++++++++++ 8 files changed, 793 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java index 9ba33dd..d2a6aeb 100644 --- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java @@ -41,15 +41,9 @@ import org.apache.cassandra.utils.MerkleTrees; */ public class AsymmetricRemoteSyncTask extends SyncTask implements CompletableRemoteSyncTask { - public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort fetchNode, InetAddressAndPort fetchFrom, - List<Range<Token>> rangesToFetch, PreviewKind previewKind) + public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort to, InetAddressAndPort from, List<Range<Token>> differences, PreviewKind previewKind) { - super(desc, fetchNode, fetchFrom, rangesToFetch, previewKind); - } - - public AsymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse to, TreeResponse from, PreviewKind previewKind) - { - this(desc, to.endpoint, from.endpoint, MerkleTrees.difference(to.trees, from.trees), previewKind); + super(desc, to, from, differences, previewKind); } public void startSync() @@ -72,4 +66,13 @@ public class AsymmetricRemoteSyncTask extends SyncTask implements CompletableRem setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", nodePair.coordinator, nodePair.peer))); } } + + @Override + public String toString() + { + return "AsymmetricRemoteSyncTask{" + + "rangesToSync=" + rangesToSync + + ", nodePair=" + nodePair + + '}'; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index 1923fbe..f9b5765 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -52,15 +52,11 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class); private final UUID pendingRepair; - private final boolean requestRanges; - private final boolean transferRanges; - public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote, UUID pendingRepair, - boolean requestRanges, boolean transferRanges, PreviewKind previewKind) - { - this(desc, local.endpoint, remote.endpoint, MerkleTrees.difference(local.trees, remote.trees), - pendingRepair, requestRanges, transferRanges, previewKind); - } + @VisibleForTesting + final boolean requestRanges; + @VisibleForTesting + final boolean transferRanges; public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote, List<Range<Token>> diff, UUID pendingRepair, @@ -76,8 +72,10 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler } @VisibleForTesting - StreamPlan createStreamPlan(InetAddressAndPort remote, List<Range<Token>> differences) + StreamPlan createStreamPlan() { + InetAddressAndPort remote = nodePair.peer; + StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind) .listeners(this) .flushBeforeTransfer(pendingRepair == null); @@ -85,7 +83,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler if (requestRanges) { // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here - plan.requestRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(differences), + plan.requestRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(rangesToSync), RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); } @@ -93,7 +91,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler { // send ranges to the remote node if we are not performing a pull repair // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here - plan.transferRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(differences), desc.columnFamily); + plan.transferRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(rangesToSync), desc.columnFamily); } return plan; @@ -112,7 +110,13 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); - createStreamPlan(remote, rangesToSync).execute(); + createStreamPlan().execute(); + } + + @Override + public boolean isLocal() + { + return true; } public void handleStreamEvent(StreamEvent event) @@ -155,4 +159,15 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler setException(t); finished(); } + + @Override + public String toString() + { + return "LocalSyncTask{" + + "requestRanges=" + requestRanges + + ", transferRanges=" + transferRanges + + ", rangesToSync=" + rangesToSync + + ", nodePair=" + nodePair + + '}'; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index c96e7fb..ebeb1f9 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -18,6 +18,8 @@ package org.apache.cassandra.repair; import java.util.*; +import java.util.function.Predicate; +import java.util.function.Function; import java.util.stream.Collectors; import com.google.common.base.Preconditions; @@ -39,6 +41,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTrees; import org.apache.cassandra.utils.Pair; /** @@ -52,9 +55,6 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable private final RepairJobDesc desc; private final RepairParallelism parallelismDegree; private final ListeningExecutorService taskExecutor; - private final boolean isIncremental; - private final PreviewKind previewKind; - private final boolean optimiseStreams; /** * Create repair job to run on specific columnfamily @@ -62,15 +62,12 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable * @param session RepairSession that this RepairJob belongs * @param columnFamily name of the ColumnFamily to repair */ - public RepairJob(RepairSession session, String columnFamily, boolean isIncremental, PreviewKind previewKind, boolean optimiseStreams) + public RepairJob(RepairSession session, String columnFamily) { this.session = session; this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.commonRange.ranges); this.taskExecutor = session.taskExecutor; this.parallelismDegree = session.parallelismDegree; - this.isIncremental = isIncremental; - this.previewKind = previewKind; - this.optimiseStreams = optimiseStreams; } /** @@ -92,7 +89,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable if (parallelismDegree != RepairParallelism.PARALLEL) { ListenableFuture<List<InetAddressAndPort>> allSnapshotTasks; - if (isIncremental) + if (session.isIncremental) { // consistent repair does it's own "snapshotting" allSnapshotTasks = Futures.immediateFuture(allEndpoints); @@ -130,7 +127,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable // When all validations complete, submit sync tasks ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync(validations, - optimiseStreams && !session.pullRepair ? this::optimisedSyncing : this::standardSyncing, + session.optimiseStreams && !session.pullRepair ? this::optimisedSyncing : this::standardSyncing, taskExecutor); // When all sync complete, set the final result @@ -138,9 +135,9 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable { public void onSuccess(List<SyncStat> stats) { - if (!previewKind.isPreview()) + if (!session.previewKind.isPreview()) { - logger.info("{} {} is fully synced", previewKind.logPrefix(session.getId()), desc.columnFamily); + logger.info("{} {} is fully synced", session.previewKind.logPrefix(session.getId()), desc.columnFamily); SystemDistributedKeyspace.successfulRepairJob(session.getId(), desc.keyspace, desc.columnFamily); } cfs.metric.repairsCompleted.inc(); @@ -152,9 +149,9 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable */ public void onFailure(Throwable t) { - if (!previewKind.isPreview()) + if (!session.previewKind.isPreview()) { - logger.warn("{} {} sync failed", previewKind.logPrefix(session.getId()), desc.columnFamily); + logger.warn("{} {} sync failed", session.previewKind.logPrefix(session.getId()), desc.columnFamily); SystemDistributedKeyspace.failedRepairJob(session.getId(), desc.keyspace, desc.columnFamily, t); } cfs.metric.repairsCompleted.inc(); @@ -170,8 +167,24 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable private ListenableFuture<List<SyncStat>> standardSyncing(List<TreeResponse> trees) { - InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); + List<SyncTask> syncTasks = createStandardSyncTasks(desc, + trees, + FBUtilities.getLocalAddressAndPort(), + this::isTransient, + session.isIncremental, + session.pullRepair, + session.previewKind); + return executeTasks(syncTasks); + } + static List<SyncTask> createStandardSyncTasks(RepairJobDesc desc, + List<TreeResponse> trees, + InetAddressAndPort local, + Predicate<InetAddressAndPort> isTransient, + boolean isIncremental, + boolean pullRepair, + PreviewKind previewKind) + { List<SyncTask> syncTasks = new ArrayList<>(); // We need to difference all trees one against another for (int i = 0; i < trees.size() - 1; ++i) @@ -182,7 +195,13 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable TreeResponse r2 = trees.get(j); // Avoid streming between two tansient replicas - if (isTransient(r1.endpoint) && isTransient(r2.endpoint)) + if (isTransient.test(r1.endpoint) && isTransient.test(r2.endpoint)) + continue; + + List<Range<Token>> differences = MerkleTrees.difference(r1.trees, r2.trees); + + // Nothing to do + if (differences.isEmpty()) continue; SyncTask task; @@ -192,43 +211,67 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable TreeResponse remote = r2.endpoint.equals(local) ? r1 : r2; // pull only if local is full - boolean requestRanges = !isTransient(self.endpoint); + boolean requestRanges = !isTransient.test(self.endpoint); // push only if remote is full; additionally check for pull repair - boolean transferRanges = !isTransient(remote.endpoint) && !session.pullRepair; + boolean transferRanges = !isTransient.test(remote.endpoint) && !pullRepair; // Nothing to do if (!requestRanges && !transferRanges) continue; - task = new LocalSyncTask(desc, self, remote, isIncremental ? desc.parentSessionId : null, - requestRanges, transferRanges, session.previewKind); + task = new LocalSyncTask(desc, self.endpoint, remote.endpoint, differences, isIncremental ? desc.parentSessionId : null, + requestRanges, transferRanges, previewKind); } - else if (isTransient(r1.endpoint) || isTransient(r2.endpoint)) + else if (isTransient.test(r1.endpoint) || isTransient.test(r2.endpoint)) { // Stream only from transient replica - TreeResponse streamFrom = isTransient(r1.endpoint) ? r1 : r2; - TreeResponse streamTo = isTransient(r1.endpoint) ? r2 : r1; - task = new AsymmetricRemoteSyncTask(desc, streamTo, streamFrom, previewKind); - session.waitForSync(Pair.create(desc, task.nodePair()), (AsymmetricRemoteSyncTask) task); + TreeResponse streamFrom = isTransient.test(r1.endpoint) ? r1 : r2; + TreeResponse streamTo = isTransient.test(r1.endpoint) ? r2 : r1; + task = new AsymmetricRemoteSyncTask(desc, streamTo.endpoint, streamFrom.endpoint, differences, previewKind); } else { - task = new SymmetricRemoteSyncTask(desc, r1, r2, session.previewKind); - // SymmetricRemoteSyncTask expects SyncComplete message sent back. - // Register task to RepairSession to receive response. - session.waitForSync(Pair.create(desc, task.nodePair()), (SymmetricRemoteSyncTask) task); + task = new SymmetricRemoteSyncTask(desc, r1.endpoint, r2.endpoint, differences, previewKind); } syncTasks.add(task); - taskExecutor.submit(task); } } - return Futures.allAsList(syncTasks); + return syncTasks; } private ListenableFuture<List<SyncStat>> optimisedSyncing(List<TreeResponse> trees) { - InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); + List<SyncTask> syncTasks = createOptimisedSyncingSyncTasks(desc, + trees, + FBUtilities.getLocalAddressAndPort(), + this::isTransient, + this::getDC, + session.isIncremental, + session.previewKind); + + return executeTasks(syncTasks); + } + + private ListenableFuture<List<SyncStat>> executeTasks(List<SyncTask> syncTasks) + { + for (SyncTask task : syncTasks) + { + if (!task.isLocal()) + session.trackSyncCompletion(Pair.create(desc, task.nodePair()), (CompletableRemoteSyncTask) task); + taskExecutor.submit(task); + } + + return Futures.allAsList(syncTasks); + } + static List<SyncTask> createOptimisedSyncingSyncTasks(RepairJobDesc desc, + List<TreeResponse> trees, + InetAddressAndPort local, + Predicate<InetAddressAndPort> isTransient, + Function<InetAddressAndPort, String> getDC, + boolean isIncremental, + PreviewKind previewKind) + { List<SyncTask> syncTasks = new ArrayList<>(); // We need to difference all trees one against another DifferenceHolder diffHolder = new DifferenceHolder(trees); @@ -236,8 +279,8 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable logger.debug("diffs = {}", diffHolder); PreferedNodeFilter preferSameDCFilter = (streaming, candidates) -> candidates.stream() - .filter(node -> getDC(streaming) - .equals(getDC(node))) + .filter(node -> getDC.apply(streaming) + .equals(getDC.apply(node))) .collect(Collectors.toSet()); ImmutableMap<InetAddressAndPort, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter); @@ -246,7 +289,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable InetAddressAndPort address = trees.get(i).endpoint; // we don't stream to transient replicas - if (isTransient(address)) + if (isTransient.test(address)) continue; HostDifferences streamsFor = reducedDifferences.get(address); @@ -256,20 +299,21 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable for (InetAddressAndPort fetchFrom : streamsFor.hosts()) { List<Range<Token>> toFetch = streamsFor.get(fetchFrom); + assert !toFetch.isEmpty(); + logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom); SyncTask task; if (address.equals(local)) { task = new LocalSyncTask(desc, address, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, - true, false, session.previewKind); + true, false, previewKind); } else { task = new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind); - session.waitForSync(Pair.create(desc, task.nodePair()), (AsymmetricRemoteSyncTask) task); } syncTasks.add(task); - taskExecutor.submit(task); + } } else @@ -277,7 +321,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable logger.debug("Node {} has nothing to stream", address); } } - return Futures.allAsList(syncTasks); + return syncTasks; } private String getDC(InetAddressAndPort address) @@ -294,15 +338,15 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable private ListenableFuture<List<TreeResponse>> sendValidationRequest(Collection<InetAddressAndPort> endpoints) { String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints); - logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); + logger.info("{} {}", session.previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); int nowInSec = FBUtilities.nowInSeconds(); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); for (InetAddressAndPort endpoint : endpoints) { - ValidationTask task = new ValidationTask(desc, endpoint, nowInSec, previewKind); + ValidationTask task = new ValidationTask(desc, endpoint, nowInSec, session.previewKind); tasks.add(task); - session.waitForValidation(Pair.create(desc, endpoint), task); + session.trackValidationCompletion(Pair.create(desc, endpoint), task); taskExecutor.execute(task); } return Futures.allAsList(tasks); @@ -314,29 +358,29 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable private ListenableFuture<List<TreeResponse>> sendSequentialValidationRequest(Collection<InetAddressAndPort> endpoints) { String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints); - logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); + logger.info("{} {}", session.previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); int nowInSec = FBUtilities.nowInSeconds(); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); Queue<InetAddressAndPort> requests = new LinkedList<>(endpoints); InetAddressAndPort address = requests.poll(); - ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, previewKind); + ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, session.previewKind); logger.info("Validating {}", address); - session.waitForValidation(Pair.create(desc, address), firstTask); + session.trackValidationCompletion(Pair.create(desc, address), firstTask); tasks.add(firstTask); ValidationTask currentTask = firstTask; while (requests.size() > 0) { final InetAddressAndPort nextAddress = requests.poll(); - final ValidationTask nextTask = new ValidationTask(desc, nextAddress, nowInSec, previewKind); + final ValidationTask nextTask = new ValidationTask(desc, nextAddress, nowInSec, session.previewKind); tasks.add(nextTask); Futures.addCallback(currentTask, new FutureCallback<TreeResponse>() { public void onSuccess(TreeResponse result) { logger.info("Validating {}", nextAddress); - session.waitForValidation(Pair.create(desc, nextAddress), nextTask); + session.trackValidationCompletion(Pair.create(desc, nextAddress), nextTask); taskExecutor.execute(nextTask); } @@ -356,7 +400,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable private ListenableFuture<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetAddressAndPort> endpoints) { String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints); - logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); + logger.info("{} {}", session.previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); int nowInSec = FBUtilities.nowInSeconds(); List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); @@ -378,22 +422,22 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable { Queue<InetAddressAndPort> requests = entry.getValue(); InetAddressAndPort address = requests.poll(); - ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, previewKind); + ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, session.previewKind); logger.info("Validating {}", address); - session.waitForValidation(Pair.create(desc, address), firstTask); + session.trackValidationCompletion(Pair.create(desc, address), firstTask); tasks.add(firstTask); ValidationTask currentTask = firstTask; while (requests.size() > 0) { final InetAddressAndPort nextAddress = requests.poll(); - final ValidationTask nextTask = new ValidationTask(desc, nextAddress, nowInSec, previewKind); + final ValidationTask nextTask = new ValidationTask(desc, nextAddress, nowInSec, session.previewKind); tasks.add(nextTask); Futures.addCallback(currentTask, new FutureCallback<TreeResponse>() { public void onSuccess(TreeResponse result) { logger.info("Validating {}", nextAddress); - session.waitForValidation(Pair.create(desc, nextAddress), nextTask); + session.trackValidationCompletion(Pair.create(desc, nextAddress), nextTask); taskExecutor.execute(nextTask); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 4dc563a..a38205e 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -106,7 +106,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor public final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask")); - private final boolean optimiseStreams; + public final boolean optimiseStreams; private volatile boolean terminated = false; @@ -189,12 +189,12 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement return commonRange.endpoints; } - public void waitForValidation(Pair<RepairJobDesc, InetAddressAndPort> key, ValidationTask task) + public void trackValidationCompletion(Pair<RepairJobDesc, InetAddressAndPort> key, ValidationTask task) { validating.put(key, task); } - public void waitForSync(Pair<RepairJobDesc, SyncNodePair> key, CompletableRemoteSyncTask task) + public void trackSyncCompletion(Pair<RepairJobDesc, SyncNodePair> key, CompletableRemoteSyncTask task) { syncingTasks.put(key, task); } @@ -305,7 +305,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement List<ListenableFuture<RepairResult>> jobs = new ArrayList<>(cfnames.length); for (String cfname : cfnames) { - RepairJob job = new RepairJob(this, cfname, isIncremental, previewKind, optimiseStreams); + RepairJob job = new RepairJob(this, cfname); executor.execute(job); jobs.add(job); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java index 4e44c15..c731bc1 100644 --- a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java @@ -47,16 +47,9 @@ public class SymmetricRemoteSyncTask extends SyncTask implements CompletableRemo { private static final Logger logger = LoggerFactory.getLogger(SymmetricRemoteSyncTask.class); - public SymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind) + public SymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort r1, InetAddressAndPort r2, List<Range<Token>> differences, PreviewKind previewKind) { - super(desc, r1.endpoint, r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees), previewKind); - } - - @VisibleForTesting - SymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort e1, InetAddressAndPort e2, - List<Range<Token>> differences, PreviewKind previewKind) - { - super(desc, e1, e2, differences, previewKind); + super(desc, r1, r2, differences, previewKind); } void sendRequest(RepairMessage request, InetAddressAndPort to) @@ -89,4 +82,13 @@ public class SymmetricRemoteSyncTask extends SyncTask implements CompletableRemo } finished(); } + + @Override + public String toString() + { + return "SymmetricRemoteSyncTask{" + + "rangesToSync=" + rangesToSync + + ", nodePair=" + nodePair + + '}'; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/src/java/org/apache/cassandra/repair/SyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java index ccbd26c..8b622c4 100644 --- a/src/java/org/apache/cassandra/repair/SyncTask.java +++ b/src/java/org/apache/cassandra/repair/SyncTask.java @@ -87,6 +87,10 @@ public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runna startSync(); } + public boolean isLocal() + { + return false; + } protected void finished() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index 6e691f5..903a273 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -96,7 +96,8 @@ public class LocalSyncTaskTest extends AbstractRepairTest // note: we reuse the same endpoint which is bogus in theory but fine here TreeResponse r1 = new TreeResponse(local, tree1); TreeResponse r2 = new TreeResponse(ep2, tree2); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); + LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees), + NO_PENDING_REPAIR, true, true, PreviewKind.NONE); task.run(); assertEquals(0, task.get().numberOfDifferences); @@ -133,7 +134,8 @@ public class LocalSyncTaskTest extends AbstractRepairTest // note: we reuse the same endpoint which is bogus in theory but fine here TreeResponse r1 = new TreeResponse(local, tree1); TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); + LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees), + NO_PENDING_REPAIR, true, true, PreviewKind.NONE); DefaultConnectionFactory.MAX_CONNECT_ATTEMPTS = 1; DefaultConnectionFactory.MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2); try @@ -160,8 +162,9 @@ public class LocalSyncTaskTest extends AbstractRepairTest TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, true, true, PreviewKind.NONE); - StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); + LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees), + NO_PENDING_REPAIR, true, true, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(); assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair()); assertTrue(plan.getFlushBeforeTransfer()); @@ -185,8 +188,9 @@ public class LocalSyncTaskTest extends AbstractRepairTest TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, true, true, PreviewKind.NONE); - StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); + LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees), + desc.parentSessionId, true, true, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(); assertEquals(desc.parentSessionId, plan.getPendingRepair()); assertFalse(plan.getFlushBeforeTransfer()); @@ -206,8 +210,9 @@ public class LocalSyncTaskTest extends AbstractRepairTest TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, true, false, PreviewKind.NONE); - StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); + LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees), + desc.parentSessionId, true, false, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(); assertNumInOut(plan, 1, 0); } @@ -224,8 +229,9 @@ public class LocalSyncTaskTest extends AbstractRepairTest TreeResponse r1 = new TreeResponse(local, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false, true, PreviewKind.NONE); - StreamPlan plan = task.createStreamPlan(local, Lists.newArrayList(RANGE1)); + LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees), + desc.parentSessionId, false, true, PreviewKind.NONE); + StreamPlan plan = task.createStreamPlan(); assertNumInOut(plan, 0, 1); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a05785d8/test/unit/org/apache/cassandra/repair/RepairJobTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java new file mode 100644 index 0000000..263ecbc --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java @@ -0,0 +1,625 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Predicate; + +import com.google.common.collect.Sets; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class RepairJobTest +{ + private static final IPartitioner PARTITIONER = ByteOrderedPartitioner.instance; + + static InetAddressAndPort addr1; + static InetAddressAndPort addr2; + static InetAddressAndPort addr3; + static InetAddressAndPort addr4; + static InetAddressAndPort addr5; + + static Range<Token> range1 = range(0, 1); + static Range<Token> range2 = range(2, 3); + static Range<Token> range3 = range(4, 5); + static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList()); + + @After + public void reset() + { + FBUtilities.reset(); + DatabaseDescriptor.setBroadcastAddress(addr1.address); + } + + static + { + try + { + addr1 = InetAddressAndPort.getByName("127.0.0.1"); + addr2 = InetAddressAndPort.getByName("127.0.0.2"); + addr3 = InetAddressAndPort.getByName("127.0.0.3"); + addr4 = InetAddressAndPort.getByName("127.0.0.4"); + addr5 = InetAddressAndPort.getByName("127.0.0.5"); + DatabaseDescriptor.setBroadcastAddress(addr1.address); + } + catch (UnknownHostException e) + { + e.printStackTrace(); + } + } + + @Test + public void testCreateStandardSyncTasks() + { + testCreateStandardSyncTasks(false); + } + + @Test + public void testCreateStandardSyncTasksPullRepair() + { + testCreateStandardSyncTasks(true); + } + + public static void testCreateStandardSyncTasks(boolean pullRepair) + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "different", range2, "same", range3, "different"), + treeResponse(addr3, range1, "same", range2, "same", range3, "same")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + addr1, // local + noTransient(), // transient + false, + pullRepair, + PreviewKind.ALL)); + + Assert.assertEquals(2, tasks.size()); + + SyncTask task = tasks.get(pair(addr1, addr2)); + Assert.assertTrue(task.isLocal()); + Assert.assertTrue(((LocalSyncTask) task).requestRanges); + Assert.assertEquals(!pullRepair, ((LocalSyncTask) task).transferRanges); + Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync); + + task = tasks.get(pair(addr2, addr3)); + Assert.assertFalse(task.isLocal()); + Assert.assertTrue(task instanceof SymmetricRemoteSyncTask); + Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync); + + Assert.assertNull(tasks.get(pair(addr1, addr3))); + } + + @Test + public void testStandardSyncTransient() + { + // Do not stream towards transient nodes + testStandardSyncTransient(true); + testStandardSyncTransient(false); + } + + public void testStandardSyncTransient(boolean pullRepair) + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "different", range2, "same", range3, "different")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + addr1, // local + transientPredicate(addr2), + false, + pullRepair, + PreviewKind.ALL)); + + Assert.assertEquals(1, tasks.size()); + + SyncTask task = tasks.get(pair(addr1, addr2)); + Assert.assertTrue(task.isLocal()); + Assert.assertTrue(((LocalSyncTask) task).requestRanges); + Assert.assertFalse(((LocalSyncTask) task).transferRanges); + Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync); + } + + @Test + public void testStandardSyncLocalTransient() + { + // Do not stream towards transient nodes + testStandardSyncLocalTransient(true); + testStandardSyncLocalTransient(false); + } + + public void testStandardSyncLocalTransient(boolean pullRepair) + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "different", range2, "same", range3, "different")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + addr1, // local + transientPredicate(addr1), + false, + pullRepair, + PreviewKind.ALL)); + + if (pullRepair) + { + Assert.assertTrue(tasks.isEmpty()); + return; + } + + Assert.assertEquals(1, tasks.size()); + + SyncTask task = tasks.get(pair(addr1, addr2)); + Assert.assertTrue(task.isLocal()); + Assert.assertFalse(((LocalSyncTask) task).requestRanges); + Assert.assertTrue(((LocalSyncTask) task).transferRanges); + Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync); + } + + @Test + public void testEmptyDifference() + { + // one of the nodes is a local coordinator + testEmptyDifference(addr1, noTransient(), true); + testEmptyDifference(addr1, noTransient(), false); + testEmptyDifference(addr2, noTransient(), true); + testEmptyDifference(addr2, noTransient(), false); + testEmptyDifference(addr1, transientPredicate(addr1), true); + testEmptyDifference(addr2, transientPredicate(addr1), true); + testEmptyDifference(addr1, transientPredicate(addr1), false); + testEmptyDifference(addr2, transientPredicate(addr1), false); + testEmptyDifference(addr1, transientPredicate(addr2), true); + testEmptyDifference(addr2, transientPredicate(addr2), true); + testEmptyDifference(addr1, transientPredicate(addr2), false); + testEmptyDifference(addr2, transientPredicate(addr2), false); + + // nonlocal coordinator + testEmptyDifference(addr3, noTransient(), true); + testEmptyDifference(addr3, noTransient(), false); + testEmptyDifference(addr3, noTransient(), true); + testEmptyDifference(addr3, noTransient(), false); + testEmptyDifference(addr3, transientPredicate(addr1), true); + testEmptyDifference(addr3, transientPredicate(addr1), true); + testEmptyDifference(addr3, transientPredicate(addr1), false); + testEmptyDifference(addr3, transientPredicate(addr1), false); + testEmptyDifference(addr3, transientPredicate(addr2), true); + testEmptyDifference(addr3, transientPredicate(addr2), true); + testEmptyDifference(addr3, transientPredicate(addr2), false); + testEmptyDifference(addr3, transientPredicate(addr2), false); + } + + public void testEmptyDifference(InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean pullRepair) + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "same", range2, "same", range3, "same")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + local, // local + isTransient, + false, + pullRepair, + PreviewKind.ALL)); + + Assert.assertTrue(tasks.isEmpty()); + } + + @Test + public void testCreateStandardSyncTasksAllDifferent() + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"), + treeResponse(addr2, range1, "two", range2, "two", range3, "two"), + treeResponse(addr3, range1, "three", range2, "three", range3, "three")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + addr1, // local + ep -> ep.equals(addr3), // transient + false, + true, + PreviewKind.ALL)); + + Assert.assertEquals(3, tasks.size()); + SyncTask task = tasks.get(pair(addr1, addr2)); + Assert.assertTrue(task.isLocal()); + Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync); + + task = tasks.get(pair(addr2, addr3)); + Assert.assertFalse(task.isLocal()); + Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync); + + task = tasks.get(pair(addr1, addr3)); + Assert.assertTrue(task.isLocal()); + Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync); + } + + @Test + public void testCreate5NodeStandardSyncTasksWithTransient() + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"), + treeResponse(addr2, range1, "two", range2, "two", range3, "two"), + treeResponse(addr3, range1, "three", range2, "three", range3, "three"), + treeResponse(addr4, range1, "four", range2, "four", range3, "four"), + treeResponse(addr5, range1, "five", range2, "five", range3, "five")); + + Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5); + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + addr1, // local + isTransient, // transient + false, + true, + PreviewKind.ALL)); + + SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2), + pair(addr1, addr3), + pair(addr1, addr4), + pair(addr1, addr5), + pair(addr2, addr4), + pair(addr2, addr4), + pair(addr2, addr5), + pair(addr3, addr4), + pair(addr3, addr5)}; + + for (SyncNodePair pair : pairs) + { + SyncTask task = tasks.get(pair); + // Local only if addr1 is a coordinator + assertEquals(task.isLocal(), pair.coordinator.equals(addr1)); + + boolean isRemote = !pair.coordinator.equals(addr1) && !pair.peer.equals(addr1); + boolean involvesTransient = isTransient.test(pair.coordinator) || isTransient.test(pair.peer); + assertEquals(String.format("Coordinator: %s\n, Peer: %s\n",pair.coordinator, pair.peer), + isRemote && involvesTransient, + task instanceof AsymmetricRemoteSyncTask); + + // All ranges to be synchronised + Assert.assertEquals(Arrays.asList(range1, range2, range3), task.rangesToSync); + } + } + + @Test + public void testLocalSyncWithTransient() + { + try + { + for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 }) + { + FBUtilities.reset(); + DatabaseDescriptor.setBroadcastAddress(local.address); + testLocalSyncWithTransient(local, false); + } + } + finally + { + FBUtilities.reset(); + DatabaseDescriptor.setBroadcastAddress(addr1.address); + } + } + + @Test + public void testLocalSyncWithTransientPullRepair() + { + try + { + for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, addr2, addr3 }) + { + FBUtilities.reset(); + DatabaseDescriptor.setBroadcastAddress(local.address); + testLocalSyncWithTransient(local, true); + } + } + finally + { + FBUtilities.reset(); + DatabaseDescriptor.setBroadcastAddress(addr1.address); + } + + } + + public static void testLocalSyncWithTransient(InetAddressAndPort local, boolean pullRepair) + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"), + treeResponse(addr2, range1, "two", range2, "two", range3, "two"), + treeResponse(addr3, range1, "three", range2, "three", range3, "three"), + treeResponse(addr4, range1, "four", range2, "four", range3, "four"), + treeResponse(addr5, range1, "five", range2, "five", range3, "five")); + + Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || ep.equals(addr5); + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + local, // local + isTransient, // transient + false, + pullRepair, + PreviewKind.ALL)); + + assertEquals(9, tasks.size()); + for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, addr2, addr3 }) + { + if (local.equals(addr)) + continue; + + LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr)); + assertTrue(task.requestRanges); + assertEquals(!pullRepair, task.transferRanges); + } + + LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4)); + assertTrue(task.requestRanges); + assertFalse(task.transferRanges); + + task = (LocalSyncTask) tasks.get(pair(local, addr5)); + assertTrue(task.requestRanges); + assertFalse(task.transferRanges); + } + + @Test + public void testLocalAndRemoteTransient() + { + testLocalAndRemoteTransient(false); + } + + @Test + public void testLocalAndRemoteTransientPullRepair() + { + testLocalAndRemoteTransient(true); + } + + private static void testLocalAndRemoteTransient(boolean pullRepair) + { + DatabaseDescriptor.setBroadcastAddress(addr4.address); + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"), + treeResponse(addr2, range1, "two", range2, "two", range3, "two"), + treeResponse(addr3, range1, "three", range2, "three", range3, "three"), + treeResponse(addr4, range1, "four", range2, "four", range3, "four"), + treeResponse(addr5, range1, "five", range2, "five", range3, "five")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createStandardSyncTasks(desc, + treeResponses, + addr4, // local + ep -> ep.equals(addr4) || ep.equals(addr5), // transient + false, + pullRepair, + PreviewKind.ALL)); + + assertNull(tasks.get(pair(addr4, addr5))); + } + + @Test + public void testOptimizedCreateStandardSyncTasksAllDifferent() + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one", range3, "one"), + treeResponse(addr2, range1, "two", range2, "two", range3, "two"), + treeResponse(addr3, range1, "three", range2, "three", range3, "three")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc, + treeResponses, + addr1, // local + noTransient(), + addr -> "DC1", + false, + PreviewKind.ALL)); + + for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2), + pair(addr1, addr3), + pair(addr2, addr1), + pair(addr2, addr3), + pair(addr3, addr1), + pair(addr3, addr2) }) + { + assertEquals(Arrays.asList(range1, range2, range3), tasks.get(pair).rangesToSync); + } + } + + @Test + public void testOptimizedCreateStandardSyncTasks() + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "one", range2, "one"), + treeResponse(addr2, range1, "one", range2, "two"), + treeResponse(addr3, range1, "three", range2, "two")); + + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc, + treeResponses, + addr4, // local + noTransient(), + addr -> "DC1", + false, + PreviewKind.ALL)); + + for (SyncTask task : tasks.values()) + assertTrue(task instanceof AsymmetricRemoteSyncTask); + + assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, addr3)).rangesToSync); + // addr1 can get range2 from either addr2 or addr3 but not from both + assertStreamRangeFromEither(tasks, Arrays.asList(range2), + addr1, addr2, addr3); + + assertEquals(Arrays.asList(range1), tasks.get(pair(addr2, addr3)).rangesToSync); + assertEquals(Arrays.asList(range2), tasks.get(pair(addr2, addr1)).rangesToSync); + + // addr3 can get range1 from either addr1 or addr2 but not from both + assertStreamRangeFromEither(tasks, Arrays.asList(range1), + addr3, addr2, addr1); + assertEquals(Arrays.asList(range2), tasks.get(pair(addr3, addr1)).rangesToSync); + } + + @Test + public void testOptimizedCreateStandardSyncTasksWithTransient() + { + List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, range1, "same", range2, "same", range3, "same"), + treeResponse(addr2, range1, "different", range2, "same", range3, "different"), + treeResponse(addr3, range1, "same", range2, "same", range3, "same")); + + RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "ks", "cf", Arrays.asList()); + Map<SyncNodePair, SyncTask> tasks = toMap(RepairJob.createOptimisedSyncingSyncTasks(desc, + treeResponses, + addr1, // local + ep -> ep.equals(addr3), + addr -> "DC1", + false, + PreviewKind.ALL)); + + assertEquals(3, tasks.size()); + SyncTask task = tasks.get(pair(addr1, addr2)); + assertTrue(task.isLocal()); + assertElementEquals(Arrays.asList(range1, range3), task.rangesToSync); + assertTrue(((LocalSyncTask)task).requestRanges); + assertFalse(((LocalSyncTask)task).transferRanges); + + assertStreamRangeFromEither(tasks, Arrays.asList(range3), + addr2, addr1, addr3); + + task = tasks.get(pair(addr2, addr3)); + assertFalse(task.isLocal()); + assertElementEquals(Arrays.asList(range1), task.rangesToSync); + } + + // Asserts that ranges are streamed from one of the nodes but not from the both + public static void assertStreamRangeFromEither(Map<SyncNodePair, SyncTask> tasks, List<Range<Token>> ranges, + InetAddressAndPort target, InetAddressAndPort either, InetAddressAndPort or) + { + InetAddressAndPort streamsFrom; + InetAddressAndPort doesntStreamFrom; + if (tasks.containsKey(pair(target, either))) + { + streamsFrom = either; + doesntStreamFrom = or; + } + else + { + doesntStreamFrom = either; + streamsFrom = or; + } + + SyncTask task = tasks.get(pair(target, streamsFrom)); + assertTrue(task instanceof AsymmetricRemoteSyncTask); + assertElementEquals(ranges, task.rangesToSync); + assertDoesntStreamRangeFrom(tasks, ranges, target, doesntStreamFrom); + } + + public static void assertDoesntStreamRangeFrom(Map<SyncNodePair, SyncTask> tasks, List<Range<Token>> ranges, + InetAddressAndPort target, InetAddressAndPort source) + { + Set<Range<Token>> rangeSet = new HashSet<>(ranges); + SyncTask task = tasks.get(pair(target, source)); + if (task == null) + return; // Doesn't stream anything + + for (Range<Token> range : task.rangesToSync) + { + assertFalse(String.format("%s shouldn't stream %s from %s", + target, range, source), + rangeSet.contains(range)); + } + } + + public static <T> void assertElementEquals(Collection<T> col1, Collection<T> col2) + { + Set<T> set1 = new HashSet<>(col1); + Set<T> set2 = new HashSet<>(col2); + Set<T> difference = Sets.difference(set1, set2); + assertTrue("Expected empty difference but got: " + difference.toString(), + difference.isEmpty()); + } + + public static Token tk(int i) + { + return PARTITIONER.getToken(ByteBufferUtil.bytes(i)); + } + + public static Range<Token> range(int from, int to) + { + return new Range<>(tk(from), tk(to)); + } + + public static TreeResponse treeResponse(InetAddressAndPort addr, Object... rangesAndHashes) + { + MerkleTrees trees = new MerkleTrees(PARTITIONER); + for (int i = 0; i < rangesAndHashes.length; i += 2) + { + Range<Token> range = (Range<Token>) rangesAndHashes[i]; + String hash = (String) rangesAndHashes[i + 1]; + MerkleTree tree = trees.addMerkleTree(2, MerkleTree.RECOMMENDED_DEPTH, range); + tree.get(range.left).hash(hash.getBytes()); + } + + return new TreeResponse(addr, trees); + } + + public static SyncNodePair pair(InetAddressAndPort node1, InetAddressAndPort node2) + { + return new SyncNodePair(node1, node2); + } + + public static Map<SyncNodePair, SyncTask> toMap(List<SyncTask> tasks) + { + Map<SyncNodePair, SyncTask> map = new HashMap(); + for (SyncTask task : tasks) + { + SyncTask oldTask = map.put(task.nodePair, task); + Assert.assertNull(String.format("\nNode pair: %s\nOld task: %s\nNew task: %s\n", + task.nodePair, + oldTask, + task), + oldTask); + } + return map; + } + + public static Predicate<InetAddressAndPort> transientPredicate(InetAddressAndPort... transientNodes) + { + Set<InetAddressAndPort> set = new HashSet<>(); + for (InetAddressAndPort node : transientNodes) + set.add(node); + + return set::contains; + } + + public static Predicate<InetAddressAndPort> noTransient() + { + return node -> false; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org