This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 7c79d91b6fef51f87640d99b601845c1dc2f7032 Merge: 428fa1f445 c14abb40b2 Author: ci worker <dcapw...@apache.org> AuthorDate: Wed May 1 14:32:09 2024 -0700 Merge branch 'cassandra-4.0' into cassandra-4.1 CHANGES.txt | 1 + .../db/compaction/AbstractStrategyHolder.java | 1 + .../db/compaction/CompactionStrategyHolder.java | 6 ++++ .../db/compaction/CompactionStrategyManager.java | 18 +++++++++++- .../db/compaction/PendingRepairHolder.java | 12 ++++++++ .../db/compaction/PendingRepairManager.java | 11 ++++++- .../org/apache/cassandra/db/lifecycle/Tracker.java | 2 ++ .../cassandra/repair/RepairMessageVerbHandler.java | 3 ++ .../apache/cassandra/repair/RepairRunnable.java | 4 +++ .../distributed/test/DistributedRepairUtils.java | 34 ++++++++++++++++++++++ .../distributed/test/RepairCoordinatorFast.java | 3 ++ .../test/RepairCoordinatorNeighbourDown.java | 3 ++ 12 files changed, 96 insertions(+), 2 deletions(-) diff --cc CHANGES.txt index e4de315818,5db267e099..17257d606a --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,10 -1,5 +1,11 @@@ -4.0.13 +4.1.5 + * Make queries visible to the "system_views.queries" virtual table at the coordinator level (CASSANDRA-19577) + * Concurrent equivalent schema updates lead to unresolved disagreement (CASSANDRA-19578) + * Fix hints delivery for a node going down repeatedly (CASSANDRA-19495) + * Do not go to disk for reading hints file sizes (CASSANDRA-19477) + * Fix system_views.settings to handle array types (CASSANDRA-19475) +Merged from 4.0: + * IR may leak SSTables with pending repair when coming from streaming (CASSANDRA-19182) * Streaming exception race creates corrupt transaction log files that prevent restart (CASSANDRA-18736) * Fix CQL tojson timestamp output on negative timestamp values before Gregorian calendar reform in 1582 (CASSANDRA-19566) * Fix few types issues and implement types compatibility tests (CASSANDRA-19479) diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 5e1d56122b,93105c8a11..e6b27594f2 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@@ -434,6 -434,20 +434,20 @@@ public class CompactionStrategyManager } } + @VisibleForTesting - public boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader sstable) ++ public boolean hasPendingRepairSSTable(TimeUUID sessionID, SSTableReader sstable) + { + readLock.lock(); + try + { + return pendingRepairs.hasPendingRepairSSTable(sessionID, sstable) || transientRepairs.hasPendingRepairSSTable(sessionID, sstable); + } + finally + { + readLock.unlock(); + } + } + public void shutdown() { writeLock.lock(); @@@ -1183,8 -1159,10 +1197,10 @@@ * Mutates sstable repairedAt times and notifies listeners of the change with the writeLock held. Prevents races * with other processes between when the metadata is changed and when sstables are moved between strategies. */ - public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt, UUID pendingRepair, boolean isTransient) throws IOException + public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt, TimeUUID pendingRepair, boolean isTransient) throws IOException { + if (sstables.isEmpty()) + return; Set<SSTableReader> changed = new HashSet<>(); writeLock.lock(); diff --cc src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java index 314df9e25e,d8a561bc40..86c40e8958 --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java @@@ -281,4 -288,9 +288,9 @@@ public class PendingRepairHolder extend { return Iterables.any(managers, prm -> prm.containsSSTable(sstable)); } + - public boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader sstable) ++ public boolean hasPendingRepairSSTable(TimeUUID sessionID, SSTableReader sstable) + { + return Iterables.any(managers, prm -> prm.hasPendingRepairSSTable(sessionID, sstable)); + } } diff --cc src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java index 11d6fe82a5,bbc7198fb4..b99d01b00e --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java @@@ -462,9 -462,9 +462,9 @@@ class PendingRepairManage return strategies.values().contains(strategy); } - public synchronized boolean hasDataForSession(UUID sessionID) + public synchronized boolean hasDataForSession(TimeUUID sessionID) { - return strategies.keySet().contains(sessionID); + return strategies.containsKey(sessionID); } boolean containsSSTable(SSTableReader sstable) @@@ -482,6 -482,15 +482,15 @@@ return group.entrySet().stream().map(g -> strategies.get(g.getKey()).getUserDefinedTask(g.getValue(), gcBefore)).collect(Collectors.toList()); } + @VisibleForTesting - public synchronized boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader sstable) ++ public synchronized boolean hasPendingRepairSSTable(TimeUUID sessionID, SSTableReader sstable) + { + AbstractCompactionStrategy strat = strategies.get(sessionID); + if (strat == null) + return false; + return strat.getSSTables().contains(sstable); + } + /** * promotes/demotes sstables involved in a consistent repair that has been finalized, or failed */ diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 58612f778a,7488f2ebc1..12fa6c341c --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@@ -260,6 -192,6 +260,9 @@@ public class RepairMessageVerbHandler i FailSession failure = (FailSession) message.payload; ActiveRepairService.instance.consistent.coordinated.handleFailSessionMessage(failure); ActiveRepairService.instance.consistent.local.handleFailSessionMessage(message.from(), failure); ++ ParticipateState p = ActiveRepairService.instance.participate(failure.sessionID); ++ if (p != null) ++ p.phase.fail("Failure message from " + message.from()); break; case STATUS_REQ: diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java index a56a32a53d,14fab59741..da4e388c5d --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@@ -37,9 -41,12 +37,10 @@@ import com.google.common.collect.Immuta import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; + +import org.apache.cassandra.utils.TimeUUID; ++import org.apache.cassandra.repair.state.ParticipateState; +import org.apache.cassandra.utils.concurrent.Future; import org.apache.commons.lang3.time.DurationFormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -201,8 -195,7 +202,11 @@@ public class RepairRunnable implements Throwable error = firstError.get(); reason = error != null ? error.getMessage() : "Some repair failed"; } - String completionMessage = String.format("Repair command #%d finished with error", cmd); + state.phase.fail(reason); ++ ParticipateState p = ActiveRepairService.instance.participate(state.id); ++ if (p != null) ++ p.phase.fail(reason); + String completionMessage = String.format("Repair command #%d finished with error", state.cmd); // Note we rely on the first message being the reason for the failure // when inspecting this state from RepairRunner.queryForCompletedRepair diff --cc test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java index fce67b52d1,5526d97e1d..441fcadf9b --- a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java @@@ -33,7 -36,10 +35,11 @@@ import org.apache.cassandra.distributed import org.apache.cassandra.distributed.api.QueryResult; import org.apache.cassandra.distributed.api.Row; import org.apache.cassandra.distributed.impl.AbstractCluster; + import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.metrics.StorageMetrics; + import org.apache.cassandra.repair.consistent.LocalSession; + import org.apache.cassandra.service.ActiveRepairService; ++import org.apache.cassandra.utils.TimeUUID; import static org.apache.cassandra.utils.Retry.retryWithBackoffBlocking; @@@ -167,6 -173,34 +173,34 @@@ public final class DistributedRepairUti Assert.assertFalse("Only one repair expected, but found more than one", rs.hasNext()); } + public static void assertNoSSTableLeak(ICluster<IInvokableInstance> cluster, String ks, String table) + { + cluster.forEach(i -> { + String name = "node" + i.config().num(); + i.forceCompact(ks, table); // cleanup happens in compaction, so run before checking + i.runOnInstance(() -> { + ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(table); + for (SSTableReader sstable : cfs.getTracker().getView().liveSSTables()) + { - UUID pendingRepair = sstable.getSSTableMetadata().pendingRepair; ++ TimeUUID pendingRepair = sstable.getSSTableMetadata().pendingRepair; + if (pendingRepair == null) + continue; + LocalSession session = ActiveRepairService.instance.consistent.local.getSession(pendingRepair); + // repair maybe async, so some participates may still think the repair is active, which means the sstable SHOULD link to it + if (session != null && !session.isCompleted()) + continue; + // The session is complete, yet the sstable is not updated... is this still pending in compaction? + if (cfs.getCompactionStrategyManager().hasPendingRepairSSTable(pendingRepair, sstable)) + continue; + // compaction does not know about the pending repair... race condition since this check started? + if (sstable.getSSTableMetadata().pendingRepair == null) + continue; // yep, race condition... ignore + throw new AssertionError(String.format("%s had leak detected on sstable %s", name, sstable.descriptor)); + } + }); + }); + } + public enum RepairType { FULL { public String[] append(String... args) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org