This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 85748281f4a7216581a94f1ca73183921534dca6
Author: Sam Tunnicliffe <s...@apache.org>
AuthorDate: Fri Mar 3 19:43:03 2023 +0000

    [CEP-21] Modify CassandraDaemon
    
    Alter CassandraDaemon intialization to accomodate TCM and replay of the
    cluster metadata log. This is something of a WIP and there is clearly
    scope to further clean up this part of the code.
    
    Co-authored-by: Marcus Eriksson <marc...@apache.org>
    Co-authored-by: Alex Petrov <oleksandr.pet...@gmail.com>
    Co-authored-by: Sam Tunnicliffe <s...@apache.org>
---
 .../apache/cassandra/service/CassandraDaemon.java  | 79 ++++++++++------------
 1 file changed, 36 insertions(+), 43 deletions(-)

diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 4aea5d47aa..38a94a7d7b 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -71,7 +71,8 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.sstable.SSTableHeaderFix;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Startup;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.DefaultNameFactory;
 import org.apache.cassandra.net.StartupClusterConnectivityChecker;
@@ -79,11 +80,10 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.security.ThreadAwareSecurityManager;
-import org.apache.cassandra.streaming.StreamManager;
-import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.InProgressSequence;
-import org.apache.cassandra.tcm.Startup;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JMXServerUtils;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -96,7 +96,6 @@ import 
org.apache.cassandra.utils.logging.LoggingSupportFactory;
 import org.apache.cassandra.utils.logging.VirtualTableAppender;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static 
