This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0894665243 Ninja: remove .orig file from repository
0894665243 is described below
commit 08946652434edbce38a6395e71d4068898ea13fa
Author: Alex Petrov <[email protected]>
AuthorDate: Tue May 13 11:19:56 2025 +0200
Ninja: remove .orig file from repository
---
.../accord/SimulatedAccordCommandStore.java.orig | 419 ---------------------
1 file changed, 419 deletions(-)
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java.orig
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java.orig
deleted file mode 100644
index f48156a3b3..0000000000
---
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java.orig
+++ /dev/null
@@ -1,419 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.accord;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BooleanSupplier;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.ToLongFunction;
-
-import accord.api.LocalListeners;
-import accord.api.ProgressLog;
-import accord.api.RemoteListeners;
-import accord.api.RoutingKey;
-import accord.impl.DefaultLocalListeners;
-import accord.impl.SizeOfIntersectionSorter;
-import accord.impl.TestAgent;
-import accord.local.Command;
-import accord.local.CommandStore;
-import accord.local.CommandStores;
-import accord.local.DurableBefore;
-import accord.local.Node;
-import accord.local.NodeCommandStoreService;
-import accord.local.TimeService;
-import accord.local.PreLoadContext;
-import accord.local.SafeCommand;
-import accord.local.SafeCommandStore;
-import accord.messages.BeginRecovery;
-import accord.messages.PreAccept;
-import accord.messages.TxnRequest;
-import accord.primitives.AbstractUnseekableKeys;
-import accord.primitives.Ballot;
-import accord.primitives.FullRoute;
-import accord.primitives.Ranges;
-import accord.primitives.Routable;
-import accord.primitives.RoutableKey;
-import accord.primitives.RoutingKeys;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.primitives.TxnId;
-import accord.primitives.Unseekables;
-import accord.topology.Topologies;
-import accord.topology.Topology;
-import accord.utils.Gens;
-import accord.utils.RandomSource;
-import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
-import org.apache.cassandra.concurrent.SimulatedExecutorFactory;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DataRange;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.memtable.Memtable;
-import org.apache.cassandra.metrics.AccordCacheMetrics;
-import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Generators;
-import org.apache.cassandra.utils.Pair;
-import org.assertj.core.api.Assertions;
-
-import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
-import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
-import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
-import static org.apache.cassandra.utils.AccordGenerators.fromQT;
-
-public class SimulatedAccordCommandStore implements AutoCloseable
-{
- private final List<Throwable> failures = new ArrayList<>();
- private final SimulatedExecutorFactory globalExecutor;
- private final CommandStore.EpochUpdateHolder updateHolder;
- private final BooleanSupplier shouldEvict, shouldFlush, shouldCompact;
-
- public final NodeCommandStoreService storeService;
- public final AccordCommandStore store;
- public final Node.Id nodeId;
- public final Topology topology;
- public final Topologies topologies;
- public final MockJournal journal;
- public final ScheduledExecutorPlus unorderedScheduled;
- public final List<String> evictions = new ArrayList<>();
- public Predicate<Throwable> ignoreExceptions = ignore -> false;
-
- public SimulatedAccordCommandStore(RandomSource rs)
- {
- globalExecutor = new SimulatedExecutorFactory(rs.fork(),
fromQT(Generators.TIMESTAMP_GEN.map(java.sql.Timestamp::getTime)).mapToLong(TimeUnit.MILLISECONDS::toNanos).next(rs),
failures::add);
- this.unorderedScheduled = globalExecutor.scheduled("ignored");
- ExecutorFactory.Global.unsafeSet(globalExecutor);
- Stage.READ.unsafeSetExecutor(unorderedScheduled);
- Stage.MUTATION.unsafeSetExecutor(unorderedScheduled);
- for (Stage stage : Arrays.asList(Stage.MISC, Stage.ACCORD_MIGRATION,
Stage.READ, Stage.MUTATION))
-
stage.unsafeSetExecutor(globalExecutor.configureSequential("ignore").build());
-
- this.updateHolder = new CommandStore.EpochUpdateHolder();
- this.nodeId =
AccordTopology.tcmIdToAccord(ClusterMetadata.currentNullable().myNodeId());
- this.storeService = new NodeCommandStoreService()
- {
- private final ToLongFunction<TimeUnit> elapsed =
TimeService.elapsedWrapperFromNonMonotonicSource(TimeUnit.NANOSECONDS,
this::now);
-
- @Override public DurableBefore durableBefore() { return
DurableBefore.EMPTY; }
-
- @Override
- public Timestamp uniqueNow()
- {
- return uniqueNow(Timestamp.NONE);
- }
-
- @Override
- public Node.Id id()
- {
- return nodeId;
- }
-
- @Override
- public long epoch()
- {
- return ClusterMetadata.current().epoch.getEpoch();
- }
-
- @Override
- public long now()
- {
- return globalExecutor.nanoTime();
- }
-
- @Override
- public long elapsed(TimeUnit unit)
- {
- return elapsed.applyAsLong(unit);
- }
-
- @Override
- public Timestamp uniqueNow(Timestamp atLeast)
- {
- var now = Timestamp.fromValues(epoch(), now(), nodeId);
- if (now.compareTo(atLeast) < 0)
- throw new UnsupportedOperationException();
- return now;
- }
- };
-
- AccordStateCache stateCache = new
AccordStateCache(Stage.READ.executor(), Stage.MUTATION.executor(), 8 << 20, new
AccordStateCacheMetrics("test"));
- this.journal = new MockJournal();
- this.store = new AccordCommandStore(0,
- storeService,
- new TestAgent.RethrowAgent()
- {
- @Override
- public long preAcceptTimeout()
- {
- return Long.MAX_VALUE;
- }
-
- @Override
- public void
onUncaughtException(Throwable t)
- {
- if
(ignoreExceptions.test(t)) return;
-
super.onUncaughtException(t);
- }
- },
- null,
- ignore -> new
ProgressLog.NoOpProgressLog(),
- cs -> new
DefaultLocalListeners(new RemoteListeners.NoOpRemoteListeners(), new
DefaultLocalListeners.NotifySink()
- {
- @Override public void
notify(SafeCommandStore safeStore, SafeCommand safeCommand, TxnId listener) {}
- @Override public boolean
notify(SafeCommandStore safeStore, SafeCommand safeCommand,
LocalListeners.ComplexListener listener) { return false; }
- }),
- updateHolder,
- journal,
-<<<<<<< HEAD
- new
AccordCommandStore.CommandStoreExecutor(stateCache,
executorFactory().sequential(CommandStore.class.getSimpleName() + '[' + 0 +
']'), Thread.currentThread().getId()));
-=======
- new AccordCommandStoreExecutor(new
AccordStateCacheMetrics("test"),
executorFactory().sequential(CommandStore.class.getSimpleName() + '[' + 0 +
']'), agent));
-
- this.topology =
AccordTopology.createAccordTopology(ClusterMetadata.current());
- this.topologies = new
Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology);
- var rangesForEpoch = new
CommandStores.RangesForEpoch(topology.epoch(), topology.ranges(), store);
- //store.unsafeSetRangesForEpoch(rangesForEpoch);
- updateHolder.add(topology.epoch(), rangesForEpoch, topology.ranges());
- updateHolder.updateGlobal(topology.ranges());
-
- shouldEvict = boolSource(rs.fork());
- shouldFlush = boolSource(rs.fork());
- shouldCompact = boolSource(rs.fork());
->>>>>>> 04671b52ef (Set ranges for epoch in AccordCommandStore via super call,
not by fixing up Simulated store)
-
- store.cache().instances().forEach(i -> {
- i.register(new AccordStateCache.Listener()
- {
- @Override
- public void onAdd(AccordCachingState state)
- {
- }
-
- @Override
- public void onRelease(AccordCachingState state)
- {
- }
-
- @Override
- public void onEvict(AccordCachingState state)
- {
- evictions.add(i + " evicted " + state);
- }
- });
- });
-
- this.topology =
AccordTopology.createAccordTopology(ClusterMetadata.current());
- this.topologies = new
Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology);
- var rangesForEpoch = new
CommandStores.RangesForEpoch(topology.epoch(), topology.ranges(), store);
- updateHolder.add(topology.epoch(), rangesForEpoch, topology.ranges());
- updateHolder.updateGlobal(topology.ranges());
-
- shouldEvict = boolSource(rs.fork());
- shouldFlush = boolSource(rs.fork());
- shouldCompact = boolSource(rs.fork());
- }
-
- private static BooleanSupplier boolSource(RandomSource rs)
- {
- var gen = Gens.bools().mixedDistribution().next(rs);
- return () -> gen.next(rs);
- }
-
- public TxnId nextTxnId(Txn.Kind kind, Routable.Domain domain)
- {
- return new TxnId(storeService.epoch(), storeService.now(), kind,
domain, nodeId);
- }
-
- public void maybeCacheEvict(Unseekables<?> keysOrRanges)
- {
- switch (keysOrRanges.domain())
- {
- case Key:
- maybeCacheEvict((AbstractUnseekableKeys) keysOrRanges,
Ranges.EMPTY);
- break;
- case Range:
- maybeCacheEvict(RoutingKeys.EMPTY, (Ranges) keysOrRanges);
- break;
- default:
- throw new UnsupportedOperationException("Unknown domain: " +
keysOrRanges.domain());
- }
- }
-
- public void maybeCacheEvict(Unseekables<RoutingKey> keys, Ranges ranges)
- {
- AccordStateCache cache = store.cache();
- cache.forEach(state -> {
- Class<?> keyType = state.key().getClass();
- if (TxnId.class.equals(keyType))
- {
- Command command = (Command) state.state().get();
- if (command != null && command.known().definition.isKnown()
- && (command.partialTxn().keys().intersects(keys) ||
ranges.intersects(command.partialTxn().keys()))
- && shouldEvict.getAsBoolean())
- cache.maybeEvict(state);
- }
- else if (RoutableKey.class.isAssignableFrom(keyType))
- {
- RoutableKey key = (RoutableKey) state.key();
- if ((keys.contains(key) || ranges.intersects(key))
- && shouldEvict.getAsBoolean())
- cache.maybeEvict(state);
- }
- else
- {
- throw new AssertionError("Unexpected key type: " +
state.key().getClass());
- }
- });
-
- for (var store :
Keyspace.open(ACCORD_KEYSPACE_NAME).getColumnFamilyStores())
- {
- Memtable memtable = store.getCurrentMemtable();
- if (memtable.partitionCount() == 0 || !intersects(store, memtable,
keys, ranges))
- continue;
- if (shouldFlush.getAsBoolean())
- store.forceBlockingFlush(UNIT_TESTS);
- }
- for (var store :
Keyspace.open(ACCORD_KEYSPACE_NAME).getColumnFamilyStores())
- {
- if (store.getLiveSSTables().size() > 5 &&
shouldCompact.getAsBoolean())
- {
- // compaction no-op since auto-compaction is disabled... so
need to enable quickly
- store.enableAutoCompaction();
- try
- {
-
FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store));
- }
- finally
- {
- store.disableAutoCompaction();
- }
- }
- }
- }
-
- private static boolean intersects(ColumnFamilyStore store, Memtable
memtable, Unseekables<RoutingKey> keys, Ranges ranges)
- {
- if (keys.isEmpty() && ranges.isEmpty()) // shouldn't happen, but just
in case...
- return false;
- switch (store.name)
- {
- case "commands_for_key":
- // pk = (store_id, routing_key)
- // since this is simulating a single store, store_id is a
constant, so check key
- try (var it = memtable.partitionIterator(ColumnFilter.NONE,
DataRange.allData(store.getPartitioner()), null))
- {
- while (it.hasNext())
- {
- var key =
AccordKeyspace.CommandsForKeysAccessor.getKey(it.next().partitionKey());
- if (keys.contains(key) || ranges.intersects(key))
- return true;
- }
- }
- break;
- }
- return false;
- }
-
- public void checkFailures()
- {
- if (Thread.interrupted())
- failures.add(new InterruptedException());
- if (failures.isEmpty()) return;
- AssertionError error = new AssertionError("Unexpected exceptions
found");
- failures.forEach(error::addSuppressed);
- failures.clear();
- throw error;
- }
-
- public <T> T process(TxnRequest<T> request) throws ExecutionException,
InterruptedException
- {
- return process(request, request::apply);
- }
-
- public <T> T process(PreLoadContext loadCtx, Function<? super
SafeCommandStore, T> function) throws ExecutionException, InterruptedException
- {
- var result = processAsync(loadCtx, function);
- processAll();
- return AsyncChains.getBlocking(result);
- }
-
- public <T> AsyncResult<T> processAsync(TxnRequest<T> request)
- {
- return processAsync(request, request::apply);
- }
-
- public <T> AsyncResult<T> processAsync(PreLoadContext loadCtx, Function<?
super SafeCommandStore, T> function)
- {
- return store.submit(loadCtx, function).beginAsResult();
- }
-
- public Pair<TxnId, AsyncResult<PreAccept.PreAcceptOk>>
enqueuePreAccept(Txn txn, FullRoute<?> route)
- {
- TxnId txnId = nextTxnId(txn.kind(), txn.keys().domain());
- PreAccept preAccept = new PreAccept(nodeId, topologies, txnId, txn,
route);
- return Pair.create(txnId, processAsync(preAccept, safe -> {
- var reply = preAccept.apply(safe);
- Assertions.assertThat(reply.isOk()).isTrue();
- return (PreAccept.PreAcceptOk) reply;
- }));
- }
-
- public Pair<TxnId, AsyncResult<BeginRecovery.RecoverOk>>
enqueueBeginRecovery(Txn txn, FullRoute<?> route)
- {
- TxnId txnId = nextTxnId(txn.kind(), txn.keys().domain());
- Ballot ballot = Ballot.fromValues(storeService.epoch(),
storeService.now(), nodeId);
- BeginRecovery br = new BeginRecovery(nodeId, topologies, txnId, null,
txn, route, ballot);
-
- return Pair.create(txnId, processAsync(br, safe -> {
- var reply = br.apply(safe);
- Assertions.assertThat(reply.isOk()).isTrue();
- return (BeginRecovery.RecoverOk) reply;
- }).beginAsResult());
- }
-
- public void processAll()
- {
- while (processOne())
- {
- }
- }
-
- private boolean processOne()
- {
- boolean result = globalExecutor.processOne();
- checkFailures();
- return result;
- }
-
- @Override
- public void close() throws Exception
- {
- store.shutdown();
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]