This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch CASSANDRA-18714-5.0-maedhroz-client in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit fecac728ec945e9b376a5fb2c4af54f337d744cf Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Fri Jan 12 14:52:43 2024 -0600 Alternative fix for SAI + CQLSSTableWriter in client mode --- src/java/org/apache/cassandra/config/Config.java | 18 +++++- .../statements/schema/CreateIndexStatement.java | 10 +-- .../org/apache/cassandra/db/ColumnFamilyStore.java | 44 +++++-------- .../db/memtable/AbstractAllocatorMemtable.java | 22 +------ .../apache/cassandra/db/memtable/TrieMemtable.java | 19 +----- .../cassandra/index/SecondaryIndexManager.java | 2 +- .../index/sai/StorageAttachedIndexGroup.java | 2 +- .../sai/disk/v1/segment/SegmentTrieBuffer.java | 4 +- .../cassandra/io/sstable/CQLSSTableWriter.java | 72 ++++------------------ .../locator/AbstractReplicationStrategy.java | 2 +- 10 files changed, 59 insertions(+), 136 deletions(-) diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index df1cf62d15..d5b91ef530 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -39,6 +39,7 @@ import org.apache.cassandra.audit.AuditLogOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.fql.FullQueryLoggerOptions; import org.apache.cassandra.index.internal.CassandraIndex; +import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.sstable.format.big.BigFormat; import org.apache.cassandra.service.StartupChecks.StartupCheckType; import org.apache.cassandra.utils.StorageCompatibilityMode; @@ -1164,7 +1165,22 @@ public class Config unslabbed_heap_buffers_logged, heap_buffers, offheap_buffers, - offheap_objects + offheap_objects; + + public BufferType toBufferType() + { + switch (this) + { + case unslabbed_heap_buffers: + case heap_buffers: + return BufferType.ON_HEAP; + case offheap_buffers: + case offheap_objects: + return BufferType.OFF_HEAP; + default: + throw new AssertionError(); + } + } } public enum DiskFailurePolicy diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java index 2c475f45f5..b53e066f90 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java @@ -80,11 +80,11 @@ public final class CreateIndexStatement extends AlterSchemaStatement public static final String KEYSPACE_DOES_NOT_MATCH_INDEX = "Keyspace name '%s' doesn't match index name '%s'"; public static final String MUST_SPECIFY_INDEX_IMPLEMENTATION = "Must specify index implementation via USING"; - public final String indexName; - public final String tableName; - public final List<IndexTarget.Raw> rawIndexTargets; - public final IndexAttributes attrs; - public final boolean ifNotExists; + private final String indexName; + private final String tableName; + private final List<IndexTarget.Raw> rawIndexTargets; + private final IndexAttributes attrs; + private final boolean ifNotExists; private ClientState state; diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 023aac5f27..bcf4dc7073 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -200,36 +200,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner are finished. By having flushExecutor size the same size as each of the perDiskflushExecutors we make sure we can have that many flushes going at the same time. */ - private static ExecutorPlus flushExecutor; + private static final ExecutorPlus flushExecutor = DatabaseDescriptor.isDaemonInitialized() + ? executorFactory().withJmxInternal().pooled("MemtableFlushWriter", getFlushWriters()) + : null; // post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed - private static ExecutorPlus postFlushExecutor; + private static final ExecutorPlus postFlushExecutor = DatabaseDescriptor.isDaemonInitialized() + ? executorFactory().withJmxInternal().sequential("MemtablePostFlush") + : null; - private static ExecutorPlus reclaimExecutor; + private static final ExecutorPlus reclaimExecutor = DatabaseDescriptor.isDaemonInitialized() + ? executorFactory().withJmxInternal().sequential("MemtableReclaimMemory") + : null; - private static PerDiskFlushExecutors perDiskflushExecutors; - - static - { - if (!DatabaseDescriptor.isClientOrToolInitialized()) - { - flushExecutor = executorFactory() - .withJmxInternal() - .pooled("MemtableFlushWriter", getFlushWriters()); - - postFlushExecutor = executorFactory() - .withJmxInternal() - .sequential("MemtablePostFlush"); - - reclaimExecutor = executorFactory() - .withJmxInternal() - .sequential("MemtableReclaimMemory"); - - perDiskflushExecutors = new PerDiskFlushExecutors(DatabaseDescriptor.getFlushWriters(), - DatabaseDescriptor.getNonLocalSystemKeyspacesDataFileLocations(), - DatabaseDescriptor.useSpecificLocationForLocalSystemData()); - } - } + private static final PerDiskFlushExecutors perDiskflushExecutors = DatabaseDescriptor.isDaemonInitialized() + ? new PerDiskFlushExecutors(DatabaseDescriptor.getFlushWriters(), + DatabaseDescriptor.getNonLocalSystemKeyspacesDataFileLocations(), + DatabaseDescriptor.useSpecificLocationForLocalSystemData()) + : null; /** * Reason for initiating a memtable flush. @@ -408,7 +396,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner indexManager.reload(); memtableFactory = metadata().params.memtable.factory(); - switchMemtableOrNotify(FlushReason.SCHEMA_CHANGE, Memtable::metadataUpdated); + + if (DatabaseDescriptor.isDaemonInitialized()) + switchMemtableOrNotify(FlushReason.SCHEMA_CHANGE, Memtable::metadataUpdated); } public static Runnable getBackgroundCompactionTaskSubmitter() diff --git a/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java b/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java index 99f321c8c5..8526dace39 100644 --- a/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java +++ b/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java @@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ImmediateExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.DataStorageSpec; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ClusteringComparator; import org.apache.cassandra.db.ColumnFamilyStore; @@ -79,24 +78,9 @@ public abstract class AbstractAllocatorMemtable extends AbstractMemtableWithComm static MemtablePool createMemtableAllocatorPool() { Config.MemtableAllocationType allocationType = DatabaseDescriptor.getMemtableAllocationType(); - - long heapLimit; - long offHeapLimit; - float memtableCleanupThreshold; - - if (DatabaseDescriptor.isClientOrToolInitialized()) - { - heapLimit = new DataStorageSpec.IntMebibytesBound((int) (Runtime.getRuntime().maxMemory() / (4 * 1048576))).toMebibytes() << 20; - offHeapLimit = new DataStorageSpec.IntMebibytesBound((int) (Runtime.getRuntime().maxMemory() / (4 * 1048576))).toMebibytes() << 20; - memtableCleanupThreshold = (float) (1.0 / (1 + 1)); - } - else - { - heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMiB() << 20; - offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMiB() << 20; - memtableCleanupThreshold = DatabaseDescriptor.getMemtableCleanupThreshold(); - } - + long heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMiB() << 20; + long offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMiB() << 20; + float memtableCleanupThreshold = DatabaseDescriptor.getMemtableCleanupThreshold(); MemtableCleaner cleaner = AbstractAllocatorMemtable::flushLargestMemtable; return createMemtableAllocatorPoolInternal(allocationType, heapLimit, offHeapLimit, memtableCleanupThreshold, cleaner); } diff --git a/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java b/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java index 294f1472f0..83b02db06a 100644 --- a/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java +++ b/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java @@ -91,24 +91,7 @@ public class TrieMemtable extends AbstractShardedMemtable private static final Logger logger = LoggerFactory.getLogger(TrieMemtable.class); /** Buffer type to use for memtable tries (on- vs off-heap) */ - public static final BufferType BUFFER_TYPE; - - static - { - switch (DatabaseDescriptor.getMemtableAllocationType()) - { - case unslabbed_heap_buffers: - case heap_buffers: - BUFFER_TYPE = BufferType.ON_HEAP; - break; - case offheap_buffers: - case offheap_objects: - BUFFER_TYPE = BufferType.OFF_HEAP; - break; - default: - throw new AssertionError(); - } - } + public static final BufferType BUFFER_TYPE = DatabaseDescriptor.getMemtableAllocationType().toBufferType(); /** If keys is below this length, we will use a recursive procedure for inserting data in the memtable trie. */ @VisibleForTesting diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index 0712aee6d8..da15282b39 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -238,7 +238,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum { try { - Callable<?> call = index.getInitializationTask(); + Callable<?> call = DatabaseDescriptor.isDaemonInitialized() ? index.getInitializationTask() : null; if (call != null) initialBuildTask = new FutureTask<>(call); } diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java index 07b0243a65..30d23f5de6 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java @@ -85,7 +85,7 @@ public class StorageAttachedIndexGroup implements Index.Group, INotificationCons private final SSTableContextManager contextManager; - public StorageAttachedIndexGroup(ColumnFamilyStore baseCfs) + StorageAttachedIndexGroup(ColumnFamilyStore baseCfs) { this.baseCfs = baseCfs; this.queryMetrics = new TableQueryMetrics(baseCfs.metadata()); diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBuffer.java b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBuffer.java index d1e79f91f5..72c9add313 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBuffer.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBuffer.java @@ -22,7 +22,7 @@ import java.util.Map; import java.util.concurrent.atomic.LongAdder; import javax.annotation.concurrent.NotThreadSafe; -import org.apache.cassandra.db.memtable.TrieMemtable; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.tries.InMemoryTrie; import org.apache.cassandra.index.sai.postings.PostingList; import org.apache.cassandra.index.sai.utils.IndexEntry; @@ -45,7 +45,7 @@ public class SegmentTrieBuffer public SegmentTrieBuffer() { - trie = new InMemoryTrie<>(TrieMemtable.BUFFER_TYPE); + trie = new InMemoryTrie<>(DatabaseDescriptor.getMemtableAllocationType().toBufferType()); postingsAccumulator = new PostingsAccumulator(); } diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index e4a4dd365b..bd2935d902 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -44,7 +44,6 @@ import org.apache.cassandra.cql3.statements.ModificationStatement; import org.apache.cassandra.cql3.statements.schema.CreateIndexStatement; import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; import org.apache.cassandra.cql3.statements.schema.CreateTypeStatement; -import org.apache.cassandra.cql3.statements.schema.IndexTarget; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; @@ -56,12 +55,9 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.SyntaxException; -import org.apache.cassandra.index.sai.StorageAttachedIndex; import org.apache.cassandra.index.sai.StorageAttachedIndexGroup; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.util.File; -import org.apache.cassandra.schema.IndexMetadata; -import org.apache.cassandra.schema.Indexes; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Keyspaces; @@ -652,33 +648,22 @@ public class CQLSSTableWriter implements Closeable KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspaceName); TableMetadata tableMetadata = ksm.tables.getNullable(schemaStatement.table()); - - ColumnFamilyStore cfs = null; - if (tableMetadata == null) { Types types = createTypes(keyspaceName); Schema.instance.transform(SchemaTransformations.addTypes(types, true)); tableMetadata = createTable(types); - Schema.instance.transform(SchemaTransformations.addTable(tableMetadata, true)); - } + Keyspace.setInitialized(); - if (buildIndexes && !indexStatements.isEmpty()) - { - cfs = new ColumnFamilyStore(Keyspace.mockKS(ksm), - tableMetadata.name, - () -> new SequenceBasedSSTableId(0), - TableMetadataRef.forOfflineTools(tableMetadata), - new Directories(tableMetadata, new Directories.DataDirectory[] { - new Directories.DataDirectory(directory) - }), - false, - false, - true); - - Indexes indexes = registerIndexes(cfs, indexStatements); - tableMetadata = tableMetadata.unbuild().indexes(indexes).build(); + if (buildIndexes && !indexStatements.isEmpty()) + { + tableMetadata = applyIndexes(ksm.withSwapped(ksm.tables.with(tableMetadata))); + Keyspace ks = Keyspace.openWithoutSSTables(keyspaceName); + Directories directories = new Directories(tableMetadata, Collections.singleton(new Directories.DataDirectory(new org.apache.cassandra.io.util.File(directory.toPath())))); + ColumnFamilyStore cfs = ColumnFamilyStore.createColumnFamilyStore(ks, tableMetadata.name, TableMetadataRef.forOfflineTools(tableMetadata), directories, false, false, true); + ks.initCfCustom(cfs); + } Schema.instance.transform(SchemaTransformations.addTable(tableMetadata, true)); } @@ -693,19 +678,13 @@ public class CQLSSTableWriter implements Closeable if (format != null) writer.setSSTableFormatType(format); - if (cfs != null) + if (buildIndexes) { - StorageAttachedIndexGroup saiGroup = StorageAttachedIndexGroup.getIndexGroup(cfs); + StorageAttachedIndexGroup saiGroup = StorageAttachedIndexGroup.getIndexGroup(Schema.instance.getColumnFamilyStoreInstance(tableMetadata.id)); if (saiGroup != null) writer.addIndexGroup(saiGroup); } - // this is the empty directory / leftover from times we initialized ColumnFamilyStore - // it will automatically create directories for keyspace and table on disk after initialization - // we set that directory to the destination of generated SSTables so we just remove empty directories here - if (cfs != null) - new File(directory, keyspaceName).deleteRecursive(); - return new CQLSSTableWriter(writer, preparedModificationStatement, preparedModificationStatement.getBindVariables()); } } @@ -723,35 +702,6 @@ public class CQLSSTableWriter implements Closeable return builder.build(); } - private Indexes registerIndexes(ColumnFamilyStore cfs, List<CreateIndexStatement.Raw> indexStatements) - { - ClientState state = ClientState.forInternalCalls(); - Indexes.Builder indexesBuilder = Indexes.builder(); - - for (CreateIndexStatement.Raw raw : indexStatements) - { - CreateIndexStatement statement = raw.prepare(state); - - List<IndexTarget> indexTargets = statement.rawIndexTargets.stream() - .map(r -> r.prepare(cfs.metadata())) - .collect(Collectors.toList()); - - IndexMetadata indexMetadata = IndexMetadata.fromIndexTargets(indexTargets, - statement.indexName, - IndexMetadata.Kind.CUSTOM, - statement.attrs.getOptions()); - - indexesBuilder.add(indexMetadata); - - - cfs.indexManager.registerIndex(new StorageAttachedIndex(cfs, indexMetadata), - StorageAttachedIndexGroup.GROUP_KEY, - () -> new StorageAttachedIndexGroup(cfs)); - } - - return indexesBuilder.build(); - } - /** * Applies any provided index definitions to the target table * @param ksm the KeyspaceMetadata object that has the table defined diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index b1f3561203..c32b7be2b0 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -324,7 +324,7 @@ public abstract class AbstractReplicationStrategy try { Constructor<? extends AbstractReplicationStrategy> constructor = strategyClass.getConstructor(parameterTypes); - IEndpointSnitch endpointSnitch = snitch == null ? new SimpleSnitch() : snitch; + IEndpointSnitch endpointSnitch = snitch == null && DatabaseDescriptor.isClientOrToolInitialized() ? new SimpleSnitch() : snitch; strategy = constructor.newInstance(keyspaceName, tokenMetadata, endpointSnitch, strategyOptions); } catch (InvocationTargetException e) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org