Repository: cassandra
Updated Branches:
  refs/heads/trunk 76ef78b7d -> 3b6c93828


Cleanups and improvements to nodetool import

Patch by marcuse; reviewed by Jordan West for CASSANDRA-14417


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3b6c9382
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3b6c9382
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3b6c9382

Branch: refs/heads/trunk
Commit: 3b6c93828c2d90b7bdadb4ff199dd70660e73188
Parents: 76ef78b
Author: Marcus Eriksson <marc...@apache.org>
Authored: Tue Apr 24 14:02:17 2018 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Fri May 18 11:15:40 2018 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 180 ++++++++++++++++---
 .../cassandra/db/ColumnFamilyStoreMBean.java    |  13 +-
 .../cassandra/db/compaction/Verifier.java       |  85 ++++++++-
 .../cassandra/service/StorageService.java       |  11 +-
 .../cassandra/service/StorageServiceMBean.java  |  15 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |   4 +-
 .../apache/cassandra/tools/nodetool/Import.java |   8 +-
 .../org/apache/cassandra/db/ImportTest.java     |  76 ++++++--
 .../org/apache/cassandra/db/VerifyTest.java     |  98 ++++++++++
 10 files changed, 421 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a36a990..5657567 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * nodetool import cleanup and improvements (CASSANDRA-14417)
  * Bump jackson version to >= 2.9.5 (CASSANDRA-14427)
  * Allow nodetool toppartitions without specifying table (CASSANDRA-14360)
  * Audit logging for database activity (CASSANDRA-12151)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 2bde9a8..122b783 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -30,6 +30,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
+import javax.annotation.Nullable;
 import javax.management.*;
 import javax.management.openmbean.*;
 
@@ -70,6 +71,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.KeyIterator;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -689,18 +691,18 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      * @param ksName The keyspace name
      * @param cfName The columnFamily name
      */
-    public static void loadNewSSTables(String ksName, String cfName, String 
srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, 
boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck)
+    public static void loadNewSSTables(String ksName, String cfName)
     {
         /** ks/cf existence checks will be done by open and getCFS methods for 
us */
         Keyspace keyspace = Keyspace.open(ksName);
-        keyspace.getColumnFamilyStore(cfName).loadNewSSTables(srcPath, 
resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, 
jbodCheck);
+        keyspace.getColumnFamilyStore(cfName).loadNewSSTables();
     }
 
-
     @Deprecated
-    public synchronized void loadNewSSTables()
+    public void loadNewSSTables()
     {
-        loadNewSSTables(null, true, false, false, false, false, false);
+        ImportOptions options = 
ImportOptions.options().resetLevel(true).build();
+        importNewSSTables(options);
     }
 
     /**
@@ -728,14 +730,12 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         long count = 0;
         int maxIndex = 0;
         long maxCount = 0;
-        try (RandomAccessReader primaryIndex = RandomAccessReader.open(new 
File(desc.filenameFor(Component.PRIMARY_INDEX))))
+
+        try (KeyIterator iter = new KeyIterator(desc, cfs.metadata()))
         {
-            long indexSize = primaryIndex.length();
-            while (primaryIndex.getFilePointer() != indexSize)
+            while (iter.hasNext())
             {
-                ByteBuffer key = 
ByteBufferUtil.readWithShortLength(primaryIndex);
-                RowIndexEntry.Serializer.skip(primaryIndex, desc.version);
-                DecoratedKey decoratedKey = 
cfs.metadata().partitioner.decorateKey(key);
+                DecoratedKey decoratedKey = iter.next();
                 if (clearCaches)
                     cfs.invalidateCachedPartition(decoratedKey);
                 if (shouldCountKeys)
@@ -774,22 +774,37 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     /**
      * #{@inheritDoc}
      */
