Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 a24bd6c6a -> 477014b8c refs/heads/cassandra-3.0 632859080 -> 841a80311 refs/heads/cassandra-3.5 70649a8d6 -> 57cbda0a4 refs/heads/trunk 70649a8d6 -> 57cbda0a4
Avoid leaking references during parallel repairs patch by Marcus Olsson; reviewed by Marcus Eriksson for CASSANDRA-11215 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/477014b8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/477014b8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/477014b8 Branch: refs/heads/cassandra-2.2 Commit: 477014b8cddb8cf1a73a7e8b408f130b64a37c6b Parents: a24bd6c Author: Marcus Olsson <marcus.ols...@ericsson.com> Authored: Tue Feb 23 14:30:12 2016 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Mar 1 13:30:35 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 69 ++++++++++++-------- 2 files changed, 42 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/477014b8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 103ac16..a50f256 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 2.2.6 * Preserve order for preferred SSL cipher suites (CASSANDRA-11164) + * Reference leak with parallel repairs on the same table (CASSANDRA-11215) * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216) * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167) * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840) http://git-wip-us.apache.org/repos/asf/cassandra/blob/477014b8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 9ca4406..b015bcd 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1075,35 +1075,9 @@ public class CompactionManager implements CompactionManagerMBean { // flush first so everyone is validating data that is as similar as possible StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name); - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId); - ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES); - Set<SSTableReader> sstablesToValidate = new HashSet<>(); - - for (SSTableReader sstable : sstableCandidates.sstables) - { - if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range))) - { - sstablesToValidate.add(sstable); - } - } - - Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId); - - if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty()) - { - logger.error("Cannot start multiple repair sessions over the same sstables"); - throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); - } - - sstables = Refs.tryRef(sstablesToValidate); + sstables = getSSTablesToValidate(cfs, validator); if (sstables == null) - { - logger.error("Could not reference sstables"); - throw new RuntimeException("Could not reference sstables"); - } - sstableCandidates.release(); - prs.addSSTables(cfs.metadata.cfId, sstablesToValidate); - + return; // this means the parent repair session was removed - the repair session failed on another node and we removed it if (validator.gcBefore > 0) gcBefore = validator.gcBefore; else @@ -1170,6 +1144,45 @@ public class CompactionManager implements CompactionManagerMBean } } + private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator) + { + Refs<SSTableReader> sstables; + + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId); + if (prs == null) + return null; + Set<SSTableReader> sstablesToValidate = new HashSet<>(); + try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES)) + { + for (SSTableReader sstable : sstableCandidates.sstables) + { + if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range))) + { + sstablesToValidate.add(sstable); + } + } + + Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId); + + if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty()) + { + logger.error("Cannot start multiple repair sessions over the same sstables"); + throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); + } + + sstables = Refs.tryRef(sstablesToValidate); + if (sstables == null) + { + logger.error("Could not reference sstables"); + throw new RuntimeException("Could not reference sstables"); + } + } + + prs.addSSTables(cfs.metadata.cfId, sstablesToValidate); + + return sstables; + } + /** * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted