Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ff1a1562 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ff1a1562 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ff1a1562 Branch: refs/heads/trunk Commit: ff1a156290440934826cf2e6a7f080d160683a16 Parents: 678291a d693ca1 Author: Yuki Morishita <yu...@apache.org> Authored: Tue May 19 09:00:08 2015 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue May 19 09:00:08 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 8 ++-- .../db/compaction/CompactionManager.java | 8 ++-- .../cassandra/db/compaction/Scrubber.java | 8 ++-- .../cassandra/service/StorageService.java | 7 +++- .../cassandra/service/StorageServiceMBean.java | 2 + .../org/apache/cassandra/tools/NodeProbe.java | 8 ++-- .../cassandra/tools/StandaloneScrubber.java | 6 ++- .../apache/cassandra/tools/nodetool/Scrub.java | 9 ++++- .../unit/org/apache/cassandra/db/ScrubTest.java | 42 ++++++++++++++------ 10 files changed, 66 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff1a1562/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff1a1562/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 062efe8,0951c01..738e9eb --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -1387,64 -1399,12 +1387,64 @@@ public class ColumnFamilyStore implemen return CompactionManager.instance.performCleanup(ColumnFamilyStore.this); } - public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted) throws ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData) throws ExecutionException, InterruptedException { - return scrub(disableSnapshot, skipCorrupted, false); ++ return scrub(disableSnapshot, skipCorrupted, false, checkData); + } + + @VisibleForTesting - public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean alwaysFail) throws ExecutionException, InterruptedException ++ public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean alwaysFail, boolean checkData) throws ExecutionException, InterruptedException + { // skip snapshot creation during scrub, SEE JIRA 5891 if(!disableSnapshot) snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis()); - return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData); + + try + { - return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted); ++ return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData); + } + catch(Throwable t) + { + if (!rebuildOnFailedScrub(t)) + throw t; + + return alwaysFail ? CompactionManager.AllSSTableOpStatus.ABORTED : CompactionManager.AllSSTableOpStatus.SUCCESSFUL; + } + } + + /** + * CASSANDRA-5174 : For an index cfs we may be able to discard everything and just rebuild + * the index when a scrub fails. + * + * @return true if we are an index cfs and we successfully rebuilt the index + */ + public boolean rebuildOnFailedScrub(Throwable failure) + { + if (!isIndex()) + return false; + + SecondaryIndex index = null; + if (metadata.cfName.contains(Directories.SECONDARY_INDEX_NAME_SEPARATOR)) + { + String[] parts = metadata.cfName.split("\\" + Directories.SECONDARY_INDEX_NAME_SEPARATOR, 2); + ColumnFamilyStore parentCfs = keyspace.getColumnFamilyStore(parts[0]); + index = parentCfs.indexManager.getIndexByName(metadata.cfName); + assert index != null; + } + + if (index == null) + return false; + + truncateBlocking(); + + logger.warn("Rebuilding index for {} because of <{}>", name, failure.getMessage()); + index.getBaseCfs().rebuildSecondaryIndex(index.getIndexName()); + return true; + } + + public CompactionManager.AllSSTableOpStatus verify(boolean extendedVerify) throws ExecutionException, InterruptedException + { + return CompactionManager.instance.performVerify(ColumnFamilyStore.this, extendedVerify); } public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion) throws ExecutionException, InterruptedException http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff1a1562/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 5d5464c,47bd2d6..cda6915 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -305,8 -308,9 +305,8 @@@ public class CompactionManager implemen } } - public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted) throws InterruptedException, ExecutionException + public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData) throws InterruptedException, ExecutionException { - assert !cfs.isIndex(); return parallelAllSSTableOperation(cfs, new OneSSTableOperation() { @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff1a1562/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java index 29472b3,ec0532c..310d58a --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@@ -105,8 -101,6 +105,8 @@@ public class Scrubber implements Closea ? new ScrubController(cfs) : new CompactionController(cfs, Collections.singleton(sstable), CompactionManager.getDefaultGcBefore(cfs)); this.isCommutative = cfs.metadata.isCounter(); + this.isIndex = cfs.isIndex(); - this.checkData = !this.isIndex; //LocalByPartitionerType does not support validation ++ this.checkData = checkData && !this.isIndex; //LocalByPartitionerType does not support validation this.expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(toScrub))); // loop through each row, deserializing to check for damage. http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff1a1562/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 4573449,7c8e424..bfbf1a8 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -2474,10 -2301,15 +2474,15 @@@ public class StorageService extends Not public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { + return scrub(disableSnapshot, skipCorrupted, true, keyspaceName, columnFamilies); + } + + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; - for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies)) + for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, columnFamilies)) { - CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted); + CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData); if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) status = oneStatus; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff1a1562/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java index 01588c6,1f86d82..2bbc999 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@@ -247,17 -256,11 +247,19 @@@ public interface StorageServiceMBean ex * * Scrubbed CFs will be snapshotted first, if disableSnapshot is false */ + @Deprecated public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; /** + * Verify (checksums of) the given keyspace. + * If columnFamilies array is empty, all CFs are verified. + * + * The entire sstable will be read to ensure each cell validates if extendedVerify is true + */ + public int verify(boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + + /** * Rewrite all sstables to the latest version. * Unlike scrub, it doesn't skip bad rows and do not snapshot sstables first. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff1a1562/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java index 0edfded,6e7179a..1341c68 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@@ -221,16 -228,11 +221,16 @@@ public class NodeProbe implements AutoC return ssProxy.forceKeyspaceCleanup(keyspaceName, columnFamilies); } - public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - return ssProxy.scrub(disableSnapshot, skipCorrupted, keyspaceName, columnFamilies); + return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies); } + public int verify(boolean extendedVerify, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + return ssProxy.verify(extendedVerify, keyspaceName, columnFamilies); + } + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies); @@@ -245,12 -247,12 +245,12 @@@ } } - public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - if (scrub(disableSnapshot, skipCorrupted, keyspaceName, columnFamilies) != 0) + if (scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies) != 0) { failed = true; - out.println("Aborted scrubbing atleast one column family in keyspace "+keyspaceName+", check server logs for more information."); + out.println("Aborted scrubbing at least one table in keyspace "+keyspaceName+", check server logs for more information."); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff1a1562/src/java/org/apache/cassandra/tools/StandaloneScrubber.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff1a1562/src/java/org/apache/cassandra/tools/nodetool/Scrub.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/nodetool/Scrub.java index 8064b8e,0000000..54f981e mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Scrub.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Scrub.java @@@ -1,66 -1,0 +1,71 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import io.airlift.command.Arguments; +import io.airlift.command.Command; +import io.airlift.command.Option; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool.NodeToolCmd; + +@Command(name = "scrub", description = "Scrub (rebuild sstables for) one or more tables") +public class Scrub extends NodeToolCmd +{ + @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables") + private List<String> args = new ArrayList<>(); + + @Option(title = "disable_snapshot", + name = {"-ns", "--no-snapshot"}, + description = "Scrubbed CFs will be snapshotted first, if disableSnapshot is false. (default false)") + private boolean disableSnapshot = false; + + @Option(title = "skip_corrupted", + name = {"-s", "--skip-corrupted"}, + description = "Skip corrupted partitions even when scrubbing counter tables. (default false)") + private boolean skipCorrupted = false; + ++ @Option(title = "no_validate", ++ name = {"-n", "--no-validate"}, ++ description = "Do not validate columns using column validator") ++ private boolean noValidation = false; ++ + @Override + public void execute(NodeProbe probe) + { + List<String> keyspaces = parseOptionalKeyspace(args, probe); + String[] cfnames = parseOptionalColumnFamilies(args); + + for (String keyspace : keyspaces) + { + try + { - probe.scrub(System.out, disableSnapshot, skipCorrupted, keyspace, cfnames); ++ probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, keyspace, cfnames); + } catch (IllegalArgumentException e) + { + throw e; + } catch (Exception e) + { + throw new RuntimeException("Error occurred during scrubbing", e); + } + } + } - } ++} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff1a1562/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java index 09121f4,028cf6c..a5af823 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@@ -21,13 -21,23 +21,19 @@@ package org.apache.cassandra.db */ import java.io.*; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.ExecutionException; + import org.apache.cassandra.cql3.QueryProcessor; + import org.apache.cassandra.db.composites.CellNameType; + import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.db.marshal.CompositeType; + import org.apache.cassandra.db.marshal.LongType; + import org.apache.cassandra.db.marshal.UTF8Type; + import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.utils.UUIDGen; import org.apache.commons.lang3.StringUtils; import org.junit.BeforeClass; import org.junit.Test; @@@ -149,19 -128,19 +155,19 @@@ public class ScrubTes overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1")); // with skipCorrupted == false, the scrub is expected to fail - try(Scrubber scrubber = new Scrubber(cfs, sstable, false, false)) - Scrubber scrubber = new Scrubber(cfs, sstable, false, false, true); - try ++ try(Scrubber scrubber = new Scrubber(cfs, sstable, false, false, true)) { scrubber.scrub(); fail("Expected a CorruptSSTableException to be thrown"); } catch (IOError err) {} - // with skipCorrupted == true, the corrupt row will be skipped + // with skipCorrupted == true, the corrupt rows will be skipped Scrubber.ScrubResult scrubResult; - try(Scrubber scrubber = new Scrubber(cfs, sstable, true, false)) - scrubber = new Scrubber(cfs, sstable, true, false, true); - scrubResult = scrubber.scrubWithResult(); - scrubber.close(); ++ try(Scrubber scrubber = new Scrubber(cfs, sstable, true, false, true)) + { + scrubResult = scrubber.scrubWithResult(); + } assertNotNull(scrubResult); @@@ -363,12 -326,11 +369,12 @@@ components.add(Component.STATS); components.add(Component.SUMMARY); components.add(Component.TOC); - SSTableReader sstable = SSTableReader.openNoValidation(desc, components, metadata); - - Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true); - scrubber.scrub(); + SSTableReader sstable = SSTableReader.openNoValidation(desc, components, cfs); - try(Scrubber scrubber = new Scrubber(cfs, sstable, false, true)) ++ try(Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true)) + { + scrubber.scrub(); + } cfs.loadNewSSTables(); List<Row> rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000); assert isRowOrdered(rows) : "Scrub failed: " + rows; @@@ -481,14 -419,24 +487,24 @@@ @Test public void testScrubColumnValidation() throws InterruptedException, RequestExecutionException, ExecutionException { - QueryProcessor.process("CREATE TABLE \"Keyspace1\".test_compact_static_columns (a bigint, b timeuuid, c boolean static, d text, PRIMARY KEY (a, b))", ConsistencyLevel.ONE); + QueryProcessor.process(String.format("CREATE TABLE \"%s\".test_compact_static_columns (a bigint, b timeuuid, c boolean static, d text, PRIMARY KEY (a, b))", KEYSPACE), ConsistencyLevel.ONE); - Keyspace keyspace = Keyspace.open("Keyspace1"); + Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("test_compact_static_columns"); - QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_static_columns (a, b, c, d) VALUES (123, c3db07e8-b602-11e3-bc6b-e0b9a54a6d93, true, 'foobar')"); + QueryProcessor.executeInternal(String.format("INSERT INTO \"%s\".test_compact_static_columns (a, b, c, d) VALUES (123, c3db07e8-b602-11e3-bc6b-e0b9a54a6d93, true, 'foobar')", KEYSPACE)); cfs.forceBlockingFlush(); - CompactionManager.instance.performScrub(cfs, false); + CompactionManager.instance.performScrub(cfs, false, true); + + QueryProcessor.process("CREATE TABLE \"Keyspace1\".test_scrub_validation (a text primary key, b int)", ConsistencyLevel.ONE); + ColumnFamilyStore cfs2 = keyspace.getColumnFamilyStore("test_scrub_validation"); + Mutation mutation = new Mutation("Keyspace1", UTF8Type.instance.decompose("key")); + CellNameType ct = cfs2.getComparator(); + mutation.add("test_scrub_validation", ct.makeCellName("b"), LongType.instance.decompose(1L), System.currentTimeMillis()); + mutation.apply(); + cfs2.forceBlockingFlush(); + + CompactionManager.instance.performScrub(cfs2, false, false); } /** @@@ -497,15 -445,15 +513,15 @@@ @Test public void testColumnNameEqualToDefaultKeyAlias() throws ExecutionException, InterruptedException { - Keyspace keyspace = Keyspace.open("Keyspace1"); - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("UUIDKeys"); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_UUID); - ColumnFamily cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "UUIDKeys"); + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE, CF_UUID); cf.addColumn(column(CFMetaData.DEFAULT_KEY_ALIAS, "not a uuid", 1L)); - Mutation mutation = new Mutation("Keyspace1", ByteBufferUtil.bytes(UUIDGen.getTimeUUID()), cf); + Mutation mutation = new Mutation(KEYSPACE, ByteBufferUtil.bytes(UUIDGen.getTimeUUID()), cf); mutation.applyUnsafe(); cfs.forceBlockingFlush(); - CompactionManager.instance.performScrub(cfs, false); + CompactionManager.instance.performScrub(cfs, false, true); assertEquals(1, cfs.getSSTables().size()); } @@@ -517,19 -465,19 +533,19 @@@ @Test public void testValidationCompactStorage() throws Exception { - QueryProcessor.process("CREATE TABLE \"Keyspace1\".test_compact_dynamic_columns (a int, b text, c text, PRIMARY KEY (a, b)) WITH COMPACT STORAGE", ConsistencyLevel.ONE); + QueryProcessor.process(String.format("CREATE TABLE \"%s\".test_compact_dynamic_columns (a int, b text, c text, PRIMARY KEY (a, b)) WITH COMPACT STORAGE", KEYSPACE), ConsistencyLevel.ONE); - Keyspace keyspace = Keyspace.open("Keyspace1"); + Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("test_compact_dynamic_columns"); - QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'a', 'foo')"); - QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'b', 'bar')"); - QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'c', 'boo')"); + QueryProcessor.executeInternal(String.format("INSERT INTO \"%s\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'a', 'foo')", KEYSPACE)); + QueryProcessor.executeInternal(String.format("INSERT INTO \"%s\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'b', 'bar')", KEYSPACE)); + QueryProcessor.executeInternal(String.format("INSERT INTO \"%s\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'c', 'boo')", KEYSPACE)); cfs.forceBlockingFlush(); - CompactionManager.instance.performScrub(cfs, true); + CompactionManager.instance.performScrub(cfs, true, true); // Scrub is silent, but it will remove broken records. So reading everything back to make sure nothing to "scrubbed away" - UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM \"Keyspace1\".test_compact_dynamic_columns"); + UntypedResultSet rs = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".test_compact_dynamic_columns", KEYSPACE)); assertEquals(3, rs.size()); Iterator<UntypedResultSet.Row> iter = rs.iterator(); @@@ -537,129 -485,4 +553,129 @@@ assertEquals("bar", iter.next().getString("c")); assertEquals("boo", iter.next().getString("c")); } + + @Test /* CASSANDRA-5174 */ + public void testScrubKeysIndex_preserveOrder() throws IOException, ExecutionException, InterruptedException + { + //If the partitioner preserves the order then SecondaryIndex uses BytesType comparator, + // otherwise it uses LocalByPartitionerType + setKeyComparator(BytesType.instance); + testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true); + } + + @Test /* CASSANDRA-5174 */ + public void testScrubCompositeIndex_preserveOrder() throws IOException, ExecutionException, InterruptedException + { + setKeyComparator(BytesType.instance); + testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, true); + } + + @Test /* CASSANDRA-5174 */ + public void testScrubKeysIndex() throws IOException, ExecutionException, InterruptedException + { + setKeyComparator(new LocalByPartionerType(StorageService.getPartitioner())); + testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true); + } + + @Test /* CASSANDRA-5174 */ + public void testScrubCompositeIndex() throws IOException, ExecutionException, InterruptedException + { + setKeyComparator(new LocalByPartionerType(StorageService.getPartitioner())); + testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, true); + } + + @Test /* CASSANDRA-5174 */ + public void testFailScrubKeysIndex() throws IOException, ExecutionException, InterruptedException + { + testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, false); + } + + @Test /* CASSANDRA-5174 */ + public void testFailScrubCompositeIndex() throws IOException, ExecutionException, InterruptedException + { + testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, false); + } + + @Test /* CASSANDRA-5174 */ + public void testScrubTwice() throws IOException, ExecutionException, InterruptedException + { + testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true, true); + } + + /** The SecondaryIndex class is used for custom indexes so to avoid + * making a public final field into a private field with getters + * and setters, we resort to this hack in order to test it properly + * since it can have two values which influence the scrubbing behavior. + * @param comparator - the key comparator we want to test + */ + private void setKeyComparator(AbstractType<?> comparator) + { + try + { + Field keyComparator = SecondaryIndex.class.getDeclaredField("keyComparator"); + keyComparator.setAccessible(true); + int modifiers = keyComparator.getModifiers(); + Field modifierField = keyComparator.getClass().getDeclaredField("modifiers"); + modifiers = modifiers & ~Modifier.FINAL; + modifierField.setAccessible(true); + modifierField.setInt(keyComparator, modifiers); + + keyComparator.set(null, comparator); + } + catch (Exception ex) + { + fail("Failed to change key comparator in secondary index : " + ex.getMessage()); + ex.printStackTrace(); + } + } + + private void testScrubIndex(String cfName, String colName, boolean composite, boolean ... scrubs) + throws IOException, ExecutionException, InterruptedException + { + CompactionManager.instance.disableAutoCompaction(); + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName); + cfs.clearUnsafe(); + + int numRows = 1000; + long[] colValues = new long [numRows * 2]; // each row has two columns + for (int i = 0; i < colValues.length; i+=2) + { + colValues[i] = (i % 4 == 0 ? 1L : 2L); // index column + colValues[i+1] = 3L; //other column + } + fillIndexCF(cfs, composite, colValues); + + // check index + IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes(colName), Operator.EQ, ByteBufferUtil.bytes(1L)); + List<Row> rows = cfs.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(), numRows); + assertNotNull(rows); + assertEquals(numRows / 2, rows.size()); + + // scrub index + Set<ColumnFamilyStore> indexCfss = cfs.indexManager.getIndexesBackedByCfs(); + assertTrue(indexCfss.size() == 1); + for(ColumnFamilyStore indexCfs : indexCfss) + { + for (int i = 0; i < scrubs.length; i++) + { + boolean failure = !scrubs[i]; + if (failure) + { //make sure the next scrub fails + overrideWithGarbage(indexCfs.getSSTables().iterator().next(), ByteBufferUtil.bytes(1L), ByteBufferUtil.bytes(2L)); + } - CompactionManager.AllSSTableOpStatus result = indexCfs.scrub(false, false, true); ++ CompactionManager.AllSSTableOpStatus result = indexCfs.scrub(false, false, true, true); + assertEquals(failure ? + CompactionManager.AllSSTableOpStatus.ABORTED : + CompactionManager.AllSSTableOpStatus.SUCCESSFUL, + result); + } + } + + + // check index is still working + rows = cfs.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(), numRows); + assertNotNull(rows); + assertEquals(numRows / 2, rows.size()); + } }