-    public synchronized void loadNewSSTables(String srcPath, boolean 
resetLevel, boolean clearRepaired, boolean verifySSTables, boolean 
verifyTokens, boolean invalidateCaches, boolean jbodCheck)
+    public void importNewSSTables(String srcPath, boolean resetLevel, boolean 
clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean 
invalidateCaches, boolean jbodCheck, boolean extendedVerify)
+    {
+        ImportOptions options = ImportOptions.options(srcPath)
+                                             .resetLevel(resetLevel)
+                                             .clearRepaired(clearRepaired)
+                                             .verifySSTables(verifySSTables)
+                                             .verifyTokens(verifyTokens)
+                                             
.invalidateCaches(invalidateCaches)
+                                             .jbodCheck(jbodCheck)
+                                             
.extendedVerify(extendedVerify).build();
+
+        this.importNewSSTables(options);
+    }
+
+    @VisibleForTesting
+    synchronized void importNewSSTables(ImportOptions options)
     {
-        logger.info("Loading new SSTables for {}/{} from {}... (resetLevel = 
{}, clearRepaired = {}, verifySSTables = {}, verifyTokens = {}, 
invalidateCaches = {}, jbodCheck = {})",
-                    keyspace.getName(), name, srcPath, resetLevel, 
clearRepaired, verifySSTables, verifyTokens, invalidateCaches, jbodCheck);
+        logger.info("Loading new SSTables for {}/{}: {}",
+                    keyspace.getName(), name, options);
 
         File dir = null;
