This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5d4bcc797a Avoid exposing intermediate state while replaying log 
during startup
5d4bcc797a is described below

commit 5d4bcc797af882c64736b3f842cbf8bedbba184b
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Thu Feb 1 10:16:41 2024 +0100

    Avoid exposing intermediate state while replaying log during startup
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-19384
---
 .../cassandra/cql3/functions/FunctionCall.java     |  7 +++---
 .../cassandra/cql3/functions/FunctionResolver.java | 24 ------------------
 .../cql3/functions/masking/ColumnMask.java         | 11 ++++----
 .../cql3/selection/AbstractFunctionSelector.java   |  3 ++-
 .../cassandra/cql3/selection/Selectable.java       |  6 ++---
 .../statements/schema/AlterTableStatement.java     | 29 ++++++++++++++--------
 .../statements/schema/CreateTableStatement.java    | 22 ++++++++--------
 .../cassandra/io/sstable/CQLSSTableWriter.java     |  6 ++---
 .../org/apache/cassandra/schema/UserFunctions.java | 17 +++++++++++++
 .../apache/cassandra/service/StorageService.java   |  4 +--
 src/java/org/apache/cassandra/tcm/Startup.java     |  8 ++++--
 .../org/apache/cassandra/tcm/Transformation.java   |  2 +-
 .../org/apache/cassandra/tcm/log/LocalLog.java     | 14 ++++++-----
 .../org/apache/cassandra/tcm/log/LogState.java     | 13 ++++++++++
 test/unit/org/apache/cassandra/cql3/CQLTester.java |  1 +
 .../cql3/functions/NativeFunctionsTest.java        |  4 ++-
 .../cql3/functions/masking/ColumnMaskTester.java   |  3 +--
 .../db/ColumnFamilyStoreClientModeTest.java        |  3 ++-
 .../io/sstable/StressCQLSSTableWriter.java         |  2 +-
 19 files changed, 103 insertions(+), 76 deletions(-)

diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java 
b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index 30d9d50437..c45964a997 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.cql3.terms.Term;
 import org.apache.cassandra.cql3.terms.Terms;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.UserFunctions;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -150,7 +151,7 @@ public class FunctionCall extends Term.NonTerminal
 
         public Term prepare(String keyspace, ColumnSpecification receiver) 
