Avoid marking too many sstables as repaired Patch by marcuse; reviewed by Joel Knighton for CASSANDRA-11696
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3c8421a3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3c8421a3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3c8421a3 Branch: refs/heads/trunk Commit: 3c8421a3304d44c064c230c329a999373feb0607 Parents: 452d626 Author: Marcus Eriksson <marc...@apache.org> Authored: Fri May 27 09:25:28 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Thu Jun 23 11:08:24 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 16 ++---- .../cassandra/service/ActiveRepairService.java | 59 ++++++++++++++++++-- .../service/ActiveRepairServiceTest.java | 57 +++++++++++++++---- 4 files changed, 105 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8421a3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9a3779c..03246ae 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 2.1.15 * Prevent select statements with clustering key > 64k (CASSANDRA-11882) + * Avoid marking too many sstables as repaired (CASSANDRA-11696) * Fix clock skew corrupting other nodes with paxos (CASSANDRA-11991) * Remove distinction between non-existing static columns and existing but null in LWTs (CASSANDRA-9842) * Support mlockall on IBM POWER arch (CASSANDRA-11576) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8421a3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 5af63fe..87819ba 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -474,7 +474,7 @@ public class CompactionManager implements CompactionManagerMBean /** * Make sure the {validatedForRepair} are marked for compaction before calling this. * - * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)). + * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefsForAntiCompaction(..)). * * @param cfs * @param ranges Ranges that the repair was carried out on @@ -1030,17 +1030,9 @@ public class CompactionManager implements CompactionManagerMBean sstables = cfs.selectAndReference(ColumnFamilyStore.CANONICAL_SSTABLES).refs; else { - ColumnFamilyStore.RefViewFragment refView = cfs.selectAndReference(ColumnFamilyStore.UNREPAIRED_SSTABLES); - sstables = refView.refs; - Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId); - - if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(refView.sstables)).isEmpty()) - { - logger.error("Cannot start multiple repair sessions over the same sstables"); - throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); - } - - ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).addSSTables(cfs.metadata.cfId, refView.sstables); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId); + prs.markSSTablesRepairing(cfs.metadata.cfId, validator.desc.parentSessionId); + sstables = cfs.selectAndReference(ColumnFamilyStore.UNREPAIRED_SSTABLES).refs; } if (validator.gcBefore > 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8421a3/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 4c83c48..bab244d 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -425,7 +425,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai List<ListenableFuture<?>> futures = new ArrayList<>(); for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet()) { - Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefs(columnFamilyStoreEntry.getKey()); + Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefsForAntiCompaction(columnFamilyStoreEntry.getKey()); ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue(); futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt)); } @@ -465,6 +465,18 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai } } + /** + * We keep a ParentRepairSession around for the duration of the entire repair, for example, on a 256 token vnode rf=3 cluster + * we would have 768 RepairSession but only one ParentRepairSession. We use the PRS to avoid anticompacting the sstables + * 768 times, instead we take all repaired ranges at the end of the repair and anticompact once. + * + * We do an optimistic marking of sstables - when we start an incremental repair we mark all unrepaired sstables as + * repairing (@see markSSTablesRepairing), then while the repair is ongoing compactions might remove those sstables, + * and when it is time for anticompaction we will only anticompact the sstables that are still on disk. + * + * Note that validation and streaming do not care about which sstables we have marked as repairing - they operate on + * all unrepaired sstables (if it is incremental), otherwise we would not get a correct repair. + */ public static class ParentRepairSession { public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); @@ -480,13 +492,16 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai * request, we need to fail the coordinator as well. */ public final boolean failed; + /** + * Indicates whether we have marked sstables as repairing. Can only be done once per table per ParentRepairSession + */ + private final Set<UUID> marked = new HashSet<>(); public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt, boolean failed) { this.coordinator = coordinator; for (ColumnFamilyStore cfs : columnFamilyStores) { - this.columnFamilyStores.put(cfs.metadata.cfId, cfs); sstableMap.put(cfs.metadata.cfId, new HashSet<String>()); } @@ -500,9 +515,18 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai this(coordinator, columnFamilyStores, ranges, repairedAt, false); } + /** + * Gets the repairing sstables for anticompaction. + * + * Note that validation and streaming uses the real unrepaired sstables. + * + * @param cfId + * @return + */ @SuppressWarnings("resource") - public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefs(UUID cfId) + public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(UUID cfId) { + assert marked.contains(cfId); ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder(); for (SSTableReader sstable : getActiveSSTables(cfId)) { @@ -515,6 +539,30 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai return new Refs<>(references.build()); } + /** + * Marks all the unrepaired sstables as repairing unless we have already done so. + * + * Any of these sstables that are still on disk are then anticompacted once the streaming and validation phases are done. + * + * @param cfId + * @param parentSessionId used to check that we don't start multiple inc repair sessions over the same sstables + */ + public synchronized void markSSTablesRepairing(UUID cfId, UUID parentSessionId) + { + if (!marked.contains(cfId)) + { + List<SSTableReader> sstables = columnFamilyStores.get(cfId).select(ColumnFamilyStore.UNREPAIRED_SSTABLES).sstables; + Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId); + if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(sstables)).isEmpty()) + { + logger.error("Cannot start multiple repair sessions over the same sstables"); + throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); + } + addSSTables(cfId, sstables); + marked.add(cfId); + } + } + private Set<SSTableReader> getActiveSSTables(UUID cfId) { if (failed) @@ -534,12 +582,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai return activeSSTables; } - public void addSSTables(UUID cfId, Collection<SSTableReader> sstables) + private void addSSTables(UUID cfId, Collection<SSTableReader> sstables) { for (SSTableReader sstable : sstables) - { sstableMap.get(cfId).add(sstable.getFilename()); - } } public ParentRepairSession asFailed() @@ -556,6 +602,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai ", repairedAt=" + repairedAt + '}'; } + } /* http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8421a3/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index 26e5126..cf64322 100644 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@ -42,6 +42,7 @@ import org.apache.cassandra.utils.concurrent.Refs; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class ActiveRepairServiceTest extends SchemaLoader { @@ -58,12 +59,9 @@ public class ActiveRepairServiceTest extends SchemaLoader UUID prsId = UUID.randomUUID(); ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null); ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); - - //add all sstables to parent repair session - prs.addSSTables(store.metadata.cfId, original); - + prs.markSSTablesRepairing(store.metadata.cfId, prsId); //retrieve all sstable references from parent repair sessions - Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId); + Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId); Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); assertEquals(original, retrieved); refs.release(); @@ -76,22 +74,62 @@ public class ActiveRepairServiceTest extends SchemaLoader store.getDataTracker().replaceWithNewInstances(Collections.singleton(removed), Collections.EMPTY_SET); //retrieve sstable references from parent repair session again - removed sstable must not be present - refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId); + refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId); retrieved = Sets.newHashSet(refs.iterator()); assertEquals(newLiveSet, retrieved); assertFalse(retrieved.contains(removed)); refs.release(); } + @Test + public void testAddingMoreSSTables() + { + ColumnFamilyStore store = prepareColumnFamilyStore(); + Set<SSTableReader> original = store.getUnrepairedSSTables(); + UUID prsId = UUID.randomUUID(); + ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); + prs.markSSTablesRepairing(store.metadata.cfId, prsId); + try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId)) + { + Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); + assertEquals(original, retrieved); + } + createSSTables(store, 2); + boolean exception = false; + try + { + UUID newPrsId = UUID.randomUUID(); + ActiveRepairService.instance.registerParentRepairSession(newPrsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null); + ActiveRepairService.instance.getParentRepairSession(newPrsId).markSSTablesRepairing(store.metadata.cfId, newPrsId); + } + catch (Throwable t) + { + exception = true; + } + assertTrue(exception); + + try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId)) + { + Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator()); + assertEquals(original, retrieved); + } + } + private ColumnFamilyStore prepareColumnFamilyStore() { Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); store.truncateBlocking(); store.disableAutoCompaction(); + createSSTables(store, 10); + return store; + } + + private void createSSTables(ColumnFamilyStore cfs, int count) + { long timestamp = System.currentTimeMillis(); - //create 10 sstables - for (int i = 0; i < 10; i++) + for (int i = 0; i < count; i++) { DecoratedKey key = Util.dk(Integer.toString(i)); Mutation rm = new Mutation(KEYSPACE1, key.getKey()); @@ -101,8 +139,7 @@ public class ActiveRepairServiceTest extends SchemaLoader timestamp, 0); rm.apply(); - store.forceBlockingFlush(); + cfs.forceBlockingFlush(); } - return store; } }