This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch CASSANDRA-18714-5.0-maedhroz-client
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit fecac728ec945e9b376a5fb2c4af54f337d744cf
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Fri Jan 12 14:52:43 2024 -0600

    Alternative fix for SAI + CQLSSTableWriter in client mode
---
 src/java/org/apache/cassandra/config/Config.java   | 18 +++++-
 .../statements/schema/CreateIndexStatement.java    | 10 +--
 .../org/apache/cassandra/db/ColumnFamilyStore.java | 44 +++++--------
 .../db/memtable/AbstractAllocatorMemtable.java     | 22 +------
 .../apache/cassandra/db/memtable/TrieMemtable.java | 19 +-----
 .../cassandra/index/SecondaryIndexManager.java     |  2 +-
 .../index/sai/StorageAttachedIndexGroup.java       |  2 +-
 .../sai/disk/v1/segment/SegmentTrieBuffer.java     |  4 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java     | 72 ++++------------------
 .../locator/AbstractReplicationStrategy.java       |  2 +-
 10 files changed, 59 insertions(+), 136 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index df1cf62d15..d5b91ef530 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.audit.AuditLogOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.fql.FullQueryLoggerOptions;
 import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.service.StartupChecks.StartupCheckType;
 import org.apache.cassandra.utils.StorageCompatibilityMode;
@@ -1164,7 +1165,22 @@ public class Config
         unslabbed_heap_buffers_logged,
         heap_buffers,
         offheap_buffers,
-        offheap_objects
+        offheap_objects;
+
+        public BufferType toBufferType()
+        {
+            switch (this)
+            {
+                case unslabbed_heap_buffers:
+                case heap_buffers:
+                    return BufferType.ON_HEAP;
+                case offheap_buffers:
+                case offheap_objects:
+                    return BufferType.OFF_HEAP;
+                default:
+                    throw new AssertionError();
+            }
+        }
     }
 
     public enum DiskFailurePolicy
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
index 2c475f45f5..b53e066f90 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
@@ -80,11 +80,11 @@ public final class CreateIndexStatement extends 
AlterSchemaStatement
     public static final String KEYSPACE_DOES_NOT_MATCH_INDEX = "Keyspace name 
'%s' doesn't match index name '%s'";
     public static final String MUST_SPECIFY_INDEX_IMPLEMENTATION = "Must 
specify index implementation via USING";
 
-    public final String indexName;
-    public final String tableName;
-    public final List<IndexTarget.Raw> rawIndexTargets;
-    public final IndexAttributes attrs;
-    public final boolean ifNotExists;
+    private final String indexName;
+    private final String tableName;
+    private final List<IndexTarget.Raw> rawIndexTargets;
+    private final IndexAttributes attrs;
+    private final boolean ifNotExists;
 
     private ClientState state;
 
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 023aac5f27..bcf4dc7073 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -200,36 +200,24 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
     are finished. By having flushExecutor size the same size as each of the 
perDiskflushExecutors we make sure we can
     have that many flushes going at the same time.
     */
-    private static ExecutorPlus flushExecutor;
+    private static final ExecutorPlus flushExecutor = 
DatabaseDescriptor.isDaemonInitialized() 
+                                                      ? 
executorFactory().withJmxInternal().pooled("MemtableFlushWriter", 
getFlushWriters())
+                                                      : null;
 
     // post-flush executor is single threaded to provide guarantee that any 
flush Future on a CF will never return until prior flushes have completed
-    private static ExecutorPlus postFlushExecutor;
+    private static final ExecutorPlus postFlushExecutor = 
DatabaseDescriptor.isDaemonInitialized()
+                                                          ? 
executorFactory().withJmxInternal().sequential("MemtablePostFlush")
+                                                          : null;
 
-    private static ExecutorPlus reclaimExecutor;
+    private static final ExecutorPlus reclaimExecutor = 
DatabaseDescriptor.isDaemonInitialized()
+                                                        ? 
executorFactory().withJmxInternal().sequential("MemtableReclaimMemory")
+                                                        : null;
 
