Staggering repair patch by Vijay and Sylvain Lebresne; reviewed by Sylvain Lebresne for CASSANDRA-3721
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1ae32c93 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1ae32c93 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1ae32c93 Branch: refs/heads/trunk Commit: 1ae32c93eca24a5bdab7332a56418a12b4e6586b Parents: c49a149 Author: Vijay Parthasarathy <vijay2...@gmail.com> Authored: Tue Feb 14 10:48:12 2012 -0800 Committer: Vijay Parthasarathy <vijay2...@gmail.com> Committed: Tue Feb 14 10:49:22 2012 -0800 ---------------------------------------------------------------------- .../org/apache/cassandra/db/ColumnFamilyStore.java | 8 + src/java/org/apache/cassandra/db/Directories.java | 15 + .../cassandra/db/compaction/CompactionManager.java | 37 ++- .../cassandra/service/AntiEntropyService.java | 223 ++++++++++++-- .../apache/cassandra/service/StorageService.java | 13 +- .../cassandra/service/StorageServiceMBean.java | 4 +- src/java/org/apache/cassandra/tools/NodeCmd.java | 7 +- src/java/org/apache/cassandra/tools/NodeProbe.java | 8 +- 8 files changed, 256 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index bf4a000..ab3b064 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1423,6 +1423,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } + public List<SSTableReader> getSnapshotSSTableReader(String tag) throws IOException + { + List<SSTableReader> readers = new ArrayList<SSTableReader>(); + for (Map.Entry<Descriptor, Set<Component>> entries : directories.sstableLister().snapshots(tag).list().entrySet()) + readers.add(SSTableReader.open(entries.getKey(), entries.getValue(), metadata, partitioner)); + return readers; + } + /** * Take a snap shot of this columnfamily store. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index 2afefd2..7c51830 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -185,6 +185,7 @@ public class Directories private int nbFiles; private final Map<Descriptor, Set<Component>> components = new HashMap<Descriptor, Set<Component>>(); private boolean filtered; + private String snapshotName; public SSTableLister skipCompacted(boolean b) { @@ -219,6 +220,14 @@ public class Directories return this; } + public SSTableLister snapshots(String sn) + { + if (filtered) + throw new IllegalStateException("list() has already been called"); + snapshotName = sn; + return this; + } + public Map<Descriptor, Set<Component>> list() { filter(); @@ -246,6 +255,12 @@ public class Directories for (File location : sstableDirectories) { + if (snapshotName != null) + { + new File(location, join(SNAPSHOT_SUBDIR, snapshotName)).listFiles(getFilter()); + continue; + } + if (!onlyBackups) location.listFiles(getFilter()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index c02aed2..448c569 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -799,23 +799,33 @@ public class CompactionManager implements CompactionManagerMBean if (!cfs.isValid()) return; - // flush first so everyone is validating data that is as similar as possible - try - { - StorageService.instance.forceTableFlush(cfs.table.name, cfs.getColumnFamilyName()); - } - catch (ExecutionException e) + Collection<SSTableReader> sstables; + if (cfs.table.snapshotExists(validator.request.sessionid)) { - throw new IOException(e); + // If there is a snapshot created for the session then read from there. + sstables = cfs.getSnapshotSSTableReader(validator.request.sessionid); } - catch (InterruptedException e) + else { - throw new AssertionError(e); + // flush first so everyone is validating data that is as similar as possible + try + { + StorageService.instance.forceTableFlush(cfs.table.name, cfs.getColumnFamilyName()); + } + catch (ExecutionException e) + { + throw new IOException(e); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + + // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced + // instead so they won't be cleaned up if they do get compacted during the validation + sstables = cfs.markCurrentSSTablesReferenced(); } - // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced - // instead so they won't be cleaned up if they do get compacted during the validation - Collection<SSTableReader> sstables = cfs.markCurrentSSTablesReferenced(); CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.request.range); CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); validationExecutor.beginCompaction(ci); @@ -838,6 +848,9 @@ public class CompactionManager implements CompactionManagerMBean { SSTableReader.releaseReferences(sstables); iter.close(); + if (cfs.table.snapshotExists(validator.request.sessionid)) + cfs.table.clearSnapshot(validator.request.sessionid); + validationExecutor.finishCompaction(ci); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/service/AntiEntropyService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java index 812c23a..10ea80c 100644 --- a/src/java/org/apache/cassandra/service/AntiEntropyService.java +++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java @@ -38,6 +38,7 @@ import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Table; +import org.apache.cassandra.db.SnapshotCommand; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.Range; @@ -47,6 +48,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.CompactEndpointSerializationHelper; +import org.apache.cassandra.net.IAsyncCallback; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -119,9 +121,9 @@ public class AntiEntropyService /** * Requests repairs for the given table and column families, and blocks until all repairs have been completed. */ - public RepairFuture submitRepairSession(Range<Token> range, String tablename, String... cfnames) + public RepairFuture submitRepairSession(Range<Token> range, String tablename, boolean isSequential, String... cfnames) { - RepairFuture futureTask = new RepairSession(range, tablename, cfnames).getFuture(); + RepairFuture futureTask = new RepairSession(range, tablename, isSequential, cfnames).getFuture(); executor.execute(futureTask); return futureTask; } @@ -209,16 +211,6 @@ public class AntiEntropyService } /** - * Requests a tree from the given node, and returns the request that was sent. - */ - TreeRequest request(String sessionid, InetAddress remote, Range<Token> range, String ksname, String cfname) - { - TreeRequest request = new TreeRequest(sessionid, remote, range, new CFPair(ksname, cfname)); - MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(request, Gossiper.instance.getVersion(remote)), remote); - return request; - } - - /** * Responds to the node that requested the given valid tree. * @param validator A locally generated validator * @param local localhost (parameterized for testing) @@ -598,6 +590,7 @@ public class AntiEntropyService static class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener { private final String sessionName; + private final boolean isSequential; private final String tablename; private final String[] cfnames; private final Range<Token> range; @@ -615,18 +608,19 @@ public class AntiEntropyService public RepairSession(TreeRequest req, String tablename, String... cfnames) { - this(req.sessionid, req.range, tablename, cfnames); + this(req.sessionid, req.range, tablename, false, cfnames); AntiEntropyService.instance.sessions.put(getName(), this); } - public RepairSession(Range<Token> range, String tablename, String... cfnames) + public RepairSession(Range<Token> range, String tablename, boolean isSequential, String... cfnames) { - this(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()).toString(), range, tablename, cfnames); + this(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()).toString(), range, tablename, isSequential, cfnames); } - private RepairSession(String id, Range<Token> range, String tablename, String[] cfnames) + private RepairSession(String id, Range<Token> range, String tablename, boolean isSequential, String[] cfnames) { this.sessionName = id; + this.isSequential = isSequential; this.tablename = tablename; this.cfnames = cfnames; assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it"; @@ -674,6 +668,12 @@ public class AntiEntropyService logger.info(String.format("[repair #%s] Cannot proceed on repair because a neighbor (%s) is dead: session failed", getName(), endpoint)); return; } + + if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_11 && isSequential) + { + logger.info(String.format("[repair #%s] Cannot repair using snapshots as node %s is pre-1.1", getName(), endpoint)); + return; + } } AntiEntropyService.instance.sessions.put(getName(), this); @@ -729,6 +729,8 @@ public class AntiEntropyService public void terminate() { terminated = true; + for (RepairJob job : jobs) + job.terminate(); jobs.clear(); activeJobs.clear(); } @@ -810,17 +812,32 @@ public class AntiEntropyService { private final String cfname; // first we send tree requests. this tracks the endpoints remaining to hear from - private final Set<InetAddress> remainingEndpoints = new HashSet<InetAddress>(); + private final RequestCoordinator<TreeRequest> treeRequests; // tree responses are then tracked here private final List<TreeResponse> trees = new ArrayList<TreeResponse>(endpoints.size() + 1); // once all responses are received, each tree is compared with each other, and differencer tasks // are submitted. the job is done when all differencers are complete. - private final Set<Differencer> remainingDifferencers = new HashSet<Differencer>(); + private final RequestCoordinator<Differencer> differencers; private final Condition requestsSent = new SimpleCondition(); + private CountDownLatch snapshotLatch = null; public RepairJob(String cfname) { this.cfname = cfname; + this.treeRequests = new RequestCoordinator<TreeRequest>(isSequential) + { + public void send(TreeRequest r) + { + MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(r, Gossiper.instance.getVersion(r.endpoint)), r.endpoint); + } + }; + this.differencers = new RequestCoordinator<Differencer>(isSequential) + { + public void send(Differencer d) + { + StageManager.getStage(Stage.ANTI_ENTROPY).execute(d); + } + }; } /** @@ -828,17 +845,51 @@ public class AntiEntropyService */ public void sendTreeRequests() { - remainingEndpoints.addAll(endpoints); - remainingEndpoints.add(FBUtilities.getBroadcastAddress()); - // send requests to all nodes - for (InetAddress endpoint : remainingEndpoints) - AntiEntropyService.instance.request(getName(), endpoint, range, tablename, cfname); + List<InetAddress> allEndpoints = new ArrayList<InetAddress>(endpoints); + allEndpoints.add(FBUtilities.getBroadcastAddress()); - logger.info(String.format("[repair #%s] requests for merkle tree sent for %s (to %s)", getName(), cfname, remainingEndpoints)); + if (isSequential) + makeSnapshots(endpoints); + + for (InetAddress endpoint : allEndpoints) + treeRequests.add(new TreeRequest(getName(), endpoint, range, new CFPair(tablename, cfname))); + + logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", getName(), cfname, allEndpoints)); + treeRequests.start(); requestsSent.signalAll(); } + public void makeSnapshots(Collection<InetAddress> endpoints) + { + try + { + snapshotLatch = new CountDownLatch(endpoints.size()); + IAsyncCallback callback = new IAsyncCallback() + { + @Override + public boolean isLatencyForSnitch() + { + return false; + } + + @Override + public void response(Message msg) + { + RepairJob.this.snapshotLatch.countDown(); + } + }; + for (InetAddress endpoint : endpoints) + MessagingService.instance().sendRR(new SnapshotCommand(tablename, cfname, sessionName, false), endpoint, callback); + snapshotLatch.await(); + snapshotLatch = null; + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + /** * Add a new received tree and return the number of remaining tree to * be received for the job to be complete. @@ -859,8 +910,7 @@ public class AntiEntropyService assert request.cf.right.equals(cfname); trees.add(new TreeResponse(request.endpoint, tree)); - remainingEndpoints.remove(request.endpoint); - return remainingEndpoints.size(); + return treeRequests.completed(request); } /** @@ -869,8 +919,6 @@ public class AntiEntropyService */ public void submitDifferencers() { - assert remainingEndpoints.isEmpty(); - // We need to difference all trees one against another for (int i = 0; i < trees.size() - 1; ++i) { @@ -880,10 +928,10 @@ public class AntiEntropyService TreeResponse r2 = trees.get(j); Differencer differencer = new Differencer(cfname, r1, r2); logger.debug("Queueing comparison {}", differencer); - remainingDifferencers.add(differencer); - StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer); + differencers.add(differencer); } } + differencers.start(); trees.clear(); // allows gc to do its thing } @@ -892,8 +940,16 @@ public class AntiEntropyService */ synchronized boolean completedSynchronization(Differencer differencer) { - remainingDifferencers.remove(differencer); - return remainingDifferencers.isEmpty(); + return differencers.completed(differencer) == 0; + } + + public void terminate() + { + if (snapshotLatch != null) + { + while (snapshotLatch.getCount() > 0) + snapshotLatch.countDown(); + } } } @@ -992,4 +1048,107 @@ public class AntiEntropyService this.session = session; } } + + public static abstract class RequestCoordinator<R> + { + private final Order<R> orderer; + + protected RequestCoordinator(boolean isSequential) + { + this.orderer = isSequential ? new SequentialOrder<R>(this) : new ParallelOrder<R>(this); + } + + public abstract void send(R request); + + public void add(R request) + { + orderer.add(request); + } + + public void start() + { + orderer.start(); + } + + // Returns how many request remains + public int completed(R request) + { + return orderer.completed(request); + } + + private static abstract class Order<R> + { + protected final RequestCoordinator<R> coordinator; + + Order(RequestCoordinator<R> coordinator) + { + this.coordinator = coordinator; + } + + public abstract void add(R request); + public abstract void start(); + public abstract int completed(R request); + } + + private static class SequentialOrder<R> extends Order<R> + { + private final Queue<R> requests = new LinkedList<R>(); + + SequentialOrder(RequestCoordinator<R> coordinator) + { + super(coordinator); + } + + public void add(R request) + { + requests.add(request); + } + + public void start() + { + if (requests.isEmpty()) + return; + + coordinator.send(requests.peek()); + } + + public int completed(R request) + { + assert request.equals(requests.peek()); + requests.poll(); + int remaining = requests.size(); + if (remaining != 0) + coordinator.send(requests.peek()); + return remaining; + } + } + + private static class ParallelOrder<R> extends Order<R> + { + private final Set<R> requests = new HashSet<R>(); + + ParallelOrder(RequestCoordinator<R> coordinator) + { + super(coordinator); + } + + public void add(R request) + { + requests.add(request); + } + + public void start() + { + for (R request : requests) + coordinator.send(request); + } + + public int completed(R request) + { + requests.remove(request); + return requests.size(); + } + } + + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index c1681b9..58986a5 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1827,12 +1827,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe * @param columnFamilies * @throws IOException */ - public void forceTableRepair(final String tableName, final String... columnFamilies) throws IOException + public void forceTableRepair(final String tableName, boolean isSequential, final String... columnFamilies) throws IOException { if (Table.SYSTEM_TABLE.equals(tableName)) return; - Collection<Range<Token>> ranges = getLocalRanges(tableName); int cmd = nextRepairCommand.incrementAndGet(); logger_.info("Starting repair command #{}, repairing {} ranges.", cmd, ranges.size()); @@ -1840,7 +1839,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size()); for (Range<Token> range : ranges) { - AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, columnFamilies); + AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies); futures.add(future); // wait for a session to be done with its differencing before starting the next one try @@ -1875,12 +1874,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe logger_.info("Repair command #{} completed successfully", cmd); } - public void forceTableRepairPrimaryRange(final String tableName, final String... columnFamilies) throws IOException + public void forceTableRepairPrimaryRange(final String tableName, boolean isSequential, final String... columnFamilies) throws IOException { if (Table.SYSTEM_TABLE.equals(tableName)) return; - AntiEntropyService.RepairFuture future = forceTableRepair(getLocalPrimaryRange(), tableName, columnFamilies); + AntiEntropyService.RepairFuture future = forceTableRepair(getLocalPrimaryRange(), tableName, isSequential, columnFamilies); try { future.get(); @@ -1892,7 +1891,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe } } - public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, final String... columnFamilies) throws IOException + public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException { ArrayList<String> names = new ArrayList<String>(); for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies)) @@ -1900,7 +1899,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe names.add(cfStore.getColumnFamilyName()); } - return AntiEntropyService.instance.submitRepairSession(range, tableName, names.toArray(new String[names.size()])); + return AntiEntropyService.instance.submitRepairSession(range, tableName, isSequential, names.toArray(new String[names.size()])); } public void forceTerminateAllRepairSessions() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 6af63b7..c5aa9fd 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -231,12 +231,12 @@ public interface StorageServiceMBean * @param columnFamilies * @throws IOException */ - public void forceTableRepair(String tableName, String... columnFamilies) throws IOException; + public void forceTableRepair(String tableName, boolean isSequential, String... columnFamilies) throws IOException; /** * Triggers proactive repair but only for the node primary range. */ - public void forceTableRepairPrimaryRange(String tableName, String... columnFamilies) throws IOException; + public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException; public void forceTerminateAllRepairSessions(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index 2b19074..8a224b8 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -53,6 +53,7 @@ public class NodeCmd private static final Pair<String, String> PASSWORD_OPT = new Pair<String, String>("pw", "password"); private static final Pair<String, String> TAG_OPT = new Pair<String, String>("t", "tag"); private static final Pair<String, String> PRIMARY_RANGE_OPT = new Pair<String, String>("pr", "partitioner-range"); + private static final Pair<String, String> SNAPSHOT_REPAIR_OPT = new Pair<String, String>("snapshot", "with-snapshot"); private static final String DEFAULT_HOST = "127.0.0.1"; private static final int DEFAULT_PORT = 7199; @@ -71,6 +72,7 @@ public class NodeCmd options.addOption(PASSWORD_OPT, true, "remote jmx agent password"); options.addOption(TAG_OPT, true, "optional name to give a snapshot"); options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned by the partitioner for the node"); + options.addOption(SNAPSHOT_REPAIR_OPT, false, "repair one node at a time using snapshots"); } public NodeCmd(NodeProbe probe) @@ -921,10 +923,11 @@ public class NodeCmd switch (nc) { case REPAIR : + boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left); if (cmd.hasOption(PRIMARY_RANGE_OPT.left)) - probe.forceTableRepairPrimaryRange(keyspace, columnFamilies); + probe.forceTableRepairPrimaryRange(keyspace, snapshot, columnFamilies); else - probe.forceTableRepair(keyspace, columnFamilies); + probe.forceTableRepair(keyspace, snapshot, columnFamilies); break; case FLUSH : try { probe.forceTableFlush(keyspace, columnFamilies); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1ae32c93/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 46d4c63..8739745 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -202,14 +202,14 @@ public class NodeProbe ssProxy.forceTableFlush(tableName, columnFamilies); } - public void forceTableRepair(String tableName, String... columnFamilies) throws IOException + public void forceTableRepair(String tableName, boolean isSequential, String... columnFamilies) throws IOException { - ssProxy.forceTableRepair(tableName, columnFamilies); + ssProxy.forceTableRepair(tableName, isSequential, columnFamilies); } - public void forceTableRepairPrimaryRange(String tableName, String... columnFamilies) throws IOException + public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException { - ssProxy.forceTableRepairPrimaryRange(tableName, columnFamilies); + ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies); } public void invalidateKeyCache() throws IOException