This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new c14abb40b2 IR may leak SSTables with pending repair when coming from 
streaming
c14abb40b2 is described below

commit c14abb40b2d0e2e1db121eac65a1264a287bcd18
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Wed May 1 09:47:06 2024 -0700

    IR may leak SSTables with pending repair when coming from streaming
    
    patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19182
---
 CHANGES.txt                                        |  1 +
 .../db/compaction/AbstractStrategyHolder.java      |  1 +
 .../db/compaction/CompactionStrategyHolder.java    |  6 ++++
 .../db/compaction/CompactionStrategyManager.java   | 18 +++++++++++-
 .../db/compaction/PendingRepairHolder.java         | 13 ++++++++-
 .../db/compaction/PendingRepairManager.java        | 11 ++++++-
 .../org/apache/cassandra/db/lifecycle/Tracker.java |  2 ++
 .../distributed/test/DistributedRepairUtils.java   | 34 ++++++++++++++++++++++
 .../distributed/test/RepairCoordinatorFast.java    |  3 ++
 .../test/RepairCoordinatorNeighbourDown.java       |  3 ++
 10 files changed, 89 insertions(+), 3 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 64c63912ba..5db267e099 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.13
+ * IR may leak SSTables with pending repair when coming from streaming 
(CASSANDRA-19182)
  * Streaming exception race creates corrupt transaction log files that prevent 
restart (CASSANDRA-18736)
  * Fix CQL tojson timestamp output on negative timestamp values before 
Gregorian calendar reform in 1582 (CASSANDRA-19566)
  * Fix few types issues and implement types compatibility tests 
(CASSANDRA-19479)
diff --git 
a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java 
b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
index 95fc7b85b0..824b22f9a5 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
@@ -181,6 +181,7 @@ public abstract class AbstractStrategyHolder
         return new GroupedSSTableContainer(this);
     }
 
+    public abstract void addSSTable(SSTableReader sstable);
     public abstract void addSSTables(GroupedSSTableContainer sstables);
 
     public abstract void removeSSTables(GroupedSSTableContainer sstables);
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
index 129ee797ee..a4084a37d7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
@@ -138,6 +138,12 @@ public class CompactionStrategyHolder extends 
AbstractStrategyHolder
         return tasks;
     }
 
+    @Override
+    public void addSSTable(SSTableReader sstable)
+    {
+        getStrategyFor(sstable).addSSTable(sstable);
+    }
+
     @Override
     public void addSSTables(GroupedSSTableContainer sstables)
     {
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 99e2ce996f..93105c8a11 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -434,6 +434,20 @@ public class CompactionStrategyManager implements 
INotificationConsumer
         }
     }
 