org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_FOREGROUND;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_JMX_REMOTE_PORT;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_PID_FILE;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.COM_SUN_MANAGEMENT_JMXREMOTE_PORT;
@@ -113,6 +112,7 @@ import static 
org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VM_NA
 public class CassandraDaemon
 {
     public static final String MBEAN_NAME = 
"org.apache.cassandra.db:type=NativeAccess";
+    public static boolean SKIP_GC_INSPECTOR = 
Boolean.getBoolean("cassandra.startup.skip_gc_inspector");
 
     private static final Logger logger;
 
@@ -259,9 +259,7 @@ public class CassandraDaemon
         NativeLibrary.tryMlockall();
 
         DatabaseDescriptor.createAllDirectories();
-
         Keyspace.setInitialized();
-
         CommitLog.instance.start();
 
         try
@@ -272,7 +270,6 @@ public class CassandraDaemon
         {
             throw new AssertionError("Can't initialize cluster metadata 
service");
         }
-
         QueryProcessor.registerStatementInvalidatingListener();
 
         //TODO disabled b/c this involves checking schema but log replay 
hasn't run yet so it hasn't been constructed
@@ -295,20 +292,9 @@ public class CassandraDaemon
 
         SystemKeyspaceMigrator41.migrate();
 
+        // TODO (TM/alexp)
         // Populate token metadata before flushing, for token-aware sstable 
partitioning (#6696)
-//        StorageService.instance.populateTokenMetadata();
-
-//        try
-//        {
-//            // load schema from disk
-//            Schema.instance.loadFromDisk();
-//        }
-//        catch (Exception e)
-//        {
-//            logger.error("Error while loading schema: ", e);
-//            throw e;
-//        }
-
+        // StorageService.instance.populateTokenMetadata();
         setupVirtualKeyspaces();
 
         SSTableHeaderFix.fixNonFrozenUDTIfUpgradeFrom30();
@@ -322,13 +308,12 @@ public class CassandraDaemon
             exitOrFail(e.returnCode, e.getMessage(), e.getCause());
         }
 
-        Keyspace.setInitialized();
-
         // initialize keyspaces
         for (String keyspaceName : Schema.instance.getKeyspaces())
         {
             if (logger.isDebugEnabled())
                 logger.debug("opening keyspace {}", keyspaceName);
+            // TODO (TM/alexp)
             // disable auto compaction until gossip settles since disk 
boundaries may be affected by ring layout
             for (ColumnFamilyStore cfs : 
Keyspace.open(keyspaceName).getColumnFamilyStores())
             {
@@ -349,20 +334,25 @@ public class CassandraDaemon
             logger.warn("Error loading key or row cache", t);
         }
 
-        try
-        {
-            GCInspector.register();
-        }
-        catch (Throwable t)
+        if (!SKIP_GC_INSPECTOR)
         {
-            JVMStabilityInspector.inspectThrowable(t);
-            logger.warn("Unable to start GCInspector (currently only supported 
on the Sun JVM)");
+            try
+            {
+                GCInspector.register();
+            }
+            catch (Throwable t)
+            {
+                JVMStabilityInspector.inspectThrowable(t);
+                logger.warn("Unable to start GCInspector (currently only 
supported on the Sun JVM)");
+            }
         }
 
         // Replay any CommitLogSegments found on disk
         PaxosState.initializeTrackers();
 
         // replay the log if necessary
+        // TODO samt - when restarting a previously running instance, this 
needs to happen after reconstructing schema
+        //  from the cluster metadata log or all mutations will throw 
IncompatibleSchemaException on deserialisation
         try
         {
             CommitLog.instance.recoverSegmentsOnDisk();
@@ -372,9 +362,6 @@ public class CassandraDaemon
             throw new RuntimeException(e);
         }
 
-        // Re-populate token metadata after commit log recover (new peers 
might be loaded onto system keyspace #10293)
-//        StorageService.instance.populateTokenMetadata();
-
         try
         {
             PaxosState.maybeRebuildUncommittedState();
@@ -385,7 +372,7 @@ public class CassandraDaemon
         }
 
         // Clean up system.size_estimates entries left lying around from 
missed keyspace drops (CASSANDRA-14905)
-        StorageService.instance.cleanupSizeEstimates();
+        SystemKeyspace.clearAllEstimates();
 
         // schedule periodic dumps of table size estimates into 
SystemKeyspace.SIZE_ESTIMATES_CF
         // set cassandra.size_recorder_interval to 0 to disable
@@ -441,6 +428,9 @@ public class CassandraDaemon
             exitOrFail(1, "Fatal configuration error", e);
         }
 
+        ClusterMetadataService.instance().replayAndWait();
+
+        // TODO: (TM/alexp), this can be made time-dependent
         // Because we are writing to the system_distributed keyspace, this 
should happen after that is created, which
         // happens in StorageService.instance.initServer()
         Runnable viewRebuild = () -> {
@@ -453,8 +443,9 @@ public class CassandraDaemon
 
         ScheduledExecutors.optionalTasks.schedule(viewRebuild, 
StorageService.RING_DELAY_MILLIS, TimeUnit.MILLISECONDS);
 
-        if 
(!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress()))
-            Gossiper.waitToSettle();
+        // TODO: (TM/alexp), we do not need to wait for gossip settlement 
anymore
+//        if 
(!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress()))
+//            Gossiper.waitToSettle();
 
         StorageService.instance.doAuthSetup(false);
 
@@ -465,7 +456,7 @@ public class CassandraDaemon
             {
                 for (final ColumnFamilyStore store : cfs.concatWithIndexes())
                 {
-                    store.reload(); //reload CFs in case there was a change of 
disk boundaries
+                    store.reload(store.metadata()); //reload CFs in case there 
was a change of disk boundaries
                     if (store.getCompactionStrategyManager().shouldBeEnabled())
                     {
                         if 
(DatabaseDescriptor.getAutocompactionOnStartupEnabled())
@@ -723,6 +714,7 @@ public class CassandraDaemon
         if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || 
(nativeFlag == null && DatabaseDescriptor.startNativeTransport()))
         {
             startNativeTransport();
+            // TODO: we should represent this state in transactional metadata 
to avoid relying on gossip
             StorageService.instance.setRpcReady(true);
         }
         else
@@ -740,7 +732,7 @@ public class CassandraDaemon
         // jsvc takes care of taking the rest down
         logger.info("Cassandra shutting down...");
         destroyClientTransports();
-        StorageService.instance.setRpcReady(false);
+        //StorageService.instance.setRpcReady(false);
 
         if (jmxServer != null)
         {
@@ -791,11 +783,12 @@ public class CassandraDaemon
                 new File(pidFile).deleteOnExit();
             }
 
-            if (CASSANDRA_FOREGROUND.getString() == null)
-            {
-                System.out.close();
-                System.err.close();
-            }
+            // TODO: this should definitely be done differently
+//            if (CASSANDRA_FOREGROUND.getString() == null)
+//            {
+//                System.out.close();
+//                System.err.close();
+//            }
 
             start();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to