This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 6b3958f1d83777e295690a663610b6b29ed1efae Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Fri Nov 17 17:13:30 2023 +0100 Improve setup and initialisation of LocalLog/LogSpec Patch by Alex Petrov; reviewed by Sam Tunnicliffe and marcuse for CASSANDRA-19271 --- CHANGES.txt | 1 + .../apache/cassandra/schema/DistributedSchema.java | 9 +- .../cassandra/tcm/ClusterMetadataService.java | 38 ++-- src/java/org/apache/cassandra/tcm/Period.java | 5 +- src/java/org/apache/cassandra/tcm/Startup.java | 52 ++--- .../cassandra/tcm/StubClusterMetadataService.java | 8 +- .../cassandra/tcm/listeners/SchemaListener.java | 10 +- .../org/apache/cassandra/tcm/log/LocalLog.java | 231 ++++++++++++--------- .../org/apache/cassandra/tcm/log/LogStorage.java | 1 + .../cassandra/tcm/log/SystemKeyspaceStorage.java | 5 + .../distributed/test/jmx/JMXFeatureTest.java | 3 +- .../distributed/test/log/CMSTestBase.java | 9 +- .../test/log/ClusterMetadataTestHelper.java | 64 +++--- .../test/log/CoordinatorPathTestBase.java | 18 +- .../distributed/test/ring/DecommissionTest.java | 9 +- .../unit/org/apache/cassandra/ServerTestUtils.java | 41 ++-- .../apache/cassandra/hints/HintsUpgradeTest.java | 2 + .../cassandra/service/ClientWarningsTest.java | 67 +++--- .../apache/cassandra/tcm/BootWithMetadataTest.java | 3 +- .../cassandra/tcm/DiscoverySimulationTest.java | 7 +- .../org/apache/cassandra/tcm/LogStateTest.java | 16 +- .../org/apache/cassandra/tcm/log/LocalLogTest.java | 25 ++- .../tcm/log/LogListenerNotificationTest.java | 9 +- .../io/sstable/StressCQLSSTableWriter.java | 3 - 24 files changed, 370 insertions(+), 266 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 5ad47d72bd..64841aa886 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Improve setup and initialisation of LocalLog/LogSpec (CASSANDRA-19271) * Refactor structure of caching metrics and expose auth cache metrics via JMX (CASSANDRA-17062) * Allow CQL client certificate authentication to work without sending an AUTHENTICATE request (CASSANDRA-18857) * Extend nodetool tpstats and system_views.thread_pools with detailed pool parameters (CASSANDRA-19289) diff --git a/src/java/org/apache/cassandra/schema/DistributedSchema.java b/src/java/org/apache/cassandra/schema/DistributedSchema.java index fbdf9c1c88..86dd1d5117 100644 --- a/src/java/org/apache/cassandra/schema/DistributedSchema.java +++ b/src/java/org/apache/cassandra/schema/DistributedSchema.java @@ -123,6 +123,10 @@ public class DistributedSchema implements MetadataValue<DistributedSchema> { keyspaceInstances.putAll(prev.keyspaceInstances); + // If there are keyspaces in schema, but none of them are initialised, we're in first boot. Initialise all. + if (!prev.isEmpty() && prev.keyspaceInstances.isEmpty()) + prev = DistributedSchema.empty(); + Keyspaces.KeyspacesDiff ksDiff = Keyspaces.diff(prev.getKeyspaces(), getKeyspaces()); SchemaChangeNotifier schemaChangeNotifier = Schema.instance.schemaChangeNotifier(); @@ -148,8 +152,8 @@ public class DistributedSchema implements MetadataValue<DistributedSchema> assert delta.before.name.equals(delta.after.name); // drop tables and views - delta.views.dropped.forEach(v -> dropView(keyspace, v, true)); - delta.tables.dropped.forEach(t -> dropTable(keyspace, t, true)); + delta.views.dropped.forEach(v -> dropView(keyspace, v, loadSSTables)); + delta.tables.dropped.forEach(t -> dropTable(keyspace, t, loadSSTables)); // add tables and views delta.tables.created.forEach(t -> createTable(keyspace, t, loadSSTables)); @@ -164,7 +168,6 @@ public class DistributedSchema implements MetadataValue<DistributedSchema> keyspace.viewManager.reload(keyspaces.get(keyspace.getName()).get()); } - //schemaChangeNotifier.notifyKeyspaceAltered(delta); SchemaDiagnostics.keyspaceAltered(Schema.instance, delta); }); diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 01abe4abdf..1077e2dded 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -40,6 +40,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.ExceptionCode; +import org.apache.cassandra.exceptions.StartupException; import org.apache.cassandra.io.util.FileInputStreamPlus; import org.apache.cassandra.io.util.FileOutputStreamPlus; import org.apache.cassandra.locator.InetAddressAndPort; @@ -47,10 +48,10 @@ import org.apache.cassandra.metrics.TCMMetrics; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.listeners.SchemaListener; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.log.LogState; -import org.apache.cassandra.tcm.log.LogStorage; import org.apache.cassandra.tcm.log.Replication; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeVersion; @@ -150,22 +151,21 @@ public class ClusterMetadataService ClusterMetadataService(PlacementProvider placementProvider, Function<Processor, Processor> wrapProcessor, Supplier<State> cmsStateSupplier, - LocalLog.LogSpec logSpec) + LocalLog.LogSpec logSpec) throws StartupException { this.placementProvider = placementProvider; this.snapshots = new MetadataSnapshots.SystemKeyspaceMetadataSnapshots(); Processor localProcessor; - LogStorage logStorage = LogStorage.SystemKeyspace; if (CassandraRelevantProperties.TCM_USE_ATOMIC_LONG_PROCESSOR.getBoolean()) { - log = LocalLog.sync(logSpec); + log = logSpec.sync().createLog(); localProcessor = wrapProcessor.apply(new AtomicLongBackedProcessor(log, logSpec.isReset())); - fetchLogHandler = new FetchCMSLog.Handler((e, ignored) -> logStorage.getLogState(e)); + fetchLogHandler = new FetchCMSLog.Handler((e, ignored) -> logSpec.storage().getLogState(e)); } else { - log = LocalLog.async(logSpec); + log = logSpec.async().createLog(); localProcessor = wrapProcessor.apply(new PaxosBackedProcessor(log)); fetchLogHandler = new FetchCMSLog.Handler(); } @@ -243,13 +243,23 @@ public class ClusterMetadataService { if (instance != null) return; - ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables(Collections.singleton("DC1")); - emptyFromSystemTables.schema.initializeKeyspaceInstances(DistributedSchema.empty(), loadSSTables); - emptyFromSystemTables = emptyFromSystemTables.forceEpoch(Epoch.EMPTY); - LocalLog.LogSpec logSpec = new LocalLog.LogSpec().withInitialState(emptyFromSystemTables) - .withStorage(new AtomicLongBackedProcessor.InMemoryStorage()); - LocalLog log = LocalLog.sync(logSpec); - log.ready(); + ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables(Collections.singleton("DC1")) + .forceEpoch(Epoch.EMPTY); + + LocalLog.LogSpec logSpec = LocalLog.logSpec() + .withInitialState(emptyFromSystemTables) + .loadSSTables(loadSSTables) + .withDefaultListeners(false) + .withListener(new SchemaListener(loadSSTables) { + @Override + public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean fromSnapshot) + { + // we do not need a post-commit hook for tools + } + }) + .sync() + .withStorage(new AtomicLongBackedProcessor.InMemoryStorage()); + LocalLog log = logSpec.createLog(); ClusterMetadataService cms = new ClusterMetadataService(new UniformRangePlacement(), MetadataSnapshots.NO_OP, log, @@ -260,6 +270,8 @@ public class ClusterMetadataService null, null, new PeerLogFetcher(log)); + + log.readyUnchecked(); log.bootstrap(FBUtilities.getBroadcastAddressAndPort()); ClusterMetadataService.setInstance(cms); } diff --git a/src/java/org/apache/cassandra/tcm/Period.java b/src/java/org/apache/cassandra/tcm/Period.java index 32bf046232..e743504397 100644 --- a/src/java/org/apache/cassandra/tcm/Period.java +++ b/src/java/org/apache/cassandra/tcm/Period.java @@ -60,7 +60,10 @@ public class Period */ public static long scanLogForPeriod(TableMetadata logTable, Epoch since) { - long currentPeriod = ClusterMetadata.current().period; + long currentPeriod = Period.EMPTY; + ClusterMetadata metadata = ClusterMetadata.currentNullable(); + if (metadata != null) + currentPeriod = metadata.period; PeriodFinder visitor = currentPeriod > Period.FIRST ? new ReversePeriodFinder(since, currentPeriod) : new ForwardPeriodFinder(since); diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index c536bd3361..fcc2ff5fa7 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -47,7 +47,6 @@ import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.NewGossiper; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableMetadata; @@ -141,18 +140,16 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; public static void initializeAsNonCmsNode(Function<Processor, Processor> wrapProcessor) throws StartupException { - LocalLog.LogSpec logSpec = new LocalLog.LogSpec().withStorage(LogStorage.SystemKeyspace) - .withDefaultListeners(); + LocalLog.LogSpec logSpec = LocalLog.logSpec() + .withStorage(LogStorage.SystemKeyspace) + .afterReplay(Startup::scrubDataDirectories) + .withDefaultListeners(); ClusterMetadataService.setInstance(new ClusterMetadataService(new UniformRangePlacement(), wrapProcessor, ClusterMetadataService::state, logSpec)); - ClusterMetadataService.instance().initRecentlySealedPeriodsIndex(); - ClusterMetadataService.instance().log().replayPersisted(); - ClusterMetadata replayed = ClusterMetadataService.instance().log().metadata(); - scrubDataDirectories(replayed); - replayed.schema.initializeKeyspaceInstances(DistributedSchema.empty()); ClusterMetadataService.instance().log().ready(); + ClusterMetadataService.instance().initRecentlySealedPeriodsIndex(); NodeId nodeId = ClusterMetadata.current().myNodeId(); UUID currentHostId = SystemKeyspace.getLocalHostId(); @@ -179,6 +176,10 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; } } + public interface AfterReplay + { + void accept(ClusterMetadata t) throws StartupException; + } /** * Initialization for Discovery. * @@ -243,15 +244,17 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; public static void initializeFromGossip(Function<Processor, Processor> wrapProcessor, Runnable initMessaging) throws StartupException { ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables(SystemKeyspace.allKnownDatacenters()); - LocalLog.LogSpec logSpec = new LocalLog.LogSpec().withInitialState(emptyFromSystemTables) - .withStorage(LogStorage.SystemKeyspace) - .withDefaultListeners(); + LocalLog.LogSpec logSpec = LocalLog.logSpec() + .withInitialState(emptyFromSystemTables) + .afterReplay(Startup::scrubDataDirectories) + .withStorage(LogStorage.SystemKeyspace) + .withDefaultListeners(); + ClusterMetadataService.setInstance(new ClusterMetadataService(new UniformRangePlacement(), wrapProcessor, ClusterMetadataService::state, logSpec)); - scrubDataDirectories(emptyFromSystemTables); - emptyFromSystemTables.schema.initializeKeyspaceInstances(DistributedSchema.empty()); + ClusterMetadataService.instance().log().ready(); initMessaging.run(); try @@ -280,8 +283,9 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; assert cmGossip.equals(initial) : cmGossip + " != " + initial; } - public static void reinitializeWithClusterMetadata(String fileName, Function<Processor, Processor> wrapProcessor, Runnable initMessaging) throws IOException + public static void reinitializeWithClusterMetadata(String fileName, Function<Processor, Processor> wrapProcessor, Runnable initMessaging) throws IOException, StartupException { + ClusterMetadata prev = ClusterMetadata.currentNullable(); // First set a minimal ClusterMetadata as some deserialization depends // on ClusterMetadata.current() to access the partitioner StubClusterMetadataService initial = StubClusterMetadataService.forClientTools(); @@ -298,25 +302,21 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; if (!metadata.isCMSMember(FBUtilities.getBroadcastAddressAndPort())) throw new IllegalStateException("When reinitializing with cluster metadata, we must be in the CMS"); - // can use local dc here since we know local host in the cms: - ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables(Collections.singleton(DatabaseDescriptor.getLocalDataCenter())); - metadata.schema.initializeKeyspaceInstances(DistributedSchema.empty()); + metadata = metadata.forceEpoch(metadata.epoch.nextEpoch()); ClusterMetadataService.unsetInstance(); - LocalLog.LogSpec logSpec = new LocalLog.LogSpec().withInitialState(metadata) - .withStorage(LogStorage.SystemKeyspace) - .withDefaultListeners() - .isReset(true) - .withReadyNotification(LocalLog.LogSpec.WhenReady.NONE); + LocalLog.LogSpec logSpec = LocalLog.logSpec() + .withPreviousState(prev) + .withInitialState(metadata) + .withStorage(LogStorage.SystemKeyspace) + .withDefaultListeners() + .isReset(true); ClusterMetadataService.setInstance(new ClusterMetadataService(new UniformRangePlacement(), wrapProcessor, ClusterMetadataService::state, logSpec)); - // When re-intializing from a loaded metadata instance we need to fire notifications using the delta between an - // empty metadata and the loaded one. So we configure the LogSpec not to do any notifications and handle it - // explicitly here. - ClusterMetadataService.instance().log().notifyListeners(emptyFromSystemTables); + ClusterMetadataService.instance().log().ready(); initMessaging.run(); ClusterMetadataService.instance().forceSnapshot(metadata.forceEpoch(metadata.nextEpoch())); diff --git a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java index 066b33ceb0..475e8ef21b 100644 --- a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java @@ -67,12 +67,16 @@ public class StubClusterMetadataService extends ClusterMetadataService { super(new UniformRangePlacement(), MetadataSnapshots.NO_OP, - LocalLog.sync(new LocalLog.LogSpec().withInitialState(initial)), + LocalLog.logSpec() + .loadSSTables(false) + .sync() + .withInitialState(initial) + .createLog(), new StubProcessor(), Commit.Replicator.NO_OP, false); this.metadata = initial; - this.log().ready(); + this.log().readyUnchecked(); } @Override diff --git a/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java b/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java index 2787f5eb2c..e3507bf2bb 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java @@ -29,17 +29,23 @@ import org.apache.cassandra.tcm.ClusterMetadata; public class SchemaListener implements ChangeListener { + private final boolean loadSSTables; + + public SchemaListener(boolean loadSSTables) + { + this.loadSSTables = loadSSTables; + } + @Override public void notifyPreCommit(ClusterMetadata prev, ClusterMetadata next, boolean fromSnapshot) { - notifyInternal(prev, next, fromSnapshot, true); + notifyInternal(prev, next, fromSnapshot, loadSSTables); } protected void notifyInternal(ClusterMetadata prev, ClusterMetadata next, boolean fromSnapshot, boolean loadSSTables) { if (!fromSnapshot && next.schema.lastModified().equals(prev.schema.lastModified())) return; - next.schema.initializeKeyspaceInstances(prev.schema, loadSSTables); } diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index 9f21dc9f03..ab519b4157 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -42,10 +42,12 @@ import org.apache.cassandra.concurrent.Interruptible; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.DurationSpec; +import org.apache.cassandra.exceptions.StartupException; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.Startup; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.listeners.ChangeListener; import org.apache.cassandra.tcm.listeners.ClientNotificationListener; @@ -96,25 +98,57 @@ public abstract class LocalLog implements Closeable // notification to them all. private final AtomicBoolean replayComplete = new AtomicBoolean(); - public static class LogSpec + public static LogSpec logSpec() { - public enum WhenReady { NONE, PRE_COMMIT_ONLY, POST_COMMIT_ONLY, ALL}; + return new LogSpec(); + } - private ClusterMetadata initial = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); + public static class LogSpec + { + private ClusterMetadata initial; + private ClusterMetadata prev; + private Startup.AfterReplay afterReplay = (metadata) -> {}; private LogStorage storage = LogStorage.None; + private boolean async = true; private boolean defaultListeners = false; - private boolean reset = false; - private WhenReady whenReady = WhenReady.POST_COMMIT_ONLY; + private boolean isReset = false; + private boolean loadSSTables = true; private final Set<LogListener> listeners = new HashSet<>(); private final Set<ChangeListener> changeListeners = new HashSet<>(); private final Set<ChangeListener.Async> asyncChangeListeners = new HashSet<>(); + private LogSpec() + { + } + + /** + * create a sync log - only used for tests and tools + * @return + */ + public LogSpec sync() + { + this.async = false; + return this; + } + + public LogSpec async() + { + this.async = true; + return this; + } + public LogSpec withDefaultListeners() { return withDefaultListeners(true); } + public LogSpec loadSSTables(boolean loadSSTables) + { + this.loadSSTables = loadSSTables; + return this; + } + public LogSpec withDefaultListeners(boolean withDefaultListeners) { if (withDefaultListeners && @@ -148,13 +182,18 @@ public abstract class LocalLog implements Closeable public LogSpec isReset(boolean isReset) { - reset = isReset; + this.isReset = isReset; return this; } public boolean isReset() { - return reset; + return this.isReset; + } + + public LogStorage storage() + { + return storage; } public LogSpec withStorage(LogStorage storage) @@ -163,17 +202,31 @@ public abstract class LocalLog implements Closeable return this; } + public LogSpec afterReplay(Startup.AfterReplay afterReplay) + { + this.afterReplay = afterReplay; + return this; + } + public LogSpec withInitialState(ClusterMetadata initial) { this.initial = initial; return this; } - public LogSpec withReadyNotification(WhenReady whenReady) + public LogSpec withPreviousState(ClusterMetadata prev) { - this.whenReady = whenReady; + this.prev = prev; return this; } + + public final LocalLog createLog() + { + if (async) + return new Async(this); + else + return new Sync(this); + } } /** @@ -196,21 +249,27 @@ public abstract class LocalLog implements Closeable return e1.epoch.compareTo(e2.epoch); }); - protected final LogStorage persistence; + protected final LogStorage storage; protected final Set<LogListener> listeners; protected final Set<ChangeListener> changeListeners; protected final Set<ChangeListener.Async> asyncChangeListeners; - private final LogSpec spec; + protected final LogSpec spec; - private LocalLog(LogSpec spec) + private LocalLog(LogSpec logSpec) { - assert spec.initial.epoch.is(EMPTY) || spec.initial.epoch.is(Epoch.UPGRADE_STARTUP) || spec.reset; - committed = new AtomicReference<>(spec.initial); - this.persistence = spec.storage; + this.spec = logSpec; + if (spec.initial == null) + spec.initial = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); + if (spec.prev == null) + spec.prev = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); + assert spec.initial.epoch.is(EMPTY) || spec.initial.epoch.is(Epoch.UPGRADE_STARTUP) || spec.isReset : + String.format(String.format("Should start with empty epoch, unless we're in upgrade or reset mode: %s (isReset: %s)", spec.initial, spec.isReset)); + + this.committed = new AtomicReference<>(logSpec.initial); + this.storage = logSpec.storage; listeners = Sets.newConcurrentHashSet(); changeListeners = Sets.newConcurrentHashSet(); asyncChangeListeners = Sets.newConcurrentHashSet(); - this.spec = spec; } public void bootstrap(InetAddressAndPort addr) @@ -250,34 +309,6 @@ public abstract class LocalLog implements Closeable return pending.size(); } - public static LocalLog sync(LogSpec spec) - { - return new Sync(spec); - } - - public static LocalLog async(LogSpec spec) - { - return new Async(spec); - } - - @VisibleForTesting - public static LocalLog asyncForTests() - { - LogSpec logSpec = new LogSpec(); - LocalLog log = new Async(logSpec); - log.ready(); - return log; - } - - @VisibleForTesting - public static LocalLog asyncForTests(ClusterMetadata initial) - { - LogSpec logSpec = new LogSpec().withInitialState(initial); - LocalLog log = new Async(logSpec); - log.ready(); - return log; - } - public boolean hasGaps() { Epoch start = committed.get().epoch; @@ -305,7 +336,7 @@ public abstract class LocalLog implements Closeable public Replication getCommittedEntries(Epoch since) { - return persistence.getReplication(since); + return storage.getReplication(since); } public ClusterMetadata waitForHighestConsecutive() @@ -448,7 +479,7 @@ public abstract class LocalLog implements Closeable next.epoch, pendingEntry.transform, prev.epoch); if (replayComplete.get()) - persistence.append(transformed.success().metadata.period, pendingEntry.maybeUnwrapExecuted()); + storage.append(transformed.success().metadata.period, pendingEntry.maybeUnwrapExecuted()); notifyPreCommit(prev, next, isSnapshot); @@ -505,13 +536,13 @@ public abstract class LocalLog implements Closeable /** * Replays items that were persisted during previous starts. Replayed items _will not_ be persisted again. */ - public Epoch replayPersisted() + private ClusterMetadata replayPersisted() { if (replayComplete.get()) throw new IllegalStateException("Can only replay persisted once."); - LogState logState = persistence.getLogState(metadata().epoch); + LogState logState = storage.getLogState(metadata().epoch); append(logState); - return waitForHighestConsecutive().epoch; + return waitForHighestConsecutive(); } private void maybeNotifyListeners(Entry entry, Transformation.Result result) @@ -538,11 +569,12 @@ public abstract class LocalLog implements Closeable this.changeListeners.remove(listener); } - public void notifyListeners(ClusterMetadata emptyFromSystemTables) + public void notifyListeners(ClusterMetadata prev) { - ClusterMetadata metadata = ClusterMetadata.current(); - notifyPreCommit(emptyFromSystemTables, metadata, true); - notifyPostCommit(emptyFromSystemTables, metadata, true); + ClusterMetadata metadata = committed.get(); + logger.info("Notifying listeners, prev epoch = {}, current epoch = {}", prev.epoch, metadata.epoch); + notifyPreCommit(prev, metadata, true); + notifyPostCommit(prev, metadata, true); } private void notifyPreCommit(ClusterMetadata before, ClusterMetadata after, boolean fromSnapshot) @@ -561,15 +593,59 @@ public abstract class LocalLog implements Closeable ScheduledExecutors.optionalTasks.submit(() -> listener.notifyPostCommit(before, after, fromSnapshot)); } + /** + * Essentially same as `ready` but throws an unchecked exception + */ + @VisibleForTesting + public final ClusterMetadata readyUnchecked() + { + try + { + return ready(); + } + catch (StartupException e) + { + throw new RuntimeException(e); + } + } + + public ClusterMetadata ready() throws StartupException + { + ClusterMetadata metadata = replayPersisted(); + spec.afterReplay.accept(metadata); + logger.info("Marking LocalLog ready at epoch {}", metadata.epoch); + + if (!replayComplete.compareAndSet(false, true)) + throw new IllegalStateException("Log is already fully initialised"); + + logger.debug("Marking LocalLog ready at epoch {}", committed.get().epoch); + if (spec.defaultListeners) + { + logger.info("Adding default listeners to LocalLog"); + addListeners(); + } + else + { + logger.info("Adding specified listeners to LocalLog"); + spec.listeners.forEach(this::addListener); + spec.changeListeners.forEach(this::addListener); + spec.asyncChangeListeners.forEach(this::addListener); + } + + logger.info("Notifying all registered listeners of both pre and post commit event"); + notifyListeners(spec.prev); + + return metadata; + } private static class Async extends LocalLog { private final AsyncRunnable runnable; private final Interruptible executor; - private Async(LogSpec spec) + private Async(LogSpec logSpec) { - super(spec); + super(logSpec); this.runnable = new AsyncRunnable(); this.executor = ExecutorFactory.Global.executorFactory().infiniteLoop("GlobalLogFollower", runnable, SAFE, NON_DAEMON, UNSYNCHRONIZED); } @@ -743,9 +819,9 @@ public abstract class LocalLog implements Closeable private static class Sync extends LocalLog { - private Sync(LogSpec spec) + private Sync(LogSpec logSpec) { - super(spec); + super(logSpec); } void runOnce(DurationSpec durationSpec) @@ -780,7 +856,7 @@ public abstract class LocalLog implements Closeable addListener(snapshotListener()); addListener(new InitializationListener()); - addListener(new SchemaListener()); + addListener(new SchemaListener(spec.loadSSTables)); addListener(new LegacyStateListener()); addListener(new PlacementsChangeListener()); addListener(new MetadataSnapshotListener()); @@ -788,45 +864,6 @@ public abstract class LocalLog implements Closeable addListener(new UpgradeMigrationListener()); } - public void ready() - { - if (!replayComplete.compareAndSet(false, true)) - throw new IllegalStateException("Log is already fully initialised"); - - logger.debug("Marking LocalLog ready at epoch {}", committed.get().epoch); - if (spec.defaultListeners) - { - logger.debug("Adding default listeners to LocalLog"); - addListeners(); - } - else - { - logger.debug("Adding specified listeners to LocalLog"); - spec.listeners.forEach(this::addListener); - spec.changeListeners.forEach(this::addListener); - spec.asyncChangeListeners.forEach(this::addListener); - } - - switch (spec.whenReady) - { - case ALL: - logger.debug("Notifying all registered listeners of both pre and post commit event"); - notifyListeners(spec.initial); - break; - case PRE_COMMIT_ONLY: - logger.debug("Notifying all registered listeners of pre-commit event only"); - notifyPreCommit(spec.initial, committed.get(), true); - break; - case POST_COMMIT_ONLY: - logger.debug("Notifying all registered listeners of post-commit event only"); - notifyPostCommit(spec.initial, committed.get(), true); - break; - case NONE: - logger.debug("Not notifying registered listeners of pre or post commit events"); - break; - } - } - private LogListener snapshotListener() { return (entry, metadata) -> { diff --git a/src/java/org/apache/cassandra/tcm/log/LogStorage.java b/src/java/org/apache/cassandra/tcm/log/LogStorage.java index e35dcfb2b1..3c88cf9b86 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogStorage.java +++ b/src/java/org/apache/cassandra/tcm/log/LogStorage.java @@ -43,6 +43,7 @@ public interface LogStorage extends LogReader { return LogState.EMPTY; } + public void truncate() {} public Replication getReplication(Epoch since) { return Replication.EMPTY; diff --git a/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java b/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java index 83c443731b..1f372f7318 100644 --- a/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java +++ b/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java @@ -101,6 +101,11 @@ public class SystemKeyspaceStorage implements LogStorage return LogState.getLogState(since, snapshots.get(), this); } + public void truncate() + { + Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(NAME).truncateBlockingWithoutSnapshot(); + } + /** * Uses the supplied period as a starting point to iterate through the log table * collating log entries which follow the supplied epoch. It is assumed that the diff --git a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java index b6b0eba2f6..2ffddaed7e 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java @@ -110,8 +110,7 @@ public class JMXFeatureTest extends TestBaseImpl Assert.assertThat(statusResult.getStderr(), is(blankOrNullString())); Assert.assertThat(statusResult.getStdout(), containsString("DN 127.0.0.1")); testInstance(instances, cluster.get(2)); - ClusterUtils.start(instanceToStop, props -> { - }); + ClusterUtils.start(instanceToStop, props -> {}); ClusterUtils.awaitRingState(otherInstance, instanceToStop, "Normal"); ClusterUtils.awaitRingStatus(otherInstance, instanceToStop, "Up"); statusResult = cluster.get(1).nodetoolResult("status"); diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java index eb78bc7e38..bd4ea3c7db 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java @@ -96,9 +96,11 @@ public class CMSTestBase this.rf = rf; schemaProvider = Mockito.mock(SchemaProvider.class); ClusterMetadata initial = new ClusterMetadata(partitioner); - LocalLog.LogSpec logSpec = new LocalLog.LogSpec().withInitialState(initial).withDefaultListeners(addListeners); - log = LocalLog.sync(logSpec); - log.ready(); + log = LocalLog.logSpec() + .sync() + .withInitialState(initial) + .withDefaultListeners(addListeners) + .createLog(); service = new ClusterMetadataService(new UniformRangePlacement(), MetadataSnapshots.NO_OP, @@ -108,6 +110,7 @@ public class CMSTestBase true); ClusterMetadataService.setInstance(service); + log.readyUnchecked(); log.bootstrap(FBUtilities.getBroadcastAddressAndPort()); service.commit(new Initialize(ClusterMetadata.current()) { public Result execute(ClusterMetadata prev) diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java index 0046e170c8..093ac0bcd6 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java @@ -22,8 +22,8 @@ import java.net.UnknownHostException; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Random; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -46,9 +46,9 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.Verb; +import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.schema.Schema; @@ -63,6 +63,8 @@ import org.apache.cassandra.tcm.MetadataSnapshots; import org.apache.cassandra.tcm.Period; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.log.LocalLog; +import org.apache.cassandra.tcm.log.LogState; +import org.apache.cassandra.tcm.log.Replication; import org.apache.cassandra.tcm.membership.Location; import org.apache.cassandra.tcm.membership.NodeAddresses; import org.apache.cassandra.tcm.membership.NodeId; @@ -73,9 +75,9 @@ import org.apache.cassandra.tcm.ownership.UniformRangePlacement; import org.apache.cassandra.tcm.ownership.VersionedEndpoints; import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; import org.apache.cassandra.tcm.sequences.BootstrapAndReplace; -import org.apache.cassandra.tcm.sequences.Move; import org.apache.cassandra.tcm.sequences.LeaveStreams; import org.apache.cassandra.tcm.sequences.ReconfigureCMS; +import org.apache.cassandra.tcm.sequences.Move; import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave; import org.apache.cassandra.tcm.transformations.AlterSchema; import org.apache.cassandra.tcm.transformations.PrepareJoin; @@ -123,13 +125,15 @@ public class ClusterMetadataTestHelper public static ClusterMetadataService instanceForTest() { ClusterMetadata current = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); - LocalLog log = LocalLog.asyncForTests(); + LocalLog log = LocalLog.logSpec() + .createLog(); ResettableClusterMetadataService service = new ResettableClusterMetadataService(new UniformRangePlacement(), MetadataSnapshots.NO_OP, log, new AtomicLongBackedProcessor(log), Commit.Replicator.NO_OP, true); + log.readyUnchecked(); log.bootstrap(FBUtilities.getBroadcastAddressAndPort()); service.commit(new Initialize(current)); QueryProcessor.registerStatementInvalidatingListener(); @@ -137,26 +141,6 @@ public class ClusterMetadataTestHelper return service; } - /** - * Create a pre-configured CMS which supports mark & reset for use in tests. This version dose not perform initial - * CMS setup, neither bootstrapping the log nor applying an Initialize transformation. It assumes that the supplied - * ClusterMetadata instance is in the state required by the specific caller. - * @return a resettable CMS instance, to be used in a call to ClusterMetadataService::setInstance - */ - public static ClusterMetadataService instanceForTest(ClusterMetadata current) - { - LocalLog log = LocalLog.asyncForTests(); - ResettableClusterMetadataService service = new ResettableClusterMetadataService(new UniformRangePlacement(), - MetadataSnapshots.NO_OP, - log, - new AtomicLongBackedProcessor(log), - Commit.Replicator.NO_OP, - true); - QueryProcessor.registerStatementInvalidatingListener(); - service.mark(); - return service; - } - public static ClusterMetadata minimalForTesting(IPartitioner partitioner) { return new ClusterMetadata(Epoch.EMPTY, @@ -186,13 +170,13 @@ public class ClusterMetadataTestHelper null, ImmutableMap.of()); } + public static void forceCurrentPeriodTo(long period) { + ClusterMetadataService.unsetInstance(); + ClusterMetadataService.setInstance(instanceForTest()); ClusterMetadata metadata = ClusterMetadata.currentNullable(); - if (metadata == null) - metadata = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); - - metadata = new ClusterMetadata(metadata.epoch, + metadata = new ClusterMetadata(metadata.epoch.nextEpoch(), period, metadata.lastInPeriod, metadata.partitioner, @@ -203,20 +187,23 @@ public class ClusterMetadataTestHelper metadata.lockedRanges, metadata.inProgressSequences, metadata.extensions); - ClusterMetadataService.unsetInstance(); - ClusterMetadataService.setInstance(instanceForTest(metadata)); + ClusterMetadataService.instance().log().append(new LogState(metadata, Replication.of(Collections.emptyList()))); } public static ClusterMetadataService syncInstanceForTest() { - LocalLog log = LocalLog.sync(new LocalLog.LogSpec()); - log.ready(); - return new ClusterMetadataService(new UniformRangePlacement(), - MetadataSnapshots.NO_OP, - log, - new AtomicLongBackedProcessor(log), - Commit.Replicator.NO_OP, - true); + LocalLog log = LocalLog.logSpec() + .sync() + .createLog(); + ClusterMetadataService cms = new ClusterMetadataService(new UniformRangePlacement(), + MetadataSnapshots.NO_OP, + log, + new AtomicLongBackedProcessor(log), + Commit.Replicator.NO_OP, + true); + + log.readyUnchecked(); + return cms; } public static void createKeyspace(String name, KeyspaceParams params) @@ -247,7 +234,6 @@ public class ClusterMetadataTestHelper } } - private static Set<InetAddressAndPort> leaving(ClusterMetadata metadata) { return metadata.directory.states.entrySet().stream() diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java index 8c41d17ddc..512fa19c5b 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java @@ -642,9 +642,12 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase assert executor == null; LogStorage logStorage = new AtomicLongBackedProcessor.InMemoryStorage(); ClusterMetadata initial = new ClusterMetadata(partitioner); - LocalLog.LogSpec logSpec = new LocalLog.LogSpec().withInitialState(initial).withStorage(logStorage); - LocalLog log = LocalLog.sync(logSpec); - log.ready(); + LocalLog log = LocalLog.logSpec() + .withInitialState(initial) + .sync() + .withStorage(logStorage) + .createLog(); + // Replicator only replicates to the node under test, as there are no other nodes in reality Commit.Replicator replicator = (result, source) -> { realCluster.deliverMessage(realCluster.get(1).broadcastAddress(), @@ -661,7 +664,7 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase replicator, true); ClusterMetadataService.setInstance(service); - + log.readyUnchecked(); log.bootstrap(cms.addr()); service.commit(new Initialize(log.metadata())); service.commit(new Register(new NodeAddresses(cms.addr()), new Location(cms.dc(), cms.rack()), NodeVersion.CURRENT)); @@ -720,8 +723,9 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase // We need to create a second node to be able to send and receive messages. RealSimulatedNode driver = createNode(); - LocalLog log = LocalLog.sync(new LocalLog.LogSpec()); - log.ready(); + LocalLog log = LocalLog.logSpec() + .sync() + .createLog(); ClusterMetadataService metadataService = new ClusterMetadataService(new UniformRangePlacement(), @@ -754,6 +758,8 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase false); ClusterMetadataService.setInstance(metadataService); + log.readyUnchecked(); + driver.clean(TCM_REPLICATION); driver.on(Verb.TCM_REPLICATION, new SimulatedAction<Replication, NoPayload>() { diff --git a/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java b/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java index 6e819d825a..5ca7bd9f5d 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java @@ -94,16 +94,16 @@ public class DecommissionTest extends TestBaseImpl } @Test - public void testDecomDirectoryMinMaxVersions() throws IOException - { - try (Cluster cluster = builder().withNodes(3) + public void testDecomDirectoryMinMaxVersions() throws IOException { + try (Cluster cluster = builder() + .withConfig(cfg -> cfg.with(GOSSIP)) + .withNodes(3) .start()) { cluster.get(3).nodetoolResult("decommission", "--force").asserts().success(); cluster.get(1).runOnInstance(() -> { ClusterMetadata metadata = ClusterMetadata.current(); - ClusterMetadataService.instance().commit(new Startup(metadata.myNodeId(), metadata.directory.getNodeAddresses(metadata.myNodeId()), new NodeVersion(new CassandraVersion("6.0.0"), @@ -133,6 +133,7 @@ public class DecommissionTest extends TestBaseImpl @Test public void testMixedVersionBlockDecom() throws IOException { try (Cluster cluster = builder().withNodes(3) + .withConfig(config -> config.with(GOSSIP)) .start()) { cluster.get(3).nodetoolResult("decommission", "--force").asserts().success(); diff --git a/test/unit/org/apache/cassandra/ServerTestUtils.java b/test/unit/org/apache/cassandra/ServerTestUtils.java index 42a18600e6..29e1a84a1e 100644 --- a/test/unit/org/apache/cassandra/ServerTestUtils.java +++ b/test/unit/org/apache/cassandra/ServerTestUtils.java @@ -24,8 +24,8 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import java.util.function.Function; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,21 +33,20 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.audit.AuditLogManager; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.big.BigTableReader; import org.apache.cassandra.io.sstable.indexsummary.IndexSummarySupport; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.File; import org.apache.cassandra.locator.AbstractEndpointSnitch; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; -import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.security.ThreadAwareSecurityManager; import org.apache.cassandra.service.EmbeddedCassandraService; import org.apache.cassandra.tcm.AtomicLongBackedProcessor; @@ -59,6 +58,7 @@ import org.apache.cassandra.tcm.MetadataSnapshots; import org.apache.cassandra.tcm.Processor; import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.log.LogStorage; +import org.apache.cassandra.tcm.log.SystemKeyspaceStorage; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.ownership.PlacementProvider; import org.apache.cassandra.tcm.ownership.UniformRangePlacement; @@ -265,7 +265,7 @@ public final class ServerTestUtils public static void initCMS() { - // Effectively disable automatic snapshots using AtomicLongBackedProcessopr and LocaLLog.Sync interacts + // Effectively disable automatic snapshots using AtomicLongBackedProcessor and LocaLLog.Sync interacts // badly with submitting SealPeriod transformations from the log listener. In this configuration, SealPeriod // commits performed on NonPeriodicTasks threads end up actually performing the transformations as well as // calling the pre and post commit listeners, which is not threadsafe. In a non-test setup the processing of @@ -276,9 +276,14 @@ public final class ServerTestUtils IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); boolean addListeners = true; ClusterMetadata initial = new ClusterMetadata(partitioner); - LocalLog.LogSpec logSpec = new LocalLog.LogSpec().withInitialState(initial) - .withDefaultListeners(addListeners); - LocalLog log = LocalLog.async(logSpec); + if (!Keyspace.isInitialized()) + Keyspace.setInitialized(); + + LocalLog log = LocalLog.logSpec() + .withInitialState(initial) + .withDefaultListeners(addListeners) + .createLog(); + ResettableClusterMetadataService service = new ResettableClusterMetadataService(new UniformRangePlacement(), MetadataSnapshots.NO_OP, log, @@ -287,10 +292,7 @@ public final class ServerTestUtils true); ClusterMetadataService.setInstance(service); - initial.schema.initializeKeyspaceInstances(DistributedSchema.empty()); - if (!Keyspace.isInitialized()) - Keyspace.setInitialized(); - log.ready(); + log.readyUnchecked(); log.bootstrap(FBUtilities.getBroadcastAddressAndPort()); service.commit(new Initialize(ClusterMetadata.current())); QueryProcessor.registerStatementInvalidatingListener(); @@ -309,12 +311,13 @@ public final class ServerTestUtils // |-- StorageService.instance.setPartitionerUnsafe(M3P) # test wants to use LongToken // |-- ServerTestUtils.recreateCMS # recreates the CMS using the updated partitioner ClusterMetadata initial = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); - initial.schema.initializeKeyspaceInstances(DistributedSchema.empty()); - LocalLog.LogSpec logSpec = new LocalLog.LogSpec().withInitialState(initial) - .withStorage(LogStorage.SystemKeyspace) - .withDefaultListeners(); - LocalLog log = LocalLog.async(logSpec); - log.ready(); + LogStorage storage = LogStorage.SystemKeyspace; + LocalLog.LogSpec logSpec = LocalLog.logSpec() + .withInitialState(initial) + .withStorage(storage) + .withDefaultListeners(); + LocalLog log = logSpec.createLog(); + ResettableClusterMetadataService cms = new ResettableClusterMetadataService(new UniformRangePlacement(), MetadataSnapshots.NO_OP, log, @@ -323,6 +326,8 @@ public final class ServerTestUtils true); ClusterMetadataService.unsetInstance(); ClusterMetadataService.setInstance(cms); + ((SystemKeyspaceStorage)LogStorage.SystemKeyspace).truncate(); + log.readyUnchecked(); log.bootstrap(FBUtilities.getBroadcastAddressAndPort()); cms.mark(); } diff --git a/test/unit/org/apache/cassandra/hints/HintsUpgradeTest.java b/test/unit/org/apache/cassandra/hints/HintsUpgradeTest.java index e1d7532ac0..3d8c443121 100644 --- a/test/unit/org/apache/cassandra/hints/HintsUpgradeTest.java +++ b/test/unit/org/apache/cassandra/hints/HintsUpgradeTest.java @@ -26,6 +26,7 @@ import java.util.function.Consumer; import com.google.common.collect.ImmutableMap; import org.junit.After; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.apache.cassandra.SchemaLoader; @@ -47,6 +48,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +@Ignore("TODO: TCM") public class HintsUpgradeTest { static diff --git a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java index 09fda7c5f5..cf6d6ed400 100644 --- a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java +++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java @@ -21,7 +21,6 @@ import java.util.Collection; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; - import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -34,6 +33,7 @@ import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.exceptions.RequestTimeoutException; import org.apache.cassandra.transport.Message; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.transport.SimpleClient; @@ -74,12 +74,12 @@ public class ClientWarningsTest extends CQLTester { client.connect(false); - QueryMessage query = new QueryMessage(createBatchStatement2(1), QueryOptions.DEFAULT); - Message.Response resp = client.execute(query); + Message.Response resp = executeWithRetries(client, + new QueryMessage(createBatchStatement2(1), QueryOptions.DEFAULT)); assertNull(resp.getWarnings()); - query = new QueryMessage(createBatchStatement2(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT); - resp = client.execute(query); + resp = executeWithRetries(client, + new QueryMessage(createBatchStatement2(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT)); assertEquals(1, resp.getWarnings().size()); } } @@ -94,12 +94,14 @@ public class ClientWarningsTest extends CQLTester { client.connect(false); - QueryMessage query = new QueryMessage(createBatchStatement2(DatabaseDescriptor.getBatchSizeWarnThreshold() / 2 + 1), QueryOptions.DEFAULT); - Message.Response resp = client.execute(query); + Message.Response resp = executeWithRetries(client, + new QueryMessage(createBatchStatement2(DatabaseDescriptor.getBatchSizeWarnThreshold() / 2 + 1), QueryOptions.DEFAULT)); assertEquals(1, resp.getWarnings().size()); - query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT); - resp = client.execute(query); + + resp = executeWithRetries(client, + new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT)); + assertNull(resp.getWarnings()); } } @@ -116,35 +118,50 @@ public class ClientWarningsTest extends CQLTester for (int i = 0; i < iterations; i++) { - QueryMessage query = new QueryMessage(String.format("INSERT INTO %s.%s (pk, ck, v) VALUES (1, %s, 1)", - KEYSPACE, - currentTable(), - i), QueryOptions.DEFAULT); - client.execute(query); + executeWithRetries(client, + new QueryMessage(String.format("INSERT INTO %s.%s (pk, ck, v) VALUES (1, %s, 1)", + KEYSPACE, + currentTable(), + i), QueryOptions.DEFAULT)); } ColumnFamilyStore store = Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()); Util.flush(store); for (int i = 0; i < iterations; i++) { - QueryMessage query = new QueryMessage(String.format("DELETE v FROM %s.%s WHERE pk = 1 AND ck = %s", - KEYSPACE, - currentTable(), - i), QueryOptions.DEFAULT); - client.execute(query); + executeWithRetries(client, + new QueryMessage(String.format("DELETE v FROM %s.%s WHERE pk = 1 AND ck = %s", + KEYSPACE, + currentTable(), + i), QueryOptions.DEFAULT)); } Util.flush(store); { - QueryMessage query = new QueryMessage(String.format("SELECT * FROM %s.%s WHERE pk = 1", - KEYSPACE, - currentTable()), QueryOptions.DEFAULT); - Message.Response resp = client.execute(query); + Message.Response resp = executeWithRetries(client, + new QueryMessage(String.format("SELECT * FROM %s.%s WHERE pk = 1", + KEYSPACE, + currentTable()), QueryOptions.DEFAULT)); assertEquals(1, resp.getWarnings().size()); } } } + private static Message.Response executeWithRetries(SimpleClient client, QueryMessage query) + { + for (int i = 0; i < 10; i++) + { + try + { + return client.execute(query); + } + catch (RequestTimeoutException t) + { + logger.warn("Timed out. Retrying."); + } + } + throw new RuntimeException("Could not execute query after 10 tries"); + } @Test public void testLargeBatchWithProtoV2() throws Exception { @@ -154,8 +171,8 @@ public class ClientWarningsTest extends CQLTester { client.connect(false); - QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT); - Message.Response resp = client.execute(query); + Message.Response resp = executeWithRetries(client, + new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT)); assertNull(resp.getWarnings()); } } diff --git a/test/unit/org/apache/cassandra/tcm/BootWithMetadataTest.java b/test/unit/org/apache/cassandra/tcm/BootWithMetadataTest.java index 831204d7ac..ffb55e3048 100644 --- a/test/unit/org/apache/cassandra/tcm/BootWithMetadataTest.java +++ b/test/unit/org/apache/cassandra/tcm/BootWithMetadataTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.tcm; -import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; @@ -99,7 +98,7 @@ public class BootWithMetadataTest epoch = doTest(Epoch.create(epoch.getEpoch() + 100), first); } - private Epoch doTest(Epoch epoch, ClusterMetadata first) throws IOException + private Epoch doTest(Epoch epoch, ClusterMetadata first) throws Throwable { long seed = System.nanoTime(); logger.info("STARTING TEST FROM EPOCH {}, SEED: {}", epoch, seed); diff --git a/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java b/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java index af24888858..f8380a6cb6 100644 --- a/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java +++ b/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java @@ -60,8 +60,10 @@ public class DiscoverySimulationTest public static void setup() { ClusterMetadata initial = new ClusterMetadata(Murmur3Partitioner.instance); - LocalLog log = LocalLog.sync(new LocalLog.LogSpec().withInitialState(initial)); - log.ready(); + LocalLog log = LocalLog.logSpec() + .sync() + .withInitialState(initial) + .createLog(); ClusterMetadataService cms = new ClusterMetadataService(new UniformRangePlacement(), MetadataSnapshots.NO_OP, @@ -70,6 +72,7 @@ public class DiscoverySimulationTest Commit.Replicator.NO_OP, false); ClusterMetadataService.setInstance(cms); + log.readyUnchecked(); } @Test diff --git a/test/unit/org/apache/cassandra/tcm/LogStateTest.java b/test/unit/org/apache/cassandra/tcm/LogStateTest.java index 309847d826..2258362fa7 100644 --- a/test/unit/org/apache/cassandra/tcm/LogStateTest.java +++ b/test/unit/org/apache/cassandra/tcm/LogStateTest.java @@ -29,10 +29,10 @@ import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.dht.Murmur3Partitioner; -import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tcm.extensions.ExtensionValue; import org.apache.cassandra.tcm.listeners.MetadataSnapshotListener; +import org.apache.cassandra.tcm.listeners.SchemaListener; import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.log.LogStorage; import org.apache.cassandra.tcm.ownership.UniformRangePlacement; @@ -54,10 +54,13 @@ public class LogStateTest LogStorage logStorage = LogStorage.SystemKeyspace; MetadataSnapshots snapshots = new MetadataSnapshots.SystemKeyspaceMetadataSnapshots(); ClusterMetadata initial = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); - LocalLog.LogSpec logSpec = new LocalLog.LogSpec().withInitialState(initial) - .withStorage(logStorage) - .withLogListener(new MetadataSnapshotListener()); - LocalLog log = LocalLog.sync(logSpec); + LocalLog.LogSpec logSpec = LocalLog.logSpec() + .sync() + .withInitialState(initial) + .withStorage(logStorage) + .withLogListener(new MetadataSnapshotListener()) + .withListener(new SchemaListener(true)); + LocalLog log = logSpec.createLog(); ClusterMetadataService cms = new ClusterMetadataService(new UniformRangePlacement(), snapshots, log, @@ -66,8 +69,7 @@ public class LogStateTest false); ClusterMetadataService.unsetInstance(); ClusterMetadataService.setInstance(cms); - initial.schema.initializeKeyspaceInstances(DistributedSchema.empty()); - log.ready(); + log.readyUnchecked(); log.bootstrap(FBUtilities.getBroadcastAddressAndPort()); } diff --git a/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java b/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java index f3f03c9f04..bcbe663736 100644 --- a/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java +++ b/test/unit/org/apache/cassandra/tcm/log/LocalLogTest.java @@ -63,8 +63,11 @@ public class LocalLogTest @Test public void appendToFillGapWithConsecutiveBufferedEntries() { - LocalLog log = LocalLog.sync(new LocalLog.LogSpec().withInitialState(cm())); - log.ready(); + LocalLog log = LocalLog.logSpec() + .sync() + .withInitialState(cm()) + .createLog(); + log.readyUnchecked(); Epoch start = log.metadata().epoch; assertEquals(EMPTY, start); @@ -87,10 +90,13 @@ public class LocalLogTest @Test public void sealPeriodForceSnapshotCollisionWithGap() { - LocalLog log = LocalLog.sync(new LocalLog.LogSpec().withInitialState(cm())); - log.ready(); + LocalLog log = LocalLog.logSpec() + .sync() + .withInitialState(cm()) + .createLog(); + log.readyUnchecked(); - List<Entry> entries =new ArrayList<>(); + List<Entry> entries = new ArrayList<>(); for (int i = 1; i <= 9; i++) entries.add(entry(i)); entries.add(new Entry(Entry.Id.NONE, @@ -112,8 +118,11 @@ public class LocalLogTest @Test public void multipleSnapshotEntries() { - LocalLog log = LocalLog.sync(new LocalLog.LogSpec().withInitialState(cm())); - log.ready(); + LocalLog log = LocalLog.logSpec() + .sync() + .withInitialState(cm()) + .createLog(); + log.readyUnchecked(); List<Entry> entries =new ArrayList<>(); for (int i = 1; i <= 9; i++) @@ -165,7 +174,7 @@ public class LocalLogTest CountDownLatch finish = CountDownLatch.newCountDownLatch(threads); CountDownLatch finishReaders = CountDownLatch.newCountDownLatch(threads); ExecutorPlus executor = executorFactory().configurePooled("APPENDER", threads * 2).build(); - LocalLog log = LocalLog.asyncForTests(cm()); + LocalLog log = LocalLog.logSpec().withInitialState(cm()).createLog(); List<Entry> committed = new CopyOnWriteArrayList<>(); // doesn't need to be concurrent, since log is single-threaded log.addListener((e, m) -> committed.add(e)); diff --git a/test/unit/org/apache/cassandra/tcm/log/LogListenerNotificationTest.java b/test/unit/org/apache/cassandra/tcm/log/LogListenerNotificationTest.java index d9a14e4e3f..59aafa37e0 100644 --- a/test/unit/org/apache/cassandra/tcm/log/LogListenerNotificationTest.java +++ b/test/unit/org/apache/cassandra/tcm/log/LogListenerNotificationTest.java @@ -94,10 +94,13 @@ public class LogListenerNotificationTest counter++; } }; - LocalLog.LogSpec logSpec = new LocalLog.LogSpec().withInitialState(cm()).withLogListener(listener); - LocalLog log = LocalLog.sync(logSpec); + LocalLog log = LocalLog.logSpec() + .sync() + .withInitialState(cm()) + .withLogListener(listener) + .createLog(); + log.readyUnchecked(); log.append(new Entry(Entry.Id.NONE, Epoch.FIRST, PreInitialize.forTesting())); - log.ready(); log.append(input); } diff --git a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java index 90d71abaa7..83aafece69 100644 --- a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java +++ b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java @@ -64,7 +64,6 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Tables; import org.apache.cassandra.schema.Types; import org.apache.cassandra.service.ClientState; -import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.JavaDriverUtils; @@ -620,7 +619,6 @@ public class StressCQLSSTableWriter implements Closeable */ public static ColumnFamilyStore createOfflineTable(CreateTableStatement.Raw schemaStatement, List<CreateTypeStatement.Raw> typeStatements, List<File> directoryList) { - ClusterMetadata prev = ClusterMetadata.current(); String keyspace = schemaStatement.keyspace(); KeyspaceMetadata ksm = KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1)); @@ -644,7 +642,6 @@ public class StressCQLSSTableWriter implements Closeable Tables tables = Tables.of(tableMetadata); KeyspaceMetadata updated = ksm.withSwapped(tables); Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(updated)); - ClusterMetadata.current().schema.initializeKeyspaceInstances(prev.schema, false); Keyspace.setInitialized(); Directories directories = new Directories(tableMetadata, directoryList.stream().map(f -> new Directories.DataDirectory(new org.apache.cassandra.io.util.File(f.toPath()))).collect(Collectors.toList())); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org