Repository: cassandra Updated Branches: refs/heads/trunk 76ef78b7d -> 3b6c93828
Cleanups and improvements to nodetool import Patch by marcuse; reviewed by Jordan West for CASSANDRA-14417 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3b6c9382 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3b6c9382 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3b6c9382 Branch: refs/heads/trunk Commit: 3b6c93828c2d90b7bdadb4ff199dd70660e73188 Parents: 76ef78b Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Apr 24 14:02:17 2018 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Fri May 18 11:15:40 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 180 ++++++++++++++++--- .../cassandra/db/ColumnFamilyStoreMBean.java | 13 +- .../cassandra/db/compaction/Verifier.java | 85 ++++++++- .../cassandra/service/StorageService.java | 11 +- .../cassandra/service/StorageServiceMBean.java | 15 +- .../org/apache/cassandra/tools/NodeProbe.java | 4 +- .../apache/cassandra/tools/nodetool/Import.java | 8 +- .../org/apache/cassandra/db/ImportTest.java | 76 ++++++-- .../org/apache/cassandra/db/VerifyTest.java | 98 ++++++++++ 10 files changed, 421 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a36a990..5657567 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * nodetool import cleanup and improvements (CASSANDRA-14417) * Bump jackson version to >= 2.9.5 (CASSANDRA-14427) * Allow nodetool toppartitions without specifying table (CASSANDRA-14360) * Audit logging for database activity (CASSANDRA-12151) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 2bde9a8..122b783 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -30,6 +30,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import javax.annotation.Nullable; import javax.management.*; import javax.management.openmbean.*; @@ -70,6 +71,7 @@ import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.KeyIterator; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.*; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; @@ -689,18 +691,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * @param ksName The keyspace name * @param cfName The columnFamily name */ - public static void loadNewSSTables(String ksName, String cfName, String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck) + public static void loadNewSSTables(String ksName, String cfName) { /** ks/cf existence checks will be done by open and getCFS methods for us */ Keyspace keyspace = Keyspace.open(ksName); - keyspace.getColumnFamilyStore(cfName).loadNewSSTables(srcPath, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, jbodCheck); + keyspace.getColumnFamilyStore(cfName).loadNewSSTables(); } - @Deprecated - public synchronized void loadNewSSTables() + public void loadNewSSTables() { - loadNewSSTables(null, true, false, false, false, false, false); + ImportOptions options = ImportOptions.options().resetLevel(true).build(); + importNewSSTables(options); } /** @@ -728,14 +730,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean long count = 0; int maxIndex = 0; long maxCount = 0; - try (RandomAccessReader primaryIndex = RandomAccessReader.open(new File(desc.filenameFor(Component.PRIMARY_INDEX)))) + + try (KeyIterator iter = new KeyIterator(desc, cfs.metadata())) { - long indexSize = primaryIndex.length(); - while (primaryIndex.getFilePointer() != indexSize) + while (iter.hasNext()) { - ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); - RowIndexEntry.Serializer.skip(primaryIndex, desc.version); - DecoratedKey decoratedKey = cfs.metadata().partitioner.decorateKey(key); + DecoratedKey decoratedKey = iter.next(); if (clearCaches) cfs.invalidateCachedPartition(decoratedKey); if (shouldCountKeys) @@ -774,22 +774,37 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean /** * #{@inheritDoc} */ - public synchronized void loadNewSSTables(String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck) + public void importNewSSTables(String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck, boolean extendedVerify) + { + ImportOptions options = ImportOptions.options(srcPath) + .resetLevel(resetLevel) + .clearRepaired(clearRepaired) + .verifySSTables(verifySSTables) + .verifyTokens(verifyTokens) + .invalidateCaches(invalidateCaches) + .jbodCheck(jbodCheck) + .extendedVerify(extendedVerify).build(); + + this.importNewSSTables(options); + } + + @VisibleForTesting + synchronized void importNewSSTables(ImportOptions options) { - logger.info("Loading new SSTables for {}/{} from {}... (resetLevel = {}, clearRepaired = {}, verifySSTables = {}, verifyTokens = {}, invalidateCaches = {}, jbodCheck = {})", - keyspace.getName(), name, srcPath, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, jbodCheck); + logger.info("Loading new SSTables for {}/{}: {}", + keyspace.getName(), name, options); File dir = null; - if (srcPath != null && !srcPath.isEmpty()) + if (options.srcPath != null && !options.srcPath.isEmpty()) { - dir = new File(srcPath); + dir = new File(options.srcPath); if (!dir.exists()) { - throw new RuntimeException(String.format("Directory %s does not exist", srcPath)); + throw new RuntimeException(String.format("Directory %s does not exist", options.srcPath)); } - if (!Directories.verifyFullPermissions(dir, srcPath)) + if (!Directories.verifyFullPermissions(dir, options.srcPath)) { - throw new RuntimeException("Insufficient permissions on directory " + srcPath); + throw new RuntimeException("Insufficient permissions on directory " + options.srcPath); } } @@ -802,7 +817,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean directories.sstableLister(dir, Directories.OnTxnErr.IGNORE).skipTemporary(true); // verify first to avoid starting to copy sstables to the data directories and then have to abort. - if (verifySSTables || verifyTokens) + if (options.verifySSTables || options.verifyTokens) { for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet()) { @@ -811,8 +826,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean try { reader = SSTableReader.open(descriptor, entry.getValue(), metadata); - Verifier.Options verifierOptions = Verifier.options().extendedVerification(verifyTokens) - .checkOwnsTokens(verifyTokens) + Verifier.Options verifierOptions = Verifier.options().extendedVerification(options.extendedVerify) + .checkOwnsTokens(options.verifyTokens) .invokeDiskFailurePolicy(false) .mutateRepairStatus(false).build(); try (Verifier verifier = new Verifier(this, reader, false, verifierOptions)) @@ -849,18 +864,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { if (new File(descriptor.filenameFor(Component.STATS)).exists()) { - if (resetLevel) + if (options.resetLevel) { descriptor.getMetadataSerializer().mutateLevel(descriptor, 0); } - if (clearRepaired) + if (options.clearRepaired) { descriptor.getMetadataSerializer().mutateRepaired(descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, null); } } - targetDirectory = findBestDiskAndInvalidateCaches(this, descriptor, srcPath, invalidateCaches, jbodCheck); + targetDirectory = findBestDiskAndInvalidateCaches(this, descriptor, options.srcPath, options.invalidateCaches, options.jbodCheck); logger.debug("{} will get copied to {}", descriptor, targetDirectory); } catch (IOException e) @@ -898,7 +913,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean for (SSTableReader sstable : newSSTables) sstable.selfRef().release(); // log which sstables we have copied so far, so that the operator can remove them - if (srcPath != null) + if (options.srcPath != null) logger.error("Aborting import of sstables. {} copied, {} was corrupt", newSSTables, newDescriptor); throw new RuntimeException(newDescriptor+" is corrupt, can't import", t); } @@ -2806,4 +2821,117 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { return neverPurgeTombstones; } + + public static class ImportOptions + { + public final String srcPath; + public final boolean resetLevel; + public final boolean clearRepaired; + public final boolean verifySSTables; + public final boolean verifyTokens; + public final boolean invalidateCaches; + public final boolean jbodCheck; + public final boolean extendedVerify; + + public ImportOptions(String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck, boolean extendedVerify) + { + this.srcPath = srcPath; + this.resetLevel = resetLevel; + this.clearRepaired = clearRepaired; + this.verifySSTables = verifySSTables; + this.verifyTokens = verifyTokens; + this.invalidateCaches = invalidateCaches; + this.jbodCheck = jbodCheck; + this.extendedVerify = extendedVerify; + } + + public static Builder options(@Nullable String srcDir) + { + return new Builder(srcDir); + } + + public static Builder options() + { + return options(null); + } + + @Override + public String toString() + { + return "ImportOptions{" + + "srcPath='" + srcPath + '\'' + + ", resetLevel=" + resetLevel + + ", clearRepaired=" + clearRepaired + + ", verifySSTables=" + verifySSTables + + ", verifyTokens=" + verifyTokens + + ", invalidateCaches=" + invalidateCaches + + ", extendedVerify=" + extendedVerify + + '}'; + } + + static class Builder + { + private final String srcPath; + private boolean resetLevel = false; + private boolean clearRepaired = false; + private boolean verifySSTables = false; + private boolean verifyTokens = false; + private boolean invalidateCaches = false; + private boolean jbodCheck = false; + private boolean extendedVerify = false; + + private Builder(String srcPath) + { + this.srcPath = srcPath; + } + + public Builder resetLevel(boolean value) + { + resetLevel = value; + return this; + } + + public Builder clearRepaired(boolean value) + { + clearRepaired = value; + return this; + } + + public Builder verifySSTables(boolean value) + { + verifySSTables = value; + return this; + } + + public Builder verifyTokens(boolean value) + { + verifyTokens = value; + return this; + } + + public Builder invalidateCaches(boolean value) + { + invalidateCaches = value; + return this; + } + + public Builder jbodCheck(boolean value) + { + jbodCheck = value; + return this; + } + + public Builder extendedVerify(boolean value) + { + extendedVerify = value; + return this; + } + + public ImportOptions build() + { + return new ImportOptions(srcPath, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, jbodCheck, extendedVerify); + } + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java index 7f416bf..35557cb 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java @@ -142,7 +142,6 @@ public interface ColumnFamilyStoreMBean */ public List<String> getSSTablesForKey(String key, boolean hexFormat); - /** * Load new sstables from the given directory * @@ -153,8 +152,16 @@ public interface ColumnFamilyStoreMBean * @param verifyTokens if the tokens in the new sstables should be verified that they are owned by the current node * @param invalidateCaches if row cache should be invalidated for the keys in the new sstables * @param jbodCheck if the new sstables should be placed 'optimally' - count tokens and move the sstable to the directory where it has the most keys - */ - public void loadNewSSTables(String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck); + * @param extendedVerify if we should run an extended verify checking all values in the new sstables + */ + public void importNewSSTables(String srcPath, + boolean resetLevel, + boolean clearRepaired, + boolean verifySSTables, + boolean verifyTokens, + boolean invalidateCaches, + boolean jbodCheck, + boolean extendedVerify); @Deprecated public void loadNewSSTables(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/src/java/org/apache/cassandra/db/compaction/Verifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java index d704c1f..a3321e3 100644 --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.db.compaction; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import org.apache.cassandra.db.*; @@ -27,6 +28,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.IndexSummary; +import org.apache.cassandra.io.sstable.KeyIterator; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.MetadataComponent; @@ -194,8 +196,31 @@ public class Verifier implements Closeable FileUtils.closeQuietly(validator); } - if ( !extended ) + if (!extended) + { + if (options.checkOwnsTokens && !isOffline) + { + try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata())) + { + List<Range<Token>> ownedRanges = Range.normalize(StorageService.instance.getLocalAndPendingRanges(cfs.metadata.keyspace)); + if (ownedRanges.isEmpty()) + return; + RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(ownedRanges); + while (iter.hasNext()) + { + DecoratedKey key = iter.next(); + rangeOwnHelper.check(key); + } + } + catch (Throwable t) + { + outputHandler.warn(t.getMessage()); + markAndThrow(); + } + } return; + } + outputHandler.output("Extended Verify requested, proceeding to inspect values"); @@ -210,7 +235,7 @@ public class Verifier implements Closeable } List<Range<Token>> ownedRanges = isOffline ? Collections.emptyList() : Range.normalize(StorageService.instance.getLocalAndPendingRanges(cfs.metadata().keyspace)); - int rangeIndex = -1; + RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(ownedRanges); DecoratedKey prevKey = null; while (!dataFile.isEOF()) @@ -235,11 +260,14 @@ public class Verifier implements Closeable if (options.checkOwnsTokens && ownedRanges.size() > 0) { - while (rangeIndex == -1 || !ownedRanges.get(rangeIndex).contains(key.getToken())) + try { - rangeIndex++; - if (rangeIndex > ownedRanges.size() - 1) - throw new RuntimeException(String.format("Key %s in sstable %s not owned by local ranges %s", key, sstable, ownedRanges)); + rangeOwnHelper.check(key); + } + catch (Throwable t) + { + outputHandler.warn(String.format("Key %s in sstable %s not owned by local ranges %s", key, sstable, ownedRanges), t); + markAndThrow(); } } @@ -307,6 +335,51 @@ public class Verifier implements Closeable outputHandler.output("Verify of " + sstable + " succeeded. All " + goodRows + " rows read successfully"); } + /** + * Use the fact that check(..) is called with sorted tokens - we keep a pointer in to the normalized ranges + * and only bump the pointer if the key given is out of range. This is done to avoid calling .contains(..) many + * times for each key (with vnodes for example) + */ + @VisibleForTesting + public static class RangeOwnHelper + { + private final List<Range<Token>> normalizedRanges; + private int rangeIndex = 0; + private DecoratedKey lastKey; + + public RangeOwnHelper(List<Range<Token>> normalizedRanges) + { + this.normalizedRanges = normalizedRanges; + } + + /** + * check if the given key is contained in any of the given ranges + * + * Must be called in sorted order - key should be increasing + * + * @param key the key + * @throws RuntimeException if the key is not contained + */ + public void check(DecoratedKey key) + { + assert lastKey == null || key.compareTo(lastKey) > 0; + lastKey = key; + + if (normalizedRanges.isEmpty()) // handle tests etc where we don't have any ranges + return; + + if (rangeIndex > normalizedRanges.size() - 1) + throw new IllegalStateException("RangeOwnHelper can only be used to find the first out-of-range-token"); + + while (!normalizedRanges.get(rangeIndex).contains(key.getToken())) + { + rangeIndex++; + if (rangeIndex > normalizedRanges.size() - 1) + throw new RuntimeException("Key "+key+" is not contained in the given ranges"); + } + } + } + private void deserializeIndex(SSTableReader sstable) throws IOException { try (RandomAccessReader primaryIndex = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)))) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a62af6f..8570f10 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -5241,20 +5241,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE LifecycleTransaction.rescheduleFailedDeletions(); } - @Deprecated - public void loadNewSSTables(String ksName, String cfName) - { - ColumnFamilyStore.loadNewSSTables(ksName, cfName, null, true, false, false, false, false, false); - } - /** * #{@inheritDoc} */ - public void importNewSSTables(String ksName, String cfName, String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck) + @Deprecated + public void loadNewSSTables(String ksName, String cfName) { if (!isInitialized()) throw new RuntimeException("Not yet initialized, can't load new sstables"); - ColumnFamilyStore.loadNewSSTables(ksName, cfName, srcPath, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, jbodCheck); + ColumnFamilyStore.loadNewSSTables(ksName, cfName); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 1282105..ab165b3 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -31,6 +31,7 @@ import javax.management.openmbean.CompositeData; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; +import org.apache.cassandra.db.ColumnFamilyStoreMBean; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.metrics.TableMetrics.Sampler; @@ -588,20 +589,16 @@ public interface StorageServiceMBean extends NotificationEmitter public void rescheduleFailedDeletions(); - @Deprecated - public void loadNewSSTables(String ksName, String tableName); - /** - * Import new SSTables to the given keyspace/table + * Load new SSTables to the given keyspace/table * * @param ksName The parent keyspace name * @param tableName The ColumnFamily name where SSTables belong - * @param srcPath The path where the SSTables will be loaded from - * @param resetLevel reset the level to 0 on the new sstables - * @param clearRepaired remove any repaired information from the new sstables - * @param verifyTokens verify that all tokens are owned by the node + * + * @see ColumnFamilyStoreMBean#loadNewSSTables() */ - public void importNewSSTables(String ksName, String tableName, String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck); + @Deprecated + public void loadNewSSTables(String ksName, String tableName); /** * Return a List of Tokens representing a sample of keys across all ColumnFamilyStores. http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 7cec99d..bd294fd 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1182,9 +1182,9 @@ public class NodeProbe implements AutoCloseable ssProxy.loadNewSSTables(ksName, cfName); } - public void importNewSSTables(String ksName, String cfName, String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck) + public void importNewSSTables(String ksName, String cfName, String srcPath, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean jbodCheck, boolean extendedVerify) { - ssProxy.importNewSSTables(ksName, cfName, srcPath, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, jbodCheck); + getCfsProxy(ksName, cfName).importNewSSTables(srcPath, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, jbodCheck, extendedVerify); } public void rebuildIndex(String ksName, String cfName, String... idxNames) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/src/java/org/apache/cassandra/tools/nodetool/Import.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Import.java b/src/java/org/apache/cassandra/tools/nodetool/Import.java index 1a6a69b..7531eb0 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Import.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Import.java @@ -70,6 +70,11 @@ public class Import extends NodeToolCmd description = "Do a quick import without verifying sstables, clearing row cache or checking in which data directory to put the file") private boolean quick = false; + @Option(title = "extended_verify", + name = {"-e", "--extended-verify"}, + description = "Run an extended verify, verifying all values in the new sstables") + private boolean extendedVerify = false; + @Override public void execute(NodeProbe probe) { @@ -88,7 +93,8 @@ public class Import extends NodeToolCmd noInvalidateCaches = true; noVerify = true; noJBODCheck = true; + extendedVerify = false; } - probe.importNewSSTables(args.get(0), args.get(1), args.get(2), !keepLevel, !keepRepaired, !noVerify, !noVerifyTokens, !noInvalidateCaches, !noJBODCheck); + probe.importNewSSTables(args.get(0), args.get(1), args.get(2), !keepLevel, !keepRepaired, !noVerify, !noVerifyTokens, !noInvalidateCaches, !noJBODCheck, extendedVerify); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/test/unit/org/apache/cassandra/db/ImportTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ImportTest.java b/test/unit/org/apache/cassandra/db/ImportTest.java index 197d79d..c7fc14e 100644 --- a/test/unit/org/apache/cassandra/db/ImportTest.java +++ b/test/unit/org/apache/cassandra/db/ImportTest.java @@ -58,7 +58,6 @@ import static org.junit.Assert.fail; public class ImportTest extends CQLTester { - @Test public void basicImportTest() throws Throwable { @@ -73,7 +72,8 @@ public class ImportTest extends CQLTester assertEquals(0, execute("select * from %s").size()); - getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, false, false, false, false, false); + ColumnFamilyStore.ImportOptions options = ColumnFamilyStore.ImportOptions.options(backupdir.toString()).build(); + getCurrentColumnFamilyStore().importNewSSTables(options); assertEquals(10, execute("select * from %s").size()); } @@ -102,21 +102,25 @@ public class ImportTest extends CQLTester Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); getCurrentColumnFamilyStore().clearUnsafe(); for (SSTableReader sstable : sstables) - sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 123); + sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 8); File backupdir = moveToBackupDir(sstables); assertEquals(0, execute("select * from %s").size()); - getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, false, false, false, false, false); + ColumnFamilyStore.ImportOptions options = ColumnFamilyStore.ImportOptions.options(backupdir.toString()).build(); + getCurrentColumnFamilyStore().importNewSSTables(options); assertEquals(10, execute("select * from %s").size()); sstables = getCurrentColumnFamilyStore().getLiveSSTables(); assertEquals(1, sstables.size()); for (SSTableReader sstable : sstables) - assertEquals(123, sstable.getSSTableLevel()); + assertEquals(8, sstable.getSSTableLevel()); getCurrentColumnFamilyStore().clearUnsafe(); backupdir = moveToBackupDir(sstables); - getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), true, false, false, false, false, false); + + options = ColumnFamilyStore.ImportOptions.options(backupdir.toString()).resetLevel(true).build(); + getCurrentColumnFamilyStore().importNewSSTables(options); + sstables = getCurrentColumnFamilyStore().getLiveSSTables(); assertEquals(1, sstables.size()); for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables()) @@ -140,7 +144,8 @@ public class ImportTest extends CQLTester assertEquals(0, execute("select * from %s").size()); - getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, false, false, false, false, false); + ColumnFamilyStore.ImportOptions options = ColumnFamilyStore.ImportOptions.options(backupdir.toString()).build(); + getCurrentColumnFamilyStore().importNewSSTables(options); assertEquals(10, execute("select * from %s").size()); sstables = getCurrentColumnFamilyStore().getLiveSSTables(); @@ -150,7 +155,9 @@ public class ImportTest extends CQLTester getCurrentColumnFamilyStore().clearUnsafe(); backupdir = moveToBackupDir(sstables); - getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, true, false, false, false, false); + + options = ColumnFamilyStore.ImportOptions.options(backupdir.toString()).clearRepaired(true).build(); + getCurrentColumnFamilyStore().importNewSSTables(options); sstables = getCurrentColumnFamilyStore().getLiveSSTables(); assertEquals(1, sstables.size()); for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables()) @@ -238,7 +245,7 @@ public class ImportTest extends CQLTester File dir = moveToBackupDir(toMove); MockCFS mock = new MockCFS(getCurrentColumnFamilyStore(), dirs); - mock.loadNewSSTables(dir.toString(), false, false, false, false, false, false); + mock.importNewSSTables(ColumnFamilyStore.ImportOptions.options(dir.toString()).build()); assertEquals(1, mock.getLiveSSTables().size()); for (SSTableReader sstable : mock.getLiveSSTables()) { @@ -272,8 +279,9 @@ public class ImportTest extends CQLTester File backupdir = moveToBackupDir(sstables); try { - getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, false, true, false, false, false); - fail("loadNewSSTables should fail!"); + ColumnFamilyStore.ImportOptions options = ColumnFamilyStore.ImportOptions.options(backupdir.toString()).verifySSTables(true).build(); + getCurrentColumnFamilyStore().importNewSSTables(options); + fail("importNewSSTables should fail!"); } catch (Throwable t) { @@ -307,7 +315,41 @@ public class ImportTest extends CQLTester File backupdir = moveToBackupDir(sstables); try { - getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, false, true, true, false, false); + ColumnFamilyStore.ImportOptions options = ColumnFamilyStore.ImportOptions.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).build(); + getCurrentColumnFamilyStore().importNewSSTables(options); + } + finally + { + tmd.clearUnsafe(); + } + } + + @Test(expected = RuntimeException.class) + public void testImportOutOfRangeExtendedVerify() throws Throwable + { + createTable("create table %s (id int primary key, d int)"); + for (int i = 0; i < 1000; i++) + execute("insert into %s (id, d) values (?, ?)", i, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables(); + + getCurrentColumnFamilyStore().clearUnsafe(); + + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + + tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.1")); + tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.2")); + tmd.updateNormalTokens(BootStrapper.getRandomTokens(tmd, 5), InetAddressAndPort.getByName("127.0.0.3")); + + + File backupdir = moveToBackupDir(sstables); + try + { + ColumnFamilyStore.ImportOptions options = ColumnFamilyStore.ImportOptions.options(backupdir.toString()) + .verifySSTables(true) + .verifyTokens(true) + .extendedVerify(true).build(); + getCurrentColumnFamilyStore().importNewSSTables(options); } finally { @@ -361,13 +403,16 @@ public class ImportTest extends CQLTester File backupdir = moveToBackupDir(Collections.singleton(sstableToImport)); // make sure we don't wipe caches with invalidateCaches = false: Set<SSTableReader> beforeFirstImport = getCurrentColumnFamilyStore().getLiveSSTables(); - getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, false, true, true, false, false); + + ColumnFamilyStore.ImportOptions options = ColumnFamilyStore.ImportOptions.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).build(); + getCurrentColumnFamilyStore().importNewSSTables(options); assertEquals(20, CacheService.instance.rowCache.size()); Set<SSTableReader> toMove = Sets.difference(getCurrentColumnFamilyStore().getLiveSSTables(), beforeFirstImport); getCurrentColumnFamilyStore().clearUnsafe(); // move away the sstable we just imported again: backupdir = moveToBackupDir(toMove); - getCurrentColumnFamilyStore().loadNewSSTables(backupdir.toString(), false, false, true, true, true, false); + options = ColumnFamilyStore.ImportOptions.options(backupdir.toString()).verifySSTables(true).verifyTokens(true).invalidateCaches(true).build(); + getCurrentColumnFamilyStore().importNewSSTables(options); assertEquals(10, CacheService.instance.rowCache.size()); it = CacheService.instance.rowCache.keyIterator(); while (it.hasNext()) @@ -388,7 +433,8 @@ public class ImportTest extends CQLTester getCurrentColumnFamilyStore().forceBlockingFlush(); CacheService.instance.setRowCacheCapacityInMB(1); getCurrentColumnFamilyStore().clearUnsafe(); - getCurrentColumnFamilyStore().loadNewSSTables(null, false, false, false, false, true, false); + ColumnFamilyStore.ImportOptions options = ColumnFamilyStore.ImportOptions.options(null).invalidateCaches(true).build(); + getCurrentColumnFamilyStore().importNewSSTables(options); assertEquals(1, getCurrentColumnFamilyStore().getLiveSSTables().size()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b6c9382/test/unit/org/apache/cassandra/db/VerifyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java index 42c4fd3..c9dbe14 100644 --- a/test/unit/org/apache/cassandra/db/VerifyTest.java +++ b/test/unit/org/apache/cassandra/db/VerifyTest.java @@ -29,6 +29,9 @@ import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.Verifier; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.FSWriteError; @@ -51,7 +54,9 @@ import org.junit.runner.RunWith; import java.io.*; import java.net.UnknownHostException; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.zip.CRC32; import java.util.zip.CheckedInputStream; @@ -593,6 +598,99 @@ public class VerifyTest catch (CorruptSSTableException err) {} } + @Test + public void testRangeOwnHelper() + { + List<Range<Token>> normalized = new ArrayList<>(); + normalized.add(r(Long.MIN_VALUE, Long.MIN_VALUE + 1)); + normalized.add(r(Long.MIN_VALUE + 5, Long.MIN_VALUE + 6)); + normalized.add(r(Long.MIN_VALUE + 10, Long.MIN_VALUE + 11)); + normalized.add(r(0,10)); + normalized.add(r(10,11)); + normalized.add(r(20,25)); + normalized.add(r(26,200)); + + Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized); + + roh.check(dk(1)); + roh.check(dk(10)); + roh.check(dk(11)); + roh.check(dk(21)); + roh.check(dk(25)); + boolean gotException = false; + try + { + roh.check(dk(26)); + } + catch (Throwable t) + { + gotException = true; + } + assertTrue(gotException); + } + + @Test(expected = AssertionError.class) + public void testRangeOwnHelperBadToken() + { + List<Range<Token>> normalized = new ArrayList<>(); + normalized.add(r(0,10)); + Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized); + roh.check(dk(1)); + // call with smaller token to get exception + roh.check(dk(0)); + } + + + @Test + public void testRangeOwnHelperNormalize() + { + List<Range<Token>> normalized = Range.normalize(Collections.singletonList(r(0,0))); + Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized); + roh.check(dk(Long.MIN_VALUE)); + roh.check(dk(0)); + roh.check(dk(Long.MAX_VALUE)); + } + + @Test + public void testRangeOwnHelperNormalizeWrap() + { + List<Range<Token>> normalized = Range.normalize(Collections.singletonList(r(Long.MAX_VALUE - 1000,Long.MIN_VALUE + 1000))); + Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized); + roh.check(dk(Long.MIN_VALUE)); + roh.check(dk(Long.MAX_VALUE)); + boolean gotException = false; + try + { + roh.check(dk(26)); + } + catch (Throwable t) + { + gotException = true; + } + assertTrue(gotException); + } + + @Test + public void testEmptyRanges() + { + new Verifier.RangeOwnHelper(Collections.emptyList()).check(dk(1)); + } + + private DecoratedKey dk(long l) + { + return new BufferDecoratedKey(t(l), ByteBufferUtil.EMPTY_BYTE_BUFFER); + } + + private Range<Token> r(long s, long e) + { + return new Range<>(t(s), t(e)); + } + + private Token t(long t) + { + return new Murmur3Partitioner.LongToken(t); + } + protected void fillCF(ColumnFamilyStore cfs, int partitionsPerSSTable) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org