+    @VisibleForTesting
+    public boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader 
sstable)
+    {
+        readLock.lock();
+        try
+        {
+            return pendingRepairs.hasPendingRepairSSTable(sessionID, sstable) 
|| transientRepairs.hasPendingRepairSSTable(sessionID, sstable);
+        }
+        finally
+        {
+            readLock.unlock();
+        }
+    }
+
     public void shutdown()
     {
         writeLock.lock();
@@ -608,7 +622,7 @@ public class CompactionStrategyManager implements 
INotificationConsumer
     private void handleFlushNotification(Iterable<SSTableReader> added)
     {
         for (SSTableReader sstable : added)
-            compactionStrategyFor(sstable).addSSTable(sstable);
+            getHolder(sstable).addSSTable(sstable);
     }
 
     private int getHolderIndex(SSTableReader sstable)
@@ -1147,6 +1161,8 @@ public class CompactionStrategyManager implements 
INotificationConsumer
       */
     public void mutateRepaired(Collection<SSTableReader> sstables, long 
repairedAt, UUID pendingRepair, boolean isTransient) throws IOException
     {
+        if (sstables.isEmpty())
+            return;
         Set<SSTableReader> changed = new HashSet<>();
 
         writeLock.lock();
diff --git 
a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java 
b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
index 03d4111745..d8a561bc40 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
@@ -30,7 +30,6 @@ import com.google.common.collect.Iterables;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.index.Index;
@@ -149,6 +148,13 @@ public class PendingRepairHolder extends 
AbstractStrategyHolder
         return tasks;
     }
 
+    @Override
+    public void addSSTable(SSTableReader sstable)
+    {
+        Preconditions.checkArgument(managesSSTable(sstable), "Attempting to 
add sstable from wrong holder");
+        managers.get(router.getIndexForSSTable(sstable)).addSSTable(sstable);
+    }
+
     AbstractCompactionTask getNextRepairFinishedTask()
     {
         List<TaskSupplier> repairFinishedSuppliers = 
getRepairFinishedTaskSuppliers();
@@ -282,4 +288,9 @@ public class PendingRepairHolder extends 
AbstractStrategyHolder
     {
         return Iterables.any(managers, prm -> prm.containsSSTable(sstable));
     }
+
+    public boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader 
sstable)
+    {
+        return Iterables.any(managers, prm -> 
prm.hasPendingRepairSSTable(sessionID, sstable));
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java 
b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index aefa40be80..bbc7198fb4 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -464,7 +464,7 @@ class PendingRepairManager
 
     public synchronized boolean hasDataForSession(UUID sessionID)
     {
-        return strategies.keySet().contains(sessionID);
+        return strategies.containsKey(sessionID);
     }
 
     boolean containsSSTable(SSTableReader sstable)
@@ -482,6 +482,15 @@ class PendingRepairManager
         return group.entrySet().stream().map(g -> 
strategies.get(g.getKey()).getUserDefinedTask(g.getValue(), 
gcBefore)).collect(Collectors.toList());
     }
 
+    @VisibleForTesting
+    public synchronized boolean hasPendingRepairSSTable(UUID sessionID, 
SSTableReader sstable)
+    {
+        AbstractCompactionStrategy strat = strategies.get(sessionID);
+        if (strat == null)
+            return false;
+        return strat.getSSTables().contains(sstable);
+    }
+
     /**
      * promotes/demotes sstables involved in a consistent repair that has been 
finalized, or failed
      */
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java 
b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 3d72a113b8..15cec5d63b 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -460,6 +460,8 @@ public class Tracker
 
     public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> 
repairStatusesChanged)
     {
+        if (repairStatusesChanged.isEmpty())
+            return;
         INotification notification = new 
SSTableRepairStatusChanged(repairStatusesChanged);
         for (INotificationConsumer subscriber : subscribers)
             subscriber.handleNotification(notification, this);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
 
b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
index fce67b52d1..5526d97e1d 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
@@ -20,12 +20,15 @@ package org.apache.cassandra.distributed.test;
 
 import java.util.Collections;
 import java.util.Set;
+import java.util.UUID;
 import java.util.function.Consumer;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang3.ArrayUtils;
 import org.junit.Assert;
 
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
@@ -33,7 +36,10 @@ import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.distributed.api.QueryResult;
 import org.apache.cassandra.distributed.api.Row;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.repair.consistent.LocalSession;
+import org.apache.cassandra.service.ActiveRepairService;
 
 import static org.apache.cassandra.utils.Retry.retryWithBackoffBlocking;
 
@@ -167,6 +173,34 @@ public final class DistributedRepairUtils
         Assert.assertFalse("Only one repair expected, but found more than 
one", rs.hasNext());
     }
 
+    public static void assertNoSSTableLeak(ICluster<IInvokableInstance> 
cluster, String ks, String table)
+    {
+        cluster.forEach(i -> {
+            String name = "node" + i.config().num();
+            i.forceCompact(ks, table); // cleanup happens in compaction, so 
run before checking
+            i.runOnInstance(() -> {
+                ColumnFamilyStore cfs = 
Keyspace.open(ks).getColumnFamilyStore(table);
+                for (SSTableReader sstable : 
cfs.getTracker().getView().liveSSTables())
+                {
+                    UUID pendingRepair = 
sstable.getSSTableMetadata().pendingRepair;
+                    if (pendingRepair == null)
+                        continue;
+                    LocalSession session = 
ActiveRepairService.instance.consistent.local.getSession(pendingRepair);
+                    // repair maybe async, so some participates may still 
think the repair is active, which means the sstable SHOULD link to it
+                    if (session != null && !session.isCompleted())
+                        continue;
+                    // The session is complete, yet the sstable is not 
updated... is this still pending in compaction?
+                    if 
(cfs.getCompactionStrategyManager().hasPendingRepairSSTable(pendingRepair, 
sstable))
+                        continue;
+                    // compaction does not know about the pending repair... 
race condition since this check started?
+                    if (sstable.getSSTableMetadata().pendingRepair == null)
+                        continue; // yep, race condition... ignore
+                    throw new AssertionError(String.format("%s had leak 
detected on sstable %s", name, sstable.descriptor));
+                }
+            });
+        });
+    }
+
     public enum RepairType {
         FULL {
             public String[] append(String... args)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
 
b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
index 0e156dab98..5516fc8748 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.service.StorageService;
 
 import static java.lang.String.format;
 import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
+import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.assertNoSSTableLeak;
 import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
 import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
 import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairSuccess;
@@ -82,6 +83,7 @@ public abstract class RepairCoordinatorFast extends 
RepairCoordinatorBase
             }
 
             Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 
2));
+            assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
         });
     }
 
@@ -398,6 +400,7 @@ public abstract class RepairCoordinatorFast extends 
RepairCoordinatorBase
                 {
                     assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
                 }
+                assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
             }
             finally
             {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
 
b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
index 4228806fa7..590c65aa72 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.utils.FBUtilities;
 
 import static java.lang.String.format;
 import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
+import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.assertNoSSTableLeak;
 import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
 import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
 import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
@@ -125,6 +126,7 @@ public abstract class RepairCoordinatorNeighbourDown 
extends RepairCoordinatorBa
             {
                 assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
             }
+            assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
         });
     }
 
@@ -184,6 +186,7 @@ public abstract class RepairCoordinatorNeighbourDown 
extends RepairCoordinatorBa
             {
                 assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
             }
+            assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
         });
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to