Repository: cassandra
Updated Branches:
  refs/heads/trunk 81ac654ff -> 65440409b


Add a few options to nodetool verify

Patch by marcuse; reviewed by Ariel Weisberg for CASSANDRA-14201


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65440409
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65440409
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65440409

Branch: refs/heads/trunk
Commit: 65440409b4d22e7bdae99ca8232fb5eaf38f4449
Parents: 81ac654
Author: Marcus Eriksson <marc...@apache.org>
Authored: Tue Feb 6 09:58:46 2018 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Thu Feb 15 09:24:20 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   4 +-
 .../db/compaction/CompactionManager.java        |  10 +-
 .../cassandra/db/compaction/Verifier.java       | 215 +++++++++++++++-
 .../cassandra/service/StorageService.java       |  26 +-
 .../cassandra/service/StorageServiceMBean.java  |   1 +
 .../org/apache/cassandra/tools/NodeProbe.java   |   8 +-
 .../cassandra/tools/StandaloneVerifier.java     |  25 +-
 .../apache/cassandra/tools/nodetool/Verify.java |  37 ++-
 .../org/apache/cassandra/db/VerifyTest.java     | 242 ++++++++++++++++---
 .../cassandra/io/sstable/LegacySSTableTest.java |  27 ++-
 11 files changed, 535 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dd56770..5fcb335 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add a few options to nodetool verify (CASSANDRA-14201)
  * CVE-2017-5929 Security vulnerability and redefine default log rotation 
policy (CASSANDRA-14183)
  * Use JVM default SSL validation algorithm instead of custom default 
(CASSANDRA-13259)
  * Better document in code InetAddressAndPort usage post 7544, incorporate 
port into UUIDGen node (CASSANDRA-14226)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9815eea..444a28c 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1527,9 +1527,9 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         return true;
     }
 
-    public CompactionManager.AllSSTableOpStatus verify(boolean extendedVerify) 
throws ExecutionException, InterruptedException
+    public CompactionManager.AllSSTableOpStatus verify(Verifier.Options 
options) throws ExecutionException, InterruptedException
     {
-        return 
CompactionManager.instance.performVerify(ColumnFamilyStore.this, 
extendedVerify);
+        return 
CompactionManager.instance.performVerify(ColumnFamilyStore.this, options);
     }
 
     public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean 
excludeCurrentVersion, int jobs) throws ExecutionException, InterruptedException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 01fc188..36cd954 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -406,7 +406,7 @@ public class CompactionManager implements 
CompactionManagerMBean
         }, jobs, OperationType.SCRUB);
     }
 
-    public AllSSTableOpStatus performVerify(final ColumnFamilyStore cfs, final 
boolean extendedVerify) throws InterruptedException, ExecutionException
+    public AllSSTableOpStatus performVerify(ColumnFamilyStore cfs, 
Verifier.Options options) throws InterruptedException, ExecutionException
     {
         assert !cfs.isIndex();
         return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
@@ -420,7 +420,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             @Override
             public void execute(LifecycleTransaction input)
             {
-                verifyOne(cfs, input.onlyOne(), extendedVerify);
+                verifyOne(cfs, input.onlyOne(), options);
             }
         }, 0, OperationType.VERIFY);
     }
@@ -993,15 +993,15 @@ public class CompactionManager implements 
CompactionManagerMBean
         }
     }
 
-    private void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, 
boolean extendedVerify)
+    private void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, 
Verifier.Options options)
     {
         CompactionInfo.Holder verifyInfo = null;
 
-        try (Verifier verifier = new Verifier(cfs, sstable, false))
+        try (Verifier verifier = new Verifier(cfs, sstable, false, options))
         {
             verifyInfo = verifier.getVerifyInfo();
             metrics.beginCompaction(verifyInfo);
-            verifier.verify(extendedVerify);
+            verifier.verify();
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java 
b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 01e465e..d704c1f 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -22,8 +22,11 @@ import com.google.common.base.Throwables;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.IndexSummary;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
@@ -33,17 +36,25 @@ import org.apache.cassandra.io.util.DataIntegrityMetadata;
 import org.apache.cassandra.io.util.DataIntegrityMetadata.FileDigestValidator;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BloomFilterSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IFilter;
 import org.apache.cassandra.utils.OutputHandler;
 import org.apache.cassandra.utils.UUIDGen;
 
+import java.io.BufferedInputStream;
 import java.io.Closeable;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.*;
 import java.util.function.LongPredicate;
 import java.util.function.Predicate;
@@ -60,18 +71,20 @@ public class Verifier implements Closeable
     private final RandomAccessReader indexFile;
     private final VerifyInfo verifyInfo;
     private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+    private final Options options;
+    private final boolean isOffline;
 
     private int goodRows;
 
     private final OutputHandler outputHandler;
     private FileDigestValidator validator;
 
-    public Verifier(ColumnFamilyStore cfs, SSTableReader sstable, boolean 
isOffline)
+    public Verifier(ColumnFamilyStore cfs, SSTableReader sstable, boolean 
isOffline, Options options)
     {
-        this(cfs, sstable, new OutputHandler.LogOutput(), isOffline);
+        this(cfs, sstable, new OutputHandler.LogOutput(), isOffline, options);
     }
 
-    public Verifier(ColumnFamilyStore cfs, SSTableReader sstable, 
OutputHandler outputHandler, boolean isOffline)
+    public Verifier(ColumnFamilyStore cfs, SSTableReader sstable, 
OutputHandler outputHandler, boolean isOffline, Options options)
     {
         this.cfs = cfs;
         this.sstable = sstable;
@@ -85,13 +98,24 @@ public class Verifier implements Closeable
                         : 
sstable.openDataReader(CompactionManager.instance.getRateLimiter());
         this.indexFile = RandomAccessReader.open(new 
File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)));
         this.verifyInfo = new VerifyInfo(dataFile, sstable);
+        this.options = options;
+        this.isOffline = isOffline;
     }
 