-    private static PerDiskFlushExecutors perDiskflushExecutors;
-
-    static
-    {
-        if (!DatabaseDescriptor.isClientOrToolInitialized())
-        {
-            flushExecutor = executorFactory()
-            .withJmxInternal()
-            .pooled("MemtableFlushWriter", getFlushWriters());
-
-            postFlushExecutor = executorFactory()
-            .withJmxInternal()
-            .sequential("MemtablePostFlush");
-
-            reclaimExecutor = executorFactory()
-            .withJmxInternal()
-            .sequential("MemtableReclaimMemory");
-
-            perDiskflushExecutors = new 
PerDiskFlushExecutors(DatabaseDescriptor.getFlushWriters(),
-                                                              
DatabaseDescriptor.getNonLocalSystemKeyspacesDataFileLocations(),
-                                                              
DatabaseDescriptor.useSpecificLocationForLocalSystemData());
-        }
-    }
+    private static final PerDiskFlushExecutors perDiskflushExecutors = 
DatabaseDescriptor.isDaemonInitialized()
+                                                                       ? new 
PerDiskFlushExecutors(DatabaseDescriptor.getFlushWriters(),
+                                                                               
                   
DatabaseDescriptor.getNonLocalSystemKeyspacesDataFileLocations(),
+                                                                               
                   DatabaseDescriptor.useSpecificLocationForLocalSystemData())
+                                                                       : null;
 
     /**
      * Reason for initiating a memtable flush.
@@ -408,7 +396,9 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
         indexManager.reload();
 
         memtableFactory = metadata().params.memtable.factory();
-        switchMemtableOrNotify(FlushReason.SCHEMA_CHANGE, 
Memtable::metadataUpdated);
+
+        if (DatabaseDescriptor.isDaemonInitialized())
+            switchMemtableOrNotify(FlushReason.SCHEMA_CHANGE, 
Memtable::metadataUpdated);
     }
 
     public static Runnable getBackgroundCompactionTaskSubmitter()
diff --git 
a/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java 
b/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java
index 99f321c8c5..8526dace39 100644
--- a/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java
+++ b/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DataStorageSpec;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -79,24 +78,9 @@ public abstract class AbstractAllocatorMemtable extends 
AbstractMemtableWithComm
     static MemtablePool createMemtableAllocatorPool()
     {
         Config.MemtableAllocationType allocationType = 
DatabaseDescriptor.getMemtableAllocationType();
-
-        long heapLimit;
-        long offHeapLimit;
-        float memtableCleanupThreshold;
-
-        if (DatabaseDescriptor.isClientOrToolInitialized())
-        {
-            heapLimit = new DataStorageSpec.IntMebibytesBound((int) 
(Runtime.getRuntime().maxMemory() / (4 * 1048576))).toMebibytes() << 20;
-            offHeapLimit = new DataStorageSpec.IntMebibytesBound((int) 
(Runtime.getRuntime().maxMemory() / (4 * 1048576))).toMebibytes() << 20;
-            memtableCleanupThreshold = (float) (1.0 / (1 + 1));
-        }
-        else
-        {
-            heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMiB() << 20;
-            offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMiB() 
<< 20;
-            memtableCleanupThreshold = 
DatabaseDescriptor.getMemtableCleanupThreshold();
-        }
-
+        long heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMiB() << 20;
+        long offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMiB() 
<< 20;
+        float memtableCleanupThreshold = 
DatabaseDescriptor.getMemtableCleanupThreshold();
         MemtableCleaner cleaner = 
AbstractAllocatorMemtable::flushLargestMemtable;
         return createMemtableAllocatorPoolInternal(allocationType, heapLimit, 
offHeapLimit, memtableCleanupThreshold, cleaner);
     }
diff --git a/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java 
b/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java
index 294f1472f0..83b02db06a 100644
--- a/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java
+++ b/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java
@@ -91,24 +91,7 @@ public class TrieMemtable extends AbstractShardedMemtable
     private static final Logger logger = 
LoggerFactory.getLogger(TrieMemtable.class);
 
     /** Buffer type to use for memtable tries (on- vs off-heap) */