-        if (srcPath != null && !srcPath.isEmpty())
+        if (options.srcPath != null && !options.srcPath.isEmpty())
         {
-            dir = new File(srcPath);
+            dir = new File(options.srcPath);
             if (!dir.exists())
             {
-                throw new RuntimeException(String.format("Directory %s does 
not exist", srcPath));
+                throw new RuntimeException(String.format("Directory %s does 
not exist", options.srcPath));
             }
-            if (!Directories.verifyFullPermissions(dir, srcPath))
+            if (!Directories.verifyFullPermissions(dir, options.srcPath))
             {
-                throw new RuntimeException("Insufficient permissions on 
directory " + srcPath);
+                throw new RuntimeException("Insufficient permissions on 
directory " + options.srcPath);
             }
         }
 
@@ -802,7 +817,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                 directories.sstableLister(dir, 
Directories.OnTxnErr.IGNORE).skipTemporary(true);
 
         // verify first to avoid starting to copy sstables to the data 
directories and then have to abort.
-        if (verifySSTables || verifyTokens)
+        if (options.verifySSTables || options.verifyTokens)
         {
             for (Map.Entry<Descriptor, Set<Component>> entry : 
lister.list().entrySet())
             {
@@ -811,8 +826,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                 try
                 {
                     reader = SSTableReader.open(descriptor, entry.getValue(), 
metadata);
-                    Verifier.Options verifierOptions = 
Verifier.options().extendedVerification(verifyTokens)
-                                                                         
.checkOwnsTokens(verifyTokens)
+                    Verifier.Options verifierOptions = 
Verifier.options().extendedVerification(options.extendedVerify)
+                                                                         
.checkOwnsTokens(options.verifyTokens)
                                                                          
.invokeDiskFailurePolicy(false)
                                                                          
.mutateRepairStatus(false).build();
                     try (Verifier verifier = new Verifier(this, reader, false, 
verifierOptions))
@@ -849,18 +864,18 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             {
                 if (new File(descriptor.filenameFor(Component.STATS)).exists())
                 {
-                    if (resetLevel)
+                    if (options.resetLevel)
                     {
                         
descriptor.getMetadataSerializer().mutateLevel(descriptor, 0);
                     }
-                    if (clearRepaired)
+                    if (options.clearRepaired)
                     {
                         
descriptor.getMetadataSerializer().mutateRepaired(descriptor,
                                                                           
ActiveRepairService.UNREPAIRED_SSTABLE,
                                                                           
null);
                     }
                 }
-                targetDirectory = findBestDiskAndInvalidateCaches(this, 
descriptor, srcPath, invalidateCaches, jbodCheck);
+                targetDirectory = findBestDiskAndInvalidateCaches(this, 
descriptor, options.srcPath, options.invalidateCaches, options.jbodCheck);
                 logger.debug("{} will get copied to {}", descriptor, 
targetDirectory);
             }
             catch (IOException e)
@@ -898,7 +913,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                 for (SSTableReader sstable : newSSTables)
                     sstable.selfRef().release();
                 // log which sstables we have copied so far, so that the 
operator can remove them
-                if (srcPath != null)
+                if (options.srcPath != null)
                     logger.error("Aborting import of sstables. {} copied, {} 
was corrupt", newSSTables, newDescriptor);
                 throw new RuntimeException(newDescriptor+" is corrupt, can't 
import", t);
             }
@@ -2806,4 +2821,117 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     {
         return neverPurgeTombstones;
     }
+
+    public static class ImportOptions
+    {
+        public final String srcPath;
+        public final boolean resetLevel;
+        public final boolean clearRepaired;
+        public final boolean verifySSTables;
+        public final boolean verifyTokens;
+        public final boolean invalidateCaches;
+        public final boolean jbodCheck;
+        public final boolean extendedVerify;
+
+        public ImportOptions(String srcPath, boolean resetLevel, boolean 
clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean 
invalidateCaches, boolean jbodCheck, boolean extendedVerify)
+        {
+            this.srcPath = srcPath;
+            this.resetLevel = resetLevel;
+            this.clearRepaired = clearRepaired;
+            this.verifySSTables = verifySSTables;
+            this.verifyTokens = verifyTokens;
+            this.invalidateCaches = invalidateCaches;
+            this.jbodCheck = jbodCheck;
+            this.extendedVerify = extendedVerify;
+        }
+
+        public static Builder options(@Nullable String srcDir)
+        {
+            return new Builder(srcDir);
+        }
+
+        public static Builder options()
+        {
+            return options(null);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "ImportOptions{" +
+                   "srcPath='" + srcPath + '\'' +
+                   ", resetLevel=" + resetLevel +
+                   ", clearRepaired=" + clearRepaired +
+                   ", verifySSTables=" + verifySSTables +
+                   ", verifyTokens=" + verifyTokens +
+                   ", invalidateCaches=" + invalidateCaches +
+                   ", extendedVerify=" + extendedVerify +
+                   '}';
+        }
+
+        static class Builder
+        {
+            private final String srcPath;
+            private boolean resetLevel = false;
+            private boolean clearRepaired = false;
+            private boolean verifySSTables = false;
+            private boolean verifyTokens = false;
+            private boolean invalidateCaches = false;
+            private boolean jbodCheck = false;
+            private boolean extendedVerify = false;
+
+            private Builder(String srcPath)
+            {
+                this.srcPath = srcPath;
+            }
+
+            public Builder resetLevel(boolean value)
+            {
+                resetLevel = value;
+                return this;
+            }
+
+            public Builder clearRepaired(boolean value)
+            {
+                clearRepaired = value;
+                return this;
+            }
+
+            public Builder verifySSTables(boolean value)
+            {
+                verifySSTables = value;
+                return this;
+            }
+
+            public Builder verifyTokens(boolean value)
+            {
+                verifyTokens = value;
+                return this;
+            }
+
+            public Builder invalidateCaches(boolean value)
+            {
+                invalidateCaches = value;
+                return this;
+            }
+
+            public Builder jbodCheck(boolean value)
+            {
+                jbodCheck = value;
+                return this;
+            }
+
+            public Builder extendedVerify(boolean value)
+            {
+                extendedVerify = value;
+                return this;
+            }
+
+            public ImportOptions build()
+            {
+                return new ImportOptions(srcPath, resetLevel, clearRepaired, 
verifySSTables, verifyTokens, invalidateCaches, jbodCheck, extendedVerify);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 7f416bf..35557cb 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -142,7 +142,6 @@ public interface ColumnFamilyStoreMBean
      */
     public List<String> getSSTablesForKey(String key, boolean hexFormat);
 
-
     /**
      * Load new sstables from the given directory
      *
@@ -153,8 +152,16 @@ public interface ColumnFamilyStoreMBean
      * @param verifyTokens if the tokens in the new sstables should be 
verified that they are owned by the current node
      * @param invalidateCaches if row cache should be invalidated for the keys 
in the new sstables
      * @param jbodCheck if the new sstables should be placed 'optimally' - 
count tokens and move the sstable to the directory where it has the most keys
-     */
-    public void loadNewSSTables(String srcPath, boolean resetLevel, boolean 
clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean 
invalidateCaches, boolean jbodCheck);
+     * @param extendedVerify if we should run an extended verify checking all 
values in the new sstables
+     */
+    public void importNewSSTables(String srcPath,
+                                  boolean resetLevel,
+                                  boolean clearRepaired,
+                                  boolean verifySSTables,
+                                  boolean verifyTokens,
+                                  boolean invalidateCaches,
+                                  boolean jbodCheck,
+                                  boolean extendedVerify);
 
     @Deprecated
     public void loadNewSSTables();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java 
b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index d704c1f..a3321e3 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db.compaction;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 
 import org.apache.cassandra.db.*;
@@ -27,6 +28,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.IndexSummary;
+import org.apache.cassandra.io.sstable.KeyIterator;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
@@ -194,8 +196,31 @@ public class Verifier implements Closeable
             FileUtils.closeQuietly(validator);
         }
 
-        if ( !extended )
+        if (!extended)
+        {
+            if (options.checkOwnsTokens && !isOffline)
+            {
+                try (KeyIterator iter = new KeyIterator(sstable.descriptor, 
sstable.metadata()))
+                {
+                    List<Range<Token>> ownedRanges = 
Range.normalize(StorageService.instance.getLocalAndPendingRanges(cfs.metadata.keyspace));
+                    if (ownedRanges.isEmpty())
+                        return;
+                    RangeOwnHelper rangeOwnHelper = new 
RangeOwnHelper(ownedRanges);
+                    while (iter.hasNext())
+                    {
+                        DecoratedKey key = iter.next();
+                        rangeOwnHelper.check(key);
+                    }
+                }
+                catch (Throwable t)
+                {
+                    outputHandler.warn(t.getMessage());
+                    markAndThrow();
+                }
+            }
             return;
+        }
+
 
         outputHandler.output("Extended Verify requested, proceeding to inspect 
values");
 
@@ -210,7 +235,7 @@ public class Verifier implements Closeable
             }
 
             List<Range<Token>> ownedRanges = isOffline ? 
Collections.emptyList() : 
Range.normalize(StorageService.instance.getLocalAndPendingRanges(cfs.metadata().keyspace));
-            int rangeIndex = -1;
+            RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(ownedRanges);
             DecoratedKey prevKey = null;
 
             while (!dataFile.isEOF())
@@ -235,11 +260,14 @@ public class Verifier implements Closeable
 
                 if (options.checkOwnsTokens && ownedRanges.size() > 0)
                 {
-                    while (rangeIndex == -1 || 
!ownedRanges.get(rangeIndex).contains(key.getToken()))
+                    try
                     {
-                        rangeIndex++;
-                        if (rangeIndex > ownedRanges.size() - 1)
-                            throw new RuntimeException(String.format("Key %s 
in sstable %s not owned by local ranges %s", key, sstable, ownedRanges));
+                        rangeOwnHelper.check(key);
+                    }
+                    catch (Throwable t)
+                    {
+                        outputHandler.warn(String.format("Key %s in sstable %s 
not owned by local ranges %s", key, sstable, ownedRanges), t);
+                        markAndThrow();
                     }
                 }
 
@@ -307,6 +335,51 @@ public class Verifier implements Closeable
         outputHandler.output("Verify of " + sstable + " succeeded. All " + 
goodRows + " rows read successfully");
     }
 
+    /**
+     * Use the fact that check(..) is called with sorted tokens - we keep a 
pointer in to the normalized ranges
+     * and only bump the pointer if the key given is out of range. This is 
done to avoid calling .contains(..) many
+     * times for each key (with vnodes for example)
+     */
+    @VisibleForTesting
+    public static class RangeOwnHelper
+    {
+        private final List<Range<Token>> normalizedRanges;
+        private int rangeIndex = 0;
+        private DecoratedKey lastKey;
+
+        public RangeOwnHelper(List<Range<Token>> normalizedRanges)
+        {
+            this.normalizedRanges = normalizedRanges;
+        }
+
+        /**
+         * check if the given key is contained in any of the given ranges
+         *
+         * Must be called in sorted order - key should be increasing
+         *
+         * @param key the key
+         * @throws RuntimeException if the key is not contained
+         */
+        public void check(DecoratedKey key)
+        {
+            assert lastKey == null || key.compareTo(lastKey) > 0;
+            lastKey = key;
+
+            if (normalizedRanges.isEmpty()) // handle tests etc where we don't 
have any ranges
+                return;
+
+            if (rangeIndex > normalizedRanges.size() - 1)
+                throw new IllegalStateException("RangeOwnHelper can only be 
used to find the first out-of-range-token");
+
+            while (!normalizedRanges.get(rangeIndex).contains(key.getToken()))
+            {
+                rangeIndex++;
+                if (rangeIndex > normalizedRanges.size() - 1)
+                    throw new RuntimeException("Key "+key+" is not contained 
in the given ranges");
+            }
+        }
+    }
+
     private void deserializeIndex(SSTableReader sstable) throws IOException
     {
         try (RandomAccessReader primaryIndex = RandomAccessReader.open(new 
File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index a62af6f..8570f10 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -5241,20 +5241,15 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         LifecycleTransaction.rescheduleFailedDeletions();
     }
 
-    @Deprecated
-    public void loadNewSSTables(String ksName, String cfName)
-    {
-        ColumnFamilyStore.loadNewSSTables(ksName, cfName, null, true, false, 
false, false, false, false);
-    }
-
     /**
      * #{@inheritDoc}
      */
-    public void importNewSSTables(String ksName, String cfName, String 
srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, 
boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck)
+    @Deprecated
+    public void loadNewSSTables(String ksName, String cfName)
     {
         if (!isInitialized())
             throw new RuntimeException("Not yet initialized, can't load new 
sstables");
-        ColumnFamilyStore.loadNewSSTables(ksName, cfName, srcPath, resetLevel, 
clearRepaired, verifySSTables, verifyTokens, invalidateCaches, jbodCheck);
+        ColumnFamilyStore.loadNewSSTables(ksName, cfName);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 1282105..ab165b3 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -31,6 +31,7 @@ import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
+import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.metrics.TableMetrics.Sampler;
 
@@ -588,20 +589,16 @@ public interface StorageServiceMBean extends 
NotificationEmitter
 
     public void rescheduleFailedDeletions();
 
-    @Deprecated
-    public void loadNewSSTables(String ksName, String tableName);
-
     /**
-     * Import new SSTables to the given keyspace/table
+     * Load new SSTables to the given keyspace/table
      *
      * @param ksName The parent keyspace name
      * @param tableName The ColumnFamily name where SSTables belong
-     * @param srcPath The path where the SSTables will be loaded from
-     * @param resetLevel reset the level to 0 on the new sstables
-     * @param clearRepaired remove any repaired information from the new 
sstables
-     * @param verifyTokens verify that all tokens are owned by the node
+     *
+     * @see ColumnFamilyStoreMBean#loadNewSSTables()
      */
-    public void importNewSSTables(String ksName, String tableName, String 
srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, 
boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck);
+    @Deprecated
+    public void loadNewSSTables(String ksName, String tableName);
 
     /**
      * Return a List of Tokens representing a sample of keys across all 
ColumnFamilyStores.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 7cec99d..bd294fd 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1182,9 +1182,9 @@ public class NodeProbe implements AutoCloseable
         ssProxy.loadNewSSTables(ksName, cfName);
     }
 
-    public void importNewSSTables(String ksName, String cfName, String 
srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, 
boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck)
+    public void importNewSSTables(String ksName, String cfName, String 
srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, 
boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck, boolean 
extendedVerify)
     {
-        ssProxy.importNewSSTables(ksName, cfName, srcPath, resetLevel, 
clearRepaired, verifySSTables, verifyTokens, invalidateCaches, jbodCheck);
+        getCfsProxy(ksName, cfName).importNewSSTables(srcPath, resetLevel, 
clearRepaired, verifySSTables, verifyTokens, invalidateCaches, jbodCheck, 
extendedVerify);
     }
 
     public void rebuildIndex(String ksName, String cfName, String... idxNames)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/src/java/org/apache/cassandra/tools/nodetool/Import.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Import.java 
b/src/java/org/apache/cassandra/tools/nodetool/Import.java
index 1a6a69b..7531eb0 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Import.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Import.java
@@ -70,6 +70,11 @@ public class Import extends NodeToolCmd
             description = "Do a quick import without verifying sstables, 
clearing row cache or checking in which data directory to put the file")
     private boolean quick = false;
 
+    @Option(title = "extended_verify",
+            name = {"-e", "--extended-verify"},
+            description = "Run an extended verify, verifying all values in the 
new sstables")
+    private boolean extendedVerify = false;
+
     @Override
     public void execute(NodeProbe probe)
     {
@@ -88,7 +93,8 @@ public class Import extends NodeToolCmd
             noInvalidateCaches = true;
             noVerify = true;
             noJBODCheck = true;
+            extendedVerify = false;
         }
-        probe.importNewSSTables(args.get(0), args.get(1), args.get(2), 
!keepLevel, !keepRepaired, !noVerify, !noVerifyTokens, !noInvalidateCaches, 
!noJBODCheck);
+        probe.importNewSSTables(args.get(0), args.get(1), args.get(2), 
!keepLevel, !keepRepaired, !noVerify, !noVerifyTokens, !noInvalidateCaches, 
!noJBODCheck, extendedVerify);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/test/unit/org/apache/cassandra/db/ImportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ImportTest.java 
b/test/unit/org/apache/cassandra/db/ImportTest.java
index 197d79d..c7fc14e 100644
--- a/test/unit/org/apache/cassandra/db/ImportTest.java
+++ b/test/unit/org/apache/cassandra/db/ImportTest.java
@@ -58,7 +58,6 @@ import static org.junit.Assert.fail;
 
 public class ImportTest extends CQLTester
 {
-
     @Test
     public void basicImportTest() throws Throwable
     {
@@ -73,7 +72,8 @@ public class ImportTest extends CQLTester
 
         assertEquals(0, execute("select * from %s").size());
 
-        getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), 
false, false, false, false, false, false);
+        ColumnFamilyStore.ImportOptions options = 
ColumnFamilyStore.ImportOptions.options(backupdir.toString()).build();
+        getCurrentColumnFamilyStore().importNewSSTables(options);
 
         assertEquals(10, execute("select * from %s").size());
     }
@@ -102,21 +102,25 @@ public class ImportTest extends CQLTester
         Set<SSTableReader> sstables = 
getCurrentColumnFamilyStore().getLiveSSTables();
         getCurrentColumnFamilyStore().clearUnsafe();
         for (SSTableReader sstable : sstables)
-            
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 123);
+            
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 8);
         File backupdir = moveToBackupDir(sstables);
         assertEquals(0, execute("select * from %s").size());
 
-        getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), 
false, false, false, false, false, false);
+        ColumnFamilyStore.ImportOptions options = 
ColumnFamilyStore.ImportOptions.options(backupdir.toString()).build();
+        getCurrentColumnFamilyStore().importNewSSTables(options);
 
         assertEquals(10, execute("select * from %s").size());
         sstables = getCurrentColumnFamilyStore().getLiveSSTables();
         assertEquals(1, sstables.size());
         for (SSTableReader sstable : sstables)
-            assertEquals(123, sstable.getSSTableLevel());
+            assertEquals(8, sstable.getSSTableLevel());
 
         getCurrentColumnFamilyStore().clearUnsafe();
         backupdir = moveToBackupDir(sstables);
-        getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), 
true, false, false, false, false, false);
+
+        options = 
ColumnFamilyStore.ImportOptions.options(backupdir.toString()).resetLevel(true).build();
+        getCurrentColumnFamilyStore().importNewSSTables(options);
+
         sstables = getCurrentColumnFamilyStore().getLiveSSTables();
         assertEquals(1, sstables.size());
         for (SSTableReader sstable : 
getCurrentColumnFamilyStore().getLiveSSTables())
@@ -140,7 +144,8 @@ public class ImportTest extends CQLTester
 
         assertEquals(0, execute("select * from %s").size());
 
-        getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), 
false, false, false, false, false, false);
+        ColumnFamilyStore.ImportOptions options = 
ColumnFamilyStore.ImportOptions.options(backupdir.toString()).build();
+        getCurrentColumnFamilyStore().importNewSSTables(options);
 
         assertEquals(10, execute("select * from %s").size());
         sstables = getCurrentColumnFamilyStore().getLiveSSTables();
@@ -150,7 +155,9 @@ public class ImportTest extends CQLTester
 
         getCurrentColumnFamilyStore().clearUnsafe();
         backupdir = moveToBackupDir(sstables);
-        getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), 
false, true, false, false, false, false);
+
+        options = 
ColumnFamilyStore.ImportOptions.options(backupdir.toString()).clearRepaired(true).build();
+        getCurrentColumnFamilyStore().importNewSSTables(options);
         sstables = getCurrentColumnFamilyStore().getLiveSSTables();
         assertEquals(1, sstables.size());
         for (SSTableReader sstable : 
getCurrentColumnFamilyStore().getLiveSSTables())
@@ -238,7 +245,7 @@ public class ImportTest extends CQLTester
         File dir = moveToBackupDir(toMove);
 
         MockCFS mock = new MockCFS(getCurrentColumnFamilyStore(), dirs);
-        mock.loadNewSSTables(dir.toString(), false, false, false, false, 
false, false);
+        
mock.importNewSSTables(ColumnFamilyStore.ImportOptions.options(dir.toString()).build());
         assertEquals(1, mock.getLiveSSTables().size());
         for (SSTableReader sstable : mock.getLiveSSTables())
         {
@@ -272,8 +279,9 @@ public class ImportTest extends CQLTester
         File backupdir = moveToBackupDir(sstables);
         try
         {
-            
getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, 
false, true, false, false, false);
-            fail("loadNewSSTables should fail!");
+            ColumnFamilyStore.ImportOptions options = 
ColumnFamilyStore.ImportOptions.options(backupdir.toString()).verifySSTables(true).build();
+            getCurrentColumnFamilyStore().importNewSSTables(options);
+            fail("importNewSSTables should fail!");
         }
         catch (Throwable t)
         {
@@ -307,7 +315,41 @@ public class ImportTest extends CQLTester
         File backupdir = moveToBackupDir(sstables);
         try
         {
-            
getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, 
false, true, true, false, false);
+            ColumnFamilyStore.ImportOptions options = 
ColumnFamilyStore.ImportOptions.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).build();
+            getCurrentColumnFamilyStore().importNewSSTables(options);
+        }
+        finally
+        {
+            tmd.clearUnsafe();
+        }
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testImportOutOfRangeExtendedVerify() throws Throwable
+    {
+        createTable("create table %s (id int primary key, d int)");
+        for (int i = 0; i < 1000; i++)
+            execute("insert into %s (id, d) values (?, ?)", i, i);
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        Set<SSTableReader> sstables = 
getCurrentColumnFamilyStore().getLiveSSTables();
+
+        getCurrentColumnFamilyStore().clearUnsafe();
+
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), 
InetAddressAndPort.getByName("127.0.0.1"));
+        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), 
InetAddressAndPort.getByName("127.0.0.2"));
+        tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), 
InetAddressAndPort.getByName("127.0.0.3"));
+
+
+        File backupdir = moveToBackupDir(sstables);
+        try
+        {
+            ColumnFamilyStore.ImportOptions options = 
ColumnFamilyStore.ImportOptions.options(backupdir.toString())
+                                                                               
      .verifySSTables(true)
+                                                                               
      .verifyTokens(true)
+                                                                               
      .extendedVerify(true).build();
+            getCurrentColumnFamilyStore().importNewSSTables(options);
         }
         finally
         {
@@ -361,13 +403,16 @@ public class ImportTest extends CQLTester
         File backupdir = 
moveToBackupDir(Collections.singleton(sstableToImport));
         // make sure we don't wipe caches with invalidateCaches = false:
         Set<SSTableReader> beforeFirstImport = 
getCurrentColumnFamilyStore().getLiveSSTables();
-        getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), 
false, false, true, true, false, false);
+
+        ColumnFamilyStore.ImportOptions options = 
ColumnFamilyStore.ImportOptions.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).build();
+        getCurrentColumnFamilyStore().importNewSSTables(options);
         assertEquals(20, CacheService.instance.rowCache.size());
         Set<SSTableReader> toMove = 
Sets.difference(getCurrentColumnFamilyStore().getLiveSSTables(), 
beforeFirstImport);
         getCurrentColumnFamilyStore().clearUnsafe();
         // move away the sstable we just imported again:
         backupdir = moveToBackupDir(toMove);
-        getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), 
false, false, true, true, true, false);
+        options = 
ColumnFamilyStore.ImportOptions.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).invalidateCaches(true).build();
+        getCurrentColumnFamilyStore().importNewSSTables(options);
         assertEquals(10, CacheService.instance.rowCache.size());
         it = CacheService.instance.rowCache.keyIterator();
         while (it.hasNext())
@@ -388,7 +433,8 @@ public class ImportTest extends CQLTester
         getCurrentColumnFamilyStore().forceBlockingFlush();
         CacheService.instance.setRowCacheCapacityInMB(1);
         getCurrentColumnFamilyStore().clearUnsafe();
-        getCurrentColumnFamilyStore().loadNewSSTables(null, false, false, 
false, false, true, false);
+        ColumnFamilyStore.ImportOptions options = 
ColumnFamilyStore.ImportOptions.options(null).invalidateCaches(true).build();
+        getCurrentColumnFamilyStore().importNewSSTables(options);
         assertEquals(1, 
getCurrentColumnFamilyStore().getLiveSSTables().size());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/test/unit/org/apache/cassandra/db/VerifyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java 
b/test/unit/org/apache/cassandra/db/VerifyTest.java
index 42c4fd3..c9dbe14 100644
--- a/test/unit/org/apache/cassandra/db/VerifyTest.java
+++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
@@ -29,6 +29,9 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.Verifier;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.FSWriteError;
@@ -51,7 +54,9 @@ import org.junit.runner.RunWith;
 import java.io.*;
 import java.net.UnknownHostException;
 import java.nio.file.Files;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
@@ -593,6 +598,99 @@ public class VerifyTest
         catch (CorruptSSTableException err) {}
     }
 
+    @Test
+    public void testRangeOwnHelper()
+    {
+        List<Range<Token>> normalized = new ArrayList<>();
+        normalized.add(r(Long.MIN_VALUE, Long.MIN_VALUE + 1));
+        normalized.add(r(Long.MIN_VALUE + 5, Long.MIN_VALUE + 6));
+        normalized.add(r(Long.MIN_VALUE + 10, Long.MIN_VALUE + 11));
+        normalized.add(r(0,10));
+        normalized.add(r(10,11));
+        normalized.add(r(20,25));
+        normalized.add(r(26,200));
+
+        Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
+
+        roh.check(dk(1));
+        roh.check(dk(10));
+        roh.check(dk(11));
+        roh.check(dk(21));
+        roh.check(dk(25));
+        boolean gotException = false;
+        try
+        {
+            roh.check(dk(26));
+        }
+        catch (Throwable t)
+        {
+            gotException = true;
+        }
+        assertTrue(gotException);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testRangeOwnHelperBadToken()
+    {
+        List<Range<Token>> normalized = new ArrayList<>();
+        normalized.add(r(0,10));
+        Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
+        roh.check(dk(1));
+        // call with smaller token to get exception
+        roh.check(dk(0));
+    }
+
+
+    @Test
+    public void testRangeOwnHelperNormalize()
+    {
+        List<Range<Token>> normalized = 
Range.normalize(Collections.singletonList(r(0,0)));
+        Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
+        roh.check(dk(Long.MIN_VALUE));
+        roh.check(dk(0));
+        roh.check(dk(Long.MAX_VALUE));
+    }
+
+    @Test
+    public void testRangeOwnHelperNormalizeWrap()
+    {
+        List<Range<Token>> normalized = 
Range.normalize(Collections.singletonList(r(Long.MAX_VALUE - 
1000,Long.MIN_VALUE + 1000)));
+        Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
+        roh.check(dk(Long.MIN_VALUE));
+        roh.check(dk(Long.MAX_VALUE));
+        boolean gotException = false;
+        try
+        {
+            roh.check(dk(26));
+        }
+        catch (Throwable t)
+        {
+            gotException = true;
+        }
+        assertTrue(gotException);
+    }
+
+    @Test
+    public void testEmptyRanges()
+    {
+        new Verifier.RangeOwnHelper(Collections.emptyList()).check(dk(1));
+    }
+
+    private DecoratedKey dk(long l)
+    {
+        return new BufferDecoratedKey(t(l), ByteBufferUtil.EMPTY_BYTE_BUFFER);
+    }
+
+    private Range<Token> r(long s, long e)
+    {
+        return new Range<>(t(s), t(e));
+    }
+
+    private Token t(long t)
+    {
+        return new Murmur3Partitioner.LongToken(t);
+    }
+
 
     protected void fillCF(ColumnFamilyStore cfs, int partitionsPerSSTable)
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to