throws InvalidRequestException
         {
-            Function fun = FunctionResolver.get(keyspace, name, terms, 
receiver.ksName, receiver.cfName, receiver.type);
+            Function fun = FunctionResolver.get(keyspace, name, terms, 
receiver.ksName, receiver.cfName, receiver.type, 
UserFunctions.getCurrentUserFunctions(name, keyspace));
             if (fun == null)
                 throw invalidRequest("Unknown function %s called", name);
             if (fun.isAggregate())
@@ -194,7 +195,7 @@ public class FunctionCall extends Term.NonTerminal
             // later with a more helpful error message that if we were to 
return false here.
             try
             {
-                Function fun = FunctionResolver.get(keyspace, name, terms, 
receiver.ksName, receiver.cfName, receiver.type);
+                Function fun = FunctionResolver.get(keyspace, name, terms, 
receiver.ksName, receiver.cfName, receiver.type, 
UserFunctions.getCurrentUserFunctions(name, keyspace));
 
                 // Because the return type of functions built by factories is 
not fixed but depending on the types of
                 // their arguments, we'll always get EXACT_MATCH.  To handle 
potentially ambiguous function calls with
@@ -221,7 +222,7 @@ public class FunctionCall extends Term.NonTerminal
         {
             try
             {
-                Function fun = FunctionResolver.get(keyspace, name, terms, 
null, null, null);
+                Function fun = FunctionResolver.get(keyspace, name, terms, 
null, null, null, UserFunctions.getCurrentUserFunctions(name, keyspace));
                 return fun == null ? null : fun.returnType();
             }
             catch (InvalidRequestException e)
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionResolver.java 
b/src/java/org/apache/cassandra/cql3/functions/FunctionResolver.java
index fd1c901b59..11cf24362c 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionResolver.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionResolver.java
@@ -29,7 +29,6 @@ import org.apache.cassandra.cql3.terms.Marker;
 import org.apache.cassandra.cql3.AssignmentTestable;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.ColumnSpecification;
-import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.schema.UserFunctions;
@@ -51,27 +50,6 @@ public final class FunctionResolver
                                        fun.argTypes().get(i));
     }
 
-    /**
-     * @param keyspace the current keyspace
-     * @param name the name of the function
-     * @param providedArgs the arguments provided for the function call
-     * @param receiverKeyspace the receiver's keyspace
-     * @param receiverTable the receiver's table
-     * @param receiverType if the receiver type is known (during inserts, for 
example), this should be the type of
-     *                     the receiver
-     */
-    @Nullable
-    public static Function get(String keyspace,
-                               FunctionName name,
-                               List<? extends AssignmentTestable> providedArgs,
-                               String receiverKeyspace,
-                               String receiverTable,
-                               AbstractType<?> receiverType)
-    throws InvalidRequestException
-    {
-        return get(keyspace, name, providedArgs, receiverKeyspace, 
receiverTable, receiverType, UserFunctions.none());
-    }
-
     /**
      * @param keyspace the current keyspace
      * @param name the name of the function
@@ -123,7 +101,6 @@ public final class FunctionResolver
         {
             // function name is fully qualified (keyspace + name)
             candidates.addAll(functions.get(name));
-            candidates.addAll(Schema.instance.getUserFunctions(name));
             candidates.addAll(NativeFunctions.instance.getFunctions(name));
             
candidates.addAll(NativeFunctions.instance.getFactories(name).stream()
                                             .map(f -> 
f.getOrCreateFunction(providedArgs, receiverType, receiverKeyspace, 
receiverTable))
@@ -136,7 +113,6 @@ public final class FunctionResolver
             // add 'current keyspace' candidates
             FunctionName userName = new FunctionName(keyspace, name.name);
             candidates.addAll(functions.get(userName));
-            candidates.addAll(Schema.instance.getUserFunctions(userName));
             // add 'SYSTEM' (native) candidates
             FunctionName nativeName = name.asNativeFunction();
             
candidates.addAll(NativeFunctions.instance.getFunctions(nativeName));
diff --git 
a/src/java/org/apache/cassandra/cql3/functions/masking/ColumnMask.java 
b/src/java/org/apache/cassandra/cql3/functions/masking/ColumnMask.java
index 15e76a316b..644eeec611 100644
--- a/src/java/org/apache/cassandra/cql3/functions/masking/ColumnMask.java
+++ b/src/java/org/apache/cassandra/cql3/functions/masking/ColumnMask.java
@@ -124,7 +124,8 @@ public class ColumnMask
                                                   .add(reversed)
                                                   
.addAll(partialArgumentTypes())
                                                   .build();
-        Function newFunction = FunctionResolver.get(function.name().keyspace, 
function.name(), args, null, null, null);
+
+        Function newFunction = FunctionResolver.get(function.name().keyspace, 
function.name(), args, null, null, null, 
UserFunctions.getCurrentUserFunctions(function.name()));
         assert newFunction != null;
         return new ColumnMask((ScalarFunction) newFunction, 
partialArgumentValues);
     }
@@ -215,20 +216,20 @@ public class ColumnMask
             this.rawPartialArguments = rawPartialArguments;
         }
 
-        public ColumnMask prepare(String keyspace, String table, 
ColumnIdentifier column, AbstractType<?> type)
+        public ColumnMask prepare(String keyspace, String table, 
ColumnIdentifier column, AbstractType<?> type, UserFunctions functions)
         {
-            ScalarFunction function = findMaskingFunction(keyspace, table, 
column, type);
+            ScalarFunction function = findMaskingFunction(keyspace, table, 
column, type, functions);
             ByteBuffer[] partialArguments = preparePartialArguments(keyspace, 
function);
             return new ColumnMask(function, partialArguments);
         }
 
-        private ScalarFunction findMaskingFunction(String keyspace, String 
table, ColumnIdentifier column, AbstractType<?> type)
+        private ScalarFunction findMaskingFunction(String keyspace, String 
table, ColumnIdentifier column, AbstractType<?> type, UserFunctions functions)
         {
             List<AssignmentTestable> args = new 
ArrayList<>(rawPartialArguments.size() + 1);
             args.add(type);
             args.addAll(rawPartialArguments);
 
-            Function function = FunctionResolver.get(keyspace, name, args, 
keyspace, table, type);
+            Function function = FunctionResolver.get(keyspace, name, args, 
keyspace, table, type, functions);
 
             if (function == null)
                 throw invalidRequest("Unable to find masking function for %s, 
" +
diff --git 
a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java 
b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
index f7853aee1f..d5403e4041 100644
--- a/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
+++ b/src/java/org/apache/cassandra/cql3/selection/AbstractFunctionSelector.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.cql3.functions.Arguments;
 import org.apache.cassandra.cql3.functions.FunctionResolver;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.UserFunctions;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.cql3.ColumnSpecification;
@@ -68,7 +69,7 @@ abstract class AbstractFunctionSelector<T extends Function> 
extends Selector
                 argTypes.add(readType(metadata, in));
             }
 
-            Function function = FunctionResolver.get(metadata.keyspace, name, 
argTypes, metadata.keyspace, metadata.name, null);
+            Function function = FunctionResolver.get(metadata.keyspace, name, 
argTypes, metadata.keyspace, metadata.name, null, 
UserFunctions.getCurrentUserFunctions(name, metadata.keyspace));
 
             if (function == null)
                 throw new IOException(String.format("Unknown serialized 
function %s(%s)",
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selectable.java 
b/src/java/org/apache/cassandra/cql3/selection/Selectable.java
index 4f08ace467..1fade9fa99 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selectable.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selectable.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.UserFunctions;
 import org.apache.cassandra.utils.Pair;
 
 import static 
org.apache.cassandra.cql3.selection.SelectorFactories.createFactoriesAndCollectColumnDefinitions;
@@ -419,8 +420,7 @@ public interface Selectable extends AssignmentTestable
                     name = AggregateFcts.countRowsFunction.name();
                     preparedArgs = Collections.emptyList();
                 }
-
-                Function fun = FunctionResolver.get(table.keyspace, name, 
preparedArgs, table.keyspace, table.name, null);
+                Function fun = FunctionResolver.get(table.keyspace, name, 
preparedArgs, table.keyspace, table.name, null, 
UserFunctions.getCurrentUserFunctions(name, table.keyspace));
 
                 if (fun == null)
                     throw new InvalidRequestException(String.format("Unknown 
function '%s'", functionName));
@@ -462,7 +462,7 @@ public interface Selectable extends AssignmentTestable
                 return factory;
 
             FunctionName name = 
FunctionName.nativeFunction(CastFcts.getFunctionName(type));
-            Function fun = FunctionResolver.get(table.keyspace, name, args, 
table.keyspace, table.name, null);
+            Function fun = FunctionResolver.get(table.keyspace, name, args, 
table.keyspace, table.name, null, UserFunctions.getCurrentUserFunctions(name, 
table.keyspace));
 
             if (fun == null)
             {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
index 24f9535d9f..15ed785ca6 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
@@ -60,6 +60,7 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.schema.MemtableParams;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.schema.UserFunctions;
 import org.apache.cassandra.schema.ViewMetadata;
 import org.apache.cassandra.schema.Views;
 import org.apache.cassandra.service.ClientState;
@@ -120,7 +121,7 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
         if (table.isView())
             throw ire("Cannot use ALTER TABLE on a materialized view; use 
ALTER MATERIALIZED VIEW instead");
 
-        return schema.withAddedOrUpdated(apply(metadata.nextEpoch(), keyspace, 
table));
+        return schema.withAddedOrUpdated(apply(metadata.nextEpoch(), keyspace, 
table, metadata));
     }
 
     SchemaChange schemaChangeEvent(KeyspacesDiff diff)
@@ -144,7 +145,7 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
         return format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, 
tableName);
     }
 
-    abstract KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table);
+    abstract KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table, ClusterMetadata metadata);
 
     /**
      * {@code ALTER TABLE [IF EXISTS] <table> ALTER <column> TYPE <newtype>;}
@@ -158,7 +159,7 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             super(keyspaceName, tableName, ifTableExists);
         }
 
-        public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table)
+        public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table, ClusterMetadata metadata)
         {
             throw ire("Altering column types is no longer supported");
         }
@@ -198,7 +199,7 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
         }
 
         @Override
-        public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table)
+        public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table, ClusterMetadata metadata)
         {
             ColumnMetadata column = table.getColumn(columnName);
 
@@ -210,8 +211,14 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
                 return keyspace;
             }
 
+            // add all user functions to be able to give a good error message 
to the user if the alter references
+            // a function from another keyspace
+            UserFunctions.Builder ufBuilder = UserFunctions.builder();
+            for (KeyspaceMetadata ksm : metadata.schema.getKeyspaces())
+                ufBuilder.add(ksm.userFunctions);
+
             ColumnMask oldMask = table.getColumn(columnName).getMask();
-            ColumnMask newMask = rawMask == null ? null : 
rawMask.prepare(keyspace.name, table.name, columnName, column.type);
+            ColumnMask newMask = rawMask == null ? null : 
rawMask.prepare(keyspace.name, table.name, columnName, column.type, 
ufBuilder.build());
 
             if (Objects.equals(oldMask, newMask))
                 return keyspace;
@@ -276,7 +283,7 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             newColumns.forEach(c -> c.type.validate(state, "Column " + 
c.name));
         }
 
-        public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table)
+        public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table, ClusterMetadata metadata)
         {
             Guardrails.alterTableEnabled.ensureEnabled("ALTER TABLE changing 
columns", state);
             TableMetadata.Builder tableBuilder = table.unbuild().epoch(epoch);
@@ -302,7 +309,7 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             ColumnIdentifier name = column.name;
             AbstractType<?> type = column.type.prepare(keyspaceName, 
keyspace.types).getType();
             boolean isStatic = column.isStatic;
-            ColumnMask mask = column.mask == null ? null : 
column.mask.prepare(keyspaceName, tableName, name, type);
+            ColumnMask mask = column.mask == null ? null : 
column.mask.prepare(keyspaceName, tableName, name, type, 
keyspace.userFunctions);
 
             if (null != tableBuilder.getColumn(name)) {
                 if (!ifColumnNotExists)
@@ -413,7 +420,7 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             this.timestamp = timestamp;
         }
 
-        public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table)
+        public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table, ClusterMetadata metadata)
         {
             Guardrails.alterTableEnabled.ensureEnabled("ALTER TABLE changing 
columns", state);
             TableMetadata.Builder builder = table.unbuild();
@@ -475,7 +482,7 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             this.ifColumnsExists = ifColumnsExists;
         }
 
-        public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table)
+        public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table, ClusterMetadata metadata)
         {
             Guardrails.alterTableEnabled.ensureEnabled("ALTER TABLE changing 
columns", state);
             TableMetadata.Builder tableBuilder = table.unbuild().epoch(epoch);
@@ -553,7 +560,7 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             validateDefaultTimeToLive(attrs.asNewTableParams());
         }
 
-        public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table)
+        public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table, ClusterMetadata metadata)
         {
             attrs.validate();
 
@@ -597,7 +604,7 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             super(keyspaceName, tableName, ifTableExists);
         }
 
-        public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table)
+        public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table, ClusterMetadata metadata)
         {
             if (!DatabaseDescriptor.enableDropCompactStorage())
                 throw new InvalidRequestException("DROP COMPACT STORAGE is 
disabled. Enable in cassandra.yaml to use.");
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
index b606a96d99..264877db7c 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
@@ -26,9 +26,6 @@ import com.google.common.collect.ImmutableSet;
 
 import org.apache.commons.lang3.StringUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.audit.AuditLogContext;
 import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.DataResource;
@@ -55,7 +52,6 @@ import static com.google.common.collect.Iterables.concat;
 
 public final class CreateTableStatement extends AlterSchemaStatement
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(CreateTableStatement.class);
     private final String tableName;
 
     private final Map<ColumnIdentifier, ColumnProperties.Raw> rawColumns;
@@ -110,7 +106,13 @@ public final class CreateTableStatement extends 
AlterSchemaStatement
             throw new AlreadyExistsException(keyspaceName, tableName);
         }
 
-        TableMetadata.Builder builder = 
builder(keyspace.types).epoch(metadata.nextEpoch());
+        // add all user functions to be able to give a good error message to 
the user if the alter references
+        // a function from another keyspace
+        UserFunctions.Builder ufBuilder = UserFunctions.builder().add();
+        for (KeyspaceMetadata ksm : schema)
+            ufBuilder.add(ksm.userFunctions);
+
+        TableMetadata.Builder builder = builder(keyspace.types, 
ufBuilder.build()).epoch(metadata.nextEpoch());
         if (!builder.hasId() && !DatabaseDescriptor.useDeterministicTableID())
             builder.id(TableId.get(metadata));
         TableMetadata table = builder.build();
@@ -189,14 +191,14 @@ public final class CreateTableStatement extends 
AlterSchemaStatement
         return String.format("%s (%s, %s)", getClass().getSimpleName(), 
keyspaceName, tableName);
     }
 
-    public TableMetadata.Builder builder(Types types)
+    public TableMetadata.Builder builder(Types types, UserFunctions functions)
     {
         attrs.validate();
         TableParams params = attrs.asNewTableParams();
 
         // use a TreeMap to preserve ordering across JDK versions (see 
CASSANDRA-9492) - important for stable unit tests
         Map<ColumnIdentifier, ColumnProperties> columns = new 
TreeMap<>(comparing(o -> o.bytes));
-        rawColumns.forEach((column, properties) -> columns.put(column, 
properties.prepare(keyspaceName, tableName, column, types)));
+        rawColumns.forEach((column, properties) -> columns.put(column, 
properties.prepare(keyspaceName, tableName, column, types, functions)));
 
         // check for nested non-frozen UDTs or collections in a non-frozen UDT
         columns.forEach((column, properties) ->
@@ -469,7 +471,7 @@ public final class CreateTableStatement extends 
AlterSchemaStatement
         return CQLFragmentParser.parseAny(CqlParser::createTableStatement, 
cql, "CREATE TABLE")
                                 .keyspace(keyspace)
                                 .prepare(null) // works around a messy 
ClientState/QueryProcessor class init deadlock
-                                .builder(Types.none());
+                                .builder(Types.none(), UserFunctions.none());
     }
 
     public final static class Raw extends CQLStatement.Raw
@@ -614,11 +616,11 @@ public final class CreateTableStatement extends 
AlterSchemaStatement
                     ColumnMask.ensureEnabled();
             }
 
-            public ColumnProperties prepare(String keyspace, String table, 
ColumnIdentifier column, Types udts)
+            public ColumnProperties prepare(String keyspace, String table, 
ColumnIdentifier column, Types udts, UserFunctions functions)
             {
                 CQL3Type cqlType = rawType.prepare(keyspace, udts);
                 AbstractType<?> type = cqlType.getType();
-                ColumnMask mask = rawMask == null ? null : 
rawMask.prepare(keyspace, table, column, type);
+                ColumnMask mask = rawMask == null ? null : 
rawMask.prepare(keyspace, table, column, type, functions);
                 return new ColumnProperties(type, cqlType, mask);
             }
         }
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 3211b9576a..cd2f7c79e8 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -663,7 +663,7 @@ public class CQLSSTableWriter implements Closeable
                 {
                     Types types = createTypes(keyspaceName);
                     
Schema.instance.submit(SchemaTransformations.addTypes(types, true));
-                    tableMetadata = createTable(types);
+                    tableMetadata = createTable(types, ksm.userFunctions);
                     
Schema.instance.submit(SchemaTransformations.addTable(tableMetadata, true));
 
                     if (buildIndexes && !indexStatements.isEmpty())
@@ -772,13 +772,13 @@ public class CQLSSTableWriter implements Closeable
          *
          * @param types types this table should be created with
          */
-        private TableMetadata createTable(Types types)
+        private TableMetadata createTable(Types types, UserFunctions functions)
         {
             ClientState state = ClientState.forInternalCalls();
             CreateTableStatement statement = schemaStatement.prepare(state);
             statement.validate(ClientState.forInternalCalls());
 
-            TableMetadata.Builder builder = statement.builder(types);
+            TableMetadata.Builder builder = statement.builder(types, 
functions);
             if (partitioner != null)
                 builder.partitioner(partitioner);
 
diff --git a/src/java/org/apache/cassandra/schema/UserFunctions.java 
b/src/java/org/apache/cassandra/schema/UserFunctions.java
index 027fe71bc7..b8823d7d85 100644
--- a/src/java/org/apache/cassandra/schema/UserFunctions.java
+++ b/src/java/org/apache/cassandra/schema/UserFunctions.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.serialization.UDTAwareMetadataSerializer;
 import org.apache.cassandra.tcm.serialization.Version;
 
@@ -252,6 +253,22 @@ public final class UserFunctions implements 
Iterable<UserFunction>
                         .build();
     }
 
+    public static UserFunctions getCurrentUserFunctions(FunctionName name, 
String keyspace)
+    {
+        KeyspaceMetadata ksm = 
ClusterMetadata.current().schema.getKeyspaces().getNullable(name.hasKeyspace() 
? name.keyspace : keyspace);
+        UserFunctions userFunctions = UserFunctions.none();
+        if (ksm != null)
+            userFunctions = ksm.userFunctions;
+        return userFunctions;
+    }
+
+    public static UserFunctions getCurrentUserFunctions(FunctionName name)
+    {
+        if (!name.hasKeyspace())
+            return UserFunctions.none();
+        return getCurrentUserFunctions(name, null);
+    }
+
     @Override
     public boolean equals(Object o)
     {
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 1ea091e771..8a8e7812e3 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -484,11 +484,11 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         jmxObjectName = "org.apache.cassandra.db:type=StorageService";
 
         sstablesTracker = new 
SSTablesGlobalTracker(DatabaseDescriptor.getSelectedSSTableFormat());
-        registerMBeans();
     }
 
-    private void registerMBeans()
+    public void registerMBeans()
     {
+        logger.info("Initializing storage service mbean");
         MBeanWrapper.instance.registerMBean(this, jmxObjectName);
         MBeanWrapper.instance.registerMBean(StreamManager.instance, 
StreamManager.OBJECT_NAME);
     }
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index e9fed3eb1c..99e36fb4b2 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -142,7 +142,8 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
     {
         LocalLog.LogSpec logSpec = LocalLog.logSpec()
                                            
.withStorage(LogStorage.SystemKeyspace)
-                                           
.afterReplay(Startup::scrubDataDirectories)
+                                           
.afterReplay(Startup::scrubDataDirectories,
+                                                        (metadata) -> 
StorageService.instance.registerMBeans())
                                            .withDefaultListeners();
         ClusterMetadataService.setInstance(new ClusterMetadataService(new 
UniformRangePlacement(),
                                                                       
wrapProcessor,
@@ -245,7 +246,8 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
         ClusterMetadata emptyFromSystemTables = 
emptyWithSchemaFromSystemTables(SystemKeyspace.allKnownDatacenters());
         LocalLog.LogSpec logSpec = LocalLog.logSpec()
                                            
.withInitialState(emptyFromSystemTables)
-                                           
.afterReplay(Startup::scrubDataDirectories)
+                                           
.afterReplay(Startup::scrubDataDirectories,
+                                                        (metadata) -> 
StorageService.instance.registerMBeans())
                                            
.withStorage(LogStorage.SystemKeyspace)
                                            .withDefaultListeners();
 
@@ -305,6 +307,8 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
         metadata = metadata.forceEpoch(metadata.epoch.nextEpoch());
         ClusterMetadataService.unsetInstance();
         LocalLog.LogSpec logSpec = LocalLog.logSpec()
+                                           
.afterReplay(Startup::scrubDataDirectories,
+                                                        (_metadata) -> 
StorageService.instance.registerMBeans())
                                            .withPreviousState(prev)
                                            .withInitialState(metadata)
                                            
.withStorage(LogStorage.SystemKeyspace)
diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java 
b/src/java/org/apache/cassandra/tcm/Transformation.java
index 943450aa53..df86c0676b 100644
--- a/src/java/org/apache/cassandra/tcm/Transformation.java
+++ b/src/java/org/apache/cassandra/tcm/Transformation.java
@@ -148,7 +148,7 @@ public interface Transformation
 
         public Success success()
         {
-            throw new IllegalStateException("Can't dereference Success for a 
rejected execution");
+            throw new IllegalStateException("Can't dereference Success for a 
rejected execution: " + code + ": " + reason);
         }
 
         public Rejected rejected()
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java 
b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 237a426320..bb878f5b12 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.tcm.log;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -33,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -107,7 +109,7 @@ public abstract class LocalLog implements Closeable
     {
         private ClusterMetadata initial;
         private ClusterMetadata prev;
-        private Startup.AfterReplay afterReplay = (metadata) -> {};
+        private List<Startup.AfterReplay> afterReplay = 
Collections.emptyList();
         private LogStorage storage = LogStorage.None;
         private boolean async = true;
         private boolean defaultListeners = false;
@@ -202,9 +204,9 @@ public abstract class LocalLog implements Closeable
             return this;
         }
 
-        public LogSpec afterReplay(Startup.AfterReplay afterReplay)
+        public LogSpec afterReplay(Startup.AfterReplay ... afterReplay)
         {
-            this.afterReplay = afterReplay;
+            this.afterReplay = Lists.newArrayList(afterReplay);
             return this;
         }
 
@@ -541,7 +543,7 @@ public abstract class LocalLog implements Closeable
         if (replayComplete.get())
             throw new IllegalStateException("Can only replay persisted once.");
         LogState logState = storage.getPersistedLogState();
-        append(logState);
+        append(logState.flatten());
         return waitForHighestConsecutive();
     }
 
@@ -612,7 +614,8 @@ public abstract class LocalLog implements Closeable
     public ClusterMetadata ready() throws StartupException
     {
         ClusterMetadata metadata = replayPersisted();
-        spec.afterReplay.accept(metadata);
+        for (Startup.AfterReplay ar : spec.afterReplay)
+            ar.accept(metadata);
         logger.info("Marking LocalLog ready at epoch {}", metadata.epoch);
 
         if (!replayComplete.compareAndSet(false, true))
@@ -634,7 +637,6 @@ public abstract class LocalLog implements Closeable
 
         logger.info("Notifying all registered listeners of both pre and post 
commit event");
         notifyListeners(spec.prev);
-
         return metadata;
     }
 
diff --git a/src/java/org/apache/cassandra/tcm/log/LogState.java 
b/src/java/org/apache/cassandra/tcm/log/LogState.java
index f1f8fcd8ad..6b8fd1a1ea 100644
--- a/src/java/org/apache/cassandra/tcm/log/LogState.java
+++ b/src/java/org/apache/cassandra/tcm/log/LogState.java
@@ -95,6 +95,19 @@ public class LogState
         return new LogState(baseState, ImmutableList.of());
     }
 
+    public LogState flatten()
+    {
+        if (baseState == null && entries.isEmpty())
+            return this;
+        ClusterMetadata metadata = baseState;
+        if (metadata == null)
+            metadata = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
+        for (Entry entry : entries)
+            metadata = entry.transform.execute(metadata).success().metadata;
+        return LogState.make(metadata);
+    }
+
+
     public boolean isEmpty()
     {
         return baseState == null && entries.isEmpty();
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java 
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 18beee3d2a..cb96bfff24 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -418,6 +418,7 @@ public abstract class CQLTester
         ServerTestUtils.daemonInitialization();
         if (ROW_CACHE_SIZE_IN_MIB > 0)
             DatabaseDescriptor.setRowCacheSizeInMiB(ROW_CACHE_SIZE_IN_MIB);
+        StorageService.instance.registerMBeans();
         
StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
         // Once per-JVM is enough
         prepareServer();
diff --git 
a/test/unit/org/apache/cassandra/cql3/functions/NativeFunctionsTest.java 
b/test/unit/org/apache/cassandra/cql3/functions/NativeFunctionsTest.java
index 3bf3543ef6..be55d17038 100644
--- a/test/unit/org/apache/cassandra/cql3/functions/NativeFunctionsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/functions/NativeFunctionsTest.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.junit.Test;
 
 import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.UserFunctions;
 import org.assertj.core.api.Assertions;
 
 public class NativeFunctionsTest
@@ -144,7 +145,8 @@ public class NativeFunctionsTest
                                                         function.argTypes,
                                                         null,
                                                         null,
-                                                        function.returnType);
+                                                        function.returnType,
+                                                        UserFunctions.none());
 
             Assertions.assertThat(newFunction).isNotNull();
             Assertions.assertThat(function).isNotEqualTo(newFunction);
diff --git 
a/test/unit/org/apache/cassandra/cql3/functions/masking/ColumnMaskTester.java 
b/test/unit/org/apache/cassandra/cql3/functions/masking/ColumnMaskTester.java
index f5a0b702b9..eadc7520f5 100644
--- 
a/test/unit/org/apache/cassandra/cql3/functions/masking/ColumnMaskTester.java
+++ 
b/test/unit/org/apache/cassandra/cql3/functions/masking/ColumnMaskTester.java
@@ -41,7 +41,6 @@ import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.SchemaKeyspace;
 import org.apache.cassandra.schema.SchemaKeyspaceTables;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.schema.UserFunctions;
 
 import static java.lang.String.format;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -133,7 +132,7 @@ public class ColumnMaskTester extends CQLTester
         UntypedResultSet columnRows = execute("SELECT * FROM 
system_schema.columns " +
                                               "WHERE keyspace_name = ? AND 
table_name = ? AND column_name = ?",
                                               KEYSPACE, table, column);
-        ColumnMetadata persistedColumn = 
SchemaKeyspace.createColumnFromRow(columnRows.one(), keyspaceMetadata.types, 
UserFunctions.none());
+        ColumnMetadata persistedColumn = 
SchemaKeyspace.createColumnFromRow(columnRows.one(), keyspaceMetadata.types, 
keyspaceMetadata.userFunctions);
 
         // Verify the column mask in the persisted schema
         ColumnMask savedMask = persistedColumn.getMask();
diff --git 
a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreClientModeTest.java 
b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreClientModeTest.java
index bf9f9dc630..0550237b74 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreClientModeTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreClientModeTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.schema.Types;
+import org.apache.cassandra.schema.UserFunctions;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
@@ -86,7 +87,7 @@ public class ColumnFamilyStoreClientModeTest
         ClientState state = ClientState.forInternalCalls(KEYSPACE);
         CreateTableStatement statement = schemaStatement.prepare(state);
         statement.validate(state);
-        TableMetadata tableMetadata = statement.builder(types)
+        TableMetadata tableMetadata = statement.builder(types, 
UserFunctions.none())
                                                
.id(TableId.fromUUID(UUID.nameUUIDFromBytes(ArrayUtils.addAll(schemaStatement.keyspace().getBytes(),
 schemaStatement.table().getBytes()))))
                                                
.partitioner(Murmur3Partitioner.instance)
                                                .build();
diff --git 
a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java 
b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
index 83aafece69..3745f717be 100644
--- 
a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
+++ 
b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
@@ -636,7 +636,7 @@ public class StressCQLSSTableWriter implements Closeable
             statement.validate(state);
 
             //Build metadata with a portable tableId
-            tableMetadata = statement.builder(ksm.types)
+            tableMetadata = statement.builder(ksm.types, ksm.userFunctions)
                                      
.id(deterministicId(schemaStatement.keyspace(), schemaStatement.table()))
                                      .build();
             Tables tables = Tables.of(tableMetadata);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to