Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/225232a9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/225232a9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/225232a9 Branch: refs/heads/cassandra-3.0 Commit: 225232a9ea8945c85ae4f9cac3b97e003c9e9035 Parents: 774e59d 3c8421a Author: Marcus Eriksson <marc...@apache.org> Authored: Thu Jun 23 11:22:57 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Thu Jun 23 11:22:57 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 32 ++-- .../repair/RepairMessageVerbHandler.java | 32 ++-- .../cassandra/service/ActiveRepairService.java | 173 +++++++++++++++++-- .../service/ActiveRepairServiceTest.java | 110 +++++++++++- 5 files changed, 296 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/225232a9/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 0be1043,03246ae..b366d21 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,36 -1,6 +1,37 @@@ -2.1.15 +2.2.7 + * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755) + * Validate bloom_filter_fp_chance against lowest supported + value when the table is created (CASSANDRA-11920) + * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013) + * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038) + * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984) + * Persist local metadata earlier in startup sequence (CASSANDRA-11742) + * Run CommitLog tests with different compression settings (CASSANDRA-9039) + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664) + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587) + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743) + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708) + * Possible memory leak in NIODataInputStream (CASSANDRA-11867) + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753) + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395) + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) + * Exit JVM if JMX server fails to startup (CASSANDRA-11540) + * Produce a heap dump when exiting on OOM (CASSANDRA-9861) + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427) + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510) + * JSON datetime formatting needs timezone (CASSANDRA-11137) + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502) + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660) + * Add missing files to debian packages (CASSANDRA-11642) + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621) + * cqlsh: COPY FROM should use regular inserts for single statement batches and + report errors correctly if workers processes crash on initialization (CASSANDRA-11474) + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553) + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988) +Merged from 2.1: * Prevent select statements with clustering key > 64k (CASSANDRA-11882) + * Avoid marking too many sstables as repaired (CASSANDRA-11696) * Fix clock skew corrupting other nodes with paxos (CASSANDRA-11991) * Remove distinction between non-existing static columns and existing but null in LWTs (CASSANDRA-9842) * Support mlockall on IBM POWER arch (CASSANDRA-11576) http://git-wip-us.apache.org/repos/asf/cassandra/blob/225232a9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index e76abad,87819ba..cf82498 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -1068,9 -1006,9 +1068,16 @@@ public class CompactionManager implemen try { -- String snapshotName = validator.desc.sessionId.toString(); int gcBefore; ++ UUID parentRepairSessionId = validator.desc.parentSessionId; ++ String snapshotName; ++ boolean isGlobalSnapshotValidation = cfs.snapshotExists(parentRepairSessionId.toString()); ++ if (isGlobalSnapshotValidation) ++ snapshotName = parentRepairSessionId.toString(); ++ else ++ snapshotName = validator.desc.sessionId.toString(); boolean isSnapshotValidation = cfs.snapshotExists(snapshotName); ++ if (isSnapshotValidation) { // If there is a snapshot created for the session then read from there. @@@ -1130,7 -1073,7 +1137,9 @@@ } finally { -- if (isSnapshotValidation) ++ // we can only clear the snapshot if we are not doing a global snapshot validation (we then clear it once anticompaction ++ // is done). ++ if (isSnapshotValidation && !isGlobalSnapshotValidation) { cfs.clearSnapshot(snapshotName); } @@@ -1158,48 -1101,6 +1167,41 @@@ } } + private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator) + { + Refs<SSTableReader> sstables; + + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId); + if (prs == null) + return null; + Set<SSTableReader> sstablesToValidate = new HashSet<>(); ++ ++ if (prs.isGlobal) ++ prs.markSSTablesRepairing(cfs.metadata.cfId, validator.desc.parentSessionId); ++ ++ // note that we always grab all existing sstables for this - if we were to just grab the ones that ++ // were marked as repairing, we would miss any ranges that were compacted away and this would cause us to overstream + try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES)) + { + for (SSTableReader sstable : sstableCandidates.sstables) + { + if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range))) + { + sstablesToValidate.add(sstable); + } + } + - if (prs.isGlobal) - { - Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId); - - if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty()) - { - logger.error("Cannot start multiple repair sessions over the same sstables"); - throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); - } - } - + sstables = Refs.tryRef(sstablesToValidate); + if (sstables == null) + { + logger.error("Could not reference sstables"); + throw new RuntimeException("Could not reference sstables"); + } + } - if (prs.isGlobal) - prs.addSSTables(cfs.metadata.cfId, sstablesToValidate); + + return sstables; + } + /** * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted http://git-wip-us.apache.org/repos/asf/cassandra/blob/225232a9/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 6e7922f,7debc93..1701e9a --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@@ -91,36 -78,18 +91,32 @@@ public class RepairMessageVerbHandler i break; case SNAPSHOT: - ColumnFamilyStore cfs = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily); - final Range<Token> repairingRange = desc.range; - cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() + logger.debug("Snapshotting {}", desc); + final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily); + if (cfs == null) + { + logErrorAndSendFailureResponse(String.format("Table %s.%s was dropped during snapshot phase of repair", + desc.keyspace, desc.columnFamily), message.from, id); + return; + } - final Range<Token> repairingRange = desc.range; - Set<SSTableReader> snapshottedSSSTables = cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() ++ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId); ++ if (prs.isGlobal) { -- public boolean apply(SSTableReader sstable) - { - return sstable != null && - !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i - new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange)); - } - }, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup - if (ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).isGlobal) ++ prs.maybeSnapshot(cfs.metadata.cfId, desc.parentSessionId); ++ } ++ else + { - Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, desc.parentSessionId); - if (!Sets.intersection(currentlyRepairing, snapshottedSSSTables).isEmpty()) ++ final Range<Token> repairingRange = desc.range; ++ cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() { - // clear snapshot that we just created - cfs.clearSnapshot(desc.sessionId.toString()); - logErrorAndSendFailureResponse("Cannot start multiple repair sessions over the same sstables", message.from, id); - return; - return sstable != null && - !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i - new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange)); -- } - ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId, snapshottedSSSTables); - }, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup - ++ public boolean apply(SSTableReader sstable) ++ { ++ return sstable != null && ++ !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i ++ new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange)); ++ } ++ }, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup + } logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/225232a9/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index 0bb7172,bab244d..e111155 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -25,6 -23,6 +25,7 @@@ import java.util.* import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; ++import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; @@@ -35,10 -33,11 +36,13 @@@ import com.google.common.util.concurren import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor; -import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; ++import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager; ++import org.apache.cassandra.dht.Bounds; ++import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.ApplicationState; @@@ -308,13 -306,13 +312,13 @@@ public class ActiveRepairService implem } catch (InterruptedException e) { -- parentRepairSessions.remove(parentRepairSession); ++ removeParentRepairSession(parentRepairSession); throw new RuntimeException("Did not get replies from all endpoints. List of failed endpoint(s): " + failedNodes.toString(), e); } if (!status.get()) { -- parentRepairSessions.remove(parentRepairSession); ++ removeParentRepairSession(parentRepairSession); throw new RuntimeException("Did not get positive replies from all endpoints. List of failed endpoint(s): " + failedNodes.toString()); } @@@ -376,8 -405,8 +380,21 @@@ return session; } ++ /** ++ * called when the repair session is done - either failed or anticompaction has completed ++ * ++ * clears out any snapshots created by this repair ++ * ++ * @param parentSessionId ++ * @return ++ */ public synchronized ParentRepairSession removeParentRepairSession(UUID parentSessionId) { ++ for (ColumnFamilyStore cfs : getParentRepairSession(parentSessionId).columnFamilyStores.values()) ++ { ++ if (cfs.snapshotExists(parentSessionId.toString())) ++ cfs.clearSnapshot(parentSessionId.toString()); ++ } return parentRepairSessions.remove(parentSessionId); } @@@ -393,26 -421,13 +410,26 @@@ { assert parentRepairSession != null; ParentRepairSession prs = getParentRepairSession(parentRepairSession); + //A repair will be marked as not global if it is a subrange repair to avoid many small anti-compactions + //in addition to other scenarios such as repairs not involving all DCs or hosts + if (!prs.isGlobal) + { + logger.info("Not a global repair, will not do anticompaction"); + removeParentRepairSession(parentRepairSession); + return Futures.immediateFuture(Collections.emptyList()); + } + assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges"; List<ListenableFuture<?>> futures = new ArrayList<>(); - for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet()) + // if we don't have successful repair ranges, then just skip anticompaction + if (!successfulRanges.isEmpty()) { - Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefsForAntiCompaction(columnFamilyStoreEntry.getKey()); - ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue(); - futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt)); + for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet()) + { - Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefs(columnFamilyStoreEntry.getKey()); ++ Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefsForAntiCompaction(columnFamilyStoreEntry.getKey(), parentRepairSession); + ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue(); + futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt)); + } } ListenableFuture<List<Object>> allAntiCompactionResults = Futures.successfulAsList(futures); @@@ -450,17 -465,39 +467,33 @@@ } } + /** + * We keep a ParentRepairSession around for the duration of the entire repair, for example, on a 256 token vnode rf=3 cluster + * we would have 768 RepairSession but only one ParentRepairSession. We use the PRS to avoid anticompacting the sstables + * 768 times, instead we take all repaired ranges at the end of the repair and anticompact once. + * + * We do an optimistic marking of sstables - when we start an incremental repair we mark all unrepaired sstables as + * repairing (@see markSSTablesRepairing), then while the repair is ongoing compactions might remove those sstables, + * and when it is time for anticompaction we will only anticompact the sstables that are still on disk. + * + * Note that validation and streaming do not care about which sstables we have marked as repairing - they operate on + * all unrepaired sstables (if it is incremental), otherwise we would not get a correct repair. + */ public static class ParentRepairSession { - public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); - public final Collection<Range<Token>> ranges; + private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); + private final Collection<Range<Token>> ranges; public final Map<UUID, Set<String>> sstableMap = new HashMap<>(); - /** - * used as fail time if failed is true - */ + public final boolean isIncremental; + public final boolean isGlobal; public final long repairedAt; public final InetAddress coordinator; + /** - * Used to mark a repair as failed - if the coordinator thinks that the repair is still ongoing and sends a - * request, we need to fail the coordinator as well. - */ - public final boolean failed; - /** + * Indicates whether we have marked sstables as repairing. Can only be done once per table per ParentRepairSession + */ + private final Set<UUID> marked = new HashSet<>(); - public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt, boolean failed) + public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal, long repairedAt) { this.coordinator = coordinator; for (ColumnFamilyStore cfs : columnFamilyStores) @@@ -470,15 -507,28 +503,51 @@@ } this.ranges = ranges; this.repairedAt = repairedAt; - this.failed = failed; + this.isGlobal = isGlobal; + this.isIncremental = isIncremental; } - public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt) ++ /** ++ * Mark sstables repairing - either all sstables or only the unrepaired ones depending on ++ * ++ * whether this is an incremental or full repair ++ * ++ * @param cfId the column family ++ * @param parentSessionId the parent repair session id, used to make sure we don't start multiple repairs over the same sstables ++ */ ++ public synchronized void markSSTablesRepairing(UUID cfId, UUID parentSessionId) + { - this(coordinator, columnFamilyStores, ranges, repairedAt, false); ++ if (!marked.contains(cfId)) ++ { ++ List<SSTableReader> sstables = columnFamilyStores.get(cfId).select(isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES).sstables; ++ Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId); ++ if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(sstables)).isEmpty()) ++ { ++ logger.error("Cannot start multiple repair sessions over the same sstables"); ++ throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); ++ } ++ addSSTables(cfId, sstables); ++ marked.add(cfId); ++ } + } + + /** - * Gets the repairing sstables for anticompaction. ++ * Get the still active sstables we should run anticompaction on + * - * Note that validation and streaming uses the real unrepaired sstables. ++ * note that validation and streaming do not call this method - they have to work on the actual active sstables on the node, we only call this ++ * to know which sstables are still there that were there when we started the repair + * + * @param cfId ++ * @param parentSessionId for checking if there exists a snapshot for this repair + * @return + */ @SuppressWarnings("resource") - public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefs(UUID cfId) - public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(UUID cfId) ++ public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(UUID cfId, UUID parentSessionId) { + assert marked.contains(cfId); ++ boolean isSnapshotRepair = columnFamilyStores.get(cfId).snapshotExists(parentSessionId.toString()); ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder(); -- for (SSTableReader sstable : getActiveSSTables(cfId)) ++ for (SSTableReader sstable : isSnapshotRepair ? getSSTablesForSnapshotRepair(cfId, parentSessionId) : getActiveSSTables(cfId)) { Ref<SSTableReader> ref = sstable.tryRef(); if (ref == null) @@@ -489,12 -539,38 +558,97 @@@ return new Refs<>(references.build()); } + /** - * Marks all the unrepaired sstables as repairing unless we have already done so. ++ * If we are running a snapshot repair we need to find the 'real' sstables when we start anticompaction + * - * Any of these sstables that are still on disk are then anticompacted once the streaming and validation phases are done. ++ * We use the generation of the sstables as identifiers instead of the file name to avoid having to parse out the ++ * actual filename. + * + * @param cfId - * @param parentSessionId used to check that we don't start multiple inc repair sessions over the same sstables ++ * @param parentSessionId ++ * @return + */ - public synchronized void markSSTablesRepairing(UUID cfId, UUID parentSessionId) ++ private Set<SSTableReader> getSSTablesForSnapshotRepair(UUID cfId, UUID parentSessionId) + { - if (!marked.contains(cfId)) ++ Set<SSTableReader> activeSSTables = new HashSet<>(); ++ ColumnFamilyStore cfs = columnFamilyStores.get(cfId); ++ ++ Set<Integer> snapshotGenerations = new HashSet<>(); ++ try (Refs<SSTableReader> snapshottedSSTables = cfs.getSnapshotSSTableReader(parentSessionId.toString())) + { - List<SSTableReader> sstables = columnFamilyStores.get(cfId).select(ColumnFamilyStore.UNREPAIRED_SSTABLES).sstables; - Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId); - if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(sstables)).isEmpty()) ++ for (SSTableReader sstable : snapshottedSSTables) ++ { ++ snapshotGenerations.add(sstable.descriptor.generation); ++ } ++ } ++ catch (IOException e) ++ { ++ throw new RuntimeException(e); ++ } ++ for (SSTableReader sstable : cfs.select(ColumnFamilyStore.CANONICAL_SSTABLES).sstables) ++ if (snapshotGenerations.contains(sstable.descriptor.generation)) ++ activeSSTables.add(sstable); ++ return activeSSTables; ++ } ++ ++ public synchronized void maybeSnapshot(UUID cfId, UUID parentSessionId) ++ { ++ String snapshotName = parentSessionId.toString(); ++ if (!columnFamilyStores.get(cfId).snapshotExists(snapshotName)) ++ { ++ Set<SSTableReader> snapshottedSSTables = columnFamilyStores.get(cfId).snapshot(snapshotName, new Predicate<SSTableReader>() ++ { ++ public boolean apply(SSTableReader sstable) ++ { ++ return sstable != null && ++ (!isIncremental || !sstable.isRepaired()) && ++ !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i ++ new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges); ++ } ++ }, true); ++ ++ if (isAlreadyRepairing(cfId, parentSessionId, snapshottedSSTables)) + { ++ columnFamilyStores.get(cfId).clearSnapshot(parentSessionId.toString()); + logger.error("Cannot start multiple repair sessions over the same sstables"); + throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); + } - addSSTables(cfId, sstables); ++ addSSTables(cfId, snapshottedSSTables); + marked.add(cfId); + } + } + ++ ++ /** ++ * Compares other repairing sstables *generation* to the ones we just snapshotted ++ * ++ * we compare generations since the sstables have different paths due to snapshot names ++ * ++ * @param cfId id of the column family store ++ * @param parentSessionId parent repair session ++ * @param sstables the newly snapshotted sstables ++ * @return ++ */ ++ private boolean isAlreadyRepairing(UUID cfId, UUID parentSessionId, Collection<SSTableReader> sstables) ++ { ++ Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId); ++ Set<Integer> currentlyRepairingGenerations = new HashSet<>(); ++ Set<Integer> newRepairingGenerations = new HashSet<>(); ++ for (SSTableReader sstable : currentlyRepairing) ++ currentlyRepairingGenerations.add(sstable.descriptor.generation); ++ for (SSTableReader sstable : sstables) ++ newRepairingGenerations.add(sstable.descriptor.generation); ++ ++ return !Sets.intersection(currentlyRepairingGenerations, newRepairingGenerations).isEmpty(); ++ } ++ private Set<SSTableReader> getActiveSSTables(UUID cfId) { - if (failed) - return Collections.emptySet(); Set<String> repairedSSTables = sstableMap.get(cfId); Set<SSTableReader> activeSSTables = new HashSet<>(); Set<String> activeSSTableNames = new HashSet<>(); -- for (SSTableReader sstable : columnFamilyStores.get(cfId).getSSTables()) ++ ColumnFamilyStore cfs = columnFamilyStores.get(cfId); ++ for (SSTableReader sstable : cfs.select(ColumnFamilyStore.CANONICAL_SSTABLES).sstables) { if (repairedSSTables.contains(sstable.getFilename())) { @@@ -506,21 -582,16 +660,20 @@@ return activeSSTables; } - public void addSSTables(UUID cfId, Collection<SSTableReader> sstables) + private void addSSTables(UUID cfId, Collection<SSTableReader> sstables) { for (SSTableReader sstable : sstables) - { sstableMap.get(cfId).add(sstable.getFilename()); - } } - public ParentRepairSession asFailed() ++ + public long getRepairedAt() { - return new ParentRepairSession(coordinator, Collections.<ColumnFamilyStore>emptyList(), Collections.<Range<Token>>emptyList(), System.currentTimeMillis(), true); + if (isGlobal) + return repairedAt; + return ActiveRepairService.UNREPAIRED_SSTABLE; } + @Override public String toString() { @@@ -580,9 -652,12 +733,9 @@@ if (!toRemove.isEmpty()) { - logger.debug("Failing {} in parent repair sessions", toRemove); + logger.debug("Removing {} in parent repair sessions", toRemove); for (UUID id : toRemove) - parentRepairSessions.remove(id); - { - ParentRepairSession failed = parentRepairSessions.get(id); - parentRepairSessions.replace(id, failed, failed.asFailed()); - } ++ removeParentRepairSession(id); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/225232a9/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index f34d0e2,cf64322..03a25c6 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@@ -1,30 -1,32 +1,31 @@@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package org.apache.cassandra.service; +import java.net.InetAddress; +import java.util.*; ++import java.util.concurrent.ExecutionException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; -import java.util.UUID; - +import com.google.common.base.Predicate; import com.google.common.collect.Sets; - +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; @@@ -49,183 -42,13 +50,184 @@@ import org.apache.cassandra.utils.concu import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; + import static org.junit.Assert.assertTrue; -public class ActiveRepairServiceTest extends SchemaLoader +public class ActiveRepairServiceTest { + public static final String KEYSPACE5 = "Keyspace5"; + public static final String CF_STANDARD1 = "Standard1"; + public static final String CF_COUNTER = "Counter1"; + + public String cfname; + public ColumnFamilyStore store; + public InetAddress LOCAL, REMOTE; + + private boolean initialized; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE5, + SimpleStrategy.class, + KSMetaData.optsWithRF(2), + SchemaLoader.standardCFMD(KEYSPACE5, CF_COUNTER), + SchemaLoader.standardCFMD(KEYSPACE5, CF_STANDARD1)); + } + + @Before + public void prepare() throws Exception + { + if (!initialized) + { + SchemaLoader.startGossiper(); + initialized = true; + + LOCAL = FBUtilities.getBroadcastAddress(); + // generate a fake endpoint for which we can spoof receiving/sending trees + REMOTE = InetAddress.getByName("127.0.0.2"); + } + + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + tmd.clearUnsafe(); + StorageService.instance.setTokens(Collections.singleton(StorageService.getPartitioner().getRandomToken())); + tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), REMOTE); + assert tmd.isMember(REMOTE); + } + + @Test + public void testGetNeighborsPlusOne() throws Throwable + { + // generate rf+1 nodes, and ensure that all nodes are returned + Set<InetAddress> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + expected.remove(FBUtilities.getBroadcastAddress()); + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Set<InetAddress> neighbors = new HashSet<>(); + for (Range<Token> range : ranges) + { + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null)); + } + assertEquals(expected, neighbors); + } + + @Test + public void testGetNeighborsTimesTwo() throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + + // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy(); + Set<InetAddress> expected = new HashSet<>(); + for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress())) + { + expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange)); + } + expected.remove(FBUtilities.getBroadcastAddress()); + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Set<InetAddress> neighbors = new HashSet<>(); + for (Range<Token> range : ranges) + { + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null)); + } + assertEquals(expected, neighbors); + } + + @Test + public void testGetNeighborsPlusOneInLocalDC() throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + + // generate rf+1 nodes, and ensure that all nodes are returned + Set<InetAddress> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + expected.remove(FBUtilities.getBroadcastAddress()); + // remove remote endpoints + TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology(); + HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter())); + expected = Sets.intersection(expected, localEndpoints); + + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Set<InetAddress> neighbors = new HashSet<>(); + for (Range<Token> range : ranges) + { + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); + } + assertEquals(expected, neighbors); + } + + @Test + public void testGetNeighborsTimesTwoInLocalDC() throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + + // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy(); + Set<InetAddress> expected = new HashSet<>(); + for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress())) + { + expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange)); + } + expected.remove(FBUtilities.getBroadcastAddress()); + // remove remote endpoints + TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology(); + HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter())); + expected = Sets.intersection(expected, localEndpoints); + + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Set<InetAddress> neighbors = new HashSet<>(); + for (Range<Token> range : ranges) + { + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); + } + assertEquals(expected, neighbors); + } + + @Test + public void testGetNeighborsTimesTwoInSpecifiedHosts() throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); - private static final String KEYSPACE1 = "Keyspace1"; - private static final String CF = "Standard1"; + // generate rf*2 nodes, and ensure that only neighbors specified by the hosts are returned + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy(); + List<InetAddress> expected = new ArrayList<>(); + for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress())) + { + expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange)); + } + + expected.remove(FBUtilities.getBroadcastAddress()); + Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName()); + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + + assertEquals(expected.get(0), ActiveRepairService.getNeighbors(KEYSPACE5, ranges, + ranges.iterator().next(), + null, hosts).iterator().next()); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws Throwable + { + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + //Dont give local endpoint + Collection<String> hosts = Arrays.asList("127.0.0.3"); + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + ActiveRepairService.getNeighbors(KEYSPACE5, ranges, ranges.iterator().next(), null, hosts); + } + + Set<InetAddress> addTokens(int max) throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + Set<InetAddress> endpoints = new HashSet<>(); + for (int i = 1; i <= max; i++) + { + InetAddress endpoint = InetAddress.getByName("127.0.0." + i); + tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), endpoint); + endpoints.add(endpoint); + } + return endpoints; + } @Test public void testGetActiveRepairedSSTableRefs() @@@ -234,14 -57,11 +236,12 @@@ Set<SSTableReader> original = store.getUnrepairedSSTables(); UUID prsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null); + ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, false); ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); - - //add all sstables to parent repair session - prs.addSSTables(store.metadata.cfId, original); + prs.markSSTablesRepairing(store.metadata.cfId, prsId); + //retrieve all sstable references from parent repair sessions - Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId); - Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId); ++ Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId); Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); assertEquals(original, retrieved); refs.release(); @@@ -249,35 -69,70 +249,128 @@@ //remove 1 sstable from data data tracker Set<SSTableReader> newLiveSet = new HashSet<>(original); Iterator<SSTableReader> it = newLiveSet.iterator(); - SSTableReader removed = it.next(); + final SSTableReader removed = it.next(); it.remove(); - store.getDataTracker().replaceWithNewInstances(Collections.singleton(removed), Collections.EMPTY_SET); + store.getTracker().dropSSTables(new Predicate<SSTableReader>() + { + public boolean apply(SSTableReader reader) + { + return removed.equals(reader); + } + }, OperationType.COMPACTION, null); //retrieve sstable references from parent repair session again - removed sstable must not be present - refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId); - refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId); ++ refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId); retrieved = Sets.newHashSet(refs.iterator()); assertEquals(newLiveSet, retrieved); assertFalse(retrieved.contains(removed)); refs.release(); } + @Test + public void testAddingMoreSSTables() + { + ColumnFamilyStore store = prepareColumnFamilyStore(); + Set<SSTableReader> original = store.getUnrepairedSSTables(); + UUID prsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null); ++ ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, true); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); + prs.markSSTablesRepairing(store.metadata.cfId, prsId); - try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId)) ++ try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) + { + Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); + assertEquals(original, retrieved); + } + createSSTables(store, 2); + boolean exception = false; + try + { + UUID newPrsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(newPrsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null); ++ ActiveRepairService.instance.registerParentRepairSession(newPrsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, true); + ActiveRepairService.instance.getParentRepairSession(newPrsId).markSSTablesRepairing(store.metadata.cfId, newPrsId); + } + catch (Throwable t) + { + exception = true; + } + assertTrue(exception); + - try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId)) ++ try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) + { + Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); + assertEquals(original, retrieved); + } + } + ++ @Test ++ public void testSnapshotAddSSTables() throws ExecutionException, InterruptedException ++ { ++ ColumnFamilyStore store = prepareColumnFamilyStore(); ++ UUID prsId = UUID.randomUUID(); ++ Set<SSTableReader> original = store.getUnrepairedSSTables(); ++ ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true); ++ ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId); ++ ++ UUID prsId2 = UUID.randomUUID(); ++ ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true); ++ createSSTables(store, 2); ++ ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId); ++ try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) ++ { ++ assertEquals(original, Sets.newHashSet(refs.iterator())); ++ } ++ store.forceMajorCompaction(); ++ // after a major compaction the original sstables will be gone and we will have no sstables to anticompact: ++ try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) ++ { ++ assertEquals(0, refs.size()); ++ } ++ } ++ ++ @Test ++ public void testSnapshotMultipleRepairs() ++ { ++ ColumnFamilyStore store = prepareColumnFamilyStore(); ++ Set<SSTableReader> original = store.getUnrepairedSSTables(); ++ UUID prsId = UUID.randomUUID(); ++ ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true); ++ ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId); ++ ++ UUID prsId2 = UUID.randomUUID(); ++ ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true); ++ boolean exception = false; ++ try ++ { ++ ActiveRepairService.instance.getParentRepairSession(prsId2).maybeSnapshot(store.metadata.cfId, prsId2); ++ } ++ catch (Throwable t) ++ { ++ exception = true; ++ } ++ assertTrue(exception); ++ try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId)) ++ { ++ assertEquals(original, Sets.newHashSet(refs.iterator())); ++ } ++ } ++ private ColumnFamilyStore prepareColumnFamilyStore() { - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); + Keyspace keyspace = Keyspace.open(KEYSPACE5); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF_STANDARD1); + store.truncateBlocking(); store.disableAutoCompaction(); + createSSTables(store, 10); + return store; + } + + private void createSSTables(ColumnFamilyStore cfs, int count) + { long timestamp = System.currentTimeMillis(); - //create 10 sstables - for (int i = 0; i < 10; i++) + for (int i = 0; i < count; i++) { DecoratedKey key = Util.dk(Integer.toString(i)); - Mutation rm = new Mutation(KEYSPACE1, key.getKey()); + Mutation rm = new Mutation(KEYSPACE5, key.getKey()); for (int j = 0; j < 10; j++) rm.add("Standard1", Util.cellname(Integer.toString(j)), ByteBufferUtil.EMPTY_BYTE_BUFFER,