This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 8d112e8bf5 Accord: test fixes and stability improvements * Fix short
accord simulation test (seed 0x6bea128ae851724b),
ConcurrentModificationException * Increase wait time during closing to avoid
Unterminated threads * Increase timeouts, improve test stability * More
descriptive output from CQL test * Shorten max CMS delay * Improve future
handling in config service
8d112e8bf5 is described below
commit 8d112e8bf5375e57692009728ba15b63d564bd1b
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Mar 11 16:22:24 2025 +0100
Accord: test fixes and stability improvements
* Fix short accord simulation test (seed 0x6bea128ae851724b),
ConcurrentModificationException
* Increase wait time during closing to avoid Unterminated threads
* Increase timeouts, improve test stability
* More descriptive output from CQL test
* Shorten max CMS delay
* Improve future handling in config service
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20440
---
src/java/org/apache/cassandra/config/Config.java | 2 +-
.../service/accord/AccordConfigurationService.java | 45 ++++++++--------
.../service/accord/AccordVerbHandler.java | 35 +++---------
.../service/accord/CommandsForRanges.java | 63 +++++++++++++++-------
.../distributed/impl/AbstractCluster.java | 2 +-
.../test/accord/AccordHostReplacementTest.java | 2 +
.../accord/AccordMigrationReadRaceTestBase.java | 5 +-
.../sstable/CQLSSTableWriterConcurrencyTest.java | 13 ++++-
8 files changed, 91 insertions(+), 76 deletions(-)
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 0e7189a385..5d0256795e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -185,7 +185,7 @@ public class Config
public volatile DurationSpec.IntMillisecondsBound
cms_default_retry_backoff = null;
@Deprecated(since="5.1")
public volatile DurationSpec.IntMillisecondsBound
cms_default_max_retry_backoff = null;
- public String cms_retry_delay = "0 <= 50ms*1*attempts <= 10s,retries=10";
+ public String cms_retry_delay = "0 <= 50ms*1*attempts <= 1s,retries=10";
/**
* How often we should snapshot the cluster metadata.
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 0fbeb7d190..7b1e36f8f1 100644
---
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -61,7 +61,6 @@ import org.apache.cassandra.utils.Simulate;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
-import static accord.topology.TopologyManager.TopologyRange;
import static org.apache.cassandra.service.accord.AccordTopology.tcmIdToAccord;
import static org.apache.cassandra.utils.Simulate.With.MONITORS;
@@ -399,32 +398,30 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
return;
}
- try
+ Set<InetAddressAndPort> peers = new
HashSet<>(metadata.directory.allJoinedEndpoints());
+ peers.remove(FBUtilities.getBroadcastAddressAndPort());
+ if (peers.isEmpty())
{
- Set<InetAddressAndPort> peers = new
HashSet<>(metadata.directory.allJoinedEndpoints());
- peers.remove(FBUtilities.getBroadcastAddressAndPort());
- if (peers.isEmpty())
- {
- onResult.accept(Success, null);
- return;
- }
-
- // Fetching only one epoch here since later epochs might have
already been requested concurrently
- TopologyRange result =
FetchTopologies.fetch(SharedContext.Global.instance, peers, epoch, epoch).get();
- result.forEach(this::reportTopology, epoch, 1);
onResult.accept(Success, null);
+ return;
}
- catch (Throwable e)
- {
- if (currentEpoch() >= epoch)
- {
- onResult.accept(Success, null);
- return;
- }
- if (e instanceof InterruptedException)
- Thread.currentThread().interrupt();
- onResult.accept(null, e);
- }
+
+ // Fetching only one epoch here since later epochs might have
already been requested concurrently
+ FetchTopologies.fetch(SharedContext.Global.instance, peers, epoch,
epoch)
+ .addCallback((topologyRange, t) -> {
+ if (t != null)
+ {
+ if (currentEpoch() >= epoch)
+ onResult.accept(Success, null);
+ else
+ onResult.accept(null, t);
+ }
+ else
+ {
+ topologyRange.forEach(this::reportTopology,
epoch, 1);
+ onResult.accept(Success, null);
+ }
+ });
});
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
index 61f18867da..12fd39a135 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
@@ -27,9 +27,6 @@ import accord.local.Node;
import accord.messages.Request;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.tcm.ClusterMetadataService;
-import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.utils.NoSpamLogger;
public class AccordVerbHandler<T extends Request> implements IVerbHandler<T>
@@ -66,35 +63,15 @@ public class AccordVerbHandler<T extends Request>
implements IVerbHandler<T>
*/
Node.Id fromNodeId = endpointMapper.mappedId(message.from());
long waitForEpoch = request.waitForEpoch();
- ClusterMetadata cm = ClusterMetadata.current();
- boolean cmUpToDate = ClusterMetadata.current().epoch.getEpoch() >=
waitForEpoch;
- if (node.topology().hasAtLeastEpoch(waitForEpoch) && cmUpToDate)
+ if (node.topology().hasAtLeastEpoch(waitForEpoch))
request.process(node, fromNodeId, message.header);
else
{
- // TODO (required): review this claim. Downstream from
`withEpoch`, we do call fetch log, albeit from _CMS_, since we
- // do not know the peer there.
- // withEpoch does not reliably ensure that TCM is up to date, if
Accord has the topology it won't
- // wait for TCM to come up to date, so do it here in the verb
handler
- if (!cmUpToDate)
- {
-
ClusterMetadataService.instance().fetchLogFromPeerOrCMSAsync(cm,
message.from(), Epoch.create(waitForEpoch))
- .addCallback((success, failure) -> {
- node.withEpoch(waitForEpoch,
(ignored, withEpochFailure) -> {
- if (withEpochFailure != null)
- throw new
RuntimeException("Timed out waiting for epoch when processing message from " +
fromNodeId + " to " + node + " message " + message, withEpochFailure);
- request.process(node,
fromNodeId, message.header);
- });
- });
- }
- else
- {
- node.withEpoch(waitForEpoch, (ignored, withEpochFailure) -> {
- if (withEpochFailure != null)
- throw new RuntimeException("Timed out waiting for
epoch when processing message from " + fromNodeId + " to " + node + " message "
+ message, withEpochFailure);
- request.process(node, fromNodeId, message.header);
- });
- }
+ node.withEpoch(waitForEpoch, (ignored, withEpochFailure) -> {
+ if (withEpochFailure != null)
+ throw new RuntimeException("Timed out waiting for epoch
when processing message from " + fromNodeId + " to " + node + " message " +
message, withEpochFailure);
+ request.process(node, fromNodeId, message.header);
+ });
}
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
index 9e80429875..ffd1754d2c 100644
--- a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
+++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
@@ -22,8 +22,10 @@ import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
+import java.util.function.UnaryOperator;
import javax.annotation.Nullable;
import accord.local.Command;
@@ -62,7 +64,7 @@ public class CommandsForRanges extends TreeMap<Timestamp,
Summary> implements Co
{
private final AccordCommandStore commandStore;
private final RangeSearcher searcher;
- private final NavigableMap<TxnId, Ranges> transitive = new TreeMap<>();
+ private AtomicReference<NavigableMap<TxnId, Ranges>> transitive = new
AtomicReference<>(new TreeMap<>());
private final ObjectHashSet<TxnId> cachedRangeTxns = new
ObjectHashSet<>();
public Manager(AccordCommandStore commandStore)
@@ -102,22 +104,46 @@ public class CommandsForRanges extends TreeMap<Timestamp,
Summary> implements Co
return new Loader(this, searchKeysOrRanges, redundantBefore,
testKind, minTxnId, maxTxnId, findAsDep);
}
+ private void updateTransitive(UnaryOperator<NavigableMap<TxnId,
Ranges>> update)
+ {
+ while (true)
+ {
+ NavigableMap<TxnId, Ranges> prev = transitive.get();
+ NavigableMap<TxnId, Ranges> next = update.apply(prev);
+ if (next == null || prev == next)
+ return;
+ if (transitive.compareAndSet(prev, next))
+ return;
+ }
+ }
+
public void mergeTransitive(TxnId txnId, Ranges ranges, BiFunction<?
super Ranges, ? super Ranges, ? extends Ranges> remappingFunction)
{
- transitive.merge(txnId, ranges, remappingFunction);
+ updateTransitive(transitive -> {
+ NavigableMap<TxnId, Ranges> next = new TreeMap<>(transitive);
+ next.merge(txnId, ranges, remappingFunction);
+ return next;
+ });
}
public void gcBefore(TxnId gcBefore, Ranges ranges)
{
- Iterator<Map.Entry<TxnId, Ranges>> iterator =
transitive.headMap(gcBefore).entrySet().iterator();
- while (iterator.hasNext())
- {
- Map.Entry<TxnId, Ranges> e = iterator.next();
- Ranges newRanges = e.getValue().without(ranges);
- if (newRanges.isEmpty())
- iterator.remove();
- e.setValue(newRanges);
- }
+ updateTransitive(transitive -> {
+ NavigableMap<TxnId, Ranges> next = null;
+ Iterator<Map.Entry<TxnId, Ranges>> iterator =
transitive.headMap(gcBefore).entrySet().iterator();
+ while (iterator.hasNext())
+ {
+ Map.Entry<TxnId, Ranges> e = iterator.next();
+ Ranges newRanges = e.getValue().without(ranges);
+ if (!newRanges.isEmpty())
+ {
+ if (next == null)
+ next = new TreeMap<>();
+ next.put(e.getKey(), newRanges);
+ }
+ }
+ return next;
+ });
}
}
@@ -144,13 +170,14 @@ public class CommandsForRanges extends TreeMap<Timestamp,
Summary> implements Co
manager.searcher.search(manager.commandStore.id(),
(TokenKey) key, minTxnId, maxTxnId).consume(forEach);
}
- if (!manager.transitive.isEmpty())
+ NavigableMap<TxnId, Ranges> transitive = manager.transitive.get();
+ if (!transitive.isEmpty())
{
- for (Map.Entry<TxnId, Ranges> e :
manager.transitive.tailMap(minTxnId, true).entrySet())
- {
- if (e.getValue().intersects(searchKeysOrRanges))
- forEach.accept(e.getKey());
- }
+ for (Map.Entry<TxnId, Ranges> e :
transitive.tailMap(minTxnId, true).entrySet())
+ {
+ if (e.getValue().intersects(searchKeysOrRanges))
+ forEach.accept(e.getKey());
+ }
}
}
@@ -180,7 +207,7 @@ public class CommandsForRanges extends TreeMap<Timestamp,
Summary> implements Co
return ifRelevant(cmd);
}
- Ranges ranges = manager.transitive.get(txnId);
+ Ranges ranges = manager.transitive.get().get(txnId);
if (ranges == null)
return null;
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index d10a75e4b6..20efdf88e7 100644
---
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -1149,7 +1149,7 @@ public abstract class AbstractCluster<I extends
IInstance> implements ICluster<I
.collect(Collectors.toList());
try
{
- FBUtilities.waitOnFutures(futures, 1L, TimeUnit.MINUTES);
+ FBUtilities.waitOnFutures(futures, instances.size(),
TimeUnit.MINUTES);
}
catch (Throwable t)
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordHostReplacementTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordHostReplacementTest.java
index 702d5f0707..86c9a72ecb 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordHostReplacementTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordHostReplacementTest.java
@@ -55,6 +55,8 @@ public class AccordHostReplacementTest extends TestBaseImpl
Cluster.Builder clusterBuilder = Cluster.build(3)
.withConfig(c ->
c.with(Feature.values())
.set("accord.command_store_shard_count", "1")
+
.set("write_request_timeout", "10s")
+
.set("read_request_timeout", "10s")
.set("accord.queue_shard_count", "1")
);
TokenSupplier tokenRing = TokenSupplier.evenlyDistributedTokens(3,
clusterBuilder.getTokenCount());
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationReadRaceTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationReadRaceTestBase.java
index 42e5bb9b8d..a4931c6ab5 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationReadRaceTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationReadRaceTestBase.java
@@ -169,8 +169,9 @@ public abstract class AccordMigrationReadRaceTestBase
extends AccordTestBase
// Otherwise repair complains if you don't specify a keyspace
CassandraRelevantProperties.SYSTEM_TRACES_DEFAULT_RF.setInt(3);
AccordTestBase.setupCluster(builder -> builder.appendConfig(config ->
config.set("paxos_variant", PaxosVariant.v2.name())
-
.set("read_request_timeout", "2s")
-
.set("range_request_timeout", "2s")
+
.set("read_request_timeout", "10s")
+
.set("range_request_timeout", "10s")
+
.set("write_request_timeout", "10s")
.set("accord.range_migration", "explicit")), 3);
partitioner =
FBUtilities.newPartitioner(SHARED_CLUSTER.get(1).callsOnInstance(() ->
DatabaseDescriptor.getPartitioner().getClass().getSimpleName()).call());
StorageService.instance.setPartitionerUnsafe(partitioner);
diff --git
a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterConcurrencyTest.java
b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterConcurrencyTest.java
index 3d69cdad5a..54d8cec32c 100644
---
a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterConcurrencyTest.java
+++
b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterConcurrencyTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.Rule;
import org.junit.Test;
@@ -36,6 +37,7 @@ import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.schema.Schema;
+import org.assertj.core.description.Description;
import static org.assertj.core.api.Assertions.assertThat;
@@ -70,6 +72,7 @@ public class CQLSSTableWriterConcurrencyTest extends CQLTester
File[] dataDirs = new File[nThreads];
String baseDataDir = tempFolder.newFolder().getAbsolutePath();
+ AtomicReference<String> errors = new AtomicReference<>("");
for (int i = 0; i < nThreads; i++)
{
tableNames[i] = String.format("table_%02d", i);
@@ -113,6 +116,7 @@ public class CQLSSTableWriterConcurrencyTest extends
CQLTester
catch (Throwable throwable)
{
LOGGER.error("Error while processing element number {}",
finalI, throwable);
+ errors.updateAndGet(s -> s + "\n" +
throwable.getMessage());
errorCount.incrementAndGet();
}
});
@@ -123,6 +127,13 @@ public class CQLSSTableWriterConcurrencyTest extends
CQLTester
{
LOGGER.warn("Unable to close executor pool after 1 minute");
}
- assertThat(errorCount.get()).isEqualTo(0);
+ int count = errorCount.get();
+ assertThat(count).isEqualTo(0).describedAs(new Description()
+ {
+ public String value()
+ {
+ return String.format("Caught %d errors: %s", count,
errors.get());
+ }
+ });
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]