This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 7c79d91b6fef51f87640d99b601845c1dc2f7032
Merge: 428fa1f445 c14abb40b2
Author: ci worker <dcapw...@apache.org>
AuthorDate: Wed May 1 14:32:09 2024 -0700

    Merge branch 'cassandra-4.0' into cassandra-4.1

 CHANGES.txt                                        |  1 +
 .../db/compaction/AbstractStrategyHolder.java      |  1 +
 .../db/compaction/CompactionStrategyHolder.java    |  6 ++++
 .../db/compaction/CompactionStrategyManager.java   | 18 +++++++++++-
 .../db/compaction/PendingRepairHolder.java         | 12 ++++++++
 .../db/compaction/PendingRepairManager.java        | 11 ++++++-
 .../org/apache/cassandra/db/lifecycle/Tracker.java |  2 ++
 .../cassandra/repair/RepairMessageVerbHandler.java |  3 ++
 .../apache/cassandra/repair/RepairRunnable.java    |  4 +++
 .../distributed/test/DistributedRepairUtils.java   | 34 ++++++++++++++++++++++
 .../distributed/test/RepairCoordinatorFast.java    |  3 ++
 .../test/RepairCoordinatorNeighbourDown.java       |  3 ++
 12 files changed, 96 insertions(+), 2 deletions(-)

diff --cc CHANGES.txt
index e4de315818,5db267e099..17257d606a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,10 -1,5 +1,11 @@@
 -4.0.13
 +4.1.5
 + * Make queries visible to the "system_views.queries" virtual table at the 
coordinator level (CASSANDRA-19577)
 + * Concurrent equivalent schema updates lead to unresolved disagreement 
(CASSANDRA-19578)
 + * Fix hints delivery for a node going down repeatedly (CASSANDRA-19495)
 + * Do not go to disk for reading hints file sizes (CASSANDRA-19477)
 + * Fix system_views.settings to handle array types (CASSANDRA-19475)
 +Merged from 4.0:
+  * IR may leak SSTables with pending repair when coming from streaming 
(CASSANDRA-19182)
   * Streaming exception race creates corrupt transaction log files that 
prevent restart (CASSANDRA-18736)
   * Fix CQL tojson timestamp output on negative timestamp values before 
Gregorian calendar reform in 1582 (CASSANDRA-19566)
   * Fix few types issues and implement types compatibility tests 
(CASSANDRA-19479)
diff --cc 
src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 5e1d56122b,93105c8a11..e6b27594f2
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -434,6 -434,20 +434,20 @@@ public class CompactionStrategyManager 
          }
      }
  
+     @VisibleForTesting
 -    public boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader 
sstable)
++    public boolean hasPendingRepairSSTable(TimeUUID sessionID, SSTableReader 
sstable)
+     {
+         readLock.lock();
+         try
+         {
+             return pendingRepairs.hasPendingRepairSSTable(sessionID, sstable) 
|| transientRepairs.hasPendingRepairSSTable(sessionID, sstable);
+         }
+         finally
+         {
+             readLock.unlock();
+         }
+     }
+ 
      public void shutdown()
      {
          writeLock.lock();
@@@ -1183,8 -1159,10 +1197,10 @@@
       * Mutates sstable repairedAt times and notifies listeners of the change 
with the writeLock held. Prevents races
       * with other processes between when the metadata is changed and when 
sstables are moved between strategies.
        */
 -    public void mutateRepaired(Collection<SSTableReader> sstables, long 
repairedAt, UUID pendingRepair, boolean isTransient) throws IOException
 +    public void mutateRepaired(Collection<SSTableReader> sstables, long 
repairedAt, TimeUUID pendingRepair, boolean isTransient) throws IOException
      {
+         if (sstables.isEmpty())
+             return;
          Set<SSTableReader> changed = new HashSet<>();
  
          writeLock.lock();
diff --cc src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
index 314df9e25e,d8a561bc40..86c40e8958
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
@@@ -281,4 -288,9 +288,9 @@@ public class PendingRepairHolder extend
      {
          return Iterables.any(managers, prm -> prm.containsSSTable(sstable));
      }
+ 
 -    public boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader 
sstable)
++    public boolean hasPendingRepairSSTable(TimeUUID sessionID, SSTableReader 
sstable)
+     {
+         return Iterables.any(managers, prm -> 
prm.hasPendingRepairSSTable(sessionID, sstable));
+     }
  }
diff --cc src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index 11d6fe82a5,bbc7198fb4..b99d01b00e
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@@ -462,9 -462,9 +462,9 @@@ class PendingRepairManage
          return strategies.values().contains(strategy);
      }
  
 -    public synchronized boolean hasDataForSession(UUID sessionID)
 +    public synchronized boolean hasDataForSession(TimeUUID sessionID)
      {
-         return strategies.keySet().contains(sessionID);
+         return strategies.containsKey(sessionID);
      }
  
      boolean containsSSTable(SSTableReader sstable)
@@@ -482,6 -482,15 +482,15 @@@
          return group.entrySet().stream().map(g -> 
strategies.get(g.getKey()).getUserDefinedTask(g.getValue(), 
gcBefore)).collect(Collectors.toList());
      }
  