-    public static final BufferType BUFFER_TYPE;
-
-    static
-    {
-        switch (DatabaseDescriptor.getMemtableAllocationType())
-        {
-        case unslabbed_heap_buffers:
-        case heap_buffers:
-            BUFFER_TYPE = BufferType.ON_HEAP;
-            break;
-        case offheap_buffers:
-        case offheap_objects:
-            BUFFER_TYPE = BufferType.OFF_HEAP;
-            break;
-        default:
-            throw new AssertionError();
-        }
-    }
+    public static final BufferType BUFFER_TYPE = 
DatabaseDescriptor.getMemtableAllocationType().toBufferType();
 
     /** If keys is below this length, we will use a recursive procedure for 
inserting data in the memtable trie. */
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 0712aee6d8..da15282b39 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -238,7 +238,7 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
         {
             try
             {
-                Callable<?> call = index.getInitializationTask();
+                Callable<?> call = DatabaseDescriptor.isDaemonInitialized() ? 
index.getInitializationTask() : null;
                 if (call != null)
                     initialBuildTask = new FutureTask<>(call);
             }
diff --git 
a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java 
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
index 07b0243a65..30d23f5de6 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
@@ -85,7 +85,7 @@ public class StorageAttachedIndexGroup implements 
Index.Group, INotificationCons
 
     private final SSTableContextManager contextManager;
 
-    public StorageAttachedIndexGroup(ColumnFamilyStore baseCfs)
+    StorageAttachedIndexGroup(ColumnFamilyStore baseCfs)
     {
         this.baseCfs = baseCfs;
         this.queryMetrics = new TableQueryMetrics(baseCfs.metadata());
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBuffer.java
 
b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBuffer.java
index d1e79f91f5..72c9add313 100644
--- 
a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBuffer.java
+++ 
b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBuffer.java
@@ -22,7 +22,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.LongAdder;
 import javax.annotation.concurrent.NotThreadSafe;
 
-import org.apache.cassandra.db.memtable.TrieMemtable;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.tries.InMemoryTrie;
 import org.apache.cassandra.index.sai.postings.PostingList;
 import org.apache.cassandra.index.sai.utils.IndexEntry;
@@ -45,7 +45,7 @@ public class SegmentTrieBuffer
 
     public SegmentTrieBuffer()
     {
-        trie = new InMemoryTrie<>(TrieMemtable.BUFFER_TYPE);
+        trie = new 
InMemoryTrie<>(DatabaseDescriptor.getMemtableAllocationType().toBufferType());
         postingsAccumulator = new PostingsAccumulator();
     }
 
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index e4a4dd365b..bd2935d902 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -44,7 +44,6 @@ import 
org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.cql3.statements.schema.CreateIndexStatement;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.cql3.statements.schema.CreateTypeStatement;
-import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
@@ -56,12 +55,9 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.index.sai.StorageAttachedIndex;
 import org.apache.cassandra.index.sai.StorageAttachedIndexGroup;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.File;
-import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.schema.Indexes;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Keyspaces;
@@ -652,33 +648,22 @@ public class CQLSSTableWriter implements Closeable
                 KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(keyspaceName);
 
                 TableMetadata tableMetadata = 
ksm.tables.getNullable(schemaStatement.table());
-
-                ColumnFamilyStore cfs = null;
-
                 if (tableMetadata == null)
                 {
                     Types types = createTypes(keyspaceName);
                     
Schema.instance.transform(SchemaTransformations.addTypes(types, true));
                     tableMetadata = createTable(types);
 
-                    
Schema.instance.transform(SchemaTransformations.addTable(tableMetadata, true));
-                }
+                    Keyspace.setInitialized();
 
-                if (buildIndexes && !indexStatements.isEmpty())
-                {
-                    cfs = new ColumnFamilyStore(Keyspace.mockKS(ksm),
-                                                tableMetadata.name,
-                                                () -> new 
SequenceBasedSSTableId(0),
-                                                
TableMetadataRef.forOfflineTools(tableMetadata),
-                                                new Directories(tableMetadata, 
new Directories.DataDirectory[] {
-                                                new 
Directories.DataDirectory(directory)
-                                                }),
-                                                false,
-                                                false,
-                                                true);
-
-                    Indexes indexes = registerIndexes(cfs, indexStatements);
-                    tableMetadata  = 
tableMetadata.unbuild().indexes(indexes).build();
+                    if (buildIndexes && !indexStatements.isEmpty())
+                    {
+                        tableMetadata = 
applyIndexes(ksm.withSwapped(ksm.tables.with(tableMetadata)));
+                        Keyspace ks = 
Keyspace.openWithoutSSTables(keyspaceName);
+                        Directories directories = new 
Directories(tableMetadata, Collections.singleton(new 
Directories.DataDirectory(new 
org.apache.cassandra.io.util.File(directory.toPath()))));
+                        ColumnFamilyStore cfs = 
ColumnFamilyStore.createColumnFamilyStore(ks, tableMetadata.name, 
TableMetadataRef.forOfflineTools(tableMetadata), directories, false, false, 
true);
+                        ks.initCfCustom(cfs);
+                    }
 
                     
Schema.instance.transform(SchemaTransformations.addTable(tableMetadata, true));
                 }
@@ -693,19 +678,13 @@ public class CQLSSTableWriter implements Closeable
                 if (format != null)
                     writer.setSSTableFormatType(format);
 
-                if (cfs != null)
+                if (buildIndexes)
                 {
-                    StorageAttachedIndexGroup saiGroup = 
StorageAttachedIndexGroup.getIndexGroup(cfs);
+                    StorageAttachedIndexGroup saiGroup = 
StorageAttachedIndexGroup.getIndexGroup(Schema.instance.getColumnFamilyStoreInstance(tableMetadata.id));
                     if (saiGroup != null)
                         writer.addIndexGroup(saiGroup);
                 }
 
-                // this is the empty directory / leftover from times we 
initialized ColumnFamilyStore
-                // it will automatically create directories for keyspace and 
table on disk after initialization
-                // we set that directory to the destination of generated 
SSTables so we just remove empty directories here
-                if (cfs != null)
-                    new File(directory, keyspaceName).deleteRecursive();
-
                 return new CQLSSTableWriter(writer, 
preparedModificationStatement, 
preparedModificationStatement.getBindVariables());
             }
         }
