Add nodetool options to repair specific ranges.
Patch by brandonwilliams, reviewed by yukim for CASSANDRA-5280


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/00748405
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/00748405
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/00748405

Branch: refs/heads/cassandra-1.2
Commit: 00748405bc374ee8f5ade4891a6a1445df072344
Parents: 4c98854
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Fri Feb 22 15:43:42 2013 -0600
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Fri Feb 22 15:48:18 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/service/AntiEntropyService.java      |    2 +-
 .../apache/cassandra/service/StorageService.java   |   30 +++++++++++-
 .../cassandra/service/StorageServiceMBean.java     |    5 ++
 src/java/org/apache/cassandra/tools/NodeCmd.java   |    9 +++-
 src/java/org/apache/cassandra/tools/NodeProbe.java |   38 +++++++++++++++
 6 files changed, 81 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4e1db62..1fe1160 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.1.11
  * cli: Add JMX authentication support (CASSANDRA-5080)
+ * nodetool: ability to repair specific range (CASSANDRA-5280)
 
 
 1.1.10

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java 
b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index f3ca1c2..5c59954 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -169,7 +169,7 @@ public class AntiEntropyService
                 throw new IllegalArgumentException("Requested range intersects 
a local range but is not fully contained in one; this would lead to imprecise 
repair");
             }
         }
-        if (rangeSuperSet == null || !replicaSets.containsKey(toRepair))
+        if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet))
             return Collections.emptySet();
 
         Set<InetAddress> neighbors = new 