-    public void verify(boolean extended)
+    public void verify()
     {
+        boolean extended = options.extendedVerification;
         long rowStart = 0;
 
         outputHandler.output(String.format("Verifying %s (%s)", sstable, 
FBUtilities.prettyPrintMemory(dataFile.length())));
+        if (options.checkVersion && 
!sstable.descriptor.version.isLatestVersion())
+        {
+            String msg = String.format("%s is not the latest version, run 
upgradesstables", sstable);
+            outputHandler.output(msg);
+            // don't use markAndThrow here because we don't want a 
CorruptSSTableException for this.
+            throw new RuntimeException(msg);
+        }
+
         outputHandler.output(String.format("Deserializing sstable metadata for 
%s ", sstable));
         try
         {
@@ -108,6 +132,41 @@ public class Verifier implements Closeable
         }
         outputHandler.output(String.format("Checking computed hash of %s ", 
sstable));
 
+        try
+        {
+            deserializeIndex(sstable);
+        }
+        catch (Throwable t)
+        {
+            outputHandler.debug(t.getMessage());
+            markAndThrow();
+        }
+
+        try
+        {
+            deserializeIndexSummary(sstable);
+
+        }
+        catch (Throwable t)
+        {
+            outputHandler.output("Index summary is corrupt - if it is removed 
it will get rebuilt on startup 
"+sstable.descriptor.filenameFor(Component.SUMMARY));
+            outputHandler.debug(t.getMessage());
+            markAndThrow(false);
+        }
+
+        try
+        {
+            deserializeBloomFilter(sstable);
+
+        }
+        catch (Throwable t)
+        {
+            outputHandler.debug(t.getMessage());
+            markAndThrow();
+        }
+
+        if (options.quick)
+            return;
 
         // Verify will use the Digest files, which works for both compressed 
and uncompressed sstables
         try
@@ -150,6 +209,8 @@ public class Verifier implements Closeable
                     markAndThrow();
             }
 
+            List<Range<Token>> ownedRanges = isOffline ? 
Collections.emptyList() : 
Range.normalize(StorageService.instance.getLocalAndPendingRanges(cfs.metadata().keyspace));
+            int rangeIndex = -1;
             DecoratedKey prevKey = null;
 
             while (!dataFile.isEOF())
@@ -172,6 +233,16 @@ public class Verifier implements Closeable
                     // check for null key below
                 }
 
+                if (options.checkOwnsTokens && ownedRanges.size() > 0)
+                {
+                    while (rangeIndex == -1 || 
!ownedRanges.get(rangeIndex).contains(key.getToken()))
+                    {
+                        rangeIndex++;
+                        if (rangeIndex > ownedRanges.size() - 1)
+                            throw new RuntimeException(String.format("Key %s 
in sstable %s not owned by local ranges %s", key, sstable, ownedRanges));
+                    }
+                }
+
                 ByteBuffer currentIndexKey = nextIndexKey;
                 long nextRowPositionFromIndex = 0;
                 try
@@ -236,6 +307,44 @@ public class Verifier implements Closeable
         outputHandler.output("Verify of " + sstable + " succeeded. All " + 
goodRows + " rows read successfully");
     }
 
+    private void deserializeIndex(SSTableReader sstable) throws IOException
+    {
+        try (RandomAccessReader primaryIndex = RandomAccessReader.open(new 
File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))))
+        {
+            long indexSize = primaryIndex.length();
+
+            while ((primaryIndex.getFilePointer()) != indexSize)
+            {
+                ByteBuffer key = 
ByteBufferUtil.readWithShortLength(primaryIndex);
+                RowIndexEntry.Serializer.skip(primaryIndex, 
sstable.descriptor.version);
+            }
+        }
+    }
+
+    private void deserializeIndexSummary(SSTableReader sstable) throws 
IOException
+    {
+        File file = new 
File(sstable.descriptor.filenameFor(Component.SUMMARY));
+        TableMetadata metadata = cfs.metadata();
+        try (DataInputStream iStream = new 
DataInputStream(Files.newInputStream(file.toPath())))
+        {
+            try (IndexSummary indexSummary = 
IndexSummary.serializer.deserialize(iStream,
+                                                               
cfs.getPartitioner(),
+                                                               
metadata.params.minIndexInterval,
+                                                               
metadata.params.maxIndexInterval))
+            {
+                ByteBufferUtil.readWithLength(iStream);
+                ByteBufferUtil.readWithLength(iStream);
+            }
+        }
+    }
+
+    private void deserializeBloomFilter(SSTableReader sstable) throws 
IOException
+    {
+        try (DataInputStream stream = new DataInputStream(new 
BufferedInputStream(Files.newInputStream(Paths.get(sstable.descriptor.filenameFor(Component.FILTER)))));
+             IFilter bf = BloomFilterSerializer.deserialize(stream, 
sstable.descriptor.version.hasOldBfFormat()))
+        {}
+    }
+
     public void close()
     {
         FileUtils.closeQuietly(dataFile);
@@ -255,7 +364,7 @@ public class Verifier implements Closeable
 
     private void markAndThrow(boolean mutateRepaired)
     {
-        if (mutateRepaired) // if we are able to mutate repaired flag, an 
incremental repair should be enough
+        if (mutateRepaired && options.mutateRepairStatus) // if we are able to 
mutate repaired flag, an incremental repair should be enough
         {
             try
             {
@@ -268,7 +377,11 @@ public class Verifier implements Closeable
                 outputHandler.output("Error mutating repairedAt for SSTable " 
+  sstable.getFilename() + ", as part of markAndThrow");
             }
         }
-        throw new CorruptSSTableException(new Exception(String.format("Invalid 
SSTable %s, please force %srepair", sstable.getFilename(), mutateRepaired ? "" 
: "a full ")), sstable.getFilename());
+        Exception e = new Exception(String.format("Invalid SSTable %s, please 
force %srepair", sstable.getFilename(), (mutateRepaired && 
options.mutateRepairStatus) ? "" : "a full "));
+        if (options.invokeDiskFailurePolicy)
+            throw new CorruptSSTableException(e, sstable.getFilename());
+        else
+            throw new RuntimeException(e);
     }
 
     public CompactionInfo.Holder getVerifyInfo()
@@ -319,4 +432,94 @@ public class Verifier implements Closeable
             return time -> false;
         }
     }