@@ -723,35 +702,6 @@ public class CQLSSTableWriter implements Closeable
             return builder.build();
         }
 
-        private Indexes registerIndexes(ColumnFamilyStore cfs, 
List<CreateIndexStatement.Raw> indexStatements)
-        {
-            ClientState state = ClientState.forInternalCalls();
-            Indexes.Builder indexesBuilder = Indexes.builder();
-
-            for (CreateIndexStatement.Raw raw : indexStatements)
-            {
-                CreateIndexStatement statement = raw.prepare(state);
-
-                List<IndexTarget> indexTargets = 
statement.rawIndexTargets.stream()
-                                                                          
.map(r -> r.prepare(cfs.metadata()))
-                                                                          
.collect(Collectors.toList());
-
-                IndexMetadata indexMetadata = 
IndexMetadata.fromIndexTargets(indexTargets,
-                                                                             
statement.indexName,
-                                                                             
IndexMetadata.Kind.CUSTOM,
-                                                                             
statement.attrs.getOptions());
-
-                indexesBuilder.add(indexMetadata);
-
-
-                cfs.indexManager.registerIndex(new StorageAttachedIndex(cfs, 
indexMetadata),
-                                               
StorageAttachedIndexGroup.GROUP_KEY,
-                                               () -> new 
StorageAttachedIndexGroup(cfs));
-            }
-
-            return indexesBuilder.build();
-        }
-
         /**
          * Applies any provided index definitions to the target table
          * @param ksm the KeyspaceMetadata object that has the table defined
diff --git 
a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java 
b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index b1f3561203..c32b7be2b0 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -324,7 +324,7 @@ public abstract class AbstractReplicationStrategy
         try
         {
             Constructor<? extends AbstractReplicationStrategy> constructor = 
strategyClass.getConstructor(parameterTypes);
-            IEndpointSnitch endpointSnitch = snitch == null ? new 
SimpleSnitch() : snitch;
+            IEndpointSnitch endpointSnitch = snitch == null && 
DatabaseDescriptor.isClientOrToolInitialized() ? new SimpleSnitch() : snitch;
             strategy = constructor.newInstance(keyspaceName, tokenMetadata, 
endpointSnitch, strategyOptions);
         }
         catch (InvocationTargetException e)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to