Updated Branches: refs/heads/cassandra-1.1 4c98854bb -> 00748405b
Add nodetool options to repair specific ranges. Patch by brandonwilliams, reviewed by yukim for CASSANDRA-5280 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/00748405 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/00748405 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/00748405 Branch: refs/heads/cassandra-1.1 Commit: 00748405bc374ee8f5ade4891a6a1445df072344 Parents: 4c98854 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Fri Feb 22 15:43:42 2013 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Fri Feb 22 15:48:18 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/AntiEntropyService.java | 2 +- .../apache/cassandra/service/StorageService.java | 30 +++++++++++- .../cassandra/service/StorageServiceMBean.java | 5 ++ src/java/org/apache/cassandra/tools/NodeCmd.java | 9 +++- src/java/org/apache/cassandra/tools/NodeProbe.java | 38 +++++++++++++++ 6 files changed, 81 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4e1db62..1fe1160 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.1.11 * cli: Add JMX authentication support (CASSANDRA-5080) + * nodetool: ability to repair specific range (CASSANDRA-5280) 1.1.10 http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/service/AntiEntropyService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java index f3ca1c2..5c59954 100644 --- a/src/java/org/apache/cassandra/service/AntiEntropyService.java +++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java @@ -169,7 +169,7 @@ public class AntiEntropyService throw new IllegalArgumentException("Requested range intersects a local range but is not fully contained in one; this would lead to imprecise repair"); } } - if (rangeSuperSet == null || !replicaSets.containsKey(toRepair)) + if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet)) return Collections.emptySet(); Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(rangeSuperSet)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 54d1c0b..05401e0 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1854,11 +1854,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public int forceRepairAsync(final String tableName, final boolean isSequential, final boolean primaryRange, final String... columnFamilies) { + final Collection<Range<Token>> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(tableName); + return forceRepairAsync(tableName, isSequential, ranges, columnFamilies); + } + + public int forceRepairAsync(final String tableName, final boolean isSequential, final Collection<Range<Token>> ranges, final String... columnFamilies) + { if (Table.SYSTEM_TABLE.equals(tableName)) return 0; final int cmd = nextRepairCommand.incrementAndGet(); - final Collection<Range<Token>> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(tableName); if (ranges.size() > 0) { new Thread(new WrappedRunnable() @@ -1872,7 +1877,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size()); for (Range<Token> range : ranges) { - AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies); + AntiEntropyService.RepairFuture future; + try + { + future = forceTableRepair(range, tableName, isSequential, columnFamilies); + } + catch (IllegalArgumentException e) + { + message = String.format("Repair session failed with error: %s", e); + sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()}); + continue; + } if (future == null) continue; futures.add(future); @@ -1914,6 +1929,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return cmd; } + public int forceRepairRangeAsync(String beginToken, String endToken, final String tableName, boolean isSequential, final String... columnFamilies) + { + Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken); + Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken); + + logger_.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}", + new Object[] {parsedBeginToken, parsedEndToken, tableName, columnFamilies}); + return forceRepairAsync(tableName, isSequential, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies); + } + + /** * Trigger proactive repair for a table and column families. * @param tableName http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index c34faf3..1261d2a 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -256,6 +256,11 @@ public interface StorageServiceMBean extends NotificationEmitter public int forceRepairAsync(String tableName, boolean isSequential, boolean primaryRange, String... columnFamilies); /** + * Same as forceRepairAsync, but handles a specified range + */ + public int forceRepairRangeAsync(String beginToken, String endToken, final String tableName, boolean isSequential, final String... columnFamilies); + + /** * Triggers proactive repair for given column families, or all columnfamilies for the given table * if none are explicitly listed. * @param tableName http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index 723bdf8..99cbab1 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -55,6 +55,8 @@ public class NodeCmd private static final Pair<String, String> PASSWORD_OPT = new Pair<String, String>("pw", "password"); private static final Pair<String, String> TAG_OPT = new Pair<String, String>("t", "tag"); private static final Pair<String, String> PRIMARY_RANGE_OPT = new Pair<String, String>("pr", "partitioner-range"); + private static final Pair<String, String> START_TOKEN_OPT = new Pair<String, String>("st", "start-token"); + private static final Pair<String, String> END_TOKEN_OPT = new Pair<String, String>("et", "end-token"); private static final Pair<String, String> SNAPSHOT_REPAIR_OPT = new Pair<String, String>("snapshot", "with-snapshot"); private static final String DEFAULT_HOST = "127.0.0.1"; @@ -76,6 +78,8 @@ public class NodeCmd options.addOption(TAG_OPT, true, "optional name to give a snapshot"); options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned by the partitioner for the node"); options.addOption(SNAPSHOT_REPAIR_OPT, false, "repair one node at a time using snapshots"); + options.addOption(START_TOKEN_OPT, true, "token at which repair range starts"); + options.addOption(END_TOKEN_OPT, true, "token at which repair range ends"); } public NodeCmd(NodeProbe probe) @@ -1041,7 +1045,10 @@ public class NodeCmd case REPAIR : boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left); boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left); - probe.forceRepairAsync(System.out, keyspace, snapshot, primaryRange, columnFamilies); + if (cmd.hasOption(START_TOKEN_OPT.left) || cmd.hasOption(END_TOKEN_OPT.left)) + probe.forceRepairRangeAsync(System.out, keyspace, snapshot, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies); + else + probe.forceRepairAsync(System.out, keyspace, snapshot, primaryRange, columnFamilies); break; case FLUSH : try { probe.forceTableFlush(keyspace, columnFamilies); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 036d653..44e64c4 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -231,6 +231,29 @@ public class NodeProbe } } + public void forceRepairRangeAsync(final PrintStream out, final String tableName, boolean isSequential, final String startToken, final String endToken, String... columnFamilies) throws IOException + { + RepairRunner runner = new RepairRunner(out, tableName, columnFamilies); + try + { + ssProxy.addNotificationListener(runner, null, null); + if (!runner.repairRangeAndWait(ssProxy, isSequential, startToken, endToken)) + failed = true; + } + catch (Exception e) + { + throw new IOException(e) ; + } + finally + { + try + { + ssProxy.removeNotificationListener(runner); + } + catch (ListenerNotFoundException ignored) {} + } + } + public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException { ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies); @@ -835,6 +858,21 @@ class RepairRunner implements NotificationListener return success; } + public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, String startToken, String endToken) throws InterruptedException + { + cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, columnFamilies); + if (cmd > 0) + { + condition.await(); + } + else + { + String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace); + out.println(message); + } + return success; + } + public void handleNotification(Notification notification, Object handback) { if ("repair".equals(notification.getType()))