HashSet<InetAddress>(replicaSets.get(rangeSuperSet));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 54d1c0b..05401e0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1854,11 +1854,16 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public int forceRepairAsync(final String tableName, final boolean 
isSequential, final boolean primaryRange, final String... columnFamilies)
     {
+        final Collection<Range<Token>> ranges = primaryRange ? 
Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(tableName);
+        return forceRepairAsync(tableName, isSequential, ranges, 
columnFamilies);
+    }
+
+    public int forceRepairAsync(final String tableName, final boolean 
isSequential, final Collection<Range<Token>> ranges, final String... 
columnFamilies)
+    {
         if (Table.SYSTEM_TABLE.equals(tableName))
             return 0;
 
         final int cmd = nextRepairCommand.incrementAndGet();
-        final Collection<Range<Token>> ranges = primaryRange ? 
Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(tableName);
         if (ranges.size() > 0)
         {
             new Thread(new WrappedRunnable()
@@ -1872,7 +1877,17 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                     List<AntiEntropyService.RepairFuture> futures = new 
ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
                     for (Range<Token> range : ranges)
                     {
-                        AntiEntropyService.RepairFuture future = 
forceTableRepair(range, tableName, isSequential, columnFamilies);
+                        AntiEntropyService.RepairFuture future;
+                        try
+                        {
+                            future = forceTableRepair(range, tableName, 
isSequential, columnFamilies);
+                        }
+                        catch (IllegalArgumentException e)
+                        {
+                            message = String.format("Repair session failed 
with error: %s", e);
+                            sendNotification("repair", message, new int[]{cmd, 
AntiEntropyService.Status.SESSION_FAILED.ordinal()});
+                            continue;
+                        }
                         if (future == null)
                             continue;
                         futures.add(future);
@@ -1914,6 +1929,17 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return cmd;
     }
 
+    public int forceRepairRangeAsync(String beginToken, String endToken, final 
String tableName, boolean isSequential, final String... columnFamilies)
+    {
+        Token parsedBeginToken = 
getPartitioner().getTokenFactory().fromString(beginToken);
+        Token parsedEndToken = 
getPartitioner().getTokenFactory().fromString(endToken);
+
+        logger_.info("starting user-requested repair of range ({}, {}] for 
keyspace {} and column families {}",
+                new Object[] {parsedBeginToken, parsedEndToken, tableName, 
columnFamilies});
+        return forceRepairAsync(tableName, isSequential, 
Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), 
columnFamilies);
+    }
+
+
     /**
      * Trigger proactive repair for a table and column families.
      * @param tableName

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index c34faf3..1261d2a 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -256,6 +256,11 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     public int forceRepairAsync(String tableName, boolean isSequential, 
boolean primaryRange, String... columnFamilies);
 
     /**
+     * Same as forceRepairAsync, but handles a specified range
+     */
+    public int forceRepairRangeAsync(String beginToken, String endToken, final 
String tableName, boolean isSequential, final String... columnFamilies);
+
+    /**
      * Triggers proactive repair for given column families, or all 
columnfamilies for the given table
      * if none are explicitly listed.
      * @param tableName

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java 
b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 723bdf8..99cbab1 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -55,6 +55,8 @@ public class NodeCmd
     private static final Pair<String, String> PASSWORD_OPT = new Pair<String, 
String>("pw", "password");
     private static final Pair<String, String> TAG_OPT = new Pair<String, 
String>("t", "tag");
     private static final Pair<String, String> PRIMARY_RANGE_OPT = new 
Pair<String, String>("pr", "partitioner-range");
+    private static final Pair<String, String> START_TOKEN_OPT = new 
Pair<String, String>("st", "start-token");
+    private static final Pair<String, String> END_TOKEN_OPT = new Pair<String, 
String>("et", "end-token");
     private static final Pair<String, String> SNAPSHOT_REPAIR_OPT = new 
Pair<String, String>("snapshot", "with-snapshot");
 
     private static final String DEFAULT_HOST = "127.0.0.1";
@@ -76,6 +78,8 @@ public class NodeCmd
         options.addOption(TAG_OPT,      true, "optional name to give a 
snapshot");
         options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first 
range returned by the partitioner for the node");
         options.addOption(SNAPSHOT_REPAIR_OPT, false, "repair one node at a 
time using snapshots");
+        options.addOption(START_TOKEN_OPT, true, "token at which repair range 
starts");
+        options.addOption(END_TOKEN_OPT, true, "token at which repair range 
ends");
     }
 
     public NodeCmd(NodeProbe probe)
@@ -1041,7 +1045,10 @@ public class NodeCmd
                 case REPAIR  :
                     boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left);
                     boolean primaryRange = 
cmd.hasOption(PRIMARY_RANGE_OPT.left);
-                    probe.forceRepairAsync(System.out, keyspace, snapshot, 
primaryRange, columnFamilies);
+                    if (cmd.hasOption(START_TOKEN_OPT.left) || 
cmd.hasOption(END_TOKEN_OPT.left))
+                        probe.forceRepairRangeAsync(System.out, keyspace, 
snapshot, cmd.getOptionValue(START_TOKEN_OPT.left), 
cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
+                    else
+                        probe.forceRepairAsync(System.out, keyspace, snapshot, 
primaryRange, columnFamilies);
                     break;
                 case FLUSH   :
                     try { probe.forceTableFlush(keyspace, columnFamilies); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 036d653..44e64c4 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -231,6 +231,29 @@ public class NodeProbe
         }
     }
 
+    public void forceRepairRangeAsync(final PrintStream out, final String 
tableName, boolean isSequential, final String startToken, final String 
endToken, String... columnFamilies) throws IOException
+    {
+        RepairRunner runner = new RepairRunner(out, tableName, columnFamilies);
+        try
+        {
+            ssProxy.addNotificationListener(runner, null, null);
+            if (!runner.repairRangeAndWait(ssProxy,  isSequential, startToken, 
endToken))
+                failed = true;
+        }
+        catch (Exception e)
+        {
+            throw new IOException(e) ;
+        }
+        finally
+        {
+            try
+            {
+                ssProxy.removeNotificationListener(runner);
+            }
+            catch (ListenerNotFoundException ignored) {}
+        }
+    }
+
     public void forceTableRepairPrimaryRange(String tableName, boolean 
isSequential, String... columnFamilies) throws IOException
     {
         ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, 
columnFamilies);
@@ -835,6 +858,21 @@ class RepairRunner implements NotificationListener
         return success;
     }
 
+    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean 
isSequential, String startToken, String endToken) throws InterruptedException
+    {
+        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, 
isSequential, columnFamilies);
+        if (cmd > 0)
+        {
+            condition.await();
+        }
+        else
+        {
+            String message = String.format("[%s] Nothing to repair for 
keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
+            out.println(message);
+        }
+        return success;
+    }
+
     public void handleNotification(Notification notification, Object handback)
     {
         if ("repair".equals(notification.getType()))

Reply via email to