+
+    public static Options.Builder options()
+    {
+        return new Options.Builder();
+    }
+
+    public static class Options
+    {
+        public final boolean invokeDiskFailurePolicy;
+        public final boolean extendedVerification;
+        public final boolean checkVersion;
+        public final boolean mutateRepairStatus;
+        public final boolean checkOwnsTokens;
+        public final boolean quick;
+
+        private Options(boolean invokeDiskFailurePolicy, boolean 
extendedVerification, boolean checkVersion, boolean mutateRepairStatus, boolean 
checkOwnsTokens, boolean quick)
+        {
+            this.invokeDiskFailurePolicy = invokeDiskFailurePolicy;
+            this.extendedVerification = extendedVerification;
+            this.checkVersion = checkVersion;
+            this.mutateRepairStatus = mutateRepairStatus;
+            this.checkOwnsTokens = checkOwnsTokens;
+            this.quick = quick;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Options{" +
+                   "invokeDiskFailurePolicy=" + invokeDiskFailurePolicy +
+                   ", extendedVerification=" + extendedVerification +
+                   ", checkVersion=" + checkVersion +
+                   ", mutateRepairStatus=" + mutateRepairStatus +
+                   ", checkOwnsTokens=" + checkOwnsTokens +
+                   ", quick=" + quick +
+                   '}';
+        }
+
+        public static class Builder
+        {
+            private boolean invokeDiskFailurePolicy = false; // invoking disk 
failure policy can stop the node if we find a corrupt stable
+            private boolean extendedVerification = false;
+            private boolean checkVersion = false;
+            private boolean mutateRepairStatus = false; // mutating repair 
status can be dangerous
+            private boolean checkOwnsTokens = false;
+            private boolean quick = false;
+
+            public Builder invokeDiskFailurePolicy(boolean param)
+            {
+                this.invokeDiskFailurePolicy = param;
+                return this;
+            }
+
+            public Builder extendedVerification(boolean param)
+            {
+                this.extendedVerification = param;
+                return this;
+            }
+
+            public Builder checkVersion(boolean param)
+            {
+                this.checkVersion = param;
+                return this;
+            }
+
+            public Builder mutateRepairStatus(boolean param)
+            {
+                this.mutateRepairStatus = param;
+                return this;
+            }
+
+            public Builder checkOwnsTokens(boolean param)
+            {
+                this.checkOwnsTokens = param;
+                return this;
+            }
+
+            public Builder quick(boolean param)
+            {
+                this.quick = param;
+                return this;
+            }
+
+            public Options build()
+            {
+                return new Options(invokeDiskFailurePolicy, 
extendedVerification, checkVersion, mutateRepairStatus, checkOwnsTokens, quick);
+            }
+
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index e28f451..42e3ddf 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -65,6 +65,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.Verifier;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
@@ -161,6 +162,16 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return getRangesForEndpoint(keyspaceName, 
FBUtilities.getBroadcastAddressAndPort());
     }
 
+    public List<Range<Token>> getLocalAndPendingRanges(String ks)
+    {
+        InetAddressAndPort broadcastAddress = 
FBUtilities.getBroadcastAddressAndPort();
+        Keyspace keyspace = Keyspace.open(ks);
+        List<Range<Token>> ranges = new ArrayList<>();
+        
ranges.addAll(keyspace.getReplicationStrategy().getAddressRanges().get(broadcastAddress));
+        ranges.addAll(getTokenMetadata().getPendingRanges(ks, 
broadcastAddress));
+        return Range.normalize(ranges);
+    }
+
     public Collection<Range<Token>> getPrimaryRanges(String keyspace)
     {
         return getPrimaryRangesForEndpoint(keyspace, 
FBUtilities.getBroadcastAddressAndPort());
@@ -3135,12 +3146,25 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return status.statusCode;
     }
 
+    @Deprecated
     public int verify(boolean extendedVerify, String keyspaceName, String... 
tableNames) throws IOException, ExecutionException, InterruptedException
     {
+        return verify(extendedVerify, false, false, false, false, false, 
keyspaceName, tableNames);
+    }
+
+    public int verify(boolean extendedVerify, boolean checkVersion, boolean 
diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean 
quick, String keyspaceName, String... tableNames) throws IOException, 
ExecutionException, InterruptedException
+    {
         CompactionManager.AllSSTableOpStatus status = 
CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
+        Verifier.Options options = 
Verifier.options().invokeDiskFailurePolicy(diskFailurePolicy)
+                                                     
.extendedVerification(extendedVerify)
+                                                     
.checkVersion(checkVersion)
+                                                     
.mutateRepairStatus(mutateRepairStatus)
+                                                     
.checkOwnsTokens(checkOwnsTokens)
+                                                     .quick(quick).build();
+        logger.info("Verifying {}.{} with options = {}", keyspaceName, 
Arrays.toString(tableNames), options);
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, 
keyspaceName, tableNames))
         {
-            CompactionManager.AllSSTableOpStatus oneStatus = 
cfStore.verify(extendedVerify);
+            CompactionManager.AllSSTableOpStatus oneStatus = 
cfStore.verify(options);
             if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
                 status = oneStatus;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 31dedf5..0676419 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -302,6 +302,7 @@ public interface StorageServiceMBean extends 
NotificationEmitter
      * The entire sstable will be read to ensure each cell validates if 
extendedVerify is true
      */
     public int verify(boolean extendedVerify, String keyspaceName, String... 
tableNames) throws IOException, ExecutionException, InterruptedException;
+    public int verify(boolean extendedVerify, boolean checkVersion, boolean 
diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean 
quick, String keyspaceName, String... tableNames) throws IOException, 
ExecutionException, InterruptedException;
 
     /**
      * Rewrite all sstables to the latest version.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index c780a16..1bace2e 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -266,9 +266,9 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, 
reinsertOverflowedTTL, jobs, keyspaceName, tables);
     }
 
-    public int verify(boolean extendedVerify, String keyspaceName, String... 
tableNames) throws IOException, ExecutionException, InterruptedException
+    public int verify(boolean extendedVerify, boolean checkVersion, boolean 
diskFailurePolicy, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean 
quick, String keyspaceName, String... tableNames) throws IOException, 
ExecutionException, InterruptedException
     {
-        return ssProxy.verify(extendedVerify, keyspaceName, tableNames);
+        return ssProxy.verify(extendedVerify, checkVersion, diskFailurePolicy, 
mutateRepairStatus, checkOwnsTokens, quick, keyspaceName, tableNames);
     }
 
     public int upgradeSSTables(String keyspaceName, boolean 
excludeCurrentVersion, int jobs, String... tableNames) throws IOException, 
ExecutionException, InterruptedException
@@ -321,9 +321,9 @@ public class NodeProbe implements AutoCloseable
         }
     }
 
-    public void verify(PrintStream out, boolean extendedVerify, String 
keyspaceName, String... tableNames) throws IOException, ExecutionException, 
InterruptedException
+    public void verify(PrintStream out, boolean extendedVerify, boolean 
checkVersion, boolean diskFailurePolicy, boolean mutateRepairStatus, boolean 
checkOwnsTokens, boolean quick, String keyspaceName, String... tableNames) 
throws IOException, ExecutionException, InterruptedException
     {
-        switch (verify(extendedVerify, keyspaceName, tableNames))
+        switch (verify(extendedVerify, checkVersion, diskFailurePolicy, 
mutateRepairStatus, checkOwnsTokens, quick, keyspaceName, tableNames))
         {
             case 1:
                 failed = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java 
b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
index 40dfbf7..81e992e 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
@@ -43,6 +43,9 @@ public class StandaloneVerifier
     private static final String EXTENDED_OPTION = "extended";
     private static final String DEBUG_OPTION  = "debug";
     private static final String HELP_OPTION  = "help";
+    private static final String CHECK_VERSION = "check_version";
+    private static final String MUTATE_REPAIR_STATUS = "mutate_repair_status";
+    private static final String QUICK = "quick";
 
     public static void main(String args[])
     {
@@ -68,8 +71,6 @@ public class StandaloneVerifier
             OutputHandler handler = new 
OutputHandler.SystemOutput(options.verbose, options.debug);
             Directories.SSTableLister lister = 
cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
 
-            boolean extended = options.extended;
-
             List<SSTableReader> sstables = new ArrayList<>();
 
             // Verify sstables
@@ -92,15 +93,20 @@ public class StandaloneVerifier
                         e.printStackTrace(System.err);
                 }
             }
-
+            Verifier.Options verifyOptions = 
Verifier.options().invokeDiskFailurePolicy(false)
+                                                               
.extendedVerification(options.extended)
+                                                               
.checkVersion(options.checkVersion)
+                                                               
.mutateRepairStatus(options.mutateRepairStatus)
+                                                               
.checkOwnsTokens(false) // don't know the ranges when running offline
+                                                               .build();
             for (SSTableReader sstable : sstables)
             {
                 try
                 {
 
-                    try (Verifier verifier = new Verifier(cfs, sstable, 
handler, true))
+                    try (Verifier verifier = new Verifier(cfs, sstable, 
handler, true, verifyOptions))
                     {
-                        verifier.verify(extended);
+                        verifier.verify();
                     }
                     catch (CorruptSSTableException cs)
                     {
@@ -136,6 +142,9 @@ public class StandaloneVerifier
         public boolean debug;
         public boolean verbose;
         public boolean extended;
+        public boolean checkVersion;
+        public boolean mutateRepairStatus;
+        public boolean quick;
 
         private Options(String keyspaceName, String cfName)
         {
@@ -174,6 +183,9 @@ public class StandaloneVerifier
                 opts.debug = cmd.hasOption(DEBUG_OPTION);
                 opts.verbose = cmd.hasOption(VERBOSE_OPTION);
                 opts.extended = cmd.hasOption(EXTENDED_OPTION);
+                opts.checkVersion = cmd.hasOption(CHECK_VERSION);
+                opts.mutateRepairStatus = cmd.hasOption(MUTATE_REPAIR_STATUS);
+                opts.quick = cmd.hasOption(QUICK);
 
                 return opts;
             }
@@ -198,6 +210,9 @@ public class StandaloneVerifier
             options.addOption("e",  EXTENDED_OPTION,       "extended 
verification");
             options.addOption("v",  VERBOSE_OPTION,        "verbose output");
             options.addOption("h",  HELP_OPTION,           "display this help 
message");
+            options.addOption("c",  CHECK_VERSION,         "make sure sstables 
are the latest version");
+            options.addOption("r",  MUTATE_REPAIR_STATUS,  "don't mutate 
repair status");
+            options.addOption("q",  QUICK,                 "do a quick check, 
don't read all data");
             return options;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/src/java/org/apache/cassandra/tools/nodetool/Verify.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Verify.java 
b/src/java/org/apache/cassandra/tools/nodetool/Verify.java
index 75cb109..28b91fd 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Verify.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Verify.java
@@ -34,21 +34,52 @@ public class Verify extends NodeToolCmd
     private List<String> args = new ArrayList<>();
 
     @Option(title = "extended_verify",
-        name = {"-e", "--extended-verify"},
-        description = "Verify each cell data, beyond simply checking sstable 
checksums")
+            name = {"-e", "--extended-verify"},
+            description = "Verify each cell data, beyond simply checking 
sstable checksums")
     private boolean extendedVerify = false;
 
+    @Option(title = "check_version",
+            name = {"-c", "--check-version"},
+            description = "Also check that all sstables are the latest 
version")
+    private boolean checkVersion = false;
+
+    @Option(title = "dfp",
+            name = {"-d", "--dfp"},
+            description = "Invoke the disk failure policy if a corrupt sstable 
is found")
+    private boolean diskFailurePolicy = false;
+
+    @Option(title = "repair_status_change",
+            name = {"-r", "--rsc"},
+            description = "Mutate the repair status on corrupt sstables")
+    private boolean mutateRepairStatus = false;
+
+    @Option(title = "check_owns_tokens",
+            name = {"-t", "--check-tokens"},
+            description = "Verify that all tokens in sstables are owned by 
this node")
+    private boolean checkOwnsTokens = false;
+
+    @Option(title = "quick",
+    name = {"-q", "--quick"},
+    description = "Do a quick check - avoid reading all data to verify 
checksums")
+    private boolean quick = false;
+
     @Override
     public void execute(NodeProbe probe)
     {
         List<String> keyspaces = parseOptionalKeyspace(args, probe);
         String[] tableNames = parseOptionalTables(args);
 
+        if (checkOwnsTokens && !extendedVerify)
+        {
+            System.out.println("Token verification requires 
--extended-verify");
+            System.exit(1);
+        }
+
         for (String keyspace : keyspaces)
         {
             try
             {
-                probe.verify(System.out, extendedVerify, keyspace, tableNames);
+                probe.verify(System.out, extendedVerify, checkVersion, 
diskFailurePolicy, mutateRepairStatus, checkOwnsTokens, quick, keyspace, 
tableNames);
             } catch (Exception e)
             {
                 throw new RuntimeException("Error occurred during verifying", 
e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/test/unit/org/apache/cassandra/db/VerifyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java 
b/test/unit/org/apache/cassandra/db/VerifyTest.java
index 8b9b437..e996082 100644
--- a/test/unit/org/apache/cassandra/db/VerifyTest.java
+++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
@@ -28,15 +28,20 @@ import 
org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.Verifier;
 import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.CorruptBlockException;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.BeforeClass;
@@ -44,6 +49,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.io.*;
+import java.net.UnknownHostException;
 import java.nio.file.Files;
 import java.util.Collections;
 import java.util.concurrent.ExecutionException;
@@ -112,9 +118,9 @@ public class VerifyTest
 
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
 
-        try (Verifier verifier = new Verifier(cfs, sstable, false))
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).build()))
         {
-            verifier.verify(false);
+            verifier.verify();
         }
         catch (CorruptSSTableException err)
         {
@@ -132,10 +138,9 @@ public class VerifyTest
         fillCounterCF(cfs, 2);
 
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
-
-        try (Verifier verifier = new Verifier(cfs, sstable, false))
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).build()))
         {
-            verifier.verify(false);
+            verifier.verify();
         }
         catch (CorruptSSTableException err)
         {
@@ -153,10 +158,9 @@ public class VerifyTest
         fillCF(cfs, 2);
 
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
-
-        try (Verifier verifier = new Verifier(cfs, sstable, false))
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).build()))
         {
-            verifier.verify(true);
+            verifier.verify();
         }
         catch (CorruptSSTableException err)
         {
@@ -175,9 +179,9 @@ public class VerifyTest
 
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
 
-        try (Verifier verifier = new Verifier(cfs, sstable, false))
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).extendedVerification(true).build()))
         {
-            verifier.verify(true);
+            verifier.verify();
         }
         catch (CorruptSSTableException err)
         {
@@ -196,9 +200,9 @@ public class VerifyTest
 
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
 
-        try (Verifier verifier = new Verifier(cfs, sstable, false))
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).build()))
         {
-            verifier.verify(false);
+            verifier.verify();
         }
         catch (CorruptSSTableException err)
         {
@@ -217,9 +221,9 @@ public class VerifyTest
 
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
 
-        try (Verifier verifier = new Verifier(cfs, sstable, false))
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).build()))
         {
-            verifier.verify(false);
+            verifier.verify();
         }
         catch (CorruptSSTableException err)
         {
@@ -238,9 +242,9 @@ public class VerifyTest
 
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
 
-        try (Verifier verifier = new Verifier(cfs, sstable, false))
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().extendedVerification(true).invokeDiskFailurePolicy(true).build()))
         {
-            verifier.verify(true);
+            verifier.verify();
         }
         catch (CorruptSSTableException err)
         {
@@ -259,9 +263,9 @@ public class VerifyTest
 
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
 
-        try (Verifier verifier = new Verifier(cfs, sstable, false))
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().extendedVerification(true).invokeDiskFailurePolicy(true).build()))
         {
-            verifier.verify(true);
+            verifier.verify();
         }
         catch (CorruptSSTableException err)
         {
@@ -287,16 +291,23 @@ public class VerifyTest
         try (RandomAccessFile file = new 
RandomAccessFile(sstable.descriptor.filenameFor(Component.DIGEST), "rw"))
         {
             Long correctChecksum = Long.valueOf(file.readLine());
-    
+
             writeChecksum(++correctChecksum, 
sstable.descriptor.filenameFor(Component.DIGEST));
         }
 
-        try (Verifier verifier = new Verifier(cfs, sstable, false))
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).build()))
         {
-            verifier.verify(false);
+            verifier.verify();
             fail("Expected a CorruptSSTableException to be thrown");
         }
         catch (CorruptSSTableException err) {}
+
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(false).build()))
+        {
+            verifier.verify();
+            fail("Expected a RuntimeException to be thrown");
+        }
+        catch (RuntimeException err) {}
     }
 
 
@@ -329,22 +340,24 @@ public class VerifyTest
         // Update the Digest to have the right Checksum
         writeChecksum(simpleFullChecksum(sstable.getFilename()), 
sstable.descriptor.filenameFor(Component.DIGEST));
 
-        try (Verifier verifier = new Verifier(cfs, sstable, false))
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).build()))
         {
             // First a simple verify checking digest, which should succeed
             try
             {
-                verifier.verify(false);
+                verifier.verify();
             }
             catch (CorruptSSTableException err)
             {
                 fail("Simple verify should have succeeded as digest matched");
             }
-
+        }
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).extendedVerification(true).build()))
+        {
             // Now try extended verify
             try
             {
-                verifier.verify(true);
+                verifier.verify();
 
             }
             catch (CorruptSSTableException err)
@@ -355,13 +368,13 @@ public class VerifyTest
         }
     }
 
-    @Test(expected = CorruptSSTableException.class)
+    @Test
     public void testVerifyBrokenSSTableMetadata() throws IOException, 
WriteTimeoutException
     {
         CompactionManager.instance.disableAutoCompaction();
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF2);
-
+        cfs.truncateBlocking();
         fillCF(cfs, 2);
 
         Util.getAll(Util.cmd(cfs).build());
@@ -373,11 +386,90 @@ public class VerifyTest
         file.seek(0);
         file.writeBytes(StringUtils.repeat('z', 2));
         file.close();
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).build()))
+        {
+            verifier.verify();
+            fail("Expected a CorruptSSTableException to be thrown");
+        }
+        catch (CorruptSSTableException err)
+        {}
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(false).build()))
+        {
+            verifier.verify();
+            fail("Expected a RuntimeException to be thrown");
+        }
+        catch (CorruptSSTableException err) { fail("wrong exception thrown"); }
+        catch (RuntimeException err)
+        {}
+    }
+
+    @Test
+    public void testVerifyMutateRepairStatus() throws IOException, 
WriteTimeoutException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF2);
+        cfs.truncateBlocking();
+        fillCF(cfs, 2);
+
+        Util.getAll(Util.cmd(cfs).build());
+
+        // make the sstable repaired:
+        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+        
sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 
System.currentTimeMillis(), sstable.getSSTableMetadata().pendingRepair);
+        sstable.reloadSSTableMetadata();
 
-        try (Verifier verifier = new Verifier(cfs, sstable, false))
+        // break the sstable:
+        Long correctChecksum;
+        try (RandomAccessFile file = new 
RandomAccessFile(sstable.descriptor.filenameFor(Component.DIGEST), "rw"))
+        {
+            correctChecksum = Long.parseLong(file.readLine());
+        }
+        writeChecksum(++correctChecksum, 
sstable.descriptor.filenameFor(Component.DIGEST));
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().mutateRepairStatus(false).invokeDiskFailurePolicy(true).build()))
+        {
+            verifier.verify();
+            fail("Expected a CorruptSSTableException to be thrown");
+        }
+        catch (CorruptSSTableException err)
+        {}
+
+        assertTrue(sstable.isRepaired());
+
+        // now the repair status should be changed:
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().mutateRepairStatus(true).invokeDiskFailurePolicy(true).build()))
         {
-            verifier.verify(false);
+            verifier.verify();
+            fail("Expected a CorruptSSTableException to be thrown");
         }
+        catch (CorruptSSTableException err)
+        {}
+        assertFalse(sstable.isRepaired());
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testOutOfRangeTokens() throws IOException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        fillCF(cfs, 100);
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        byte[] tk1 = new byte[1], tk2 = new byte[1];
+        tk1[0] = 2;
+        tk2[0] = 1;
+        tmd.updateNormalToken(new ByteOrderedPartitioner.BytesToken(tk1), 
InetAddressAndPort.getByName("127.0.0.1"));
+        tmd.updateNormalToken(new ByteOrderedPartitioner.BytesToken(tk2), 
InetAddressAndPort.getByName("127.0.0.2"));
+
+        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().checkOwnsTokens(true).extendedVerification(true).build()))
+        {
+            verifier.verify();
+        }
+        finally
+        {
+            StorageService.instance.getTokenMetadata().clearUnsafe();
+        }
+
     }
 
     @Test
@@ -403,9 +495,9 @@ public class VerifyTest
             correctChecksum = Long.parseLong(file.readLine());
         }
         writeChecksum(++correctChecksum, 
sstable.descriptor.filenameFor(Component.DIGEST));
-        try (Verifier verifier = new Verifier(cfs, sstable, false))
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).mutateRepairStatus(true).build()))
         {
-            verifier.verify(false);
+            verifier.verify();
             fail("should be corrupt");
         }
         catch (CorruptSSTableException e)
@@ -413,6 +505,94 @@ public class VerifyTest
         assertFalse(sstable.isRepaired());
     }
 
+    @Test
+    public void testVerifyIndex() throws IOException
+    {
+        testBrokenComponentHelper(Component.PRIMARY_INDEX);
+    }
+    @Test
+    public void testVerifyBf() throws IOException
+    {
+        testBrokenComponentHelper(Component.FILTER);
+    }
+
+    @Test
+    public void testVerifyIndexSummary() throws IOException
+    {
+        testBrokenComponentHelper(Component.SUMMARY);
+    }
+
+    private void testBrokenComponentHelper(Component componentToBreak) throws 
IOException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF2);
+
+        fillCF(cfs, 2);
+
+        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().build()))
+        {
+            verifier.verify(); //still not corrupt, should pass
+        }
+        String filenameToCorrupt = 
sstable.descriptor.filenameFor(componentToBreak);
+        try (RandomAccessFile file = new RandomAccessFile(filenameToCorrupt, 
"rw"))
+        {
+            file.setLength(3);
+        }
+
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).build()))
+        {
+            verifier.verify();
+            fail("should throw exception");
+        }
+        catch(CorruptSSTableException e)
+        {
+            //expected
+        }
+    }
+
+    @Test
+    public void testQuick() throws IOException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF);
+
+        fillCF(cfs, 2);
+
+        Util.getAll(Util.cmd(cfs).build());
+
+        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+
+
+        try (RandomAccessFile file = new 
RandomAccessFile(sstable.descriptor.filenameFor(Component.DIGEST), "rw"))
+        {
+            Long correctChecksum = Long.valueOf(file.readLine());
+
+            writeChecksum(++correctChecksum, 
sstable.descriptor.filenameFor(Component.DIGEST));
+        }
+
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).build()))
+        {
+            verifier.verify();
+            fail("Expected a CorruptSSTableException to be thrown");
+        }
+        catch (CorruptSSTableException err) {}
+
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).quick(true).build())) // with 
quick = true we don't verify the digest
+        {
+            verifier.verify();
+        }
+
+        try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().invokeDiskFailurePolicy(true).build()))
+        {
+            verifier.verify();
+            fail("Expected a RuntimeException to be thrown");
+        }
+        catch (CorruptSSTableException err) {}
+    }
+
 
     protected void fillCF(ColumnFamilyStore cfs, int partitionsPerSSTable)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65440409/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java 
b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 84f7158..e399c67 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -58,6 +58,8 @@ import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.junit.Assert.fail;
+
 /**
  * Tests backwards compatibility for SSTables
  */
@@ -196,17 +198,34 @@ public class LegacySSTableTest
     }
 
     @Test
-    public void verifyOldSSTables() throws Exception
+    public void testVerifyOldSSTables() throws IOException
     {
         for (String legacyVersion : legacyVersions)
         {
-            loadLegacyTables(legacyVersion);
             ColumnFamilyStore cfs = 
Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple",
 legacyVersion));
+            loadLegacyTable("legacy_%s_simple", legacyVersion);
+
             for (SSTableReader sstable : cfs.getLiveSSTables())
             {
-                try (Verifier verifier = new Verifier(cfs, sstable, false))
+                try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().checkVersion(true).build()))
+                {
+                    verifier.verify();
+                    if (!sstable.descriptor.version.isLatestVersion())
+                        fail("Verify should throw RuntimeException for old 
sstables "+sstable);
+                }
+                catch (RuntimeException e)
+                {}
+            }
+            // make sure we don't throw any exception if not checking version:
+            for (SSTableReader sstable : cfs.getLiveSSTables())
+            {
+                try (Verifier verifier = new Verifier(cfs, sstable, false, 
Verifier.options().checkVersion(false).build()))
+                {
+                    verifier.verify();
+                }
+                catch (Throwable e)
                 {
-                    verifier.verify(true);
+                    fail("Verify should throw RuntimeException for old 
sstables "+sstable);
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to