Repository: cassandra Updated Branches: refs/heads/trunk 81ac654ff -> 65440409b
Add a few options to nodetool verify Patch by marcuse; reviewed by Ariel Weisberg for CASSANDRA-14201 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65440409 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65440409 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65440409 Branch: refs/heads/trunk Commit: 65440409b4d22e7bdae99ca8232fb5eaf38f4449 Parents: 81ac654 Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Feb 6 09:58:46 2018 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Thu Feb 15 09:24:20 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 4 +- .../db/compaction/CompactionManager.java | 10 +- .../cassandra/db/compaction/Verifier.java | 215 +++++++++++++++- .../cassandra/service/StorageService.java | 26 +- .../cassandra/service/StorageServiceMBean.java | 1 + .../org/apache/cassandra/tools/NodeProbe.java | 8 +- .../cassandra/tools/StandaloneVerifier.java | 25 +- .../apache/cassandra/tools/nodetool/Verify.java | 37 ++- .../org/apache/cassandra/db/VerifyTest.java | 242 ++++++++++++++++--- .../cassandra/io/sstable/LegacySSTableTest.java | 27 ++- 11 files changed, 535 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index dd56770..5fcb335 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add a few options to nodetool verify (CASSANDRA-14201) * CVE-2017-5929 Security vulnerability and redefine default log rotation policy (CASSANDRA-14183) * Use JVM default SSL validation algorithm instead of custom default (CASSANDRA-13259) * Better document in code InetAddressAndPort usage post 7544, incorporate port into UUIDGen node (CASSANDRA-14226) http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 9815eea..444a28c 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1527,9 +1527,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return true; } - public CompactionManager.AllSSTableOpStatus verify(boolean extendedVerify) throws ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus verify(Verifier.Options options) throws ExecutionException, InterruptedException { - return CompactionManager.instance.performVerify(ColumnFamilyStore.this, extendedVerify); + return CompactionManager.instance.performVerify(ColumnFamilyStore.this, options); } public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion, int jobs) throws ExecutionException, InterruptedException http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 01fc188..36cd954 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -406,7 +406,7 @@ public class CompactionManager implements CompactionManagerMBean }, jobs, OperationType.SCRUB); } - public AllSSTableOpStatus performVerify(final ColumnFamilyStore cfs, final boolean extendedVerify) throws InterruptedException, ExecutionException + public AllSSTableOpStatus performVerify(ColumnFamilyStore cfs, Verifier.Options options) throws InterruptedException, ExecutionException { assert !cfs.isIndex(); return parallelAllSSTableOperation(cfs, new OneSSTableOperation() @@ -420,7 +420,7 @@ public class CompactionManager implements CompactionManagerMBean @Override public void execute(LifecycleTransaction input) { - verifyOne(cfs, input.onlyOne(), extendedVerify); + verifyOne(cfs, input.onlyOne(), options); } }, 0, OperationType.VERIFY); } @@ -993,15 +993,15 @@ public class CompactionManager implements CompactionManagerMBean } } - private void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean extendedVerify) + private void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, Verifier.Options options) { CompactionInfo.Holder verifyInfo = null; - try (Verifier verifier = new Verifier(cfs, sstable, false)) + try (Verifier verifier = new Verifier(cfs, sstable, false, options)) { verifyInfo = verifier.getVerifyInfo(); metrics.beginCompaction(verifyInfo); - verifier.verify(extendedVerify); + verifier.verify(); } finally { http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/db/compaction/Verifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java index 01e465e..d704c1f 100644 --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@ -22,8 +22,11 @@ import com.google.common.base.Throwables; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.IndexSummary; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.MetadataComponent; @@ -33,17 +36,25 @@ import org.apache.cassandra.io.util.DataIntegrityMetadata; import org.apache.cassandra.io.util.DataIntegrityMetadata.FileDigestValidator; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.BloomFilterSerializer; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.IFilter; import org.apache.cassandra.utils.OutputHandler; import org.apache.cassandra.utils.UUIDGen; +import java.io.BufferedInputStream; import java.io.Closeable; +import java.io.DataInputStream; import java.io.File; import java.io.IOError; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.*; import java.util.function.LongPredicate; import java.util.function.Predicate; @@ -60,18 +71,20 @@ public class Verifier implements Closeable private final RandomAccessReader indexFile; private final VerifyInfo verifyInfo; private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer; + private final Options options; + private final boolean isOffline; private int goodRows; private final OutputHandler outputHandler; private FileDigestValidator validator; - public Verifier(ColumnFamilyStore cfs, SSTableReader sstable, boolean isOffline) + public Verifier(ColumnFamilyStore cfs, SSTableReader sstable, boolean isOffline, Options options) { - this(cfs, sstable, new OutputHandler.LogOutput(), isOffline); + this(cfs, sstable, new OutputHandler.LogOutput(), isOffline, options); } - public Verifier(ColumnFamilyStore cfs, SSTableReader sstable, OutputHandler outputHandler, boolean isOffline) + public Verifier(ColumnFamilyStore cfs, SSTableReader sstable, OutputHandler outputHandler, boolean isOffline, Options options) { this.cfs = cfs; this.sstable = sstable; @@ -85,13 +98,24 @@ public class Verifier implements Closeable : sstable.openDataReader(CompactionManager.instance.getRateLimiter()); this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))); this.verifyInfo = new VerifyInfo(dataFile, sstable); + this.options = options; + this.isOffline = isOffline; } - public void verify(boolean extended) + public void verify() { + boolean extended = options.extendedVerification; long rowStart = 0; outputHandler.output(String.format("Verifying %s (%s)", sstable, FBUtilities.prettyPrintMemory(dataFile.length()))); + if (options.checkVersion && !sstable.descriptor.version.isLatestVersion()) + { + String msg = String.format("%s is not the latest version, run upgradesstables", sstable); + outputHandler.output(msg); + // don't use markAndThrow here because we don't want a CorruptSSTableException for this. + throw new RuntimeException(msg); + } + outputHandler.output(String.format("Deserializing sstable metadata for %s ", sstable)); try { @@ -108,6 +132,41 @@ public class Verifier implements Closeable } outputHandler.output(String.format("Checking computed hash of %s ", sstable)); + try + { + deserializeIndex(sstable); + } + catch (Throwable t) + { + outputHandler.debug(t.getMessage()); + markAndThrow(); + } + + try + { + deserializeIndexSummary(sstable); + + } + catch (Throwable t) + { + outputHandler.output("Index summary is corrupt - if it is removed it will get rebuilt on startup "+sstable.descriptor.filenameFor(Component.SUMMARY)); + outputHandler.debug(t.getMessage()); + markAndThrow(false); + } + + try + { + deserializeBloomFilter(sstable); + + } + catch (Throwable t) + { + outputHandler.debug(t.getMessage()); + markAndThrow(); + } + + if (options.quick) + return; // Verify will use the Digest files, which works for both compressed and uncompressed sstables try @@ -150,6 +209,8 @@ public class Verifier implements Closeable markAndThrow(); } + List<Range<Token>> ownedRanges = isOffline ? Collections.emptyList() : Range.normalize(StorageService.instance.getLocalAndPendingRanges(cfs.metadata().keyspace)); + int rangeIndex = -1; DecoratedKey prevKey = null; while (!dataFile.isEOF()) @@ -172,6 +233,16 @@ public class Verifier implements Closeable // check for null key below } + if (options.checkOwnsTokens && ownedRanges.size() > 0) + { + while (rangeIndex == -1 || !ownedRanges.get(rangeIndex).contains(key.getToken())) + { + rangeIndex++; + if (rangeIndex > ownedRanges.size() - 1) + throw new RuntimeException(String.format("Key %s in sstable %s not owned by local ranges %s", key, sstable, ownedRanges)); + } + } + ByteBuffer currentIndexKey = nextIndexKey; long nextRowPositionFromIndex = 0; try @@ -236,6 +307,44 @@ public class Verifier implements Closeable outputHandler.output("Verify of " + sstable + " succeeded. All " + goodRows + " rows read successfully"); } + private void deserializeIndex(SSTableReader sstable) throws IOException + { + try (RandomAccessReader primaryIndex = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)))) + { + long indexSize = primaryIndex.length(); + + while ((primaryIndex.getFilePointer()) != indexSize) + { + ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); + RowIndexEntry.Serializer.skip(primaryIndex, sstable.descriptor.version); + } + } + } + + private void deserializeIndexSummary(SSTableReader sstable) throws IOException + { + File file = new File(sstable.descriptor.filenameFor(Component.SUMMARY)); + TableMetadata metadata = cfs.metadata(); + try (DataInputStream iStream = new DataInputStream(Files.newInputStream(file.toPath()))) + { + try (IndexSummary indexSummary = IndexSummary.serializer.deserialize(iStream, + cfs.getPartitioner(), + metadata.params.minIndexInterval, + metadata.params.maxIndexInterval)) + { + ByteBufferUtil.readWithLength(iStream); + ByteBufferUtil.readWithLength(iStream); + } + } + } + + private void deserializeBloomFilter(SSTableReader sstable) throws IOException + { + try (DataInputStream stream = new DataInputStream(new BufferedInputStream(Files.newInputStream(Paths.get(sstable.descriptor.filenameFor(Component.FILTER))))); + IFilter bf = BloomFilterSerializer.deserialize(stream, sstable.descriptor.version.hasOldBfFormat())) + {} + } + public void close() { FileUtils.closeQuietly(dataFile); @@ -255,7 +364,7 @@ public class Verifier implements Closeable private void markAndThrow(boolean mutateRepaired) { - if (mutateRepaired) // if we are able to mutate repaired flag, an incremental repair should be enough + if (mutateRepaired && options.mutateRepairStatus) // if we are able to mutate repaired flag, an incremental repair should be enough { try { @@ -268,7 +377,11 @@ public class Verifier implements Closeable outputHandler.output("Error mutating repairedAt for SSTable " + sstable.getFilename() + ", as part of markAndThrow"); } } - throw new CorruptSSTableException(new Exception(String.format("Invalid SSTable %s, please force %srepair", sstable.getFilename(), mutateRepaired ? "" : "a full ")), sstable.getFilename()); + Exception e = new Exception(String.format("Invalid SSTable %s, please force %srepair", sstable.getFilename(), (mutateRepaired && options.mutateRepairStatus) ? "" : "a full ")); + if (options.invokeDiskFailurePolicy) + throw new CorruptSSTableException(e, sstable.getFilename()); + else + throw new RuntimeException(e); } public CompactionInfo.Holder getVerifyInfo() @@ -319,4 +432,94 @@ public class Verifier implements Closeable return time -> false; } } + + public static Options.Builder options() + { + return new Options.Builder(); + } + + public static class Options + { + public final boolean invokeDiskFailurePolicy; + public final boolean extendedVerification; + public final boolean checkVersion; + public final boolean mutateRepairStatus; + public final boolean checkOwnsTokens; + public final boolean quick; + + private Options(boolean invokeDiskFailurePolicy, boolean extendedVerification, boolean checkVersion, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick) + { + this.invokeDiskFailurePolicy = invokeDiskFailurePolicy; + this.extendedVerification = extendedVerification; + this.checkVersion = checkVersion; + this.mutateRepairStatus = mutateRepairStatus; + this.checkOwnsTokens = checkOwnsTokens; + this.quick = quick; + } + + @Override + public String toString() + { + return "Options{" + + "invokeDiskFailurePolicy=" + invokeDiskFailurePolicy + + ", extendedVerification=" + extendedVerification + + ", checkVersion=" + checkVersion + + ", mutateRepairStatus=" + mutateRepairStatus + + ", checkOwnsTokens=" + checkOwnsTokens + + ", quick=" + quick + + '}'; + } + + public static class Builder + { + private boolean invokeDiskFailurePolicy = false; // invoking disk failure policy can stop the node if we find a corrupt stable + private boolean extendedVerification = false; + private boolean checkVersion = false; + private boolean mutateRepairStatus = false; // mutating repair status can be dangerous + private boolean checkOwnsTokens = false; + private boolean quick = false; + + public Builder invokeDiskFailurePolicy(boolean param) + { + this.invokeDiskFailurePolicy = param; + return this; + } + + public Builder extendedVerification(boolean param) + { + this.extendedVerification = param; + return this; + } + + public Builder checkVersion(boolean param) + { + this.checkVersion = param; + return this; + } + + public Builder mutateRepairStatus(boolean param) + { + this.mutateRepairStatus = param; + return this; + } + + public Builder checkOwnsTokens(boolean param) + { + this.checkOwnsTokens = param; + return this; + } + + public Builder quick(boolean param) + { + this.quick = param; + return this; + } + + public Options build() + { + return new Options(invokeDiskFailurePolicy, extendedVerification, checkVersion, mutateRepairStatus, checkOwnsTokens, quick); + } + + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index e28f451..42e3ddf 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -65,6 +65,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.Verifier; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Range; @@ -161,6 +162,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddressAndPort()); } + public List<Range<Token>> getLocalAndPendingRanges(String ks) + { + InetAddressAndPort broadcastAddress = FBUtilities.getBroadcastAddressAndPort(); + Keyspace keyspace = Keyspace.open(ks); + List<Range<Token>> ranges = new ArrayList<>(); + ranges.addAll(keyspace.getReplicationStrategy().getAddressRanges().get(broadcastAddress)); + ranges.addAll(getTokenMetadata().getPendingRanges(ks, broadcastAddress)); + return Range.normalize(ranges); + } + public Collection<Range<Token>> getPrimaryRanges(String keyspace) { return getPrimaryRangesForEndpoint(keyspace, FBUtilities.getBroadcastAddressAndPort()); @@ -3135,12 +3146,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return status.statusCode; } + @Deprecated public int verify(boolean extendedVerify, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException { + return verify(extendedVerify, false, false, false, false, false, keyspaceName, tableNames); + } + + public int verify(boolean extendedVerify, boolean checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException + { CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; + Verifier.Options options = Verifier.options().invokeDiskFailurePolicy(diskFailurePolicy) + .extendedVerification(extendedVerify) + .checkVersion(checkVersion) + .mutateRepairStatus(mutateRepairStatus) + .checkOwnsTokens(checkOwnsTokens) + .quick(quick).build(); + logger.info("Verifying {}.{} with options = {}", keyspaceName, Arrays.toString(tableNames), options); for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, tableNames)) { - CompactionManager.AllSSTableOpStatus oneStatus = cfStore.verify(extendedVerify); + CompactionManager.AllSSTableOpStatus oneStatus = cfStore.verify(options); if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) status = oneStatus; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 31dedf5..0676419 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -302,6 +302,7 @@ public interface StorageServiceMBean extends NotificationEmitter * The entire sstable will be read to ensure each cell validates if extendedVerify is true */ public int verify(boolean extendedVerify, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException; + public int verify(boolean extendedVerify, boolean checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException; /** * Rewrite all sstables to the latest version. http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index c780a16..1bace2e 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -266,9 +266,9 @@ public class NodeProbe implements AutoCloseable return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, reinsertOverflowedTTL, jobs, keyspaceName, tables); } - public int verify(boolean extendedVerify, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException + public int verify(boolean extendedVerify, boolean checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException { - return ssProxy.verify(extendedVerify, keyspaceName, tableNames); + return ssProxy.verify(extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, keyspaceName, tableNames); } public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException @@ -321,9 +321,9 @@ public class NodeProbe implements AutoCloseable } } - public void verify(PrintStream out, boolean extendedVerify, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException + public void verify(PrintStream out, boolean extendedVerify, boolean checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException { - switch (verify(extendedVerify, keyspaceName, tableNames)) + switch (verify(extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, keyspaceName, tableNames)) { case 1: failed = true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/tools/StandaloneVerifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java index 40dfbf7..81e992e 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java +++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java @@ -43,6 +43,9 @@ public class StandaloneVerifier private static final String EXTENDED_OPTION = "extended"; private static final String DEBUG_OPTION = "debug"; private static final String HELP_OPTION = "help"; + private static final String CHECK_VERSION = "check_version"; + private static final String MUTATE_REPAIR_STATUS = "mutate_repair_status"; + private static final String QUICK = "quick"; public static void main(String args[]) { @@ -68,8 +71,6 @@ public class StandaloneVerifier OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug); Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true); - boolean extended = options.extended; - List<SSTableReader> sstables = new ArrayList<>(); // Verify sstables @@ -92,15 +93,20 @@ public class StandaloneVerifier e.printStackTrace(System.err); } } - + Verifier.Options verifyOptions = Verifier.options().invokeDiskFailurePolicy(false) + .extendedVerification(options.extended) + .checkVersion(options.checkVersion) + .mutateRepairStatus(options.mutateRepairStatus) + .checkOwnsTokens(false) // don't know the ranges when running offline + .build(); for (SSTableReader sstable : sstables) { try { - try (Verifier verifier = new Verifier(cfs, sstable, handler, true)) + try (Verifier verifier = new Verifier(cfs, sstable, handler, true, verifyOptions)) { - verifier.verify(extended); + verifier.verify(); } catch (CorruptSSTableException cs) { @@ -136,6 +142,9 @@ public class StandaloneVerifier public boolean debug; public boolean verbose; public boolean extended; + public boolean checkVersion; + public boolean mutateRepairStatus; + public boolean quick; private Options(String keyspaceName, String cfName) { @@ -174,6 +183,9 @@ public class StandaloneVerifier opts.debug = cmd.hasOption(DEBUG_OPTION); opts.verbose = cmd.hasOption(VERBOSE_OPTION); opts.extended = cmd.hasOption(EXTENDED_OPTION); + opts.checkVersion = cmd.hasOption(CHECK_VERSION); + opts.mutateRepairStatus = cmd.hasOption(MUTATE_REPAIR_STATUS); + opts.quick = cmd.hasOption(QUICK); return opts; } @@ -198,6 +210,9 @@ public class StandaloneVerifier options.addOption("e", EXTENDED_OPTION, "extended verification"); options.addOption("v", VERBOSE_OPTION, "verbose output"); options.addOption("h", HELP_OPTION, "display this help message"); + options.addOption("c", CHECK_VERSION, "make sure sstables are the latest version"); + options.addOption("r", MUTATE_REPAIR_STATUS, "don't mutate repair status"); + options.addOption("q", QUICK, "do a quick check, don't read all data"); return options; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/tools/nodetool/Verify.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Verify.java b/src/java/org/apache/cassandra/tools/nodetool/Verify.java index 75cb109..28b91fd 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Verify.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Verify.java @@ -34,21 +34,52 @@ public class Verify extends NodeToolCmd private List<String> args = new ArrayList<>(); @Option(title = "extended_verify", - name = {"-e", "--extended-verify"}, - description = "Verify each cell data, beyond simply checking sstable checksums") + name = {"-e", "--extended-verify"}, + description = "Verify each cell data, beyond simply checking sstable checksums") private boolean extendedVerify = false; + @Option(title = "check_version", + name = {"-c", "--check-version"}, + description = "Also check that all sstables are the latest version") + private boolean checkVersion = false; + + @Option(title = "dfp", + name = {"-d", "--dfp"}, + description = "Invoke the disk failure policy if a corrupt sstable is found") + private boolean diskFailurePolicy = false; + + @Option(title = "repair_status_change", + name = {"-r", "--rsc"}, + description = "Mutate the repair status on corrupt sstables") + private boolean mutateRepairStatus = false; + + @Option(title = "check_owns_tokens", + name = {"-t", "--check-tokens"}, + description = "Verify that all tokens in sstables are owned by this node") + private boolean checkOwnsTokens = false; + + @Option(title = "quick", + name = {"-q", "--quick"}, + description = "Do a quick check - avoid reading all data to verify checksums") + private boolean quick = false; + @Override public void execute(NodeProbe probe) { List<String> keyspaces = parseOptionalKeyspace(args, probe); String[] tableNames = parseOptionalTables(args); + if (checkOwnsTokens && !extendedVerify) + { + System.out.println("Token verification requires --extended-verify"); + System.exit(1); + } + for (String keyspace : keyspaces) { try { - probe.verify(System.out, extendedVerify, keyspace, tableNames); + probe.verify(System.out, extendedVerify, checkVersion, diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, keyspace, tableNames); } catch (Exception e) { throw new RuntimeException("Error occurred during verifying", e); http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/test/unit/org/apache/cassandra/db/VerifyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java index 8b9b437..e996082 100644 --- a/test/unit/org/apache/cassandra/db/VerifyTest.java +++ b/test/unit/org/apache/cassandra/db/VerifyTest.java @@ -28,15 +28,20 @@ import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.Verifier; import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.dht.ByteOrderedPartitioner; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.CorruptBlockException; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.commons.lang3.StringUtils; import org.junit.BeforeClass; @@ -44,6 +49,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import java.io.*; +import java.net.UnknownHostException; import java.nio.file.Files; import java.util.Collections; import java.util.concurrent.ExecutionException; @@ -112,9 +118,9 @@ public class VerifyTest SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); - try (Verifier verifier = new Verifier(cfs, sstable, false)) + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).build())) { - verifier.verify(false); + verifier.verify(); } catch (CorruptSSTableException err) { @@ -132,10 +138,9 @@ public class VerifyTest fillCounterCF(cfs, 2); SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); - - try (Verifier verifier = new Verifier(cfs, sstable, false)) + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).build())) { - verifier.verify(false); + verifier.verify(); } catch (CorruptSSTableException err) { @@ -153,10 +158,9 @@ public class VerifyTest fillCF(cfs, 2); SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); - - try (Verifier verifier = new Verifier(cfs, sstable, false)) + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).build())) { - verifier.verify(true); + verifier.verify(); } catch (CorruptSSTableException err) { @@ -175,9 +179,9 @@ public class VerifyTest SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); - try (Verifier verifier = new Verifier(cfs, sstable, false)) + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).extendedVerification(true).build())) { - verifier.verify(true); + verifier.verify(); } catch (CorruptSSTableException err) { @@ -196,9 +200,9 @@ public class VerifyTest SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); - try (Verifier verifier = new Verifier(cfs, sstable, false)) + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).build())) { - verifier.verify(false); + verifier.verify(); } catch (CorruptSSTableException err) { @@ -217,9 +221,9 @@ public class VerifyTest SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); - try (Verifier verifier = new Verifier(cfs, sstable, false)) + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).build())) { - verifier.verify(false); + verifier.verify(); } catch (CorruptSSTableException err) { @@ -238,9 +242,9 @@ public class VerifyTest SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); - try (Verifier verifier = new Verifier(cfs, sstable, false)) + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().extendedVerification(true).invokeDiskFailurePolicy(true).build())) { - verifier.verify(true); + verifier.verify(); } catch (CorruptSSTableException err) { @@ -259,9 +263,9 @@ public class VerifyTest SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); - try (Verifier verifier = new Verifier(cfs, sstable, false)) + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().extendedVerification(true).invokeDiskFailurePolicy(true).build())) { - verifier.verify(true); + verifier.verify(); } catch (CorruptSSTableException err) { @@ -287,16 +291,23 @@ public class VerifyTest try (RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(Component.DIGEST), "rw")) { Long correctChecksum = Long.valueOf(file.readLine()); - + writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(Component.DIGEST)); } - try (Verifier verifier = new Verifier(cfs, sstable, false)) + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).build())) { - verifier.verify(false); + verifier.verify(); fail("Expected a CorruptSSTableException to be thrown"); } catch (CorruptSSTableException err) {} + + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(false).build())) + { + verifier.verify(); + fail("Expected a RuntimeException to be thrown"); + } + catch (RuntimeException err) {} } @@ -329,22 +340,24 @@ public class VerifyTest // Update the Digest to have the right Checksum writeChecksum(simpleFullChecksum(sstable.getFilename()), sstable.descriptor.filenameFor(Component.DIGEST)); - try (Verifier verifier = new Verifier(cfs, sstable, false)) + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).build())) { // First a simple verify checking digest, which should succeed try { - verifier.verify(false); + verifier.verify(); } catch (CorruptSSTableException err) { fail("Simple verify should have succeeded as digest matched"); } - + } + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).extendedVerification(true).build())) + { // Now try extended verify try { - verifier.verify(true); + verifier.verify(); } catch (CorruptSSTableException err) @@ -355,13 +368,13 @@ public class VerifyTest } } - @Test(expected = CorruptSSTableException.class) + @Test public void testVerifyBrokenSSTableMetadata() throws IOException, WriteTimeoutException { CompactionManager.instance.disableAutoCompaction(); Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF2); - + cfs.truncateBlocking(); fillCF(cfs, 2); Util.getAll(Util.cmd(cfs).build()); @@ -373,11 +386,90 @@ public class VerifyTest file.seek(0); file.writeBytes(StringUtils.repeat('z', 2)); file.close(); + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).build())) + { + verifier.verify(); + fail("Expected a CorruptSSTableException to be thrown"); + } + catch (CorruptSSTableException err) + {} + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(false).build())) + { + verifier.verify(); + fail("Expected a RuntimeException to be thrown"); + } + catch (CorruptSSTableException err) { fail("wrong exception thrown"); } + catch (RuntimeException err) + {} + } + + @Test + public void testVerifyMutateRepairStatus() throws IOException, WriteTimeoutException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF2); + cfs.truncateBlocking(); + fillCF(cfs, 2); + + Util.getAll(Util.cmd(cfs).build()); + + // make the sstable repaired: + SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); + sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, System.currentTimeMillis(), sstable.getSSTableMetadata().pendingRepair); + sstable.reloadSSTableMetadata(); - try (Verifier verifier = new Verifier(cfs, sstable, false)) + // break the sstable: + Long correctChecksum; + try (RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(Component.DIGEST), "rw")) + { + correctChecksum = Long.parseLong(file.readLine()); + } + writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(Component.DIGEST)); + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().mutateRepairStatus(false).invokeDiskFailurePolicy(true).build())) + { + verifier.verify(); + fail("Expected a CorruptSSTableException to be thrown"); + } + catch (CorruptSSTableException err) + {} + + assertTrue(sstable.isRepaired()); + + // now the repair status should be changed: + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().mutateRepairStatus(true).invokeDiskFailurePolicy(true).build())) { - verifier.verify(false); + verifier.verify(); + fail("Expected a CorruptSSTableException to be thrown"); } + catch (CorruptSSTableException err) + {} + assertFalse(sstable.isRepaired()); + } + + @Test(expected = RuntimeException.class) + public void testOutOfRangeTokens() throws IOException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + fillCF(cfs, 100); + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + byte[] tk1 = new byte[1], tk2 = new byte[1]; + tk1[0] = 2; + tk2[0] = 1; + tmd.updateNormalToken(new ByteOrderedPartitioner.BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1")); + tmd.updateNormalToken(new ByteOrderedPartitioner.BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2")); + + SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().checkOwnsTokens(true).extendedVerification(true).build())) + { + verifier.verify(); + } + finally + { + StorageService.instance.getTokenMetadata().clearUnsafe(); + } + } @Test @@ -403,9 +495,9 @@ public class VerifyTest correctChecksum = Long.parseLong(file.readLine()); } writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(Component.DIGEST)); - try (Verifier verifier = new Verifier(cfs, sstable, false)) + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).mutateRepairStatus(true).build())) { - verifier.verify(false); + verifier.verify(); fail("should be corrupt"); } catch (CorruptSSTableException e) @@ -413,6 +505,94 @@ public class VerifyTest assertFalse(sstable.isRepaired()); } + @Test + public void testVerifyIndex() throws IOException + { + testBrokenComponentHelper(Component.PRIMARY_INDEX); + } + @Test + public void testVerifyBf() throws IOException + { + testBrokenComponentHelper(Component.FILTER); + } + + @Test + public void testVerifyIndexSummary() throws IOException + { + testBrokenComponentHelper(Component.SUMMARY); + } + + private void testBrokenComponentHelper(Component componentToBreak) throws IOException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF2); + + fillCF(cfs, 2); + + SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().build())) + { + verifier.verify(); //still not corrupt, should pass + } + String filenameToCorrupt = sstable.descriptor.filenameFor(componentToBreak); + try (RandomAccessFile file = new RandomAccessFile(filenameToCorrupt, "rw")) + { + file.setLength(3); + } + + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).build())) + { + verifier.verify(); + fail("should throw exception"); + } + catch(CorruptSSTableException e) + { + //expected + } + } + + @Test + public void testQuick() throws IOException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF); + + fillCF(cfs, 2); + + Util.getAll(Util.cmd(cfs).build()); + + SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); + + + try (RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(Component.DIGEST), "rw")) + { + Long correctChecksum = Long.valueOf(file.readLine()); + + writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(Component.DIGEST)); + } + + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).build())) + { + verifier.verify(); + fail("Expected a CorruptSSTableException to be thrown"); + } + catch (CorruptSSTableException err) {} + + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).quick(true).build())) // with quick = true we don't verify the digest + { + verifier.verify(); + } + + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().invokeDiskFailurePolicy(true).build())) + { + verifier.verify(); + fail("Expected a RuntimeException to be thrown"); + } + catch (CorruptSSTableException err) {} + } + protected void fillCF(ColumnFamilyStore cfs, int partitionsPerSSTable) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java index 84f7158..e399c67 100644 --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java @@ -58,6 +58,8 @@ import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import static org.junit.Assert.fail; + /** * Tests backwards compatibility for SSTables */ @@ -196,17 +198,34 @@ public class LegacySSTableTest } @Test - public void verifyOldSSTables() throws Exception + public void testVerifyOldSSTables() throws IOException { for (String legacyVersion : legacyVersions) { - loadLegacyTables(legacyVersion); ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion)); + loadLegacyTable("legacy_%s_simple", legacyVersion); + for (SSTableReader sstable : cfs.getLiveSSTables()) { - try (Verifier verifier = new Verifier(cfs, sstable, false)) + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().checkVersion(true).build())) + { + verifier.verify(); + if (!sstable.descriptor.version.isLatestVersion()) + fail("Verify should throw RuntimeException for old sstables "+sstable); + } + catch (RuntimeException e) + {} + } + // make sure we don't throw any exception if not checking version: + for (SSTableReader sstable : cfs.getLiveSSTables()) + { + try (Verifier verifier = new Verifier(cfs, sstable, false, Verifier.options().checkVersion(false).build())) + { + verifier.verify(); + } + catch (Throwable e) { - verifier.verify(true); + fail("Verify should throw RuntimeException for old sstables "+sstable); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org