This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 5d4bcc797a Avoid exposing intermediate state while replaying log during startup 5d4bcc797a is described below commit 5d4bcc797af882c64736b3f842cbf8bedbba184b Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Thu Feb 1 10:16:41 2024 +0100 Avoid exposing intermediate state while replaying log during startup Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-19384 --- .../cassandra/cql3/functions/FunctionCall.java | 7 +++--- .../cassandra/cql3/functions/FunctionResolver.java | 24 ------------------ .../cql3/functions/masking/ColumnMask.java | 11 ++++---- .../cql3/selection/AbstractFunctionSelector.java | 3 ++- .../cassandra/cql3/selection/Selectable.java | 6 ++--- .../statements/schema/AlterTableStatement.java | 29 ++++++++++++++-------- .../statements/schema/CreateTableStatement.java | 22 ++++++++-------- .../cassandra/io/sstable/CQLSSTableWriter.java | 6 ++--- .../org/apache/cassandra/schema/UserFunctions.java | 17 +++++++++++++ .../apache/cassandra/service/StorageService.java | 4 +-- src/java/org/apache/cassandra/tcm/Startup.java | 8 ++++-- .../org/apache/cassandra/tcm/Transformation.java | 2 +- .../org/apache/cassandra/tcm/log/LocalLog.java | 14 ++++++----- .../org/apache/cassandra/tcm/log/LogState.java | 13 ++++++++++ test/unit/org/apache/cassandra/cql3/CQLTester.java | 1 + .../cql3/functions/NativeFunctionsTest.java | 4 ++- .../cql3/functions/masking/ColumnMaskTester.java | 3 +-- .../db/ColumnFamilyStoreClientModeTest.java | 3 ++- .../io/sstable/StressCQLSSTableWriter.java | 2 +- 19 files changed, 103 insertions(+), 76 deletions(-) diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java index 30d9d50437..c45964a997 100644 --- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java +++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java @@ -32,6 +32,7 @@ import org.apache.cassandra.cql3.terms.Term; import org.apache.cassandra.cql3.terms.Terms; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.UserFunctions; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.ByteBufferUtil; @@ -150,7 +151,7 @@ public class FunctionCall extends Term.NonTerminal public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException { - Function fun = FunctionResolver.get(keyspace, name, terms, receiver.ksName, receiver.cfName, receiver.type); + Function fun = FunctionResolver.get(keyspace, name, terms, receiver.ksName, receiver.cfName, receiver.type, UserFunctions.getCurrentUserFunctions(name, keyspace)); if (fun == null) throw invalidRequest("Unknown function %s called", name); if (fun.isAggregate()) @@ -194,7 +195,7 @@ public class FunctionCall extends Term.NonTerminal // later with a more helpful error message that if we were to return false here. try { - Function fun = FunctionResolver.get(keyspace, name, terms, receiver.ksName, receiver.cfName, receiver.type); + Function fun = FunctionResolver.get(keyspace, name, terms, receiver.ksName, receiver.cfName, receiver.type, UserFunctions.getCurrentUserFunctions(name, keyspace)); // Because the return type of functions built by factories is not fixed but depending on the types of // their arguments, we'll always get EXACT_MATCH. To handle potentially ambiguous function calls with @@ -221,7 +222,7 @@ public class FunctionCall extends Term.NonTerminal { try { - Function fun = FunctionResolver.get(keyspace, name, terms, null, null, null); + Function fun = FunctionResolver.get(keyspace, name, terms, null, null, null, UserFunctions.getCurrentUserFunctions(name, keyspace)); return fun == null ? null : fun.returnType(); } catch (InvalidRequestException e) diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionResolver.java b/src/java/org/apache/cassandra/cql3/functions/FunctionResolver.java index fd1c901b59..11cf24362c 100644 --- a/src/java/org/apache/cassandra/cql3/functions/FunctionResolver.java +++ b/src/java/org/apache/cassandra/cql3/functions/FunctionResolver.java @@ -29,7 +29,6 @@ import org.apache.cassandra.cql3.terms.Marker; import org.apache.cassandra.cql3.AssignmentTestable; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.ColumnSpecification; -import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.schema.UserFunctions; @@ -51,27 +50,6 @@ public final class FunctionResolver fun.argTypes().get(i)); } - /** - * @param keyspace the current keyspace - * @param name the name of the function - * @param providedArgs the arguments provided for the function call - * @param receiverKeyspace the receiver's keyspace - * @param receiverTable the receiver's table - * @param receiverType if the receiver type is known (during inserts, for example), this should be the type of - * the receiver - */ - @Nullable - public static Function get(String keyspace, - FunctionName name, - List<? extends AssignmentTestable> providedArgs, - String receiverKeyspace, - String receiverTable, - AbstractType<?> receiverType) - throws InvalidRequestException - { - return get(keyspace, name, providedArgs, receiverKeyspace, receiverTable, receiverType, UserFunctions.none()); - } - /** * @param keyspace the current keyspace * @param name the name of the function @@ -123,7 +101,6 @@ public final class FunctionResolver { // function name is fully qualified (keyspace + name) candidates.addAll(functions.get(name)); - candidates.addAll(Schema.instance.getUserFunctions(name)); candidates.addAll(NativeFunctions.instance.getFunctions(name)); candidates.addAll(NativeFunctions.instance.getFactories(name).stream() .map(f -> f.getOrCreateFunction(providedArgs, receiverType, receiverKeyspace, receiverTable)) @@ -136,7 +113,6 @@ public final class FunctionResolver // add 'current keyspace' candidates FunctionName userName = new FunctionName(keyspace, name.name); candidates.addAll(functions.get(userName)); - candidates.addAll(Schema.instance.getUserFunctions(userName)); // add 'SYSTEM' (native) candidates FunctionName nativeName = name.asNativeFunction(); candidates.addAll(NativeFunctions.instance.getFunctions(nativeName)); diff --git a/src/java/org/apache/cassandra/cql3/functions/masking/ColumnMask.java b/src/java/org/apache/cassandra/cql3/functions/masking/ColumnMask.java index 15e76a316b..644eeec611 100644 --- a/src/java/org/apache/cassandra/cql3/functions/masking/ColumnMask.java +++ b/src/java/org/apache/cassandra/cql3/functions/masking/ColumnMask.java @@ -124,7 +124,8 @@ public class ColumnMask .add(reversed) .addAll(partialArgumentTypes()) .build(); - Function newFunction = FunctionResolver.get(function.name().keyspace, function.name(), args, null, null, null); + + Function newFunction = FunctionResolver.get(function.name().keyspace, function.name(), args, null, null, null, UserFunctions.getCurrentUserFunctions(function.name())); assert newFunction != null; return new ColumnMask((ScalarFunction) newFunction, partialArgumentValues); } @@ -215,20 +216,20 @@ public class ColumnMask this.rawPartialArguments = rawPartialArguments; } - public ColumnMask prepare(String keyspace, String table, ColumnIdentifier column, AbstractType<?> type) + public ColumnMask prepare(String keyspace, String table, ColumnIdentifier column, AbstractType<?> type, UserFunctions functions) { - ScalarFunction function = findMaskingFunction(keyspace, table, column, type); + ScalarFunction function = findMaskingFunction(keyspace, table, column, type, functions); ByteBuffer[] partialArguments = preparePartialArguments(keyspace, function); return new ColumnMask(function, partialArguments); } - private ScalarFunction findMaskingFunction(String keyspace, String table, ColumnIdentifier column, AbstractType<?> type) + private ScalarFunction findMaskingFunction(String keyspace, String table, ColumnIdentifier column, AbstractType<?> type, UserFunctions functions) { List<AssignmentTestable> args = new ArrayList<>(rawPartialArguments.size() + 1); args.add(type); args.addAll(rawPartialArguments); - Function function = FunctionResolver.get(keyspace, name, args, keyspace, table, type); + Function function = FunctionResolver.get(keyspace, name, args, keyspace, table, type, functions); if (function == null) throw invalidRequest("Unable to find masking function for %s, " + diff --git a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java index f7853aee1f..d5403e4041 100644 --- a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java +++ b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java @@ -31,6 +31,7 @@ import org.apache.cassandra.cql3.functions.Arguments; import org.apache.cassandra.cql3.functions.FunctionResolver; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.UserFunctions; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.cql3.ColumnSpecification; @@ -68,7 +69,7 @@ abstract class AbstractFunctionSelector<T extends Function> extends Selector argTypes.add(readType(metadata, in)); } - Function function = FunctionResolver.get(metadata.keyspace, name, argTypes, metadata.keyspace, metadata.name, null); + Function function = FunctionResolver.get(metadata.keyspace, name, argTypes, metadata.keyspace, metadata.name, null, UserFunctions.getCurrentUserFunctions(name, metadata.keyspace)); if (function == null) throw new IOException(String.format("Unknown serialized function %s(%s)", diff --git a/src/java/org/apache/cassandra/cql3/selection/Selectable.java b/src/java/org/apache/cassandra/cql3/selection/Selectable.java index 4f08ace467..1fade9fa99 100644 --- a/src/java/org/apache/cassandra/cql3/selection/Selectable.java +++ b/src/java/org/apache/cassandra/cql3/selection/Selectable.java @@ -37,6 +37,7 @@ import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.UserFunctions; import org.apache.cassandra.utils.Pair; import static org.apache.cassandra.cql3.selection.SelectorFactories.createFactoriesAndCollectColumnDefinitions; @@ -419,8 +420,7 @@ public interface Selectable extends AssignmentTestable name = AggregateFcts.countRowsFunction.name(); preparedArgs = Collections.emptyList(); } - - Function fun = FunctionResolver.get(table.keyspace, name, preparedArgs, table.keyspace, table.name, null); + Function fun = FunctionResolver.get(table.keyspace, name, preparedArgs, table.keyspace, table.name, null, UserFunctions.getCurrentUserFunctions(name, table.keyspace)); if (fun == null) throw new InvalidRequestException(String.format("Unknown function '%s'", functionName)); @@ -462,7 +462,7 @@ public interface Selectable extends AssignmentTestable return factory; FunctionName name = FunctionName.nativeFunction(CastFcts.getFunctionName(type)); - Function fun = FunctionResolver.get(table.keyspace, name, args, table.keyspace, table.name, null); + Function fun = FunctionResolver.get(table.keyspace, name, args, table.keyspace, table.name, null, UserFunctions.getCurrentUserFunctions(name, table.keyspace)); if (fun == null) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java index 24f9535d9f..15ed785ca6 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java @@ -60,6 +60,7 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.schema.MemtableParams; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableParams; +import org.apache.cassandra.schema.UserFunctions; import org.apache.cassandra.schema.ViewMetadata; import org.apache.cassandra.schema.Views; import org.apache.cassandra.service.ClientState; @@ -120,7 +121,7 @@ public abstract class AlterTableStatement extends AlterSchemaStatement if (table.isView()) throw ire("Cannot use ALTER TABLE on a materialized view; use ALTER MATERIALIZED VIEW instead"); - return schema.withAddedOrUpdated(apply(metadata.nextEpoch(), keyspace, table)); + return schema.withAddedOrUpdated(apply(metadata.nextEpoch(), keyspace, table, metadata)); } SchemaChange schemaChangeEvent(KeyspacesDiff diff) @@ -144,7 +145,7 @@ public abstract class AlterTableStatement extends AlterSchemaStatement return format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, tableName); } - abstract KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table); + abstract KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table, ClusterMetadata metadata); /** * {@code ALTER TABLE [IF EXISTS] <table> ALTER <column> TYPE <newtype>;} @@ -158,7 +159,7 @@ public abstract class AlterTableStatement extends AlterSchemaStatement super(keyspaceName, tableName, ifTableExists); } - public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table) + public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table, ClusterMetadata metadata) { throw ire("Altering column types is no longer supported"); } @@ -198,7 +199,7 @@ public abstract class AlterTableStatement extends AlterSchemaStatement } @Override - public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table) + public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table, ClusterMetadata metadata) { ColumnMetadata column = table.getColumn(columnName); @@ -210,8 +211,14 @@ public abstract class AlterTableStatement extends AlterSchemaStatement return keyspace; } + // add all user functions to be able to give a good error message to the user if the alter references + // a function from another keyspace + UserFunctions.Builder ufBuilder = UserFunctions.builder(); + for (KeyspaceMetadata ksm : metadata.schema.getKeyspaces()) + ufBuilder.add(ksm.userFunctions); + ColumnMask oldMask = table.getColumn(columnName).getMask(); - ColumnMask newMask = rawMask == null ? null : rawMask.prepare(keyspace.name, table.name, columnName, column.type); + ColumnMask newMask = rawMask == null ? null : rawMask.prepare(keyspace.name, table.name, columnName, column.type, ufBuilder.build()); if (Objects.equals(oldMask, newMask)) return keyspace; @@ -276,7 +283,7 @@ public abstract class AlterTableStatement extends AlterSchemaStatement newColumns.forEach(c -> c.type.validate(state, "Column " + c.name)); } - public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table) + public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table, ClusterMetadata metadata) { Guardrails.alterTableEnabled.ensureEnabled("ALTER TABLE changing columns", state); TableMetadata.Builder tableBuilder = table.unbuild().epoch(epoch); @@ -302,7 +309,7 @@ public abstract class AlterTableStatement extends AlterSchemaStatement ColumnIdentifier name = column.name; AbstractType<?> type = column.type.prepare(keyspaceName, keyspace.types).getType(); boolean isStatic = column.isStatic; - ColumnMask mask = column.mask == null ? null : column.mask.prepare(keyspaceName, tableName, name, type); + ColumnMask mask = column.mask == null ? null : column.mask.prepare(keyspaceName, tableName, name, type, keyspace.userFunctions); if (null != tableBuilder.getColumn(name)) { if (!ifColumnNotExists) @@ -413,7 +420,7 @@ public abstract class AlterTableStatement extends AlterSchemaStatement this.timestamp = timestamp; } - public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table) + public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table, ClusterMetadata metadata) { Guardrails.alterTableEnabled.ensureEnabled("ALTER TABLE changing columns", state); TableMetadata.Builder builder = table.unbuild(); @@ -475,7 +482,7 @@ public abstract class AlterTableStatement extends AlterSchemaStatement this.ifColumnsExists = ifColumnsExists; } - public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table) + public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table, ClusterMetadata metadata) { Guardrails.alterTableEnabled.ensureEnabled("ALTER TABLE changing columns", state); TableMetadata.Builder tableBuilder = table.unbuild().epoch(epoch); @@ -553,7 +560,7 @@ public abstract class AlterTableStatement extends AlterSchemaStatement validateDefaultTimeToLive(attrs.asNewTableParams()); } - public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table) + public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table, ClusterMetadata metadata) { attrs.validate(); @@ -597,7 +604,7 @@ public abstract class AlterTableStatement extends AlterSchemaStatement super(keyspaceName, tableName, ifTableExists); } - public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table) + public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table, ClusterMetadata metadata) { if (!DatabaseDescriptor.enableDropCompactStorage()) throw new InvalidRequestException("DROP COMPACT STORAGE is disabled. Enable in cassandra.yaml to use."); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java index b606a96d99..264877db7c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java @@ -26,9 +26,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.audit.AuditLogContext; import org.apache.cassandra.audit.AuditLogEntryType; import org.apache.cassandra.auth.DataResource; @@ -55,7 +52,6 @@ import static com.google.common.collect.Iterables.concat; public final class CreateTableStatement extends AlterSchemaStatement { - private static final Logger logger = LoggerFactory.getLogger(CreateTableStatement.class); private final String tableName; private final Map<ColumnIdentifier, ColumnProperties.Raw> rawColumns; @@ -110,7 +106,13 @@ public final class CreateTableStatement extends AlterSchemaStatement throw new AlreadyExistsException(keyspaceName, tableName); } - TableMetadata.Builder builder = builder(keyspace.types).epoch(metadata.nextEpoch()); + // add all user functions to be able to give a good error message to the user if the alter references + // a function from another keyspace + UserFunctions.Builder ufBuilder = UserFunctions.builder().add(); + for (KeyspaceMetadata ksm : schema) + ufBuilder.add(ksm.userFunctions); + + TableMetadata.Builder builder = builder(keyspace.types, ufBuilder.build()).epoch(metadata.nextEpoch()); if (!builder.hasId() && !DatabaseDescriptor.useDeterministicTableID()) builder.id(TableId.get(metadata)); TableMetadata table = builder.build(); @@ -189,14 +191,14 @@ public final class CreateTableStatement extends AlterSchemaStatement return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, tableName); } - public TableMetadata.Builder builder(Types types) + public TableMetadata.Builder builder(Types types, UserFunctions functions) { attrs.validate(); TableParams params = attrs.asNewTableParams(); // use a TreeMap to preserve ordering across JDK versions (see CASSANDRA-9492) - important for stable unit tests Map<ColumnIdentifier, ColumnProperties> columns = new TreeMap<>(comparing(o -> o.bytes)); - rawColumns.forEach((column, properties) -> columns.put(column, properties.prepare(keyspaceName, tableName, column, types))); + rawColumns.forEach((column, properties) -> columns.put(column, properties.prepare(keyspaceName, tableName, column, types, functions))); // check for nested non-frozen UDTs or collections in a non-frozen UDT columns.forEach((column, properties) -> @@ -469,7 +471,7 @@ public final class CreateTableStatement extends AlterSchemaStatement return CQLFragmentParser.parseAny(CqlParser::createTableStatement, cql, "CREATE TABLE") .keyspace(keyspace) .prepare(null) // works around a messy ClientState/QueryProcessor class init deadlock - .builder(Types.none()); + .builder(Types.none(), UserFunctions.none()); } public final static class Raw extends CQLStatement.Raw @@ -614,11 +616,11 @@ public final class CreateTableStatement extends AlterSchemaStatement ColumnMask.ensureEnabled(); } - public ColumnProperties prepare(String keyspace, String table, ColumnIdentifier column, Types udts) + public ColumnProperties prepare(String keyspace, String table, ColumnIdentifier column, Types udts, UserFunctions functions) { CQL3Type cqlType = rawType.prepare(keyspace, udts); AbstractType<?> type = cqlType.getType(); - ColumnMask mask = rawMask == null ? null : rawMask.prepare(keyspace, table, column, type); + ColumnMask mask = rawMask == null ? null : rawMask.prepare(keyspace, table, column, type, functions); return new ColumnProperties(type, cqlType, mask); } } diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 3211b9576a..cd2f7c79e8 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -663,7 +663,7 @@ public class CQLSSTableWriter implements Closeable { Types types = createTypes(keyspaceName); Schema.instance.submit(SchemaTransformations.addTypes(types, true)); - tableMetadata = createTable(types); + tableMetadata = createTable(types, ksm.userFunctions); Schema.instance.submit(SchemaTransformations.addTable(tableMetadata, true)); if (buildIndexes && !indexStatements.isEmpty()) @@ -772,13 +772,13 @@ public class CQLSSTableWriter implements Closeable * * @param types types this table should be created with */ - private TableMetadata createTable(Types types) + private TableMetadata createTable(Types types, UserFunctions functions) { ClientState state = ClientState.forInternalCalls(); CreateTableStatement statement = schemaStatement.prepare(state); statement.validate(ClientState.forInternalCalls()); - TableMetadata.Builder builder = statement.builder(types); + TableMetadata.Builder builder = statement.builder(types, functions); if (partitioner != null) builder.partitioner(partitioner); diff --git a/src/java/org/apache/cassandra/schema/UserFunctions.java b/src/java/org/apache/cassandra/schema/UserFunctions.java index 027fe71bc7..b8823d7d85 100644 --- a/src/java/org/apache/cassandra/schema/UserFunctions.java +++ b/src/java/org/apache/cassandra/schema/UserFunctions.java @@ -31,6 +31,7 @@ import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.serialization.UDTAwareMetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; @@ -252,6 +253,22 @@ public final class UserFunctions implements Iterable<UserFunction> .build(); } + public static UserFunctions getCurrentUserFunctions(FunctionName name, String keyspace) + { + KeyspaceMetadata ksm = ClusterMetadata.current().schema.getKeyspaces().getNullable(name.hasKeyspace() ? name.keyspace : keyspace); + UserFunctions userFunctions = UserFunctions.none(); + if (ksm != null) + userFunctions = ksm.userFunctions; + return userFunctions; + } + + public static UserFunctions getCurrentUserFunctions(FunctionName name) + { + if (!name.hasKeyspace()) + return UserFunctions.none(); + return getCurrentUserFunctions(name, null); + } + @Override public boolean equals(Object o) { diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 1ea091e771..8a8e7812e3 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -484,11 +484,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE jmxObjectName = "org.apache.cassandra.db:type=StorageService"; sstablesTracker = new SSTablesGlobalTracker(DatabaseDescriptor.getSelectedSSTableFormat()); - registerMBeans(); } - private void registerMBeans() + public void registerMBeans() { + logger.info("Initializing storage service mbean"); MBeanWrapper.instance.registerMBean(this, jmxObjectName); MBeanWrapper.instance.registerMBean(StreamManager.instance, StreamManager.OBJECT_NAME); } diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index e9fed3eb1c..99e36fb4b2 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -142,7 +142,8 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; { LocalLog.LogSpec logSpec = LocalLog.logSpec() .withStorage(LogStorage.SystemKeyspace) - .afterReplay(Startup::scrubDataDirectories) + .afterReplay(Startup::scrubDataDirectories, + (metadata) -> StorageService.instance.registerMBeans()) .withDefaultListeners(); ClusterMetadataService.setInstance(new ClusterMetadataService(new UniformRangePlacement(), wrapProcessor, @@ -245,7 +246,8 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables(SystemKeyspace.allKnownDatacenters()); LocalLog.LogSpec logSpec = LocalLog.logSpec() .withInitialState(emptyFromSystemTables) - .afterReplay(Startup::scrubDataDirectories) + .afterReplay(Startup::scrubDataDirectories, + (metadata) -> StorageService.instance.registerMBeans()) .withStorage(LogStorage.SystemKeyspace) .withDefaultListeners(); @@ -305,6 +307,8 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; metadata = metadata.forceEpoch(metadata.epoch.nextEpoch()); ClusterMetadataService.unsetInstance(); LocalLog.LogSpec logSpec = LocalLog.logSpec() + .afterReplay(Startup::scrubDataDirectories, + (_metadata) -> StorageService.instance.registerMBeans()) .withPreviousState(prev) .withInitialState(metadata) .withStorage(LogStorage.SystemKeyspace) diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java b/src/java/org/apache/cassandra/tcm/Transformation.java index 943450aa53..df86c0676b 100644 --- a/src/java/org/apache/cassandra/tcm/Transformation.java +++ b/src/java/org/apache/cassandra/tcm/Transformation.java @@ -148,7 +148,7 @@ public interface Transformation public Success success() { - throw new IllegalStateException("Can't dereference Success for a rejected execution"); + throw new IllegalStateException("Can't dereference Success for a rejected execution: " + code + ": " + reason); } public Rejected rejected() diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index 237a426320..bb878f5b12 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -20,6 +20,7 @@ package org.apache.cassandra.tcm.log; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.NoSuchElementException; @@ -33,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,7 +109,7 @@ public abstract class LocalLog implements Closeable { private ClusterMetadata initial; private ClusterMetadata prev; - private Startup.AfterReplay afterReplay = (metadata) -> {}; + private List<Startup.AfterReplay> afterReplay = Collections.emptyList(); private LogStorage storage = LogStorage.None; private boolean async = true; private boolean defaultListeners = false; @@ -202,9 +204,9 @@ public abstract class LocalLog implements Closeable return this; } - public LogSpec afterReplay(Startup.AfterReplay afterReplay) + public LogSpec afterReplay(Startup.AfterReplay ... afterReplay) { - this.afterReplay = afterReplay; + this.afterReplay = Lists.newArrayList(afterReplay); return this; } @@ -541,7 +543,7 @@ public abstract class LocalLog implements Closeable if (replayComplete.get()) throw new IllegalStateException("Can only replay persisted once."); LogState logState = storage.getPersistedLogState(); - append(logState); + append(logState.flatten()); return waitForHighestConsecutive(); } @@ -612,7 +614,8 @@ public abstract class LocalLog implements Closeable public ClusterMetadata ready() throws StartupException { ClusterMetadata metadata = replayPersisted(); - spec.afterReplay.accept(metadata); + for (Startup.AfterReplay ar : spec.afterReplay) + ar.accept(metadata); logger.info("Marking LocalLog ready at epoch {}", metadata.epoch); if (!replayComplete.compareAndSet(false, true)) @@ -634,7 +637,6 @@ public abstract class LocalLog implements Closeable logger.info("Notifying all registered listeners of both pre and post commit event"); notifyListeners(spec.prev); - return metadata; } diff --git a/src/java/org/apache/cassandra/tcm/log/LogState.java b/src/java/org/apache/cassandra/tcm/log/LogState.java index f1f8fcd8ad..6b8fd1a1ea 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogState.java +++ b/src/java/org/apache/cassandra/tcm/log/LogState.java @@ -95,6 +95,19 @@ public class LogState return new LogState(baseState, ImmutableList.of()); } + public LogState flatten() + { + if (baseState == null && entries.isEmpty()) + return this; + ClusterMetadata metadata = baseState; + if (metadata == null) + metadata = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); + for (Entry entry : entries) + metadata = entry.transform.execute(metadata).success().metadata; + return LogState.make(metadata); + } + + public boolean isEmpty() { return baseState == null && entries.isEmpty(); diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 18beee3d2a..cb96bfff24 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -418,6 +418,7 @@ public abstract class CQLTester ServerTestUtils.daemonInitialization(); if (ROW_CACHE_SIZE_IN_MIB > 0) DatabaseDescriptor.setRowCacheSizeInMiB(ROW_CACHE_SIZE_IN_MIB); + StorageService.instance.registerMBeans(); StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance); // Once per-JVM is enough prepareServer(); diff --git a/test/unit/org/apache/cassandra/cql3/functions/NativeFunctionsTest.java b/test/unit/org/apache/cassandra/cql3/functions/NativeFunctionsTest.java index 3bf3543ef6..be55d17038 100644 --- a/test/unit/org/apache/cassandra/cql3/functions/NativeFunctionsTest.java +++ b/test/unit/org/apache/cassandra/cql3/functions/NativeFunctionsTest.java @@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils; import org.junit.Test; import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.UserFunctions; import org.assertj.core.api.Assertions; public class NativeFunctionsTest @@ -144,7 +145,8 @@ public class NativeFunctionsTest function.argTypes, null, null, - function.returnType); + function.returnType, + UserFunctions.none()); Assertions.assertThat(newFunction).isNotNull(); Assertions.assertThat(function).isNotEqualTo(newFunction); diff --git a/test/unit/org/apache/cassandra/cql3/functions/masking/ColumnMaskTester.java b/test/unit/org/apache/cassandra/cql3/functions/masking/ColumnMaskTester.java index f5a0b702b9..eadc7520f5 100644 --- a/test/unit/org/apache/cassandra/cql3/functions/masking/ColumnMaskTester.java +++ b/test/unit/org/apache/cassandra/cql3/functions/masking/ColumnMaskTester.java @@ -41,7 +41,6 @@ import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.SchemaKeyspace; import org.apache.cassandra.schema.SchemaKeyspaceTables; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.schema.UserFunctions; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; @@ -133,7 +132,7 @@ public class ColumnMaskTester extends CQLTester UntypedResultSet columnRows = execute("SELECT * FROM system_schema.columns " + "WHERE keyspace_name = ? AND table_name = ? AND column_name = ?", KEYSPACE, table, column); - ColumnMetadata persistedColumn = SchemaKeyspace.createColumnFromRow(columnRows.one(), keyspaceMetadata.types, UserFunctions.none()); + ColumnMetadata persistedColumn = SchemaKeyspace.createColumnFromRow(columnRows.one(), keyspaceMetadata.types, keyspaceMetadata.userFunctions); // Verify the column mask in the persisted schema ColumnMask savedMask = persistedColumn.getMask(); diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreClientModeTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreClientModeTest.java index bf9f9dc630..0550237b74 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreClientModeTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreClientModeTest.java @@ -39,6 +39,7 @@ import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.schema.Types; +import org.apache.cassandra.schema.UserFunctions; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; @@ -86,7 +87,7 @@ public class ColumnFamilyStoreClientModeTest ClientState state = ClientState.forInternalCalls(KEYSPACE); CreateTableStatement statement = schemaStatement.prepare(state); statement.validate(state); - TableMetadata tableMetadata = statement.builder(types) + TableMetadata tableMetadata = statement.builder(types, UserFunctions.none()) .id(TableId.fromUUID(UUID.nameUUIDFromBytes(ArrayUtils.addAll(schemaStatement.keyspace().getBytes(), schemaStatement.table().getBytes())))) .partitioner(Murmur3Partitioner.instance) .build(); diff --git a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java index 83aafece69..3745f717be 100644 --- a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java +++ b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java @@ -636,7 +636,7 @@ public class StressCQLSSTableWriter implements Closeable statement.validate(state); //Build metadata with a portable tableId - tableMetadata = statement.builder(ksm.types) + tableMetadata = statement.builder(ksm.types, ksm.userFunctions) .id(deterministicId(schemaStatement.keyspace(), schemaStatement.table())) .build(); Tables tables = Tables.of(tableMetadata); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org