Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: src/java/org/apache/cassandra/service/StorageService.java src/java/org/apache/cassandra/service/StorageServiceMBean.java src/java/org/apache/cassandra/tools/NodeProbe.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8b43d4a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8b43d4a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8b43d4a Branch: refs/heads/cassandra-2.1 Commit: f8b43d4a811b0a7d9e88fb19d0aa4a6bf9117cc7 Parents: 136042e e20810c Author: Yuki Morishita <yu...@apache.org> Authored: Tue Jan 6 14:26:05 2015 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Jan 6 14:26:05 2015 -0600 ---------------------------------------------------------------------- .../cassandra/service/StorageService.java | 22 ++++++++++++++------ .../cassandra/service/StorageServiceMBean.java | 9 ++++---- .../org/apache/cassandra/tools/NodeProbe.java | 4 ++-- 3 files changed, 23 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8b43d4a/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index cc23712,8085d7b..b961381 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -2495,39 -2412,27 +2495,43 @@@ public class StorageService extends Not sendNotification(jmxNotification); } - public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final boolean primaryRange, final String... columnFamilies) + public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException { - return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, fullRepair, columnFamilies); - return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), dataCenters, hosts, primaryRange, columnFamilies); ++ return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), dataCenters, hosts, primaryRange, fullRepair, columnFamilies); } - public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) - public int forceRepairAsync(final String keyspace, final int parallelismDegree, final Collection<String> dataCenters, final Collection<String> hosts, final boolean primaryRange, final String... columnFamilies) ++ public int forceRepairAsync(String keyspace, int parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) { + if (parallelismDegree < 0 || parallelismDegree > RepairParallelism.values().length - 1) + { + throw new IllegalArgumentException("Invalid parallelism degree specified: " + parallelismDegree); + } - // when repairing only primary range, dataCenter nor hosts can be set - if (primaryRange && (dataCenters != null || hosts != null)) + Collection<Range<Token>> ranges; + if (primaryRange) + { + // when repairing only primary range, neither dataCenters nor hosts can be set + if (dataCenters == null && hosts == null) + ranges = getPrimaryRanges(keyspace); + // except dataCenters only contain local DC (i.e. -local) + else if (dataCenters != null && dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter())) + ranges = getPrimaryRangesWithinDC(keyspace); + else + throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster."); + } + else { - throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster."); + ranges = getLocalRanges(keyspace); } - final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace); - return forceRepairAsync(keyspace, RepairParallelism.values()[parallelismDegree], dataCenters, hosts, ranges, columnFamilies); + - return forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, ranges, fullRepair, columnFamilies); ++ return forceRepairAsync(keyspace, RepairParallelism.values()[parallelismDegree], dataCenters, hosts, ranges, fullRepair, columnFamilies); } - public int forceRepairAsync(final String keyspace, final RepairParallelism parallelismDegree, final Collection<String> dataCenters, final Collection<String> hosts, final Collection<Range<Token>> ranges, final String... columnFamilies) + public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies) + { + return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, ranges, fullRepair, columnFamilies); + } + + public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies) { if (ranges.isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2) return 0; @@@ -2580,21 -2471,62 +2584,27 @@@ return cmd; } - public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies) + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws IOException { - return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, fullRepair, columnFamilies); - return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), dataCenters, hosts, columnFamilies); ++ return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), dataCenters, hosts, fullRepair, columnFamilies); } - public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) - public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, int parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies) ++ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, int parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) { + if (parallelismDegree < 0 || parallelismDegree > RepairParallelism.values().length - 1) + { + throw new IllegalArgumentException("Invalid parallelism degree specified: " + parallelismDegree); + } Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken); logger.info("starting user-requested repair of range {} for keyspace {} and column families {}", - repairingRange, keyspaceName, columnFamilies); + repairingRange, keyspaceName, columnFamilies); - return forceRepairAsync(keyspaceName, parallelismDegree, dataCenters, hosts, repairingRange, fullRepair, columnFamilies); + + RepairParallelism parallelism = RepairParallelism.values()[parallelismDegree]; - if (!FBUtilities.isUnix() && parallelism != RepairParallelism.PARALLEL) - { - logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair."); - parallelism = RepairParallelism.PARALLEL; - } - return forceRepairAsync(keyspaceName, parallelism, dataCenters, hosts, repairingRange, columnFamilies); - } - - public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) - { - Set<String> dataCenters = null; - if (isLocal) - { - dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter()); - } - return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential, dataCenters, null, columnFamilies); ++ return forceRepairAsync(keyspaceName, parallelism, dataCenters, hosts, repairingRange, fullRepair, columnFamilies); } - /** - * Trigger proactive repair for a keyspace and column families. - */ - public void forceKeyspaceRepair(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException - { - forceKeyspaceRepairRange(keyspaceName, getLocalRanges(keyspaceName), isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, isLocal, columnFamilies); - } - - public void forceKeyspaceRepairPrimaryRange(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException - { - // primary range repair can only be performed for whole cluster. - // NOTE: we should omit the param but keep API as is for now. - if (isLocal) - { - throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster."); - } - - forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, false, columnFamilies); - } - - public void forceKeyspaceRepairRange(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies) { Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8b43d4a/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java index a661b97,10d17fd..e0441fb --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@@ -283,9 -266,10 +281,10 @@@ public interface StorageServiceMBean ex * type: "repair" * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status * + * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel * @return Repair command number, or 0 if nothing to repair */ - public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies); - public int forceRepairAsync(String keyspace, int parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRange, String... columnFamilies); ++ public int forceRepairAsync(String keyspace, int parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies); /** * Same as forceRepairAsync, but handles a specified range @@@ -294,8 -278,10 +293,10 @@@ /** * Same as forceRepairAsync, but handles a specified range + * + * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel */ - public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies); - public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, int parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies); ++ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, int parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies); /** * Invoke repair asynchronously. http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8b43d4a/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java index 9c9e93d,6b28f18..00f9686 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@@ -1307,16 -1075,16 +1307,16 @@@ class RepairRunner implements Notificat this.columnFamilies = columnFamilies; } - public boolean repairAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRangeOnly) throws Exception + public boolean repairAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRangeOnly, boolean fullRepair) throws Exception { - cmd = ssProxy.forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, primaryRangeOnly, fullRepair, columnFamilies); - cmd = ssProxy.forceRepairAsync(keyspace, parallelismDegree.ordinal(), dataCenters, hosts, primaryRangeOnly, columnFamilies); ++ cmd = ssProxy.forceRepairAsync(keyspace, parallelismDegree.ordinal(), dataCenters, hosts, primaryRangeOnly, fullRepair, columnFamilies); waitForRepair(); return success; } - public boolean repairRangeAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, String startToken, String endToken) throws Exception + public boolean repairRangeAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, String startToken, String endToken, boolean fullRepair) throws Exception { - cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, parallelismDegree, dataCenters, hosts, fullRepair, columnFamilies); - cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, parallelismDegree.ordinal(), dataCenters, hosts, columnFamilies); ++ cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, parallelismDegree.ordinal(), dataCenters, hosts, fullRepair, columnFamilies); waitForRepair(); return success; }