Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 303ff22dd -> 434b5d683
  refs/heads/cassandra-2.1 4f381a2a6 -> 40f8ebae6
  refs/heads/trunk 7d8ba3be5 -> 65b19cae3


Improve validation of sub range repair

also prevent "-pr" repair not to work with "-dc/-hosts/-local".
patch by yukim; reviewed by krummas for CASSANDRA-7317


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/434b5d68
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/434b5d68
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/434b5d68

Branch: refs/heads/cassandra-2.0
Commit: 434b5d683ec7520acf1a5a2d421ee5aba2ede0e8
Parents: 303ff22
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Jun 19 10:47:14 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Jun 19 10:47:14 2014 -0500

----------------------------------------------------------------------
 .../cassandra/service/ActiveRepairService.java  |  3 -
 .../cassandra/service/StorageService.java       | 98 +++++++++++++-------
 .../org/apache/cassandra/tools/NodeCmd.java     |  3 +
 3 files changed, 70 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/434b5d68/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 00e43ea..aac9f9a 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -146,9 +146,6 @@ public class ActiveRepairService
      */
     public static Set<InetAddress> getNeighbors(String keyspaceName, 
Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts)
     {
-        if (dataCenters != null && 
!dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
-            throw new IllegalArgumentException("The local data center must be 
part of the repair");
-
         StorageService ss = StorageService.instance;
         Map<Range<Token>, List<InetAddress>> replicaSets = 
ss.getRangeToAddressMap(keyspaceName);
         Range<Token> rangeSuperSet = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/434b5d68/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 05cc8d7..13dd3b7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2488,6 +2488,11 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public int forceRepairAsync(final String keyspace, final boolean 
isSequential, final Collection<String> dataCenters, final Collection<String> 
hosts, final boolean primaryRange, final String... columnFamilies)
     {
+        // when repairing only primary range, dataCenter nor hosts can be set
+        if (primaryRange && (dataCenters != null || hosts != null))
+        {
+            throw new IllegalArgumentException("You need to run primary range 
repair on all nodes in the cluster.");
+        }
         final Collection<Range<Token>> ranges = primaryRange ? 
getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
         return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, 
ranges, columnFamilies);
     }
@@ -2507,6 +2512,11 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public int forceRepairAsync(final String keyspace, final boolean 
isSequential, final boolean isLocal, final boolean primaryRange, final 
String... columnFamilies)
     {
+        // when repairing only primary range, you cannot repair only on local 
DC
+        if (primaryRange && isLocal)
+        {
+            throw new IllegalArgumentException("You need to run primary range 
repair on all nodes in the cluster.");
+        }
         final Collection<Range<Token>> ranges = primaryRange ? 
getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
         return forceRepairAsync(keyspace, isSequential, isLocal, ranges, 
columnFamilies);
     }
@@ -2528,30 +2538,25 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public int forceRepairRangeAsync(String beginToken, String endToken, final 
String keyspaceName, boolean isSequential, Collection<String> dataCenters, 
final Collection<String> hosts, final String... columnFamilies)
     {
-        Token parsedBeginToken = 
getPartitioner().getTokenFactory().fromString(beginToken);
-        Token parsedEndToken = 
getPartitioner().getTokenFactory().fromString(endToken);
+        Collection<Range<Token>> repairingRange = 
createRepairRangeFrom(beginToken, endToken);
 
-        logger.info("starting user-requested repair of range ({}, {}] for 
keyspace {} and column families {}",
-                    parsedBeginToken, parsedEndToken, keyspaceName, 
columnFamilies);
-        return forceRepairAsync(keyspaceName, isSequential, dataCenters, 
hosts, Collections.singleton(new Range<Token>(parsedBeginToken, 
parsedEndToken)), columnFamilies);
+        logger.info("starting user-requested repair of range {} for keyspace 
{} and column families {}",
+                    repairingRange, keyspaceName, columnFamilies);
+        return forceRepairAsync(keyspaceName, isSequential, dataCenters, 
hosts, repairingRange, columnFamilies);
     }
 
     public int forceRepairRangeAsync(String beginToken, String endToken, final 
String keyspaceName, boolean isSequential, boolean isLocal, final String... 
columnFamilies)
     {
-        Token parsedBeginToken = 
getPartitioner().getTokenFactory().fromString(beginToken);
-        Token parsedEndToken = 
getPartitioner().getTokenFactory().fromString(endToken);
-
-        logger.info("starting user-requested repair of range ({}, {}] for 
keyspace {} and column families {}",
-                    parsedBeginToken, parsedEndToken, keyspaceName, 
columnFamilies);
-        return forceRepairAsync(keyspaceName, isSequential, isLocal, 
Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), 
columnFamilies);
+        Set<String> dataCenters = null;
+        if (isLocal)
+        {
+            dataCenters = 
Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
+        }
+        return forceRepairRangeAsync(beginToken, endToken, keyspaceName, 
isSequential, dataCenters, null, columnFamilies);
     }
 
-
     /**
      * Trigger proactive repair for a keyspace and column families.
-     * @param keyspaceName
-     * @param columnFamilies
-     * @throws IOException
      */
     public void forceKeyspaceRepair(final String keyspaceName, boolean 
isSequential, boolean isLocal, final String... columnFamilies) throws 
IOException
     {
@@ -2560,17 +2565,23 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public void forceKeyspaceRepairPrimaryRange(final String keyspaceName, 
boolean isSequential, boolean isLocal, final String... columnFamilies) throws 
IOException
     {
-        forceKeyspaceRepairRange(keyspaceName, 
getLocalPrimaryRanges(keyspaceName), isSequential, isLocal, columnFamilies);
+        // primary range repair can only be performed for whole cluster.
+        // NOTE: we should omit the param but keep API as is for now.
+        if (isLocal)
+        {
+            throw new IllegalArgumentException("You need to run primary range 
repair on all nodes in the cluster.");
+        }
+
+        forceKeyspaceRepairRange(keyspaceName, 
getLocalPrimaryRanges(keyspaceName), isSequential, false, columnFamilies);
     }
 
     public void forceKeyspaceRepairRange(String beginToken, String endToken, 
final String keyspaceName, boolean isSequential, boolean isLocal, final 
String... columnFamilies) throws IOException
     {
-        Token parsedBeginToken = 
getPartitioner().getTokenFactory().fromString(beginToken);
-        Token parsedEndToken = 
getPartitioner().getTokenFactory().fromString(endToken);
+        Collection<Range<Token>> repairingRange = 
createRepairRangeFrom(beginToken, endToken);
 
-        logger.info("starting user-requested repair of range ({}, {}] for 
keyspace {} and column families {}",
-                    parsedBeginToken, parsedEndToken, keyspaceName, 
columnFamilies);
-        forceKeyspaceRepairRange(keyspaceName, Collections.singleton(new 
Range<Token>(parsedBeginToken, parsedEndToken)), isSequential, isLocal, 
columnFamilies);
+        logger.info("starting user-requested repair of range {} for keyspace 
{} and column families {}",
+                           repairingRange, keyspaceName, columnFamilies);
+        forceKeyspaceRepairRange(keyspaceName, repairingRange, isSequential, 
isLocal, columnFamilies);
     }
 
     public void forceKeyspaceRepairRange(final String keyspaceName, final 
Collection<Range<Token>> ranges, boolean isSequential, boolean isLocal, final 
String... columnFamilies) throws IOException
@@ -2580,6 +2591,34 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         createRepairTask(nextRepairCommand.incrementAndGet(), keyspaceName, 
ranges, isSequential, isLocal, columnFamilies).run();
     }
 
+    /**
+     * Create collection of ranges that match ring layout from given tokens.
+     *
+     * @param beginToken beginning token of the range
+     * @param endToken end token of the range
+     * @return collection of ranges that match ring layout in TokenMetadata
+     */
+    @SuppressWarnings("unchecked")
+    private Collection<Range<Token>> createRepairRangeFrom(String beginToken, 
String endToken)
+    {
+        Token parsedBeginToken = 
getPartitioner().getTokenFactory().fromString(beginToken);
+        Token parsedEndToken = 
getPartitioner().getTokenFactory().fromString(endToken);
+
+        Deque<Range<Token>> repairingRange = new ArrayDeque<>();
+        // Break up given range to match ring layout in TokenMetadata
+        Token previous = 
tokenMetadata.getPredecessor(TokenMetadata.firstToken(tokenMetadata.sortedTokens(),
 parsedEndToken));
+        while (parsedBeginToken.compareTo(previous) < 0)
+        {
+            repairingRange.addFirst(new Range<>(previous, parsedEndToken));
+
+            parsedEndToken = previous;
+            previous = tokenMetadata.getPredecessor(previous);
+        }
+        repairingRange.addFirst(new Range<>(parsedBeginToken, parsedEndToken));
+
+        return repairingRange;
+    }
+
     private FutureTask<Object> createRepairTask(final int cmd, final String 
keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, 
final boolean isLocal, final String... columnFamilies)
     {
         Set<String> dataCenters = null;
@@ -2592,7 +2631,12 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     private FutureTask<Object> createRepairTask(final int cmd, final String 
keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, 
final Collection<String> dataCenters, final Collection<String> hosts, final 
String... columnFamilies)
     {
-        return new FutureTask<Object>(new WrappedRunnable()
+        if (dataCenters != null && 
!dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+        {
+            throw new IllegalArgumentException("the local data center must be 
part of the repair");
+        }
+
+        return new FutureTask<>(new WrappedRunnable()
         {
             protected void runMayThrow() throws Exception
             {
@@ -2600,15 +2644,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 logger.info(message);
                 sendNotification("repair", message, new int[]{cmd, 
ActiveRepairService.Status.STARTED.ordinal()});
 
-                if (dataCenters != null && 
!dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
-                {
-                    message = String.format("Cancelling repair command #%d 
(the local data center must be part of the repair)", cmd);
-                    logger.error(message);
-                    sendNotification("repair", message, new int[]{cmd, 
ActiveRepairService.Status.FINISHED.ordinal()});
-                    return;
-                }
-
-                List<RepairFuture> futures = new 
ArrayList<RepairFuture>(ranges.size());
+                List<RepairFuture> futures = new ArrayList<>(ranges.size());
                 for (Range<Token> range : ranges)
                 {
                     RepairFuture future;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/434b5d68/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java 
b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 213e4b4..afa42dd 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -1659,6 +1659,9 @@ public class NodeCmd
                     Collection<String> dataCenters = null;
                     Collection<String> hosts = null;
 
+                    if (primaryRange && (localDC || specificDC || 
specificHosts))
+                        throw new RuntimeException("Primary range repair 
should be performed on all nodes in the cluster.");
+
                     if (specificDC)
                         dataCenters = 
Arrays.asList(cmd.getOptionValue(DC_REPAIR_OPT.left).split(","));
                     else if (localDC)

Reply via email to