This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push: new c14abb40b2 IR may leak SSTables with pending repair when coming from streaming c14abb40b2 is described below commit c14abb40b2d0e2e1db121eac65a1264a287bcd18 Author: David Capwell <dcapw...@apache.org> AuthorDate: Wed May 1 09:47:06 2024 -0700 IR may leak SSTables with pending repair when coming from streaming patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19182 --- CHANGES.txt | 1 + .../db/compaction/AbstractStrategyHolder.java | 1 + .../db/compaction/CompactionStrategyHolder.java | 6 ++++ .../db/compaction/CompactionStrategyManager.java | 18 +++++++++++- .../db/compaction/PendingRepairHolder.java | 13 ++++++++- .../db/compaction/PendingRepairManager.java | 11 ++++++- .../org/apache/cassandra/db/lifecycle/Tracker.java | 2 ++ .../distributed/test/DistributedRepairUtils.java | 34 ++++++++++++++++++++++ .../distributed/test/RepairCoordinatorFast.java | 3 ++ .../test/RepairCoordinatorNeighbourDown.java | 3 ++ 10 files changed, 89 insertions(+), 3 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 64c63912ba..5db267e099 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0.13 + * IR may leak SSTables with pending repair when coming from streaming (CASSANDRA-19182) * Streaming exception race creates corrupt transaction log files that prevent restart (CASSANDRA-18736) * Fix CQL tojson timestamp output on negative timestamp values before Gregorian calendar reform in 1582 (CASSANDRA-19566) * Fix few types issues and implement types compatibility tests (CASSANDRA-19479) diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java index 95fc7b85b0..824b22f9a5 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java @@ -181,6 +181,7 @@ public abstract class AbstractStrategyHolder return new GroupedSSTableContainer(this); } + public abstract void addSSTable(SSTableReader sstable); public abstract void addSSTables(GroupedSSTableContainer sstables); public abstract void removeSSTables(GroupedSSTableContainer sstables); diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java index 129ee797ee..a4084a37d7 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java @@ -138,6 +138,12 @@ public class CompactionStrategyHolder extends AbstractStrategyHolder return tasks; } + @Override + public void addSSTable(SSTableReader sstable) + { + getStrategyFor(sstable).addSSTable(sstable); + } + @Override public void addSSTables(GroupedSSTableContainer sstables) { diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 99e2ce996f..93105c8a11 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -434,6 +434,20 @@ public class CompactionStrategyManager implements INotificationConsumer } } + @VisibleForTesting + public boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader sstable) + { + readLock.lock(); + try + { + return pendingRepairs.hasPendingRepairSSTable(sessionID, sstable) || transientRepairs.hasPendingRepairSSTable(sessionID, sstable); + } + finally + { + readLock.unlock(); + } + } + public void shutdown() { writeLock.lock(); @@ -608,7 +622,7 @@ public class CompactionStrategyManager implements INotificationConsumer private void handleFlushNotification(Iterable<SSTableReader> added) { for (SSTableReader sstable : added) - compactionStrategyFor(sstable).addSSTable(sstable); + getHolder(sstable).addSSTable(sstable); } private int getHolderIndex(SSTableReader sstable) @@ -1147,6 +1161,8 @@ public class CompactionStrategyManager implements INotificationConsumer */ public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt, UUID pendingRepair, boolean isTransient) throws IOException { + if (sstables.isEmpty()) + return; Set<SSTableReader> changed = new HashSet<>(); writeLock.lock(); diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java index 03d4111745..d8a561bc40 100644 --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java @@ -30,7 +30,6 @@ import com.google.common.collect.Iterables; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; -import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.index.Index; @@ -149,6 +148,13 @@ public class PendingRepairHolder extends AbstractStrategyHolder return tasks; } + @Override + public void addSSTable(SSTableReader sstable) + { + Preconditions.checkArgument(managesSSTable(sstable), "Attempting to add sstable from wrong holder"); + managers.get(router.getIndexForSSTable(sstable)).addSSTable(sstable); + } + AbstractCompactionTask getNextRepairFinishedTask() { List<TaskSupplier> repairFinishedSuppliers = getRepairFinishedTaskSuppliers(); @@ -282,4 +288,9 @@ public class PendingRepairHolder extends AbstractStrategyHolder { return Iterables.any(managers, prm -> prm.containsSSTable(sstable)); } + + public boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader sstable) + { + return Iterables.any(managers, prm -> prm.hasPendingRepairSSTable(sessionID, sstable)); + } } diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java index aefa40be80..bbc7198fb4 100644 --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java @@ -464,7 +464,7 @@ class PendingRepairManager public synchronized boolean hasDataForSession(UUID sessionID) { - return strategies.keySet().contains(sessionID); + return strategies.containsKey(sessionID); } boolean containsSSTable(SSTableReader sstable) @@ -482,6 +482,15 @@ class PendingRepairManager return group.entrySet().stream().map(g -> strategies.get(g.getKey()).getUserDefinedTask(g.getValue(), gcBefore)).collect(Collectors.toList()); } + @VisibleForTesting + public synchronized boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader sstable) + { + AbstractCompactionStrategy strat = strategies.get(sessionID); + if (strat == null) + return false; + return strat.getSSTables().contains(sstable); + } + /** * promotes/demotes sstables involved in a consistent repair that has been finalized, or failed */ diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index 3d72a113b8..15cec5d63b 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -460,6 +460,8 @@ public class Tracker public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged) { + if (repairStatusesChanged.isEmpty()) + return; INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged); for (INotificationConsumer subscriber : subscribers) subscriber.handleNotification(notification, this); diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java index fce67b52d1..5526d97e1d 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java @@ -20,12 +20,15 @@ package org.apache.cassandra.distributed.test; import java.util.Collections; import java.util.Set; +import java.util.UUID; import java.util.function.Consumer; import com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.ArrayUtils; import org.junit.Assert; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInvokableInstance; @@ -33,7 +36,10 @@ import org.apache.cassandra.distributed.api.NodeToolResult; import org.apache.cassandra.distributed.api.QueryResult; import org.apache.cassandra.distributed.api.Row; import org.apache.cassandra.distributed.impl.AbstractCluster; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.repair.consistent.LocalSession; +import org.apache.cassandra.service.ActiveRepairService; import static org.apache.cassandra.utils.Retry.retryWithBackoffBlocking; @@ -167,6 +173,34 @@ public final class DistributedRepairUtils Assert.assertFalse("Only one repair expected, but found more than one", rs.hasNext()); } + public static void assertNoSSTableLeak(ICluster<IInvokableInstance> cluster, String ks, String table) + { + cluster.forEach(i -> { + String name = "node" + i.config().num(); + i.forceCompact(ks, table); // cleanup happens in compaction, so run before checking + i.runOnInstance(() -> { + ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(table); + for (SSTableReader sstable : cfs.getTracker().getView().liveSSTables()) + { + UUID pendingRepair = sstable.getSSTableMetadata().pendingRepair; + if (pendingRepair == null) + continue; + LocalSession session = ActiveRepairService.instance.consistent.local.getSession(pendingRepair); + // repair maybe async, so some participates may still think the repair is active, which means the sstable SHOULD link to it + if (session != null && !session.isCompleted()) + continue; + // The session is complete, yet the sstable is not updated... is this still pending in compaction? + if (cfs.getCompactionStrategyManager().hasPendingRepairSSTable(pendingRepair, sstable)) + continue; + // compaction does not know about the pending repair... race condition since this check started? + if (sstable.getSSTableMetadata().pendingRepair == null) + continue; // yep, race condition... ignore + throw new AssertionError(String.format("%s had leak detected on sstable %s", name, sstable.descriptor)); + } + }); + }); + } + public enum RepairType { FULL { public String[] append(String... args) diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java index 0e156dab98..5516fc8748 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java +++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java @@ -40,6 +40,7 @@ import org.apache.cassandra.service.StorageService; import static java.lang.String.format; import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of; +import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertNoSSTableLeak; import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains; import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist; import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairSuccess; @@ -82,6 +83,7 @@ public abstract class RepairCoordinatorFast extends RepairCoordinatorBase } Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2)); + assertNoSSTableLeak(CLUSTER, KEYSPACE, table); }); } @@ -398,6 +400,7 @@ public abstract class RepairCoordinatorFast extends RepairCoordinatorBase { assertParentRepairNotExist(CLUSTER, KEYSPACE, table); } + assertNoSSTableLeak(CLUSTER, KEYSPACE, table); } finally { diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java index 4228806fa7..590c65aa72 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java +++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java @@ -40,6 +40,7 @@ import org.apache.cassandra.utils.FBUtilities; import static java.lang.String.format; import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of; +import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertNoSSTableLeak; import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains; import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist; import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions; @@ -125,6 +126,7 @@ public abstract class RepairCoordinatorNeighbourDown extends RepairCoordinatorBa { assertParentRepairNotExist(CLUSTER, KEYSPACE, table); } + assertNoSSTableLeak(CLUSTER, KEYSPACE, table); }); } @@ -184,6 +186,7 @@ public abstract class RepairCoordinatorNeighbourDown extends RepairCoordinatorBa { assertParentRepairNotExist(CLUSTER, KEYSPACE, table); } + assertNoSSTableLeak(CLUSTER, KEYSPACE, table); }); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org