Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d693ca12 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d693ca12 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d693ca12 Branch: refs/heads/cassandra-2.2 Commit: d693ca12c76c2651df1769e137a94b954174e061 Parents: adb8831 be9eff5 Author: Yuki Morishita <yu...@apache.org> Authored: Tue May 19 08:50:28 2015 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue May 19 08:50:28 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 4 +-- .../db/compaction/CompactionManager.java | 8 ++--- .../cassandra/db/compaction/Scrubber.java | 12 ++++--- .../cassandra/service/StorageService.java | 7 +++- .../cassandra/service/StorageServiceMBean.java | 2 ++ .../org/apache/cassandra/tools/NodeProbe.java | 8 ++--- .../org/apache/cassandra/tools/NodeTool.java | 7 +++- .../cassandra/tools/StandaloneScrubber.java | 6 +++- .../unit/org/apache/cassandra/db/ScrubTest.java | 38 +++++++++++++------- 10 files changed, 63 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d693ca12/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 198935b,cf124b4..6fc1c9c --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -30,6 -3,10 +30,7 @@@ Merged from 2.0 * Clone SliceQueryFilter in AbstractReadCommand implementations (CASSANDRA-8940) * Push correct protocol notification for DROP INDEX (CASSANDRA-9310) * token-generator - generated tokens too long (CASSANDRA-9300) + * Add option not to validate atoms during scrub (CASSANDRA-9406) - - -2.0.15: * Fix counting of tombstones for TombstoneOverwhelmingException (CASSANDRA-9299) * Fix ReconnectableSnitch reconnecting to peers during upgrade (CASSANDRA-6702) * Include keyspace and table name in error log for collections over the size http://git-wip-us.apache.org/repos/asf/cassandra/blob/d693ca12/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index bdc2d8b,eec4044..0951c01 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -1394,22 -1117,22 +1394,22 @@@ public class ColumnFamilyStore implemen return maxFile; } - public void forceCleanup(CounterId.OneShotRenewer renewer) throws ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus forceCleanup() throws ExecutionException, InterruptedException { - CompactionManager.instance.performCleanup(ColumnFamilyStore.this, renewer); + return CompactionManager.instance.performCleanup(ColumnFamilyStore.this); } - public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted) throws ExecutionException, InterruptedException - public void scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData) throws ExecutionException, InterruptedException ++ public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData) throws ExecutionException, InterruptedException { // skip snapshot creation during scrub, SEE JIRA 5891 if(!disableSnapshot) snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis()); - return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted); - CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData); ++ return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData); } - public void sstablesRewrite(boolean excludeCurrentVersion) throws ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion) throws ExecutionException, InterruptedException { - CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion); + return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion); } public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d693ca12/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index c7232a0,207b90d..47bd2d6 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -246,83 -205,42 +246,83 @@@ public class CompactionManager implemen } } - private static interface AllSSTablesOperation - { - public void perform(ColumnFamilyStore store, Iterable<SSTableReader> sstables) throws IOException; - } - - private void performAllSSTableOperation(final ColumnFamilyStore cfs, final AllSSTablesOperation operation) throws InterruptedException, ExecutionException + private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation) throws ExecutionException, InterruptedException { - final Iterable<SSTableReader> sstables = cfs.markAllCompacting(); - if (sstables == null) - return; - - Callable<Object> runnable = new Callable<Object>() + Iterable<SSTableReader> compactingSSTables = cfs.markAllCompacting(); + if (compactingSSTables == null) { - public Object call() throws IOException + logger.info("Aborting operation on {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name); + return AllSSTableOpStatus.ABORTED; + } + if (Iterables.isEmpty(compactingSSTables)) + { + logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name); + return AllSSTableOpStatus.SUCCESSFUL; + } + try + { + Iterable<SSTableReader> sstables = operation.filterSSTables(compactingSSTables); + List<Future<Object>> futures = new ArrayList<>(); + + for (final SSTableReader sstable : sstables) { - try + if (executor.isShutdown()) { - operation.perform(cfs, sstables); + logger.info("Executor has shut down, not submitting task"); + return AllSSTableOpStatus.ABORTED; } - finally + + futures.add(executor.submit(new Callable<Object>() { - cfs.getDataTracker().unmarkCompacting(sstables); - } - return this; + @Override + public Object call() throws Exception + { + operation.execute(sstable); + return this; + } + })); } - }; - executor.submit(runnable).get(); + + for (Future<Object> f : futures) + f.get(); + } + finally + { + cfs.getDataTracker().unmarkCompacting(compactingSSTables); + } + return AllSSTableOpStatus.SUCCESSFUL; } - public void performScrub(ColumnFamilyStore cfStore, final boolean skipCorrupted, final boolean checkData) throws InterruptedException, ExecutionException + private static interface OneSSTableOperation { - performAllSSTableOperation(cfStore, new AllSSTablesOperation() + Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input); + void execute(SSTableReader input) throws IOException; + } + + public enum AllSSTableOpStatus { ABORTED(1), SUCCESSFUL(0); + public final int statusCode; + + AllSSTableOpStatus(int statusCode) { - public void perform(ColumnFamilyStore store, Iterable<SSTableReader> sstables) throws IOException + this.statusCode = statusCode; + } + } + - public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted) throws InterruptedException, ExecutionException ++ public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData) throws InterruptedException, ExecutionException + { + assert !cfs.isIndex(); + return parallelAllSSTableOperation(cfs, new OneSSTableOperation() + { + @Override + public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input) { - doScrub(store, sstables, skipCorrupted, checkData); + return input; + } + + @Override + public void execute(SSTableReader input) throws IOException + { - scrubOne(cfs, input, skipCorrupted); ++ scrubOne(cfs, input, skipCorrupted, checkData); } }); } @@@ -639,9 -425,23 +639,9 @@@ } } - private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted) throws IOException - /** - * Deserialize everything in the CFS and re-serialize w/ the newest version. Also attempts to recover - * from bogus row keys / sizes using data from the index, and skips rows with garbage columns that resulted - * from early ByteBuffer bugs. - * - * @throws IOException - */ - private void doScrub(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, boolean skipCorrupted, boolean checkData) throws IOException - { - assert !cfs.isIndex(); - for (final SSTableReader sstable : sstables) - scrubOne(cfs, sstable, skipCorrupted, checkData); - } - + private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean checkData) throws IOException { - Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false); - Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, checkData); ++ Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false, checkData); CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo(); metrics.beginCompaction(scrubInfo); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d693ca12/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java index e8814e4,e5bcd25..ec0532c --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@@ -36,10 -33,11 +36,11 @@@ import org.apache.cassandra.utils.Outpu public class Scrubber implements Closeable { - public final ColumnFamilyStore cfs; - public final SSTableReader sstable; - public final File destination; - public final boolean skipCorrupted; + private final ColumnFamilyStore cfs; + private final SSTableReader sstable; + private final File destination; + private final boolean skipCorrupted; + public final boolean validateColumns; private final CompactionController controller; private final boolean isCommutative; @@@ -74,18 -71,18 +75,19 @@@ }; private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator); - public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline) throws IOException - public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean checkData) throws IOException ++ public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException { - this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), isOffline); - this(cfs, sstable, skipCorrupted, checkData, new OutputHandler.LogOutput(), false); ++ this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData); } - public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline) throws IOException - public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean checkData, OutputHandler outputHandler, boolean isOffline) throws IOException ++ public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData) throws IOException { this.cfs = cfs; this.sstable = sstable; this.outputHandler = outputHandler; this.skipCorrupted = skipCorrupted; + this.isOffline = isOffline; + this.validateColumns = checkData; List<SSTableReader> toScrub = Collections.singletonList(sstable); @@@ -184,15 -196,15 +186,15 @@@ } if (dataSize > dataFile.length()) - throw new IOError(new IOException("Impossible row size (greater than file length): " + dataSize)); + throw new IOError(new IOException("Impossible row size " + dataSize)); if (dataStart != dataStartFromIndex) - outputHandler.warn(String.format("Data file row position %d differs from index file row position %d", dataStart, dataSizeFromIndex)); + outputHandler.warn(String.format("Data file row position %d different from index file row position %d", dataStart, dataSizeFromIndex)); if (dataSize != dataSizeFromIndex) - outputHandler.warn(String.format("Data file row size %d differs from index file row size %d", dataSize, dataSizeFromIndex)); + outputHandler.warn(String.format("Data file row size %d different from index file row size %d", dataSize, dataSizeFromIndex)); - SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true); + SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, validateColumns); if (prevKey != null && prevKey.compareTo(key) > 0) { saveOutOfOrderRow(prevKey, key, atoms); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d693ca12/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 1ea915b,62b0c75..7c8e424 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -2289,38 -2179,28 +2289,43 @@@ public class StorageService extends Not if (keyspaceName.equals(Keyspace.SYSTEM_KS)) throw new RuntimeException("Cleanup of the system keyspace is neither necessary nor wise"); - CounterId.OneShotRenewer counterIdRenewer = new CounterId.OneShotRenewer(); + CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies)) { - cfStore.forceCleanup(counterIdRenewer); + CompactionManager.AllSSTableOpStatus oneStatus = cfStore.forceCleanup(); + if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) + status = oneStatus; } + return status.statusCode; } - public void scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - scrub(disableSnapshot, skipCorrupted, true, keyspaceName, columnFamilies); ++ return scrub(disableSnapshot, skipCorrupted, true, keyspaceName, columnFamilies); + } + - public void scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException ++ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies)) - cfStore.scrub(disableSnapshot, skipCorrupted, checkData); + { - CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted); ++ CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData); + if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) + status = oneStatus; + } + return status.statusCode; } - public void upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { + CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, true, keyspaceName, columnFamilies)) - cfStore.sstablesRewrite(excludeCurrentVersion); + { + CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(excludeCurrentVersion); + if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) + status = oneStatus; + } + return status.statusCode; } public void forceKeyspaceCompaction(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException http://git-wip-us.apache.org/repos/asf/cassandra/blob/d693ca12/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java index ab34e1b,57780a3..1f86d82 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@@ -256,7 -231,9 +256,9 @@@ public interface StorageServiceMBean ex * * Scrubbed CFs will be snapshotted first, if disableSnapshot is false */ + @Deprecated - public void scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; - public void scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; ++ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; /** * Rewrite all sstables to the latest version. http://git-wip-us.apache.org/repos/asf/cassandra/blob/d693ca12/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java index 5b97e79,e8e087f..6e7179a --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@@ -223,49 -186,21 +223,49 @@@ public class NodeProbe implements AutoC jmxc.close(); } - public void forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + return ssProxy.forceKeyspaceCleanup(keyspaceName, columnFamilies); + } + - public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException ++ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { - return ssProxy.scrub(disableSnapshot, skipCorrupted, keyspaceName, columnFamilies); ++ return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies); + } + + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies); + } + + public void forceKeyspaceCleanup(PrintStream out, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - ssProxy.forceKeyspaceCleanup(keyspaceName, columnFamilies); + if (forceKeyspaceCleanup(keyspaceName, columnFamilies) != 0) + { + failed = true; + out.println("Aborted cleaning up atleast one column family in keyspace "+keyspaceName+", check server logs for more information."); + } } - public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException - public void scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException ++ public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - if (scrub(disableSnapshot, skipCorrupted, keyspaceName, columnFamilies) != 0) - ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies); ++ if (scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies) != 0) + { + failed = true; + out.println("Aborted scrubbing atleast one column family in keyspace "+keyspaceName+", check server logs for more information."); + } } - public void upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies); + if (upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies) != 0) + { + failed = true; + out.println("Aborted upgrading sstables for atleast one column family in keyspace "+keyspaceName+", check server logs for more information."); + } } + public void forceKeyspaceCompaction(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { ssProxy.forceKeyspaceCompaction(keyspaceName, columnFamilies);