Repository: cassandra Updated Branches: refs/heads/trunk 7cc51f7ae -> 2818ca4cf
Pick sstables to validate as late as possible with inc repairs Patch by marcuse; reviewed by yukim for CASSANDRA-8366 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f7077c0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f7077c0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f7077c0 Branch: refs/heads/trunk Commit: 2f7077c06ccbd5e8e7259c6891fe98d83ec3359d Parents: 33279dd Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Feb 17 16:20:35 2015 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Mar 3 10:32:46 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 14 +++++++ .../db/compaction/CompactionManager.java | 14 ++++++- .../cassandra/service/ActiveRepairService.java | 41 +++++++------------- 4 files changed, 42 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f7077c0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 15a5a61..c3c7a19 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.4 + * Pick sstables for validation as late as possible inc repairs (CASSANDRA-8366) * Fix commitlog getPendingTasks to not increment (CASSANDRA-8856) * Fix parallelism adjustment in range and secondary index queries when the first fetch does not satisfy the limit (CASSANDRA-8856) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f7077c0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 62aadf9..e4531f2 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2926,4 +2926,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return new ArrayList<>(view.sstables); } }; + + public static final Function<DataTracker.View, List<SSTableReader>> UNREPAIRED_SSTABLES = new Function<DataTracker.View, List<SSTableReader>>() + { + public List<SSTableReader> apply(DataTracker.View view) + { + List<SSTableReader> sstables = new ArrayList<>(); + for (SSTableReader sstable : view.sstables) + { + if (!sstable.isRepaired()) + sstables.add(sstable); + } + return sstables; + } + }; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f7077c0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 68313a3..e54a25f 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -956,7 +956,19 @@ public class CompactionManager implements CompactionManagerMBean if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null) sstables = cfs.selectAndReference(ColumnFamilyStore.ALL_SSTABLES).refs; else - sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId); + { + ColumnFamilyStore.RefViewFragment refView = cfs.selectAndReference(ColumnFamilyStore.UNREPAIRED_SSTABLES); + sstables = refView.refs; + Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId); + + if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(refView.sstables)).isEmpty()) + { + logger.error("Cannot start multiple repair sessions over the same sstables"); + throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); + } + + ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).addSSTables(cfs.metadata.cfId, refView.sstables); + } if (validator.gcBefore > 0) gcBefore = validator.gcBefore; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f7077c0/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index bf1cdd6..f71cb6b 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -305,38 +305,16 @@ public class ActiveRepairService public synchronized void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges) { - Map<UUID, Set<SSTableReader>> sstablesToRepair = new HashMap<>(); - for (ColumnFamilyStore cfs : columnFamilyStores) - { - Set<SSTableReader> sstables = new HashSet<>(); - Set<SSTableReader> currentlyRepairing = currentlyRepairing(cfs.metadata.cfId); - for (SSTableReader sstable : cfs.getSSTables()) - { - if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges)) - { - if (!sstable.isRepaired()) - { - if (currentlyRepairing.contains(sstable)) - { - logger.error("Already repairing "+sstable+", can not continue."); - throw new RuntimeException("Already repairing "+sstable+", can not continue."); - } - sstables.add(sstable); - } - } - } - sstablesToRepair.put(cfs.metadata.cfId, sstables); - } - parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, sstablesToRepair, System.currentTimeMillis())); + parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, System.currentTimeMillis())); } - private Set<SSTableReader> currentlyRepairing(UUID cfId) + public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID parentRepairSession) { Set<SSTableReader> repairing = new HashSet<>(); for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet()) { Collection<SSTableReader> sstables = entry.getValue().sstableMap.get(cfId); - if (sstables != null) + if (sstables != null && !entry.getKey().equals(parentRepairSession)) repairing.addAll(sstables); } return repairing; @@ -419,12 +397,12 @@ public class ActiveRepairService public final Map<UUID, Set<SSTableReader>> sstableMap; public final long repairedAt; - public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, Map<UUID, Set<SSTableReader>> sstables, long repairedAt) + public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt) { for (ColumnFamilyStore cfs : columnFamilyStores) this.columnFamilyStores.put(cfs.metadata.cfId, cfs); this.ranges = ranges; - this.sstableMap = sstables; + this.sstableMap = new HashMap<>(); this.repairedAt = repairedAt; } @@ -452,6 +430,15 @@ public class ActiveRepairService return new Refs<>(references.build()); } + public void addSSTables(UUID cfId, Collection<SSTableReader> sstables) + { + Set<SSTableReader> existingSSTables = this.sstableMap.get(cfId); + if (existingSSTables == null) + existingSSTables = new HashSet<>(); + existingSSTables.addAll(sstables); + this.sstableMap.put(cfId, existingSSTables); + } + @Override public String toString() {