Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ee6e7bc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ee6e7bc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ee6e7bc Branch: refs/heads/cassandra-3.0 Commit: 5ee6e7bc12a22744db462106abe1372a72b07a41 Parents: 458b36b 225232a Author: Marcus Eriksson <marc...@apache.org> Authored: Thu Jun 23 11:26:43 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Thu Jun 23 11:26:43 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 32 ++-- .../repair/RepairMessageVerbHandler.java | 31 ++-- .../cassandra/service/ActiveRepairService.java | 175 +++++++++++++++++-- .../service/ActiveRepairServiceTest.java | 125 +++++++++++-- 5 files changed, 305 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index ee2f6d3,b366d21..f7e854d --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -6,35 -2,9 +6,36 @@@ Merged from 2.2 * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755) * Validate bloom_filter_fp_chance against lowest supported value when the table is created (CASSANDRA-11920) - * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013) * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038) * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984) +Merged from 2.1: ++ * Avoid marking too many sstables as repaired (CASSANDRA-11696) + * Prevent select statements with clustering key > 64k (CASSANDRA-11882) + * Fix clock skew corrupting other nodes with paxos (CASSANDRA-11991) + * Remove distinction between non-existing static columns and existing but null in LWTs (CASSANDRA-9842) + * Cache local ranges when calculating repair neighbors (CASSANDRA-11934) + * Allow LWT operation on static column with only partition keys (CASSANDRA-10532) + * Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886) + * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749) + + +3.0.7 + * Fix legacy serialization of Thrift-generated non-compound range tombstones + when communicating with 2.x nodes (CASSANDRA-11930) + * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849) + * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912) + * Fix sstables not being protected from removal during index build (CASSANDRA-11905) + * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032) + * Remove unneeded code to repair index summaries that have + been improperly down-sampled (CASSANDRA-11127) + * Avoid WriteTimeoutExceptions during commit log replay due to materialized + view lock contention (CASSANDRA-11891) + * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530) + * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705) + * Allow compaction strategies to disable early open (CASSANDRA-11754) + * Refactor Materialized View code (CASSANDRA-11475) + * Update Java Driver (CASSANDRA-11615) +Merged from 2.2: * Persist local metadata earlier in startup sequence (CASSANDRA-11742) * Run CommitLog tests with different compression settings (CASSANDRA-9039) * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index bf7bd81,cf82498..99e0fd5 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -1059,10 -1068,16 +1059,17 @@@ public class CompactionManager implemen try { - String snapshotName = validator.desc.sessionId.toString(); int gcBefore; + int nowInSec = FBUtilities.nowInSeconds(); + UUID parentRepairSessionId = validator.desc.parentSessionId; + String snapshotName; + boolean isGlobalSnapshotValidation = cfs.snapshotExists(parentRepairSessionId.toString()); + if (isGlobalSnapshotValidation) + snapshotName = parentRepairSessionId.toString(); + else + snapshotName = validator.desc.sessionId.toString(); boolean isSnapshotValidation = cfs.snapshotExists(snapshotName); + if (isSnapshotValidation) { // If there is a snapshot created for the session then read from there. @@@ -1083,40 -1098,53 +1090,42 @@@ StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name); sstables = getSSTablesToValidate(cfs, validator); if (sstables == null) -- return; // this means the parent repair session was removed - the repair session failed on another node and we removed it ++ return; // this means the parent repair session was removed - the repair session failed on another node and we removed i if (validator.gcBefore > 0) gcBefore = validator.gcBefore; else - gcBefore = getDefaultGcBefore(cfs); + gcBefore = getDefaultGcBefore(cfs, nowInSec); } - // Create Merkle tree suitable to hold estimated partitions for given range. - // We blindly assume that partition is evenly distributed on all sstables for now. - long numPartitions = 0; - for (SSTableReader sstable : sstables) - { - numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range)); - } + // Create Merkle trees suitable to hold estimated partitions for the given ranges. + // We blindly assume that a partition is evenly distributed on all sstables for now. // determine tree depth from number of partitions, but cap at 20 to prevent large tree. - int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0; - MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); - + MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs); long start = System.nanoTime(); - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range)) + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges); + ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore); + CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics)) { - CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore); - Iterator<AbstractCompactedRow> iter = ci.iterator(); - metrics.beginCompaction(ci); - try + // validate the CF as we iterate over it + validator.prepare(cfs, tree); + while (ci.hasNext()) { - // validate the CF as we iterate over it - validator.prepare(cfs, tree); - while (iter.hasNext()) + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); + try (UnfilteredRowIterator partition = ci.next()) { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - AbstractCompactedRow row = iter.next(); - validator.add(row); + validator.add(partition); } - validator.complete(); } - finally + validator.complete(); + } + finally + { - if (isSnapshotValidation) ++ if (isSnapshotValidation && !isGlobalSnapshotValidation) { + // we can only clear the snapshot if we are not doing a global snapshot validation (we then clear it once anticompaction + // is done). - if (isSnapshotValidation && !isGlobalSnapshotValidation) - { - cfs.clearSnapshot(snapshotName); - } - - metrics.finishCompaction(ci); + cfs.clearSnapshot(snapshotName); } } @@@ -1174,7 -1175,13 +1183,11 @@@ if (prs == null) return null; Set<SSTableReader> sstablesToValidate = new HashSet<>(); - + if (prs.isGlobal) + prs.markSSTablesRepairing(cfs.metadata.cfId, validator.desc.parentSessionId); - + // note that we always grab all existing sstables for this - if we were to just grab the ones that + // were marked as repairing, we would miss any ranges that were compacted away and this would cause us to overstream - try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES)) + try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (s) -> !prs.isIncremental || !s.isRepaired()))) { for (SSTableReader sstable : sstableCandidates.sstables) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index c536b13,1701e9a..edcb4f9 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@@ -90,27 -99,23 +90,22 @@@ public class RepairMessageVerbHandler i desc.keyspace, desc.columnFamily), message.from, id); return; } - final Collection<Range<Token>> repairingRange = desc.ranges; - Set<SSTableReader> snapshottedSSSTables = cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId); + if (prs.isGlobal) { - public boolean apply(SSTableReader sstable) - { - return sstable != null && - !sstable.metadata.isIndex() && // exclude SSTables from 2i - new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(repairingRange); - } - }, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup - if (ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).isGlobal) + prs.maybeSnapshot(cfs.metadata.cfId, desc.parentSessionId); + } + else { - Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, desc.parentSessionId); - if (!Sets.intersection(currentlyRepairing, snapshottedSSSTables).isEmpty()) - final Range<Token> repairingRange = desc.range; + cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() { - // clear snapshot that we just created - cfs.clearSnapshot(desc.sessionId.toString()); - logErrorAndSendFailureResponse("Cannot start multiple repair sessions over the same sstables", message.from, id); - return; - } - ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId, snapshottedSSSTables); + public boolean apply(SSTableReader sstable) + { + return sstable != null && - !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i - new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange)); ++ !sstable.metadata.isIndex() && // exclude SSTables from 2i ++ new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(desc.ranges); + } + }, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup } logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index 6b1fd83,e111155..27c2424 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -36,9 -37,12 +37,11 @@@ import org.slf4j.Logger import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ColumnFamilyStore; --import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager; ++import org.apache.cassandra.db.lifecycle.SSTableSet; ++import org.apache.cassandra.db.lifecycle.View; + import org.apache.cassandra.dht.Bounds; -import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.ApplicationState; @@@ -49,7 -53,7 +52,6 @@@ import org.apache.cassandra.gms.IFailur import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; import org.apache.cassandra.gms.IFailureDetectionEventListener; import org.apache.cassandra.gms.VersionedValue; --import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.IAsyncCallbackWithFailure; @@@ -460,8 -488,12 +487,12 @@@ public class ActiveRepairService implem public final boolean isGlobal; public final long repairedAt; public final InetAddress coordinator; + /** + * Indicates whether we have marked sstables as repairing. Can only be done once per table per ParentRepairSession + */ + private final Set<UUID> marked = new HashSet<>(); - public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal, long repairedAt) + public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal) { this.coordinator = coordinator; for (ColumnFamilyStore cfs : columnFamilyStores) @@@ -471,15 -503,51 +502,51 @@@ } this.ranges = ranges; this.repairedAt = repairedAt; - this.isGlobal = isGlobal; this.isIncremental = isIncremental; + this.isGlobal = isGlobal; } + /** + * Mark sstables repairing - either all sstables or only the unrepaired ones depending on + * + * whether this is an incremental or full repair + * + * @param cfId the column family + * @param parentSessionId the parent repair session id, used to make sure we don't start multiple repairs over the same sstables + */ + public synchronized void markSSTablesRepairing(UUID cfId, UUID parentSessionId) + { + if (!marked.contains(cfId)) + { - List<SSTableReader> sstables = columnFamilyStores.get(cfId).select(isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES).sstables; ++ List<SSTableReader> sstables = columnFamilyStores.get(cfId).select(View.select(SSTableSet.CANONICAL, (s) -> !isIncremental || !s.isRepaired())).sstables; + Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId); + if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(sstables)).isEmpty()) + { + logger.error("Cannot start multiple repair sessions over the same sstables"); + throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); + } + addSSTables(cfId, sstables); + marked.add(cfId); + } + } + + /** + * Get the still active sstables we should run anticompaction on + * + * note that validation and streaming do not call this method - they have to work on the actual active sstables on the node, we only call this + * to know which sstables are still there that were there when we started the repair + * + * @param cfId + * @param parentSessionId for checking if there exists a snapshot for this repair + * @return + */ @SuppressWarnings("resource") - public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefs(UUID cfId) + public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(UUID cfId, UUID parentSessionId) { + assert marked.contains(cfId); + boolean isSnapshotRepair = columnFamilyStores.get(cfId).snapshotExists(parentSessionId.toString()); ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder(); - for (SSTableReader sstable : getActiveSSTables(cfId)) + for (SSTableReader sstable : isSnapshotRepair ? getSSTablesForSnapshotRepair(cfId, parentSessionId) : getActiveSSTables(cfId)) { Ref<SSTableReader> ref = sstable.tryRef(); if (ref == null) @@@ -490,12 -558,97 +557,97 @@@ return new Refs<>(references.build()); } + /** + * If we are running a snapshot repair we need to find the 'real' sstables when we start anticompaction + * + * We use the generation of the sstables as identifiers instead of the file name to avoid having to parse out the + * actual filename. + * + * @param cfId + * @param parentSessionId + * @return + */ + private Set<SSTableReader> getSSTablesForSnapshotRepair(UUID cfId, UUID parentSessionId) + { + Set<SSTableReader> activeSSTables = new HashSet<>(); + ColumnFamilyStore cfs = columnFamilyStores.get(cfId); + + Set<Integer> snapshotGenerations = new HashSet<>(); + try (Refs<SSTableReader> snapshottedSSTables = cfs.getSnapshotSSTableReader(parentSessionId.toString())) + { + for (SSTableReader sstable : snapshottedSSTables) + { + snapshotGenerations.add(sstable.descriptor.generation); + } + } + catch (IOException e) + { + throw new RuntimeException(e); + } - for (SSTableReader sstable : cfs.select(ColumnFamilyStore.CANONICAL_SSTABLES).sstables) ++ for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) + if (snapshotGenerations.contains(sstable.descriptor.generation)) + activeSSTables.add(sstable); + return activeSSTables; + } + + public synchronized void maybeSnapshot(UUID cfId, UUID parentSessionId) + { + String snapshotName = parentSessionId.toString(); + if (!columnFamilyStores.get(cfId).snapshotExists(snapshotName)) + { + Set<SSTableReader> snapshottedSSTables = columnFamilyStores.get(cfId).snapshot(snapshotName, new Predicate<SSTableReader>() + { + public boolean apply(SSTableReader sstable) + { + return sstable != null && + (!isIncremental || !sstable.isRepaired()) && - !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i ++ !(sstable.metadata.isIndex()) && // exclude SSTables from 2i + new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges); + } + }, true); + + if (isAlreadyRepairing(cfId, parentSessionId, snapshottedSSTables)) + { + columnFamilyStores.get(cfId).clearSnapshot(parentSessionId.toString()); + logger.error("Cannot start multiple repair sessions over the same sstables"); + throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); + } + addSSTables(cfId, snapshottedSSTables); + marked.add(cfId); + } + } + + + /** + * Compares other repairing sstables *generation* to the ones we just snapshotted + * + * we compare generations since the sstables have different paths due to snapshot names + * + * @param cfId id of the column family store + * @param parentSessionId parent repair session + * @param sstables the newly snapshotted sstables + * @return + */ + private boolean isAlreadyRepairing(UUID cfId, UUID parentSessionId, Collection<SSTableReader> sstables) + { + Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId); + Set<Integer> currentlyRepairingGenerations = new HashSet<>(); + Set<Integer> newRepairingGenerations = new HashSet<>(); + for (SSTableReader sstable : currentlyRepairing) + currentlyRepairingGenerations.add(sstable.descriptor.generation); + for (SSTableReader sstable : sstables) + newRepairingGenerations.add(sstable.descriptor.generation); + + return !Sets.intersection(currentlyRepairingGenerations, newRepairingGenerations).isEmpty(); + } + private Set<SSTableReader> getActiveSSTables(UUID cfId) { Set<String> repairedSSTables = sstableMap.get(cfId); Set<SSTableReader> activeSSTables = new HashSet<>(); Set<String> activeSSTableNames = new HashSet<>(); - for (SSTableReader sstable : columnFamilyStores.get(cfId).getSSTables()) + ColumnFamilyStore cfs = columnFamilyStores.get(cfId); - for (SSTableReader sstable : cfs.select(ColumnFamilyStore.CANONICAL_SSTABLES).sstables) ++ for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) { if (repairedSSTables.contains(sstable.getFilename())) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index da067fd,03a25c6..adcd684 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@@ -20,18 -20,23 +20,21 @@@ package org.apache.cassandra.service import java.net.InetAddress; import java.util.*; + import java.util.concurrent.ExecutionException; -import com.google.common.base.Predicate; import com.google.common.collect.Sets; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.compaction.OperationType; ++import org.apache.cassandra.db.lifecycle.SSTableSet; ++import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; @@@ -225,17 -233,15 +229,15 @@@ public class ActiveRepairServiceTes public void testGetActiveRepairedSSTableRefs() { ColumnFamilyStore store = prepareColumnFamilyStore(); - Set<SSTableReader> original = store.getUnrepairedSSTables(); + Set<SSTableReader> original = store.getLiveSSTables(); UUID prsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, false); + ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, 0, false); ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); - - //add all sstables to parent repair session - prs.addSSTables(store.metadata.cfId, original); + prs.markSSTablesRepairing(store.metadata.cfId, prsId); //retrieve all sstable references from parent repair sessions - Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId); + Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId); Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); assertEquals(original, retrieved); refs.release(); @@@ -261,20 -267,117 +263,117 @@@ refs.release(); } + @Test + public void testAddingMoreSSTables() + { + ColumnFamilyStore store = prepareColumnFamilyStore(); - Set<SSTableReader> original = store.getUnrepairedSSTables(); ++ Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables); + UUID prsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, true); ++ ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, System.currentTimeMillis(), true); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); + prs.markSSTablesRepairing(store.metadata.cfId, prsId); + try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) + { + Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); + assertEquals(original, retrieved); + } + createSSTables(store, 2); + boolean exception = false; + try + { + UUID newPrsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(newPrsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, true); ++ ActiveRepairService.instance.registerParentRepairSession(newPrsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, System.currentTimeMillis(), true); + ActiveRepairService.instance.getParentRepairSession(newPrsId).markSSTablesRepairing(store.metadata.cfId, newPrsId); + } + catch (Throwable t) + { + exception = true; + } + assertTrue(exception); + + try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) + { + Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); + assertEquals(original, retrieved); + } + } + + @Test + public void testSnapshotAddSSTables() throws ExecutionException, InterruptedException + { + ColumnFamilyStore store = prepareColumnFamilyStore(); + UUID prsId = UUID.randomUUID(); - Set<SSTableReader> original = store.getUnrepairedSSTables(); - ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true); ++ Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables); ++ ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true); + ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId); + + UUID prsId2 = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true); ++ ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true); + createSSTables(store, 2); + ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId); + try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) + { + assertEquals(original, Sets.newHashSet(refs.iterator())); + } + store.forceMajorCompaction(); + // after a major compaction the original sstables will be gone and we will have no sstables to anticompact: + try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) + { + assertEquals(0, refs.size()); + } + } + + @Test + public void testSnapshotMultipleRepairs() + { + ColumnFamilyStore store = prepareColumnFamilyStore(); - Set<SSTableReader> original = store.getUnrepairedSSTables(); ++ Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables); + UUID prsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true); ++ ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true); + ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId); + + UUID prsId2 = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true); ++ ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true); + boolean exception = false; + try + { + ActiveRepairService.instance.getParentRepairSession(prsId2).maybeSnapshot(store.metadata.cfId, prsId2); + } + catch (Throwable t) + { + exception = true; + } + assertTrue(exception); + try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) + { + assertEquals(original, Sets.newHashSet(refs.iterator())); + } + } + private ColumnFamilyStore prepareColumnFamilyStore() { Keyspace keyspace = Keyspace.open(KEYSPACE5); ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF_STANDARD1); + store.truncateBlocking(); store.disableAutoCompaction(); - for (int i = 0; i < 10; i++) + createSSTables(store, 10); + return store; + } + + private void createSSTables(ColumnFamilyStore cfs, int count) + { + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < count; i++) { - new RowUpdateBuilder(store.metadata, System.currentTimeMillis(), Integer.toString(i)) - .clustering("c") - .add("val", "val") - .build() - .applyUnsafe(); - DecoratedKey key = Util.dk(Integer.toString(i)); - Mutation rm = new Mutation(KEYSPACE5, key.getKey()); + for (int j = 0; j < 10; j++) - rm.add("Standard1", Util.cellname(Integer.toString(j)), - ByteBufferUtil.EMPTY_BYTE_BUFFER, - timestamp, - 0); - rm.apply(); ++ { ++ new RowUpdateBuilder(cfs.metadata, timestamp, Integer.toString(j)) ++ .clustering("c") ++ .add("val", "val") ++ .build() ++ .applyUnsafe(); ++ } + cfs.forceBlockingFlush(); } - store.forceBlockingFlush(); - return store; } }