Repository: cassandra Updated Branches: refs/heads/trunk cfee3da90 -> dcc3bb054
Schema change events/results for UDFs and aggregates Patch by Robert Stupp; reviewed by Tyler Hobbs for CASSANDRA-7708 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dcc3bb05 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dcc3bb05 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dcc3bb05 Branch: refs/heads/trunk Commit: dcc3bb054167eb5f408cea79935855780fd56285 Parents: cfee3da Author: Robert Stupp <sn...@snazy.de> Authored: Tue Dec 30 12:25:17 2014 -0600 Committer: Tyler Hobbs <ty...@datastax.com> Committed: Tue Dec 30 12:25:17 2014 -0600 ---------------------------------------------------------------------- CHANGES.txt | 7 +- doc/native_protocol_v4.spec | 33 ++++--- src/java/org/apache/cassandra/auth/Auth.java | 58 +----------- .../apache/cassandra/cql3/QueryProcessor.java | 24 ++--- .../cassandra/cql3/functions/Functions.java | 20 +--- .../cassandra/cql3/functions/UDHelper.java | 23 +---- .../statements/CreateAggregateStatement.java | 14 ++- .../statements/CreateFunctionStatement.java | 13 ++- .../cql3/statements/DropAggregateStatement.java | 8 +- .../cql3/statements/DropFunctionStatement.java | 8 +- .../cassandra/db/marshal/AbstractType.java | 9 ++ .../cassandra/schema/LegacySchemaTables.java | 43 +++++++-- .../cassandra/service/IMigrationListener.java | 40 -------- .../cassandra/service/MigrationListener.java | 85 +++++++++++++++++ .../cassandra/service/MigrationManager.java | 49 +++++----- .../org/apache/cassandra/transport/Event.java | 96 +++++++++++++++++--- .../org/apache/cassandra/transport/Server.java | 28 ++++-- .../apache/cassandra/cql3/AggregationTest.java | 62 ++++++++++++- .../org/apache/cassandra/cql3/CQLTester.java | 43 +++++++-- test/unit/org/apache/cassandra/cql3/UFTest.java | 46 +++++++++- .../cassandra/transport/SerDeserTest.java | 14 +++ 21 files changed, 483 insertions(+), 240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1468693..ac63fb3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,7 +10,8 @@ * Fix aggregate fn results on empty selection, result column name, and cqlsh parsing (CASSANDRA-8229) * Mark sstables as repaired after full repair (CASSANDRA-7586) - * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443) + * Extend Descriptor to include a format value and refactor reader/writer + APIs (CASSANDRA-7443) * Integrate JMH for microbenchmarks (CASSANDRA-8151) * Keep sstable levels when bootstrapping (CASSANDRA-7460) * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838) @@ -22,8 +23,8 @@ * Improve compaction logging (CASSANDRA-7818) * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917) * Do anticompaction in groups (CASSANDRA-6851) - * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929, - 7924, 7812, 8063, 7813) + * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929, + 7924, 7812, 8063, 7813, 7708) * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416) * Move sstable RandomAccessReader to nio2, which allows using the FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/doc/native_protocol_v4.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec index 02aac3b..3764e91 100644 --- a/doc/native_protocol_v4.spec +++ b/doc/native_protocol_v4.spec @@ -669,18 +669,25 @@ Table of Contents the rest of the message will be <change_type><target><options> where: - <change_type> is a [string] representing the type of changed involved. It will be one of "CREATED", "UPDATED" or "DROPPED". - - <target> is a [string] that can be one of "KEYSPACE", "TABLE" or "TYPE" - and describes what has been modified ("TYPE" stands for modifications - related to user types). - - <options> depends on the preceding <target>. If <target> is - "KEYSPACE", then <options> will be a single [string] representing the - keyspace changed. Otherwise, if <target> is "TABLE" or "TYPE", then - <options> will be 2 [string]: the first one will be the keyspace - containing the affected object, and the second one will be the name - of said affected object (so either the table name or the user type - name). - - All EVENT message have a streamId of -1 (Section 2.3). + - <target> is a [string] that can be one of "KEYSPACE", "TABLE", "TYPE", + "FUNCTION" or "AGGREGATE" and describes what has been modified + ("TYPE" stands for modifications related to user types, "FUNCTION" + for modifications related to user defined functions, "AGGREGATE" + for modifications related to user defined aggregates). + - <options> depends on the preceding <target>: + - If <target> is "KEYSPACE", then <options> will be a single [string] + representing the keyspace changed. + - If <target> is "TABLE" or "TYPE", then + <options> will be 2 [string]: the first one will be the keyspace + containing the affected object, and the second one will be the name + of said affected object (either the table, user type, function, or + aggregate name). + - If <target> is "FUNCTION" or "AGGREGATE", multiple arguments follow: + - [string] keyspace containing the user defined function / aggregate + - [string] the function/aggregate name + - [string list] one string for each argument type (as CQL type) + + All EVENT messages have a streamId of -1 (Section 2.3). Please note that "NEW_NODE" and "UP" events are sent based on internal Gossip communication and as such may be sent a short delay before the binary @@ -896,4 +903,6 @@ Table of Contents 10. Changes from v3 + * The format of "SCHEMA_CHANGE" events (Section 4.2.6) (and implicitly "Schema_change" results (Section 4.2.5.5)) + has been modified, and now includes changes related to user defined functions and user defined aggregates. http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/auth/Auth.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java index cdcfa0e..0c3b0fe 100644 --- a/src/java/org/apache/cassandra/auth/Auth.java +++ b/src/java/org/apache/cassandra/auth/Auth.java @@ -185,7 +185,7 @@ public class Auth implements AuthMBean DatabaseDescriptor.getAuthorizer().setup(); // register a custom MigrationListener for permissions cleanup after dropped keyspaces/cfs. - MigrationManager.instance.register(new MigrationListener()); + MigrationManager.instance.register(new AuthMigrationListener()); // the delay is here to give the node some time to see its peers - to reduce // "Skipped default superuser setup: some nodes were not ready" log spam. @@ -318,9 +318,9 @@ public class Auth implements AuthMBean } /** - * IMigrationListener implementation that cleans up permissions on dropped resources. + * MigrationListener implementation that cleans up permissions on dropped resources. */ - public static class MigrationListener implements IMigrationListener + public static class AuthMigrationListener extends MigrationListener { public void onDropKeyspace(String ksName) { @@ -331,57 +331,5 @@ public class Auth implements AuthMBean { DatabaseDescriptor.getAuthorizer().revokeAll(DataResource.columnFamily(ksName, cfName)); } - - public void onDropUserType(String ksName, String userType) - { - } - - public void onDropFunction(String ksName, String functionName) - { - } - - public void onDropAggregate(String ksName, String aggregateName) - { - } - - public void onCreateKeyspace(String ksName) - { - } - - public void onCreateColumnFamily(String ksName, String cfName) - { - } - - public void onCreateUserType(String ksName, String userType) - { - } - - public void onCreateFunction(String ksName, String functionName) - { - } - - public void onCreateAggregate(String ksName, String aggregateName) - { - } - - public void onUpdateKeyspace(String ksName) - { - } - - public void onUpdateColumnFamily(String ksName, String cfName) - { - } - - public void onUpdateUserType(String ksName, String userType) - { - } - - public void onUpdateFunction(String ksName, String functionName) - { - } - - public void onUpdateAggregate(String ksName, String aggregateName) - { - } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index ae09972..8531d32 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -31,6 +31,7 @@ import com.googlecode.concurrentlinkedhashmap.EntryWeigher; import com.googlecode.concurrentlinkedhashmap.EvictionListener; import org.antlr.runtime.*; +import org.apache.cassandra.service.MigrationListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,6 @@ import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.metrics.CQLMetrics; import org.apache.cassandra.service.ClientState; -import org.apache.cassandra.service.IMigrationListener; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.pager.QueryPager; @@ -560,7 +560,7 @@ public class QueryProcessor implements QueryHandler return meter.measureDeep(key); } - private static class MigrationSubscriber implements IMigrationListener + private static class MigrationSubscriber extends MigrationListener { private void removeInvalidPreparedStatements(String ksName, String cfName) { @@ -602,10 +602,7 @@ public class QueryProcessor implements QueryHandler return ksName.equals(statementKsName) && (cfName == null || cfName.equals(statementCfName)); } - public void onCreateKeyspace(String ksName) { } - public void onCreateColumnFamily(String ksName, String cfName) { } - public void onCreateUserType(String ksName, String typeName) { } - public void onCreateFunction(String ksName, String functionName) { + public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) { if (Functions.getOverloadCount(new FunctionName(ksName, functionName)) > 1) { // in case there are other overloads, we have to remove all overloads since argument type @@ -614,7 +611,7 @@ public class QueryProcessor implements QueryHandler removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, functionName); } } - public void onCreateAggregate(String ksName, String aggregateName) { + public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) { if (Functions.getOverloadCount(new FunctionName(ksName, aggregateName)) > 1) { // in case there are other overloads, we have to remove all overloads since argument type @@ -624,12 +621,6 @@ public class QueryProcessor implements QueryHandler } } - public void onUpdateKeyspace(String ksName) { } - public void onUpdateColumnFamily(String ksName, String cfName) { } - public void onUpdateUserType(String ksName, String typeName) { } - public void onUpdateFunction(String ksName, String functionName) { } - public void onUpdateAggregate(String ksName, String aggregateName) { } - public void onDropKeyspace(String ksName) { removeInvalidPreparedStatements(ksName, null); @@ -640,18 +631,17 @@ public class QueryProcessor implements QueryHandler removeInvalidPreparedStatements(ksName, cfName); } - public void onDropUserType(String ksName, String typeName) { } - public void onDropFunction(String ksName, String functionName) { + public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) { removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, functionName); removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, functionName); } - public void onDropAggregate(String ksName, String aggregateName) + public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) { removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, aggregateName); removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, aggregateName); } - private void removeInvalidPreparedStatementsForFunction(Iterator<ParsedStatement.Prepared> iterator, + private static void removeInvalidPreparedStatementsForFunction(Iterator<ParsedStatement.Prepared> iterator, String ksName, String functionName) { while (iterator.hasNext()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/functions/Functions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java index b55ebc5..09e360b 100644 --- a/src/java/org/apache/cassandra/cql3/functions/Functions.java +++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java @@ -27,7 +27,7 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.*; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.service.IMigrationListener; +import org.apache.cassandra.service.MigrationListener; import org.apache.cassandra.service.MigrationManager; public abstract class Functions @@ -302,28 +302,12 @@ public abstract class Functions return true; } - private static class FunctionsMigrationListener implements IMigrationListener + private static class FunctionsMigrationListener extends MigrationListener { - public void onCreateKeyspace(String ksName) { } - public void onCreateColumnFamily(String ksName, String cfName) { } - public void onCreateUserType(String ksName, String typeName) { } - public void onCreateFunction(String ksName, String functionName) { } - public void onCreateAggregate(String ksName, String aggregateName) { } - - public void onUpdateKeyspace(String ksName) { } - public void onUpdateColumnFamily(String ksName, String cfName) { } public void onUpdateUserType(String ksName, String typeName) { for (Function function : all()) if (function instanceof UDFunction) ((UDFunction)function).userTypeUpdated(ksName, typeName); } - public void onUpdateFunction(String ksName, String functionName) { } - public void onUpdateAggregate(String ksName, String aggregateName) { } - - public void onDropKeyspace(String ksName) { } - public void onDropColumnFamily(String ksName, String cfName) { } - public void onDropUserType(String ksName, String typeName) { } - public void onDropFunction(String ksName, String functionName) { } - public void onDropAggregate(String ksName, String aggregateName) { } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/functions/UDHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java index 0738cbe..f4b3809 100644 --- a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java +++ b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java @@ -21,8 +21,6 @@ import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; import java.util.*; import org.slf4j.Logger; @@ -30,9 +28,7 @@ import org.slf4j.LoggerFactory; import com.datastax.driver.core.DataType; import org.apache.cassandra.cql3.*; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.db.marshal.*; /** * Helper class for User Defined Functions + Aggregates. @@ -66,7 +62,7 @@ public final class UDHelper */ public static Class<?>[] javaTypes(DataType[] dataTypes) { - Class<?> paramTypes[] = new Class[dataTypes.length]; + Class<?>[] paramTypes = new Class[dataTypes.length]; for (int i = 0; i < paramTypes.length; i++) paramTypes[i] = dataTypes[i].asJavaClass(); return paramTypes; @@ -107,19 +103,4 @@ public final class UDHelper throw new RuntimeException("cannot parse driver type " + cqlType.getType().toString(), e); } } - - // We allow method overloads, so a function is not uniquely identified by its name only, but - // also by its argument types. To distinguish overloads of given function name in the schema - // we use a "signature" which is just a SHA-1 of it's argument types (we could replace that by - // using a "signature" UDT that would be comprised of the function name and argument types, - // which we could then use as clustering column. But as we haven't yet used UDT in system tables, - // We'll leave that decision to #6717). - public static ByteBuffer calculateSignature(AbstractFunction fun) - { - MessageDigest digest = FBUtilities.newMessageDigest("SHA-1"); - digest.update(UTF8Type.instance.decompose(fun.name().name)); - for (AbstractType<?> type : fun.argTypes()) - digest.update(UTF8Type.instance.decompose(type.asCQL3Type().toString())); - return ByteBuffer.wrap(digest.digest()); - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java index 9816e58..e135ffe 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java @@ -47,13 +47,16 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement private final boolean orReplace; private final boolean ifNotExists; private FunctionName functionName; - private String stateFunc; - private String finalFunc; + private final String stateFunc; + private final String finalFunc; private final CQL3Type.Raw stateTypeRaw; private final List<CQL3Type.Raw> argRawTypes; private final Term.Raw ival; + private UDAggregate udAggregate; + private boolean replaced; + public CreateAggregateStatement(FunctionName functionName, List<CQL3Type.Raw> argRawTypes, String stateFunc, @@ -102,7 +105,9 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement public Event.SchemaChange changeEvent() { - return null; + return new Event.SchemaChange(replaced ? Event.SchemaChange.Change.UPDATED : Event.SchemaChange.Change.CREATED, + Event.SchemaChange.Target.AGGREGATE, + udAggregate.name().keyspace, udAggregate.name().name, AbstractType.asCQLTypeStringList(udAggregate.argTypes())); } public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException @@ -164,10 +169,11 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement initcond = ival.prepare(functionName.keyspace, receiver).bindAndGet(QueryOptions.DEFAULT); } - UDAggregate udAggregate = new UDAggregate(functionName, argTypes, returnType, + udAggregate = new UDAggregate(functionName, argTypes, returnType, fState, fFinal, initcond); + replaced = old != null; MigrationManager.announceNewAggregate(udAggregate, isLocalOnly); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java index dbdecf9..c49f80c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java @@ -51,6 +51,9 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement private final List<CQL3Type.Raw> argRawTypes; private final CQL3Type.Raw rawReturnType; + private UDFunction udFunction; + private boolean replaced; + public CreateFunctionStatement(FunctionName functionName, String language, String body, @@ -101,7 +104,9 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement public Event.SchemaChange changeEvent() { - return null; + return new Event.SchemaChange(replaced ? Event.SchemaChange.Change.UPDATED : Event.SchemaChange.Change.CREATED, + Event.SchemaChange.Target.FUNCTION, + udFunction.name().keyspace, udFunction.name().name, AbstractType.asCQLTypeStringList(udFunction.argTypes())); } public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException @@ -131,7 +136,11 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement functionName, returnType.asCQL3Type(), old.returnType().asCQL3Type())); } - MigrationManager.announceNewFunction(UDFunction.create(functionName, argNames, argTypes, returnType, language, body, deterministic), isLocalOnly); + this.udFunction = UDFunction.create(functionName, argNames, argTypes, returnType, language, body, deterministic); + this.replaced = old != null; + + MigrationManager.announceNewFunction(udFunction, isLocalOnly); + return true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java index 118f89d..97ec196 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropAggregateStatement.java @@ -42,6 +42,8 @@ public final class DropAggregateStatement extends SchemaAlteringStatement private final List<CQL3Type.Raw> argRawTypes; private final boolean argsPresent; + private Function old; + public DropAggregateStatement(FunctionName functionName, List<CQL3Type.Raw> argRawTypes, boolean argsPresent, @@ -77,7 +79,8 @@ public final class DropAggregateStatement extends SchemaAlteringStatement public Event.SchemaChange changeEvent() { - return null; + return new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE, + old.name().keyspace, old.name().name, AbstractType.asCQLTypeStringList(old.argTypes())); } public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException @@ -130,7 +133,10 @@ public final class DropAggregateStatement extends SchemaAlteringStatement throw new InvalidRequestException(String.format("Cannot drop aggregate '%s' because it is a " + "native (built-in) function", functionName)); + this.old = old; + MigrationManager.announceAggregateDrop((UDAggregate)old, isLocalOnly); + return true; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java index 394aca0..083db45 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java @@ -42,6 +42,8 @@ public final class DropFunctionStatement extends SchemaAlteringStatement private final List<CQL3Type.Raw> argRawTypes; private final boolean argsPresent; + private Function old; + public DropFunctionStatement(FunctionName functionName, List<CQL3Type.Raw> argRawTypes, boolean argsPresent, @@ -81,7 +83,8 @@ public final class DropFunctionStatement extends SchemaAlteringStatement @Override public Event.SchemaChange changeEvent() { - return null; + return new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION, + old.name().keyspace, old.name().name, AbstractType.asCQLTypeStringList(old.argTypes())); } @Override @@ -135,7 +138,10 @@ public final class DropFunctionStatement extends SchemaAlteringStatement if (!references.isEmpty()) throw new InvalidRequestException(String.format("Function '%s' still referenced by %s", functionName, references)); + this.old = old; + MigrationManager.announceFunctionDrop((UDFunction) old, isLocalOnly); + return true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/db/marshal/AbstractType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java index 85b6dc7..d3711df 100644 --- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java +++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -63,6 +64,14 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer> }; } + public static List<String> asCQLTypeStringList(List<AbstractType<?>> abstractTypes) + { + List<String> r = new ArrayList<>(abstractTypes.size()); + for (AbstractType<?> abstractType : abstractTypes) + r.add(abstractType.asCQL3Type().toString()); + return r; + } + public T compose(ByteBuffer bytes) { return getSerializer().deserialize(bytes); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/schema/LegacySchemaTables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java index 047698c..4d06863 100644 --- a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java +++ b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java @@ -157,7 +157,7 @@ public class LegacySchemaTables "CREATE TABLE %s (" + "keyspace_name text," + "function_name text," - + "signature blob," + + "signature frozen<list<text>>," + "argument_names list<text>," + "argument_types list<text>," + "body text," @@ -172,7 +172,7 @@ public class LegacySchemaTables "CREATE TABLE %s (" + "keyspace_name text," + "aggregate_name text," - + "signature blob," + + "signature frozen<list<text>>," + "argument_types list<text>," + "final_func text," + "initcond blob," @@ -1293,7 +1293,7 @@ public class LegacySchemaTables private static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation) { ColumnFamily cells = mutation.addOrGet(Functions); - Composite prefix = Functions.comparator.make(function.name().name, UDHelper.calculateSignature(function)); + Composite prefix = Functions.comparator.make(function.name().name, functionSignatureWithTypes(function)); CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); adder.resetCollection("argument_names"); @@ -1319,7 +1319,7 @@ public class LegacySchemaTables ColumnFamily cells = mutation.addOrGet(Functions); int ldt = (int) (System.currentTimeMillis() / 1000); - Composite prefix = Functions.comparator.make(function.name().name, UDHelper.calculateSignature(function)); + Composite prefix = Functions.comparator.make(function.name().name, functionSignatureWithTypes(function)); cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); return mutation; @@ -1332,7 +1332,7 @@ public class LegacySchemaTables for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) { UDFunction function = createFunctionFromFunctionRow(row); - functions.put(UDHelper.calculateSignature(function), function); + functions.put(functionSignatureWithNameAndTypes(function), function); } return functions; } @@ -1385,7 +1385,7 @@ public class LegacySchemaTables private static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation) { ColumnFamily cells = mutation.addOrGet(Aggregates); - Composite prefix = Aggregates.comparator.make(aggregate.name().name, UDHelper.calculateSignature(aggregate)); + Composite prefix = Aggregates.comparator.make(aggregate.name().name, functionSignatureWithTypes(aggregate)); CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); adder.resetCollection("argument_types"); @@ -1409,7 +1409,7 @@ public class LegacySchemaTables for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) { UDAggregate aggregate = createAggregateFromAggregateRow(row); - aggregates.put(UDHelper.calculateSignature(aggregate), aggregate); + aggregates.put(functionSignatureWithNameAndTypes(aggregate), aggregate); } return aggregates; } @@ -1459,7 +1459,7 @@ public class LegacySchemaTables ColumnFamily cells = mutation.addOrGet(Aggregates); int ldt = (int) (System.currentTimeMillis() / 1000); - Composite prefix = Aggregates.comparator.make(aggregate.name().name, UDHelper.calculateSignature(aggregate)); + Composite prefix = Aggregates.comparator.make(aggregate.name().name, functionSignatureWithTypes(aggregate)); cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); return mutation; @@ -1477,4 +1477,31 @@ public class LegacySchemaTables throw new RuntimeException(e); } } + + // We allow method overloads, so a function is not uniquely identified by its name only, but + // also by its argument types. To distinguish overloads of given function name in the schema + // we use a "signature" which is just a list of it's CQL argument types (we could replace that by + // using a "signature" UDT that would be comprised of the function name and argument types, + // which we could then use as clustering column. But as we haven't yet used UDT in system tables, + // We'll leave that decision to #6717). + public static ByteBuffer functionSignatureWithTypes(AbstractFunction fun) + { + ListType<String> list = ListType.getInstance(UTF8Type.instance, false); + List<String> strList = new ArrayList<>(fun.argTypes().size()); + for (AbstractType<?> argType : fun.argTypes()) + strList.add(argType.asCQL3Type().toString()); + return list.decompose(strList); + } + + public static ByteBuffer functionSignatureWithNameAndTypes(AbstractFunction fun) + { + ListType<String> list = ListType.getInstance(UTF8Type.instance, false); + List<String> strList = new ArrayList<>(fun.argTypes().size() + 2); + strList.add(fun.name().keyspace); + strList.add(fun.name().name); + for (AbstractType<?> argType : fun.argTypes()) + strList.add(argType.asCQL3Type().toString()); + return list.decompose(strList); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/service/IMigrationListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java deleted file mode 100644 index faaffb9..0000000 --- a/src/java/org/apache/cassandra/service/IMigrationListener.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -public interface IMigrationListener -{ - public void onCreateKeyspace(String ksName); - public void onCreateColumnFamily(String ksName, String cfName); - public void onCreateUserType(String ksName, String typeName); - public void onCreateFunction(String ksName, String functionName); - public void onCreateAggregate(String ksName, String aggregateName); - - public void onUpdateKeyspace(String ksName); - public void onUpdateColumnFamily(String ksName, String cfName); - public void onUpdateUserType(String ksName, String typeName); - public void onUpdateFunction(String ksName, String functionName); - public void onUpdateAggregate(String ksName, String aggregateName); - - public void onDropKeyspace(String ksName); - public void onDropColumnFamily(String ksName, String cfName); - public void onDropUserType(String ksName, String typeName); - public void onDropFunction(String ksName, String functionName); - public void onDropAggregate(String ksName, String aggregateName); - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/service/MigrationListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationListener.java b/src/java/org/apache/cassandra/service/MigrationListener.java new file mode 100644 index 0000000..2b728d9 --- /dev/null +++ b/src/java/org/apache/cassandra/service/MigrationListener.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.util.List; + +import org.apache.cassandra.db.marshal.AbstractType; + +public abstract class MigrationListener +{ + public void onCreateKeyspace(String ksName) + { + } + + public void onCreateColumnFamily(String ksName, String cfName) + { + } + + public void onCreateUserType(String ksName, String typeName) + { + } + + public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) + { + } + + public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) + { + } + + public void onUpdateKeyspace(String ksName) + { + } + + public void onUpdateColumnFamily(String ksName, String cfName) + { + } + + public void onUpdateUserType(String ksName, String typeName) + { + } + + public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) + { + } + + public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) + { + } + + public void onDropKeyspace(String ksName) + { + } + + public void onDropColumnFamily(String ksName, String cfName) + { + } + + public void onDropUserType(String ksName, String typeName) + { + } + + public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) + { + } + + public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) + { + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index fe32559..ef1adc6 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -63,16 +63,16 @@ public class MigrationManager public static final int MIGRATION_DELAY_IN_MS = 60000; - private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<>(); + private final List<MigrationListener> listeners = new CopyOnWriteArrayList<>(); private MigrationManager() {} - public void register(IMigrationListener listener) + public void register(MigrationListener listener) { listeners.add(listener); } - public void unregister(IMigrationListener listener) + public void unregister(MigrationListener listener) { listeners.remove(listener); } @@ -160,92 +160,93 @@ public class MigrationManager public void notifyCreateKeyspace(KSMetaData ksm) { - for (IMigrationListener listener : listeners) + for (MigrationListener listener : listeners) listener.onCreateKeyspace(ksm.name); } public void notifyCreateColumnFamily(CFMetaData cfm) { - for (IMigrationListener listener : listeners) + for (MigrationListener listener : listeners) listener.onCreateColumnFamily(cfm.ksName, cfm.cfName); } public void notifyCreateUserType(UserType ut) { - for (IMigrationListener listener : listeners) + for (MigrationListener listener : listeners) listener.onCreateUserType(ut.keyspace, ut.getNameAsString()); } public void notifyCreateFunction(UDFunction udf) { - for (IMigrationListener listener : listeners) - listener.onCreateFunction(udf.name().keyspace, udf.name().name); + for (MigrationListener listener : listeners) + listener.onCreateFunction(udf.name().keyspace, udf.name().name, udf.argTypes()); } + public void notifyCreateAggregate(UDAggregate udf) { - for (IMigrationListener listener : listeners) - listener.onCreateAggregate(udf.name().keyspace, udf.name().name); + for (MigrationListener listener : listeners) + listener.onCreateAggregate(udf.name().keyspace, udf.name().name, udf.argTypes()); } public void notifyUpdateKeyspace(KSMetaData ksm) { - for (IMigrationListener listener : listeners) + for (MigrationListener listener : listeners) listener.onUpdateKeyspace(ksm.name); } public void notifyUpdateColumnFamily(CFMetaData cfm) { - for (IMigrationListener listener : listeners) + for (MigrationListener listener : listeners) listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName); } public void notifyUpdateUserType(UserType ut) { - for (IMigrationListener listener : listeners) + for (MigrationListener listener : listeners) listener.onUpdateUserType(ut.keyspace, ut.getNameAsString()); } public void notifyUpdateFunction(UDFunction udf) { - for (IMigrationListener listener : listeners) - listener.onUpdateFunction(udf.name().keyspace, udf.name().name); + for (MigrationListener listener : listeners) + listener.onUpdateFunction(udf.name().keyspace, udf.name().name, udf.argTypes()); } public void notifyUpdateAggregate(UDAggregate udf) { - for (IMigrationListener listener : listeners) - listener.onUpdateAggregate(udf.name().keyspace, udf.name().name); + for (MigrationListener listener : listeners) + listener.onUpdateAggregate(udf.name().keyspace, udf.name().name, udf.argTypes()); } public void notifyDropKeyspace(KSMetaData ksm) { - for (IMigrationListener listener : listeners) + for (MigrationListener listener : listeners) listener.onDropKeyspace(ksm.name); } public void notifyDropColumnFamily(CFMetaData cfm) { - for (IMigrationListener listener : listeners) + for (MigrationListener listener : listeners) listener.onDropColumnFamily(cfm.ksName, cfm.cfName); } public void notifyDropUserType(UserType ut) { - for (IMigrationListener listener : listeners) + for (MigrationListener listener : listeners) listener.onDropUserType(ut.keyspace, ut.getNameAsString()); } public void notifyDropFunction(UDFunction udf) { - for (IMigrationListener listener : listeners) - listener.onDropFunction(udf.name().keyspace, udf.name().name); + for (MigrationListener listener : listeners) + listener.onDropFunction(udf.name().keyspace, udf.name().name, udf.argTypes()); } public void notifyDropAggregate(UDAggregate udf) { - for (IMigrationListener listener : listeners) - listener.onDropAggregate(udf.name().keyspace, udf.name().name); + for (MigrationListener listener : listeners) + listener.onDropAggregate(udf.name().keyspace, udf.name().name, udf.argTypes()); } public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/transport/Event.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java index 9962599..5e9c6b7 100644 --- a/src/java/org/apache/cassandra/transport/Event.java +++ b/src/java/org/apache/cassandra/transport/Event.java @@ -19,6 +19,8 @@ package org.apache.cassandra.transport; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.Iterator; +import java.util.List; import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; @@ -204,22 +206,29 @@ public abstract class Event public static class SchemaChange extends Event { public enum Change { CREATED, UPDATED, DROPPED } - public enum Target { KEYSPACE, TABLE, TYPE } + public enum Target { KEYSPACE, TABLE, TYPE, FUNCTION, AGGREGATE } public final Change change; public final Target target; public final String keyspace; - public final String tableOrTypeOrFunction; + public final String name; + public final List<String> argTypes; - public SchemaChange(Change change, Target target, String keyspace, String tableOrTypeOrFunction) + public SchemaChange(Change change, Target target, String keyspace, String name, List<String> argTypes) { super(Type.SCHEMA_CHANGE); this.change = change; this.target = target; this.keyspace = keyspace; - this.tableOrTypeOrFunction = tableOrTypeOrFunction; + this.name = name; if (target != Target.KEYSPACE) - assert this.tableOrTypeOrFunction != null : "Table or type should be set for non-keyspace schema change events"; + assert this.name != null : "Table, type, function or aggregate name should be set for non-keyspace schema change events"; + this.argTypes = argTypes; + } + + public SchemaChange(Change change, Target target, String keyspace, String name) + { + this(change, target, keyspace, name, null); } public SchemaChange(Change change, String keyspace) @@ -236,7 +245,11 @@ public abstract class Event Target target = CBUtil.readEnumValue(Target.class, cb); String keyspace = CBUtil.readString(cb); String tableOrType = target == Target.KEYSPACE ? null : CBUtil.readString(cb); - return new SchemaChange(change, target, keyspace, tableOrType); + List<String> argTypes = null; + if (target == Target.FUNCTION || target == Target.AGGREGATE) + argTypes = CBUtil.readStringList(cb); + + return new SchemaChange(change, target, keyspace, tableOrType, argTypes); } else { @@ -248,13 +261,36 @@ public abstract class Event public void serializeEvent(ByteBuf dest, int version) { + if (target == Target.FUNCTION || target == Target.AGGREGATE) + { + if (version >= 4) + { + // available since protocol version 4 + CBUtil.writeEnumValue(change, dest); + CBUtil.writeEnumValue(target, dest); + CBUtil.writeString(keyspace, dest); + CBUtil.writeString(name, dest); + CBUtil.writeStringList(argTypes, dest); + } + else + { + // not available in protocol versions < 4 - just say the keyspace was updated. + CBUtil.writeEnumValue(Change.UPDATED, dest); + if (version >= 3) + CBUtil.writeEnumValue(Target.KEYSPACE, dest); + CBUtil.writeString(keyspace, dest); + CBUtil.writeString("", dest); + } + return; + } + if (version >= 3) { CBUtil.writeEnumValue(change, dest); CBUtil.writeEnumValue(target, dest); CBUtil.writeString(keyspace, dest); if (target != Target.KEYSPACE) - CBUtil.writeString(tableOrTypeOrFunction, dest); + CBUtil.writeString(name, dest); } else { @@ -270,13 +306,30 @@ public abstract class Event { CBUtil.writeEnumValue(change, dest); CBUtil.writeString(keyspace, dest); - CBUtil.writeString(target == Target.KEYSPACE ? "" : tableOrTypeOrFunction, dest); + CBUtil.writeString(target == Target.KEYSPACE ? "" : name, dest); } } } public int eventSerializedSize(int version) { + if (target == Target.FUNCTION || target == Target.AGGREGATE) + { + if (version >= 4) + return CBUtil.sizeOfEnumValue(change) + + CBUtil.sizeOfEnumValue(target) + + CBUtil.sizeOfString(keyspace) + + CBUtil.sizeOfString(name) + + CBUtil.sizeOfStringList(argTypes); + if (version >= 3) + return CBUtil.sizeOfEnumValue(Change.UPDATED) + + CBUtil.sizeOfEnumValue(Target.KEYSPACE) + + CBUtil.sizeOfString(keyspace); + return CBUtil.sizeOfEnumValue(Change.UPDATED) + + CBUtil.sizeOfString(keyspace) + + CBUtil.sizeOfString(""); + } + if (version >= 3) { int size = CBUtil.sizeOfEnumValue(change) @@ -284,7 +337,7 @@ public abstract class Event + CBUtil.sizeOfString(keyspace); if (target != Target.KEYSPACE) - size += CBUtil.sizeOfString(tableOrTypeOrFunction); + size += CBUtil.sizeOfString(name); return size; } @@ -298,20 +351,36 @@ public abstract class Event } return CBUtil.sizeOfEnumValue(change) + CBUtil.sizeOfString(keyspace) - + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : tableOrTypeOrFunction); + + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : name); } } @Override public String toString() { - return change + " " + target + " " + keyspace + (tableOrTypeOrFunction == null ? "" : "." + tableOrTypeOrFunction); + StringBuilder sb = new StringBuilder().append(change) + .append(' ').append(target) + .append(' ').append(keyspace); + if (name != null) + sb.append('.').append(name); + if (argTypes != null) + { + sb.append(" ("); + for (Iterator<String> iter = argTypes.iterator(); iter.hasNext(); ) + { + sb.append(iter.next()); + if (iter.hasNext()) + sb.append(','); + } + sb.append(')'); + } + return sb.toString(); } @Override public int hashCode() { - return Objects.hashCode(change, target, keyspace, tableOrTypeOrFunction); + return Objects.hashCode(change, target, keyspace, name, argTypes); } @Override @@ -324,7 +393,8 @@ public abstract class Event return Objects.equal(change, scc.change) && Objects.equal(target, scc.target) && Objects.equal(keyspace, scc.keyspace) - && Objects.equal(tableOrTypeOrFunction, scc.tableOrTypeOrFunction); + && Objects.equal(name, scc.name) + && Objects.equal(argTypes, scc.argTypes); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 5202a94..147d729 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -22,6 +22,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.EnumMap; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLContext; @@ -44,6 +45,7 @@ import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.auth.ISaslAwareAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions; +import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.metrics.ClientMetrics; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.service.*; @@ -330,7 +332,7 @@ public class Server implements CassandraDaemon.Server } } - private static class EventNotifier implements IEndpointLifecycleSubscriber, IMigrationListener + private static class EventNotifier extends MigrationListener implements IEndpointLifecycleSubscriber { private final Server server; private static final InetAddress bindAll; @@ -410,12 +412,16 @@ public class Server implements CassandraDaemon.Server server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } - public void onCreateFunction(String ksName, String functionName) + public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION, + ksName, functionName, AbstractType.asCQLTypeStringList(argTypes))); } - public void onCreateAggregate(String ksName, String aggregateName) + public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE, + ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes))); } public void onUpdateKeyspace(String ksName) @@ -433,12 +439,16 @@ public class Server implements CassandraDaemon.Server server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } - public void onUpdateFunction(String ksName, String functionName) + public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION, + ksName, functionName, AbstractType.asCQLTypeStringList(argTypes))); } - public void onUpdateAggregate(String ksName, String aggregateName) + public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE, + ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes))); } public void onDropKeyspace(String ksName) @@ -456,12 +466,16 @@ public class Server implements CassandraDaemon.Server server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } - public void onDropFunction(String ksName, String functionName) + public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION, + ksName, functionName, AbstractType.asCQLTypeStringList(argTypes))); } - public void onDropAggregate(String ksName, String aggregateName) + public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE, + ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes))); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/test/unit/org/apache/cassandra/cql3/AggregationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/AggregationTest.java b/test/unit/org/apache/cassandra/cql3/AggregationTest.java index 940e87f..1ddd1f1 100644 --- a/test/unit/org/apache/cassandra/cql3/AggregationTest.java +++ b/test/unit/org/apache/cassandra/cql3/AggregationTest.java @@ -28,6 +28,7 @@ import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.Event; import org.apache.cassandra.transport.messages.ResultMessage; public class AggregationTest extends CQLTester @@ -41,7 +42,7 @@ public class AggregationTest extends CQLTester assertColumnNames(execute("SELECT COUNT(*) FROM %s"), "count"); assertRows(execute("SELECT COUNT(*) FROM %s"), row(0L)); assertColumnNames(execute("SELECT max(b), min(b), sum(b), avg(b) , max(c), sum(c), avg(c), sum(d), avg(d) FROM %s"), - "system.max(b)", "system.min(b)", "system.sum(b)", "system.avg(b)" , "system.max(c)", "system.sum(c)", "system.avg(c)", "system.sum(d)", "system.avg(d)"); + "system.max(b)", "system.min(b)", "system.sum(b)", "system.avg(b)", "system.max(c)", "system.sum(c)", "system.avg(c)", "system.sum(d)", "system.avg(d)"); assertRows(execute("SELECT max(b), min(b), sum(b), avg(b) , max(c), sum(c), avg(c), sum(d), avg(d) FROM %s"), row(null, null, 0, 0, null, 0.0, 0.0, new BigDecimal("0"), new BigDecimal("0"))); @@ -133,7 +134,7 @@ public class AggregationTest extends CQLTester } @Test - public void testDropStatements() throws Throwable + public void testSchemaChange() throws Throwable { String f = createFunction(KEYSPACE, "double, double", @@ -141,13 +142,66 @@ public class AggregationTest extends CQLTester "RETURNS double " + "LANGUAGE javascript " + "AS '\"string\";';"); + createFunctionOverload(f, + "double, double", + "CREATE OR REPLACE FUNCTION %s(state int, val int) " + + "RETURNS int " + + "LANGUAGE javascript " + + "AS '\"string\";';"); + + String a = createAggregate(KEYSPACE, + "double", + "CREATE OR REPLACE AGGREGATE %s(double) " + + "SFUNC " + shortFunctionName(f) + " " + + "STYPE double"); + + assertLastSchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE, + KEYSPACE, parseFunctionName(a).name, + "double"); + + schemaChange("CREATE OR REPLACE AGGREGATE " + a + "(double) " + + "SFUNC " + shortFunctionName(f) + " " + + "STYPE double"); + + assertLastSchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE, + KEYSPACE, parseFunctionName(a).name, + "double"); + + createAggregateOverload(a, + "int", + "CREATE OR REPLACE AGGREGATE %s(int) " + + "SFUNC " + shortFunctionName(f) + " " + + "STYPE int"); + + assertLastSchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE, + KEYSPACE, parseFunctionName(a).name, + "int"); + + schemaChange("DROP AGGREGATE " + a + "(double)"); + + assertLastSchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE, + KEYSPACE, parseFunctionName(a).name, + "double"); + } + + @Test + public void testDropStatements() throws Throwable + { + String f = createFunction(KEYSPACE, "double, double", - "CREATE OR REPLACE FUNCTION %s(state int, val int) " + - "RETURNS int " + + "CREATE OR REPLACE FUNCTION %s(state double, val double) " + + "RETURNS double " + "LANGUAGE javascript " + "AS '\"string\";';"); + createFunctionOverload(f, + "double, double", + "CREATE OR REPLACE FUNCTION %s(state int, val int) " + + "RETURNS int " + + "LANGUAGE javascript " + + "AS '\"string\";';"); + // DROP AGGREGATE must not succeed against a scalar assertInvalid("DROP AGGREGATE " + f); assertInvalid("DROP AGGREGATE " + f + "(double, double)"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 36fe957..5611ac6 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -42,21 +42,25 @@ import org.slf4j.LoggerFactory; import com.datastax.driver.core.*; import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.functions.FunctionName; -import org.apache.cassandra.db.Directories; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.cql3.statements.ParsedStatement; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.marshal.TupleType; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.serializers.TypeSerializer; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.transport.Event; import org.apache.cassandra.transport.Server; +import org.apache.cassandra.transport.messages.ResultMessage; /** * Base class for CQL tests. @@ -94,6 +98,8 @@ public abstract class CQLTester } } + public static ResultMessage lastSchemaChangeResult; + private List<String> tables = new ArrayList<>(); private List<String> types = new ArrayList<>(); private List<String> functions = new ArrayList<>(); @@ -327,7 +333,7 @@ public abstract class CQLTester String fullQuery = String.format(query, functionName); functions.add(functionName + '(' + argTypes + ')'); logger.info(fullQuery); - execute(fullQuery); + schemaChange(fullQuery); } protected String createAggregate(String keyspace, String argTypes, String query) throws Throwable @@ -342,7 +348,7 @@ public abstract class CQLTester String fullQuery = String.format(query, aggregateName); aggregates.add(aggregateName + '(' + argTypes + ')'); logger.info(fullQuery); - execute(fullQuery); + schemaChange(fullQuery); } protected void createTable(String query) @@ -426,12 +432,33 @@ public abstract class CQLTester schemaChange(fullQuery); } - private static void schemaChange(String query) + protected void assertLastSchemaChange(Event.SchemaChange.Change change, Event.SchemaChange.Target target, + String keyspace, String name, + String... argTypes) + { + Assert.assertTrue(lastSchemaChangeResult instanceof ResultMessage.SchemaChange); + ResultMessage.SchemaChange schemaChange = (ResultMessage.SchemaChange) lastSchemaChangeResult; + Assert.assertSame(change, schemaChange.change.change); + Assert.assertSame(target, schemaChange.change.target); + Assert.assertEquals(keyspace, schemaChange.change.keyspace); + Assert.assertEquals(name, schemaChange.change.name); + Assert.assertEquals(argTypes != null ? Arrays.asList(argTypes) : null, schemaChange.change.argTypes); + } + + protected static void schemaChange(String query) { try { - // executeOnceInternal don't work for schema changes - QueryProcessor.executeOnceInternal(query); + ClientState state = ClientState.forInternalCalls(); + state.setKeyspace(SystemKeyspace.NAME); + QueryState queryState = new QueryState(state); + + ParsedStatement.Prepared prepared = QueryProcessor.parseStatement(query, queryState); + prepared.statement.validate(state); + + QueryOptions options = QueryOptions.forInternalCalls(Collections.<ByteBuffer>emptyList()); + + lastSchemaChangeResult = prepared.statement.executeInternal(queryState, options); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/test/unit/org/apache/cassandra/cql3/UFTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/UFTest.java b/test/unit/org/apache/cassandra/cql3/UFTest.java index fa28126..ea1b2da 100644 --- a/test/unit/org/apache/cassandra/cql3/UFTest.java +++ b/test/unit/org/apache/cassandra/cql3/UFTest.java @@ -29,6 +29,7 @@ import org.apache.cassandra.cql3.functions.FunctionName; import org.apache.cassandra.cql3.functions.Functions; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.Event; import org.apache.cassandra.transport.Server; import org.apache.cassandra.transport.messages.ResultMessage; @@ -36,6 +37,47 @@ public class UFTest extends CQLTester { @Test + public void testSchemaChange() throws Throwable + { + String f = createFunction(KEYSPACE, + "double, double", + "CREATE OR REPLACE FUNCTION %s(state double, val double) " + + "RETURNS double " + + "LANGUAGE javascript " + + "AS '\"string\";';"); + + assertLastSchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION, + KEYSPACE, parseFunctionName(f).name, + "double", "double"); + + createFunctionOverload(f, + "double, double", + "CREATE OR REPLACE FUNCTION %s(state int, val int) " + + "RETURNS int " + + "LANGUAGE javascript " + + "AS '\"string\";';"); + + assertLastSchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION, + KEYSPACE, parseFunctionName(f).name, + "int", "int"); + + schemaChange("CREATE OR REPLACE FUNCTION " + f + "(state int, val int) " + + "RETURNS int " + + "LANGUAGE javascript " + + "AS '\"string\";';"); + + assertLastSchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION, + KEYSPACE, parseFunctionName(f).name, + "int", "int"); + + schemaChange("DROP FUNCTION " + f + "(double, double)"); + + assertLastSchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION, + KEYSPACE, parseFunctionName(f).name, + "double", "double"); + } + + @Test public void testFunctionDropOnKeyspaceDrop() throws Throwable { String fSin = createFunction(KEYSPACE_PER_TEST, "double", @@ -245,7 +287,7 @@ public class UFTest extends CQLTester // single-int-overload must still work assertRows(execute("SELECT v FROM %s WHERE k = " + fOverload + "((int)?)", 3), row(1)); // overloaded has just one overload now - so the following DROP FUNCTION is not ambigious (CASSANDRA-7812) - execute("DROP FUNCTION " + fOverload + ""); + execute("DROP FUNCTION " + fOverload); } @Test @@ -360,7 +402,7 @@ public class UFTest extends CQLTester createTable("CREATE TABLE %s (key int primary key, val bigint)"); String fName = createFunction(KEYSPACE, "double", - "CREATE OR REPLACE FUNCTION " + KEYSPACE + ".jft(val double)" + + "CREATE OR REPLACE FUNCTION %s(val double)" + "RETURNS double LANGUAGE JAVA " + "AS 'return val;';"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc3bb05/test/unit/org/apache/cassandra/transport/SerDeserTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java index 649f7a2..39bd58b 100644 --- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java +++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java @@ -94,6 +94,7 @@ public class SerDeserTest { eventSerDeserTest(2); eventSerDeserTest(3); + eventSerDeserTest(4); } public void eventSerDeserTest(int version) throws Exception @@ -122,6 +123,19 @@ public class SerDeserTest events.add(new SchemaChange(SchemaChange.Change.DROPPED, SchemaChange.Target.TYPE, "ks", "type")); } + if (version >= 4) + { + List<String> moreTypes = Arrays.asList("text", "bigint"); + + events.add(new SchemaChange(SchemaChange.Change.CREATED, SchemaChange.Target.FUNCTION, "ks", "func", Collections.<String>emptyList())); + events.add(new SchemaChange(SchemaChange.Change.UPDATED, SchemaChange.Target.FUNCTION, "ks", "func", moreTypes)); + events.add(new SchemaChange(SchemaChange.Change.DROPPED, SchemaChange.Target.FUNCTION, "ks", "func", moreTypes)); + + events.add(new SchemaChange(SchemaChange.Change.CREATED, SchemaChange.Target.AGGREGATE, "ks", "aggr", Collections.<String>emptyList())); + events.add(new SchemaChange(SchemaChange.Change.UPDATED, SchemaChange.Target.AGGREGATE, "ks", "aggr", moreTypes)); + events.add(new SchemaChange(SchemaChange.Change.DROPPED, SchemaChange.Target.AGGREGATE, "ks", "aggr", moreTypes)); + } + for (Event ev : events) { ByteBuf buf = Unpooled.buffer(ev.serializedSize(version));