Allow specifying datacentersto participate in a repair patch by Jimmy MÃ¥rdell; reviewed by Lyuben Todorov for CASSANDRA-6218
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f065cbfe Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f065cbfe Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f065cbfe Branch: refs/heads/trunk Commit: f065cbfe058a2b0bb58ed53602afe0f12942d525 Parents: d41a746 Author: Jonathan Ellis <jbel...@apache.org> Authored: Fri Nov 29 13:26:16 2013 -0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Fri Nov 29 13:26:16 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/repair/RepairSession.java | 10 ++-- .../cassandra/service/ActiveRepairService.java | 27 +++++++--- .../cassandra/service/StorageService.java | 54 ++++++++++++++++++-- .../cassandra/service/StorageServiceMBean.java | 17 ++++++ .../org/apache/cassandra/tools/NodeCmd.java | 12 ++++- .../org/apache/cassandra/tools/NodeProbe.java | 16 +++--- .../apache/cassandra/tools/NodeToolHelp.yaml | 4 ++ .../service/AntiEntropyServiceTestAbstract.java | 8 +-- 9 files changed, 119 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 644c6b3..12469f2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.4 + * Allow specifying datacenters to participate in a repair (CASSANDRA-6218) * Fix divide-by-zero in PCI (CASSANDRA-6403) * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284) * Add sub-ms precision formats to the timestamp parser (CASSANDRA-6395) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 18688f9..ebcd3f4 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -102,15 +102,15 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan * @param range range to repair * @param keyspace name of keyspace * @param isSequential true if performing repair on snapshots sequentially - * @param isLocal true if you want to perform repair only inside the data center + * @param dataCenters the data centers that should be part of the repair; null for all DCs * @param cfnames names of columnfamilies */ - public RepairSession(Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String... cfnames) + public RepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String... cfnames) { - this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, isLocal, cfnames); + this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, dataCenters, cfnames); } - public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String[] cfnames) + public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String[] cfnames) { this.id = id; this.isSequential = isSequential; @@ -118,7 +118,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan this.cfnames = cfnames; assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it"; this.range = range; - this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, isLocal); + this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, dataCenters); } public UUID getId() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 2f16b31..b77f216 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -21,6 +21,7 @@ import java.net.InetAddress; import java.util.*; import java.util.concurrent.*; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor; @@ -90,9 +91,9 @@ public class ActiveRepairService * * @return Future for asynchronous call or null if there is no need to repair */ - public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, boolean isLocal, String... cfnames) + public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String... cfnames) { - RepairSession session = new RepairSession(range, keyspace, isSequential, isLocal, cfnames); + RepairSession session = new RepairSession(range, keyspace, isSequential, dataCenters, cfnames); if (session.endpoints.isEmpty()) return null; RepairFuture futureTask = new RepairFuture(session); @@ -126,7 +127,7 @@ public class ActiveRepairService // add it to the sessions (avoid NPE in tests) RepairFuture submitArtificialRepairSession(RepairJobDesc desc) { - RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, false, new String[]{desc.columnFamily}); + RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, null, new String[]{desc.columnFamily}); sessions.put(session.getId(), session); RepairFuture futureTask = new RepairFuture(session); executor.execute(futureTask); @@ -138,12 +139,15 @@ public class ActiveRepairService * * @param keyspaceName keyspace to repair * @param toRepair token to repair - * @param isLocal need to use only nodes from local datacenter + * @param dataCenters the data centers to involve in the repair * * @return neighbors with whom we share the provided range */ - public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, boolean isLocal) + public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token> toRepair, Collection<String> dataCenters) { + if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter())) + throw new IllegalArgumentException("The local data center must be part of the repair"); + StorageService ss = StorageService.instance; Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName); Range<Token> rangeSuperSet = null; @@ -165,11 +169,18 @@ public class ActiveRepairService Set<InetAddress> neighbors = new HashSet<>(replicaSets.get(rangeSuperSet)); neighbors.remove(FBUtilities.getBroadcastAddress()); - if (isLocal) + if (dataCenters != null) { TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology(); - Set<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter())); - return Sets.intersection(neighbors, localEndpoints); + Set<InetAddress> dcEndpoints = Sets.newHashSet(); + Multimap<String,InetAddress> dcEndpointsMap = topology.getDatacenterEndpoints(); + for (String dc : dataCenters) + { + Collection<InetAddress> c = dcEndpointsMap.get(dc); + if (c != null) + dcEndpoints.addAll(c); + } + return Sets.intersection(neighbors, dcEndpoints); } return neighbors; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 418a496..5b3a3e1 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2251,6 +2251,26 @@ public class StorageService extends NotificationBroadcasterSupport implements IE jmxNotification.setUserData(userObject); sendNotification(jmxNotification); } + + public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final boolean primaryRange, final String... columnFamilies) + { + final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace); + return forceRepairAsync(keyspace, isSequential, dataCenters, ranges, columnFamilies); + } + + public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<Range<Token>> ranges, final String... columnFamilies) + { + if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty()) + return 0; + + final int cmd = nextRepairCommand.incrementAndGet(); + if (ranges.size() > 0) + { + new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies)).start(); + } + return cmd; + } + public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies) { final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace); @@ -2270,6 +2290,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return cmd; } + public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies) + { + Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken); + Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken); + + logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}", + parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies); + return forceRepairAsync(keyspaceName, isSequential, dataCenters, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies); + } + public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) { Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken); @@ -2316,6 +2346,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final boolean isLocal, final String... columnFamilies) { + Set<String> dataCenters = null; + if (isLocal) + { + dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter()); + } + return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies); + } + + private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String> dataCenters, final String... columnFamilies) + { return new FutureTask<Object>(new WrappedRunnable() { protected void runMayThrow() throws Exception @@ -2324,13 +2364,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.info(message); sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()}); + if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter())) + { + message = String.format("Cancelling repair command #%d (the local data center must be part of the repair)", cmd); + logger.error(message); + sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); + return; + } + List<RepairFuture> futures = new ArrayList<RepairFuture>(ranges.size()); for (Range<Token> range : ranges) { RepairFuture future; try { - future = forceKeyspaceRepair(range, keyspace, isSequential, isLocal, columnFamilies); + future = forceKeyspaceRepair(range, keyspace, isSequential, dataCenters, columnFamilies); } catch (IllegalArgumentException e) { @@ -2380,7 +2428,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE }, null); } - public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException + public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies) throws IOException { ArrayList<String> names = new ArrayList<String>(); for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies)) @@ -2394,7 +2442,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return null; } - return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential, isLocal, names.toArray(new String[names.size()])); + return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential, dataCenters, names.toArray(new String[names.size()])); } public void forceTerminateAllRepairSessions() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 2dd8b00..be1b0aa 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -256,6 +256,23 @@ public interface StorageServiceMBean extends NotificationEmitter * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status * * @return Repair command number, or 0 if nothing to repair + */ + public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies); + + /** + * Same as forceRepairAsync, but handles a specified range + */ + public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies); + + + /** + * Invoke repair asynchronously. + * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean. + * Notification format is: + * type: "repair" + * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status + * + * @return Repair command number, or 0 if nothing to repair * @see #forceKeyspaceRepair(String, boolean, boolean, String...) */ public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index 022f9c0..5c071b6 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -67,6 +67,7 @@ public class NodeCmd private static final Pair<String, String> PRIMARY_RANGE_OPT = Pair.create("pr", "partitioner-range"); private static final Pair<String, String> PARALLEL_REPAIR_OPT = Pair.create("par", "parallel"); private static final Pair<String, String> LOCAL_DC_REPAIR_OPT = Pair.create("local", "in-local-dc"); + private static final Pair<String, String> DC_REPAIR_OPT = Pair.create("dc", "in-dc"); private static final Pair<String, String> START_TOKEN_OPT = Pair.create("st", "start-token"); private static final Pair<String, String> END_TOKEN_OPT = Pair.create("et", "end-token"); private static final Pair<String, String> UPGRADE_ALL_SSTABLE_OPT = Pair.create("a", "include-all-sstables"); @@ -92,6 +93,7 @@ public class NodeCmd options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned by the partitioner for the node"); options.addOption(PARALLEL_REPAIR_OPT, false, "repair nodes in parallel."); options.addOption(LOCAL_DC_REPAIR_OPT, false, "only repair against nodes in the same datacenter"); + options.addOption(DC_REPAIR_OPT, true, "only repair against nodes in the specified datacenters (comma separated)"); options.addOption(START_TOKEN_OPT, true, "token at which repair range starts"); options.addOption(END_TOKEN_OPT, true, "token at which repair range ends"); options.addOption(UPGRADE_ALL_SSTABLE_OPT, false, "includes sstables that are already on the most recent version during upgradesstables"); @@ -1480,11 +1482,17 @@ public class NodeCmd case REPAIR : boolean sequential = !cmd.hasOption(PARALLEL_REPAIR_OPT.left); boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left); + boolean specificDC = cmd.hasOption(DC_REPAIR_OPT.left); boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left); + Collection<String> dataCenters = null; + if (specificDC) + dataCenters = Arrays.asList(cmd.getOptionValue(DC_REPAIR_OPT.left).split(",")); + else if (localDC) + dataCenters = Arrays.asList(probe.getDataCenter()); if (cmd.hasOption(START_TOKEN_OPT.left) || cmd.hasOption(END_TOKEN_OPT.left)) - probe.forceRepairRangeAsync(System.out, keyspace, sequential, localDC, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies); + probe.forceRepairRangeAsync(System.out, keyspace, sequential, dataCenters, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies); else - probe.forceRepairAsync(System.out, keyspace, sequential, localDC, primaryRange, columnFamilies); + probe.forceRepairAsync(System.out, keyspace, sequential, dataCenters, primaryRange, columnFamilies); break; case FLUSH : try { probe.forceKeyspaceFlush(keyspace, columnFamilies); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 0008325..d784f29 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -215,14 +215,14 @@ public class NodeProbe ssProxy.forceKeyspaceRepair(keyspaceName, isSequential, isLocal, columnFamilies); } - public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies) throws IOException + public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies) throws IOException { RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies); try { jmxc.addConnectionNotificationListener(runner, null, null); ssProxy.addNotificationListener(runner, null, null); - if (!runner.repairAndWait(ssProxy, isSequential, isLocal, primaryRange)) + if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, primaryRange)) failed = true; } catch (Exception e) @@ -240,14 +240,14 @@ public class NodeProbe } } - public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, boolean isLocal, final String startToken, final String endToken, String... columnFamilies) throws IOException + public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String startToken, final String endToken, String... columnFamilies) throws IOException { RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies); try { jmxc.addConnectionNotificationListener(runner, null, null); ssProxy.addNotificationListener(runner, null, null); - if (!runner.repairRangeAndWait(ssProxy, isSequential, isLocal, startToken, endToken)) + if (!runner.repairRangeAndWait(ssProxy, isSequential, dataCenters, startToken, endToken)) failed = true; } catch (Exception e) @@ -1009,16 +1009,16 @@ class RepairRunner implements NotificationListener this.columnFamilies = columnFamilies; } - public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, boolean primaryRangeOnly) throws Exception + public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, boolean primaryRangeOnly) throws Exception { - cmd = ssProxy.forceRepairAsync(keyspace, isSequential, isLocal, primaryRangeOnly, columnFamilies); + cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, primaryRangeOnly, columnFamilies); waitForRepair(); return success; } - public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, String startToken, String endToken) throws Exception + public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, String startToken, String endToken) throws Exception { - cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, isLocal, columnFamilies); + cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, columnFamilies); waitForRepair(); return success; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml ---------------------------------------------------------------------- diff --git a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml index 632d7e1..54d0884 100644 --- a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml +++ b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml @@ -148,8 +148,12 @@ commands: - name: repair [keyspace] [cfnames] help: | Repair one or more column families + Use -dc to repair specific datacenters (csv list). + Use -et to specify a token at which repair range ends. + Use -local to only repair against nodes in the same datacenter. Use -pr to repair only the first range returned by the partitioner. Use -par to carry out a parallel repair. + Use -st to specify a token at which the repair range starts. - name: cleanup [keyspace] [cfnames] help: | Run cleanup on one or more column families http://git-wip-us.apache.org/repos/asf/cassandra/blob/f065cbfe/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java index 4023910..1123fc0 100644 --- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java +++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java @@ -124,7 +124,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader Set<InetAddress> neighbors = new HashSet<InetAddress>(); for (Range<Token> range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, false)); + neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null)); } assertEquals(expected, neighbors); } @@ -147,7 +147,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader Set<InetAddress> neighbors = new HashSet<InetAddress>(); for (Range<Token> range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, false)); + neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null)); } assertEquals(expected, neighbors); } @@ -169,7 +169,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader Set<InetAddress> neighbors = new HashSet<InetAddress>(); for (Range<Token> range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, true)); + neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()))); } assertEquals(expected, neighbors); } @@ -197,7 +197,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader Set<InetAddress> neighbors = new HashSet<InetAddress>(); for (Range<Token> range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, true)); + neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()))); } assertEquals(expected, neighbors); }