+     @VisibleForTesting
 -    public synchronized boolean hasPendingRepairSSTable(UUID sessionID, 
SSTableReader sstable)
++    public synchronized boolean hasPendingRepairSSTable(TimeUUID sessionID, 
SSTableReader sstable)
+     {
+         AbstractCompactionStrategy strat = strategies.get(sessionID);
+         if (strat == null)
+             return false;
+         return strat.getSSTables().contains(sstable);
+     }
+ 
      /**
       * promotes/demotes sstables involved in a consistent repair that has 
been finalized, or failed
       */
diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 58612f778a,7488f2ebc1..12fa6c341c
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@@ -260,6 -192,6 +260,9 @@@ public class RepairMessageVerbHandler i
                      FailSession failure = (FailSession) message.payload;
                      
ActiveRepairService.instance.consistent.coordinated.handleFailSessionMessage(failure);
                      
ActiveRepairService.instance.consistent.local.handleFailSessionMessage(message.from(),
 failure);
++                    ParticipateState p = 
ActiveRepairService.instance.participate(failure.sessionID);
++                    if (p != null)
++                        p.phase.fail("Failure message from " + 
message.from());
                      break;
  
                  case STATUS_REQ:
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index a56a32a53d,14fab59741..da4e388c5d
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -37,9 -41,12 +37,10 @@@ import com.google.common.collect.Immuta
  import com.google.common.collect.Iterables;
  import com.google.common.collect.Lists;
  import com.google.common.collect.Sets;
 -import com.google.common.util.concurrent.AsyncFunction;
 -import com.google.common.util.concurrent.FutureCallback;
 -import com.google.common.util.concurrent.Futures;
 -import com.google.common.util.concurrent.ListenableFuture;
 -import com.google.common.util.concurrent.ListeningExecutorService;
 -import com.google.common.util.concurrent.MoreExecutors;
 +
 +import org.apache.cassandra.utils.TimeUUID;
++import org.apache.cassandra.repair.state.ParticipateState;
 +import org.apache.cassandra.utils.concurrent.Future;
  import org.apache.commons.lang3.time.DurationFormatUtils;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -201,8 -195,7 +202,11 @@@ public class RepairRunnable implements 
              Throwable error = firstError.get();
              reason = error != null ? error.getMessage() : "Some repair 
failed";
          }
 -        String completionMessage = String.format("Repair command #%d finished 
with error", cmd);
 +        state.phase.fail(reason);
++        ParticipateState p = 
ActiveRepairService.instance.participate(state.id);
++        if (p != null)
++            p.phase.fail(reason);
 +        String completionMessage = String.format("Repair command #%d finished 
with error", state.cmd);
  
          // Note we rely on the first message being the reason for the failure
          // when inspecting this state from 
RepairRunner.queryForCompletedRepair
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
index fce67b52d1,5526d97e1d..441fcadf9b
--- 
a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
@@@ -33,7 -36,10 +35,11 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.distributed.api.QueryResult;
  import org.apache.cassandra.distributed.api.Row;
  import org.apache.cassandra.distributed.impl.AbstractCluster;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.metrics.StorageMetrics;
+ import org.apache.cassandra.repair.consistent.LocalSession;
+ import org.apache.cassandra.service.ActiveRepairService;
++import org.apache.cassandra.utils.TimeUUID;
  
  import static org.apache.cassandra.utils.Retry.retryWithBackoffBlocking;
  
@@@ -167,6 -173,34 +173,34 @@@ public final class DistributedRepairUti
          Assert.assertFalse("Only one repair expected, but found more than 
one", rs.hasNext());
      }
  
+     public static void assertNoSSTableLeak(ICluster<IInvokableInstance> 
cluster, String ks, String table)
+     {
+         cluster.forEach(i -> {
+             String name = "node" + i.config().num();
+             i.forceCompact(ks, table); // cleanup happens in compaction, so 
run before checking
+             i.runOnInstance(() -> {
+                 ColumnFamilyStore cfs = 
Keyspace.open(ks).getColumnFamilyStore(table);
+                 for (SSTableReader sstable : 
cfs.getTracker().getView().liveSSTables())
+                 {
 -                    UUID pendingRepair = 
sstable.getSSTableMetadata().pendingRepair;
++                    TimeUUID pendingRepair = 
sstable.getSSTableMetadata().pendingRepair;
+                     if (pendingRepair == null)
+                         continue;
+                     LocalSession session = 
ActiveRepairService.instance.consistent.local.getSession(pendingRepair);
+                     // repair maybe async, so some participates may still 
think the repair is active, which means the sstable SHOULD link to it
+                     if (session != null && !session.isCompleted())
+                         continue;
+                     // The session is complete, yet the sstable is not 
updated... is this still pending in compaction?
+                     if 
(cfs.getCompactionStrategyManager().hasPendingRepairSSTable(pendingRepair, 
sstable))
+                         continue;
+                     // compaction does not know about the pending repair... 
race condition since this check started?
+                     if (sstable.getSSTableMetadata().pendingRepair == null)
+                         continue; // yep, race condition... ignore
+                     throw new AssertionError(String.format("%s had leak 
detected on sstable %s", name, sstable.descriptor));
+                 }
+             });
+         });
+     }
+ 
      public enum RepairType {
          FULL {
              public String[] append(String... args)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to