Support for UDTs, tuples, and collections in UDFs Patch by Robert Stupp; reviewed by Tyler Hobbs for CASSANDRA-7563
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/794d68b5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/794d68b5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/794d68b5 Branch: refs/heads/trunk Commit: 794d68b51b77c2a3cb09374010b6f84231ead604 Parents: e131213 Author: Robert Stupp <sn...@snazy.de> Authored: Wed Nov 26 17:49:45 2014 -0600 Committer: Tyler Hobbs <ty...@datastax.com> Committed: Wed Nov 26 17:49:45 2014 -0600 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/cassandra/cql3/CQL3Type.java | 12 +- .../cql3/functions/BytesConversionFcts.java | 8 +- .../cassandra/cql3/functions/FunctionCall.java | 9 +- .../cassandra/cql3/functions/FunctionName.java | 1 - .../cassandra/cql3/functions/Functions.java | 50 +- .../cql3/functions/JavaSourceUDFFactory.java | 51 +- .../cql3/functions/ScalarFunction.java | 3 +- .../cql3/functions/ScriptBasedUDF.java | 13 +- .../cassandra/cql3/functions/TimeuuidFcts.java | 10 +- .../cassandra/cql3/functions/TokenFct.java | 2 +- .../cassandra/cql3/functions/UDFunction.java | 177 ++- .../cassandra/cql3/functions/UuidFcts.java | 2 +- .../selection/AggregateFunctionSelector.java | 8 +- .../cassandra/cql3/selection/FieldSelector.java | 8 +- .../cql3/selection/ScalarFunctionSelector.java | 10 +- .../cassandra/cql3/selection/Selection.java | 30 +- .../cassandra/cql3/selection/Selector.java | 6 +- .../cql3/selection/SimpleSelector.java | 5 +- .../cql3/selection/WritetimeOrTTLSelector.java | 4 +- .../statements/CreateFunctionStatement.java | 23 +- .../cql3/statements/DropFunctionStatement.java | 10 +- .../cql3/statements/DropTypeStatement.java | 11 + .../cql3/statements/ModificationStatement.java | 2 +- .../cql3/statements/SelectStatement.java | 8 +- .../cassandra/hadoop/cql3/CqlRecordReader.java | 2 +- .../org/apache/cassandra/transport/Server.java | 1 + .../org/apache/cassandra/cql3/CQLTester.java | 232 ++- test/unit/org/apache/cassandra/cql3/UFTest.java | 1356 ++++++++++++++---- tools/lib/cassandra-driver-core-2.0.5.jar | Bin 544552 -> 0 bytes 30 files changed, 1643 insertions(+), 413 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 162d579..55c86dd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 3.0 + * Support UDTs, tuples, and collections in user-defined + functions (CASSANDRA-7563) * Fix aggregate fn results on empty selection, result column name, and cqlsh parsing (CASSANDRA-8229) * Mark sstables as repaired after full repair (CASSANDRA-7586) http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/CQL3Type.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/CQL3Type.java b/src/java/org/apache/cassandra/cql3/CQL3Type.java index b656de8..98d1b15 100644 --- a/src/java/org/apache/cassandra/cql3/CQL3Type.java +++ b/src/java/org/apache/cassandra/cql3/CQL3Type.java @@ -315,6 +315,11 @@ public interface CQL3Type return false; } + public String keyspace() + { + return null; + } + public void freeze() throws InvalidRequestException { String message = String.format("frozen<> is only allowed on collections, tuples, and user-defined types (got %s)", this); @@ -474,6 +479,11 @@ public interface CQL3Type this.name = name; } + public String keyspace() + { + return name.getKeyspace(); + } + public void freeze() { frozen = true; @@ -485,7 +495,7 @@ public interface CQL3Type { // The provided keyspace is the one of the current statement this is part of. If it's different from the keyspace of // the UTName, we reject since we want to limit user types to their own keyspace (see #6643) - if (!keyspace.equals(name.getKeyspace())) + if (keyspace != null && !keyspace.equals(name.getKeyspace())) throw new InvalidRequestException(String.format("Statement on keyspace %s cannot refer to a user type in keyspace %s; " + "user types can only be used in the keyspace they are defined in", keyspace, name.getKeyspace())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java b/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java index 1cd1d69..ddb33fc 100644 --- a/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java +++ b/src/java/org/apache/cassandra/cql3/functions/BytesConversionFcts.java @@ -36,7 +36,7 @@ public abstract class BytesConversionFcts String name = fromType.asCQL3Type() + "asblob"; return new NativeScalarFunction(name, BytesType.instance, fromType) { - public ByteBuffer execute(List<ByteBuffer> parameters) + public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) { return parameters.get(0); } @@ -48,7 +48,7 @@ public abstract class BytesConversionFcts final String name = "blobas" + toType.asCQL3Type(); return new NativeScalarFunction(name, toType, BytesType.instance) { - public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException + public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException { ByteBuffer val = parameters.get(0); try @@ -68,7 +68,7 @@ public abstract class BytesConversionFcts public static final Function VarcharAsBlobFct = new NativeScalarFunction("varcharasblob", BytesType.instance, UTF8Type.instance) { - public ByteBuffer execute(List<ByteBuffer> parameters) + public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) { return parameters.get(0); } @@ -76,7 +76,7 @@ public abstract class BytesConversionFcts public static final Function BlobAsVarcharFact = new NativeScalarFunction("blobasvarchar", UTF8Type.instance, BytesType.instance) { - public ByteBuffer execute(List<ByteBuffer> parameters) + public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) { return parameters.get(0); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java index efaa12a..01443d2 100644 --- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java +++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java @@ -27,6 +27,7 @@ import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.marshal.MapType; import org.apache.cassandra.db.marshal.SetType; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.transport.Server; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.serializers.MarshalException; @@ -69,12 +70,12 @@ public class FunctionCall extends Term.NonTerminal throw new InvalidRequestException(String.format("Invalid null value for argument to %s", fun)); buffers.add(val); } - return executeInternal(fun, buffers); + return executeInternal(options.getProtocolVersion(), fun, buffers); } - private static ByteBuffer executeInternal(ScalarFunction fun, List<ByteBuffer> params) throws InvalidRequestException + private static ByteBuffer executeInternal(int protocolVersion, ScalarFunction fun, List<ByteBuffer> params) throws InvalidRequestException { - ByteBuffer result = fun.execute(params); + ByteBuffer result = fun.execute(protocolVersion, params); try { // Check the method didn't lied on it's declared return type @@ -172,7 +173,7 @@ public class FunctionCall extends Term.NonTerminal buffers.add(((Term.Terminal)t).get(QueryOptions.DEFAULT)); } - return executeInternal(fun, buffers); + return executeInternal(Server.CURRENT_VERSION, fun, buffers); } public AssignmentTestable.TestResult testAssignment(String keyspace, ColumnSpecification receiver) http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/FunctionName.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java index 460e7a6..bb30040 100644 --- a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java +++ b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java @@ -19,7 +19,6 @@ package org.apache.cassandra.cql3.functions; import com.google.common.base.Objects; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; public final class FunctionName http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/Functions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java index 7021475..a8fdf0f 100644 --- a/src/java/org/apache/cassandra/cql3/functions/Functions.java +++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java @@ -18,6 +18,7 @@ package org.apache.cassandra.cql3.functions; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import com.google.common.collect.ArrayListMultimap; @@ -29,6 +30,8 @@ import org.apache.cassandra.cql3.*; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.service.IMigrationListener; +import org.apache.cassandra.service.MigrationManager; public abstract class Functions { @@ -83,6 +86,8 @@ public abstract class Functions declare(AggregateFcts.avgFunctionForDouble); declare(AggregateFcts.avgFunctionForVarint); declare(AggregateFcts.avgFunctionForDecimal); + + MigrationManager.instance.register(new FunctionsMigrationListener()); } private static void declare(Function fun) @@ -188,7 +193,7 @@ public abstract class Functions assert name.hasKeyspace() : "function name not fully qualified"; for (Function f : declared.get(name)) { - if (f.argTypes().equals(argTypes)) + if (typeEquals(f.argTypes(), argTypes)) return f; } return null; @@ -284,4 +289,47 @@ public abstract class Functions removeFunction(fun.name(), fun.argTypes()); addFunction(fun); } + + public static Collection<Function> all() + { + return declared.values(); + } + + public static boolean typeEquals(AbstractType<?> t1, AbstractType<?> t2) + { + return t1.asCQL3Type().toString().equals(t2.asCQL3Type().toString()); + } + + public static boolean typeEquals(List<AbstractType<?>> t1, List<AbstractType<?>> t2) + { + if (t1.size() != t2.size()) + return false; + for (int i = 0; i < t1.size(); i ++) + if (!typeEquals(t1.get(i), t2.get(i))) + return false; + return true; + } + + private static class FunctionsMigrationListener implements IMigrationListener + { + public void onCreateKeyspace(String ksName) { } + public void onCreateColumnFamily(String ksName, String cfName) { } + public void onCreateUserType(String ksName, String typeName) { } + public void onCreateFunction(String ksName, String functionName) { } + + public void onUpdateKeyspace(String ksName) { } + public void onUpdateColumnFamily(String ksName, String cfName) { } + public void onUpdateUserType(String ksName, String typeName) { + for (Function function : all()) + if (function instanceof UDFunction) + ((UDFunction)function).userTypeUpdated(ksName, typeName); + } + public void onUpdateFunction(String ksName, String functionName) { } + + public void onDropKeyspace(String ksName) { } + public void onDropColumnFamily(String ksName, String cfName) { } + public void onDropUserType(String ksName, String typeName) { } + public void onDropFunction(String ksName, String functionName) { } + + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java index 0f5fe48..560f077 100644 --- a/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java +++ b/src/java/org/apache/cassandra/cql3/functions/JavaSourceUDFFactory.java @@ -20,13 +20,13 @@ package org.apache.cassandra.cql3.functions; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Modifier; -import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datastax.driver.core.DataType; import javassist.CannotCompileException; import javassist.ClassPool; import javassist.CtClass; @@ -56,8 +56,14 @@ public final class JavaSourceUDFFactory boolean deterministic) throws InvalidRequestException { - Class<?> javaReturnType = UDFunction.javaType(returnType); - Class<?>[] javaParamTypes = UDFunction.javaParamTypes(argTypes); + // argDataTypes is just the C* internal argTypes converted to the Java Driver DataType + DataType[] argDataTypes = UDFunction.driverTypes(argTypes); + // returnDataType is just the C* internal returnType converted to the Java Driver DataType + DataType returnDataType = UDFunction.driverType(returnType); + // javaParamTypes is just the Java representation for argTypes resp. argDataTypes + Class<?>[] javaParamTypes = UDFunction.javaTypes(argDataTypes); + // javaReturnType is just the Java representation for returnType resp. returnDataType + Class<?> javaReturnType = returnDataType.asJavaClass(); String clsName = generateClassName(name); @@ -92,9 +98,13 @@ public final class JavaSourceUDFFactory Constructor ctor = cc.toClass().getDeclaredConstructor( - FunctionName.class, List.class, List.class, - AbstractType.class, String.class, boolean.class); - return (UDFunction) ctor.newInstance(name, argNames, argTypes, returnType, body, deterministic); + FunctionName.class, List.class, List.class, DataType[].class, + AbstractType.class, DataType.class, + String.class, boolean.class); + return (UDFunction) ctor.newInstance( + name, argNames, argTypes, argDataTypes, + returnType, returnDataType, + body, deterministic); } catch (NotFoundException | CannotCompileException | NoSuchMethodException | LinkageError | InstantiationException | IllegalAccessException e) { @@ -133,10 +143,12 @@ public final class JavaSourceUDFFactory "(org.apache.cassandra.cql3.functions.FunctionName name, " + "java.util.List argNames, " + "java.util.List argTypes, " + + "com.datastax.driver.core.DataType[] argDataTypes, " + "org.apache.cassandra.db.marshal.AbstractType returnType, " + + "com.datastax.driver.core.DataType returnDataType, " + "String body," + "boolean deterministic)\n{" + - " super(name, argNames, argTypes, returnType, \"java\", body, deterministic);\n" + + " super(name, argNames, argTypes, argDataTypes, returnType, returnDataType, \"java\", body, deterministic);\n" + "}"; } @@ -177,15 +189,17 @@ public final class JavaSourceUDFFactory * Generated looks like this: * <code><pre> * - * public java.nio.ByteBuffer execute(java.util.List params) + * public java.nio.ByteBuffer execute(int protocolVersion, java.util.List params) * throws org.apache.cassandra.exceptions.InvalidRequestException * { * try * { * Object result = executeInternal( - * (<cast to JAVA_ARG_TYPE>)org.apache.cassandra.cql3.functions.JavaSourceUDFFactory.compose(argTypes, params, 0) + * (<cast to JAVA_ARG_TYPE>)compose(protocolVersion, 0, (java.nio.ByteBuffer)params.get(0)), + * (<cast to JAVA_ARG_TYPE>)compose(protocolVersion, 1, (java.nio.ByteBuffer)params.get(1)), + * ... * ); - * return result != null ? returnType.decompose(result) : null; + * return decompose(protocolVersion, result); * } * catch (Throwable t) * { @@ -202,7 +216,7 @@ public final class JavaSourceUDFFactory // usual methods are 700-800 chars long (prevent temp object allocations) StringBuilder code = new StringBuilder(1024); // overrides org.apache.cassandra.cql3.functions.Function.execute(java.util.List) - code.append("public java.nio.ByteBuffer execute(java.util.List params)\n" + + code.append("public java.nio.ByteBuffer execute(int protocolVersion, java.util.List params)\n" + "throws org.apache.cassandra.exceptions.InvalidRequestException\n" + "{\n" + " try\n" + @@ -219,13 +233,13 @@ public final class JavaSourceUDFFactory code. // cast to Java type append("\n (").append(paramTypes[i].getName()).append(")"). - // generate object representation of input parameter - append("org.apache.cassandra.cql3.functions.JavaSourceUDFFactory.compose(argTypes, params, ").append(i).append(')'); + // generate object representation of input parameter (call UDFunction.compose) + append("compose(protocolVersion, ").append(i).append(", (java.nio.ByteBuffer)params.get(").append(i).append("))"); } code.append("\n );\n" + - // generate serialized return value (returnType is a field in AbstractFunction class) - " return result != null ? returnType.decompose(result) : null;\n" + + // generate serialized return value (returnType is a field in AbstractFunction class), (call UDFunction.decompose) + " return decompose(protocolVersion, result);\n" + // // error handling ... " }\n" + @@ -242,11 +256,4 @@ public final class JavaSourceUDFFactory return code.toString(); } - // Used by execute() implementations of generated java source UDFs. - public static Object compose(List<AbstractType<?>> argTypes, List<ByteBuffer> parameters, int param) - { - ByteBuffer bb = parameters.get(param); - return bb == null ? null : argTypes.get(param).compose(bb); - } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java b/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java index ba2a374..f00faf7 100644 --- a/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java +++ b/src/java/org/apache/cassandra/cql3/functions/ScalarFunction.java @@ -30,9 +30,10 @@ public interface ScalarFunction extends Function /** * Applies this function to the specified parameter. * + * @param protocolVersion protocol version used for parameters and return value * @param parameters the input parameters * @return the result of applying this function to the parameter * @throws InvalidRequestException if this function cannot not be applied to the parameter */ - public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException; + public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java index 73fc43b..059a612 100644 --- a/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java +++ b/src/java/org/apache/cassandra/cql3/functions/ScriptBasedUDF.java @@ -88,15 +88,11 @@ public class ScriptBasedUDF extends UDFunction } } - public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException + public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException { Object[] params = new Object[argTypes.size()]; for (int i = 0; i < params.length; i++) - { - ByteBuffer bb = parameters.get(i); - if (bb != null) - params[i] = argTypes.get(i).compose(bb); - } + params[i] = compose(protocolVersion, i, parameters.get(i)); try { @@ -108,7 +104,7 @@ public class ScriptBasedUDF extends UDFunction if (result == null) return null; - Class<?> javaReturnType = returnType.getSerializer().getType(); + Class<?> javaReturnType = returnDataType.asJavaClass(); Class<?> resultType = result.getClass(); if (!javaReturnType.isAssignableFrom(resultType)) { @@ -138,8 +134,7 @@ public class ScriptBasedUDF extends UDFunction } } - @SuppressWarnings("unchecked") ByteBuffer r = ((AbstractType) returnType).decompose(result); - return r; + return decompose(protocolVersion, result); } catch (RuntimeException | ScriptException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java b/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java index e481cf5..c1c3490 100644 --- a/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java +++ b/src/java/org/apache/cassandra/cql3/functions/TimeuuidFcts.java @@ -31,7 +31,7 @@ public abstract class TimeuuidFcts { public static final Function nowFct = new NativeScalarFunction("now", TimeUUIDType.instance) { - public ByteBuffer execute(List<ByteBuffer> parameters) + public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) { return ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()); } @@ -45,7 +45,7 @@ public abstract class TimeuuidFcts public static final Function minTimeuuidFct = new NativeScalarFunction("mintimeuuid", TimeUUIDType.instance, TimestampType.instance) { - public ByteBuffer execute(List<ByteBuffer> parameters) + public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) { ByteBuffer bb = parameters.get(0); if (bb == null) @@ -57,7 +57,7 @@ public abstract class TimeuuidFcts public static final Function maxTimeuuidFct = new NativeScalarFunction("maxtimeuuid", TimeUUIDType.instance, TimestampType.instance) { - public ByteBuffer execute(List<ByteBuffer> parameters) + public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) { ByteBuffer bb = parameters.get(0); if (bb == null) @@ -69,7 +69,7 @@ public abstract class TimeuuidFcts public static final Function dateOfFct = new NativeScalarFunction("dateof", TimestampType.instance, TimeUUIDType.instance) { - public ByteBuffer execute(List<ByteBuffer> parameters) + public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) { ByteBuffer bb = parameters.get(0); if (bb == null) @@ -81,7 +81,7 @@ public abstract class TimeuuidFcts public static final Function unixTimestampOfFct = new NativeScalarFunction("unixtimestampof", LongType.instance, TimeUUIDType.instance) { - public ByteBuffer execute(List<ByteBuffer> parameters) + public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) { ByteBuffer bb = parameters.get(0); if (bb == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/TokenFct.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java index ca4d473..9d50a97 100644 --- a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java +++ b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java @@ -50,7 +50,7 @@ public class TokenFct extends NativeScalarFunction return types; } - public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException + public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException { CBuilder builder = cfm.getKeyValidatorAsCType().builder(); for (int i = 0; i < parameters.size(); i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/UDFunction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java index 42418c6..973c70a 100644 --- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java +++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java @@ -17,6 +17,9 @@ */ package org.apache.cassandra.cql3.functions; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; @@ -26,6 +29,11 @@ import com.google.common.base.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.UserType; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.*; import org.apache.cassandra.db.*; import org.apache.cassandra.db.composites.Composite; @@ -33,6 +41,8 @@ import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.service.MigrationManager; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; /** @@ -42,11 +52,83 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct { protected static final Logger logger = LoggerFactory.getLogger(UDFunction.class); + // TODO make these c'tors and methods public in Java-Driver - see https://datastax-oss.atlassian.net/browse/JAVA-502 + static final MethodHandle methodParseOne; + static + { + try + { + Class<?> cls = Class.forName("com.datastax.driver.core.CassandraTypeParser"); + Method m = cls.getDeclaredMethod("parseOne", String.class); + m.setAccessible(true); + methodParseOne = MethodHandles.lookup().unreflect(m); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + /** + * Construct an array containing the Java classes for the given Java Driver {@link com.datastax.driver.core.DataType}s. + * + * @param dataTypes array with UDF argument types + * @return array of same size with UDF arguments + */ + public static Class<?>[] javaTypes(DataType[] dataTypes) + { + Class<?> paramTypes[] = new Class[dataTypes.length]; + for (int i = 0; i < paramTypes.length; i++) + paramTypes[i] = dataTypes[i].asJavaClass(); + return paramTypes; + } + + /** + * Construct an array containing the Java Driver {@link com.datastax.driver.core.DataType}s for the + * C* internal types. + * + * @param abstractTypes list with UDF argument types + * @return array with argument types as {@link com.datastax.driver.core.DataType} + */ + public static DataType[] driverTypes(List<AbstractType<?>> abstractTypes) + { + DataType[] argDataTypes = new DataType[abstractTypes.size()]; + for (int i = 0; i < argDataTypes.length; i++) + argDataTypes[i] = driverType(abstractTypes.get(i)); + return argDataTypes; + } + + /** + * Returns the Java Driver {@link com.datastax.driver.core.DataType} for the C* internal type. + */ + public static DataType driverType(AbstractType abstractType) + { + CQL3Type cqlType = abstractType.asCQL3Type(); + try + { + return (DataType) methodParseOne.invoke(cqlType.getType().toString()); + } + catch (RuntimeException | Error e) + { + // immediately rethrow these... + throw e; + } + catch (Throwable e) + { + throw new RuntimeException("cannot parse driver type " + cqlType.getType().toString(), e); + } + } + + // instance vars + protected final List<ColumnIdentifier> argNames; protected final String language; protected final String body; - private final boolean deterministic; + protected final boolean deterministic; + + protected final DataType[] argDataTypes; + protected final DataType returnDataType; protected UDFunction(FunctionName name, List<ColumnIdentifier> argNames, @@ -56,12 +138,53 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct String body, boolean deterministic) { + this(name, argNames, argTypes, driverTypes(argTypes), returnType, + driverType(returnType), language, body, deterministic); + } + + protected UDFunction(FunctionName name, + List<ColumnIdentifier> argNames, + List<AbstractType<?>> argTypes, + DataType[] argDataTypes, + AbstractType<?> returnType, + DataType returnDataType, + String language, + String body, + boolean deterministic) + { super(name, argTypes, returnType); assert new HashSet<>(argNames).size() == argNames.size() : "duplicate argument names"; this.argNames = argNames; this.language = language; this.body = body; this.deterministic = deterministic; + this.argDataTypes = argDataTypes; + this.returnDataType = returnDataType; + } + + /** + * Used by UDF implementations (both Java code generated by {@link org.apache.cassandra.cql3.functions.JavaSourceUDFFactory} + * and script executor {@link org.apache.cassandra.cql3.functions.ScriptBasedUDF}) to convert the C* + * serialized representation to the Java object representation. + * + * @param protocolVersion the native protocol version used for serialization + * @param argIndex index of the UDF input argument + */ + protected Object compose(int protocolVersion, int argIndex, ByteBuffer value) + { + return value == null ? null : argDataTypes[argIndex].deserialize(value, ProtocolVersion.fromInt(protocolVersion)); + } + + /** + * Used by UDF implementations (both Java code generated by {@link org.apache.cassandra.cql3.functions.JavaSourceUDFFactory} + * and script executor {@link org.apache.cassandra.cql3.functions.ScriptBasedUDF}) to convert the Java + * object representation for the return value to the C* serialized representation. + * + * @param protocolVersion the native protocol version used for serialization + */ + protected ByteBuffer decompose(int protocolVersion, Object value) + { + return value == null ? null : returnDataType.serialize(value, ProtocolVersion.fromInt(protocolVersion)); } public boolean isAggregate() @@ -85,19 +208,6 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct } } - static Class<?>[] javaParamTypes(List<AbstractType<?>> argTypes) - { - Class<?> paramTypes[] = new Class[argTypes.size()]; - for (int i = 0; i < paramTypes.length; i++) - paramTypes[i] = javaType(argTypes.get(i)); - return paramTypes; - } - - static Class<?> javaType(AbstractType<?> type) - { - return type.getSerializer().getType(); - } - /** * It can happen that a function has been declared (is listed in the scheam) but cannot * be loaded (maybe only on some nodes). This is the case for instance if the class defining @@ -117,7 +227,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct { return new UDFunction(name, argNames, argTypes, returnType, language, body, true) { - public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException + public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException { throw new InvalidRequestException(String.format("Function '%s' exists but hasn't been loaded successfully for the following reason: %s. " + "Please see the server log for more details", this, reason.getMessage())); @@ -135,7 +245,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct { MessageDigest digest = FBUtilities.newMessageDigest("SHA-1"); for (AbstractType<?> type : argTypes) - digest.update(type.toString().getBytes(StandardCharsets.UTF_8)); + digest.update(type.asCQL3Type().toString().getBytes(StandardCharsets.UTF_8)); return ByteBuffer.wrap(digest.digest()); } @@ -268,8 +378,8 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct UDFunction that = (UDFunction)o; return Objects.equal(this.name, that.name) && Objects.equal(this.argNames, that.argNames) - && Objects.equal(this.argTypes, that.argTypes) - && Objects.equal(this.returnType, that.returnType) + && Functions.typeEquals(this.argTypes, that.argTypes) + && Functions.typeEquals(this.returnType, that.returnType) && Objects.equal(this.language, that.language) && Objects.equal(this.body, that.body) && Objects.equal(this.deterministic, that.deterministic); @@ -280,4 +390,35 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct { return Objects.hashCode(name, argNames, argTypes, returnType, language, body, deterministic); } + + public void userTypeUpdated(String ksName, String typeName) + { + boolean updated = false; + + for (int i = 0; i < argDataTypes.length; i++) + { + DataType dataType = argDataTypes[i]; + if (dataType instanceof UserType) + { + UserType userType = (UserType) dataType; + if (userType.getKeyspace().equals(ksName) && userType.getTypeName().equals(typeName)) + { + KSMetaData ksm = Schema.instance.getKSMetaData(ksName); + assert ksm != null; + + org.apache.cassandra.db.marshal.UserType ut = ksm.userTypes.getType(ByteBufferUtil.bytes(typeName)); + + DataType newUserType = driverType(ut); + argDataTypes[i] = newUserType; + + argTypes.set(i, ut); + + updated = true; + } + } + } + + if (updated) + MigrationManager.announceNewFunction(this, true); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java b/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java index b3cef85..afb5aae 100644 --- a/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java +++ b/src/java/org/apache/cassandra/cql3/functions/UuidFcts.java @@ -28,7 +28,7 @@ public abstract class UuidFcts { public static final Function uuidFct = new NativeScalarFunction("uuid", UUIDType.instance) { - public ByteBuffer execute(List<ByteBuffer> parameters) + public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) { return UUIDSerializer.instance.serialize(UUID.randomUUID()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java b/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java index 6ea9716..7702796 100644 --- a/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java +++ b/src/java/org/apache/cassandra/cql3/selection/AggregateFunctionSelector.java @@ -34,20 +34,20 @@ final class AggregateFunctionSelector extends AbstractFunctionSelector<Aggregate return true; } - public void addInput(ResultSetBuilder rs) throws InvalidRequestException + public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException { // Aggregation of aggregation is not supported for (int i = 0, m = argSelectors.size(); i < m; i++) { Selector s = argSelectors.get(i); - s.addInput(rs); - args.set(i, s.getOutput()); + s.addInput(protocolVersion, rs); + args.set(i, s.getOutput(protocolVersion)); s.reset(); } this.aggregate.addInput(args); } - public ByteBuffer getOutput() throws InvalidRequestException + public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException { return aggregate.compute(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java b/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java index 7e14486..d695598 100644 --- a/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java +++ b/src/java/org/apache/cassandra/cql3/selection/FieldSelector.java @@ -64,14 +64,14 @@ final class FieldSelector extends Selector return false; } - public void addInput(ResultSetBuilder rs) throws InvalidRequestException + public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException { - selected.addInput(rs); + selected.addInput(protocolVersion, rs); } - public ByteBuffer getOutput() throws InvalidRequestException + public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException { - ByteBuffer value = selected.getOutput(); + ByteBuffer value = selected.getOutput(protocolVersion); if (value == null) return null; ByteBuffer[] buffers = type.split(value); http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java b/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java index 4ceadb9..bb56bb8 100644 --- a/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java +++ b/src/java/org/apache/cassandra/cql3/selection/ScalarFunctionSelector.java @@ -36,12 +36,12 @@ final class ScalarFunctionSelector extends AbstractFunctionSelector<ScalarFuncti return argSelectors.get(0).isAggregate(); } - public void addInput(ResultSetBuilder rs) throws InvalidRequestException + public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException { for (int i = 0, m = argSelectors.size(); i < m; i++) { Selector s = argSelectors.get(i); - s.addInput(rs); + s.addInput(protocolVersion, rs); } } @@ -49,15 +49,15 @@ final class ScalarFunctionSelector extends AbstractFunctionSelector<ScalarFuncti { } - public ByteBuffer getOutput() throws InvalidRequestException + public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException { for (int i = 0, m = argSelectors.size(); i < m; i++) { Selector s = argSelectors.get(i); - args.set(i, s.getOutput()); + args.set(i, s.getOutput(protocolVersion)); s.reset(); } - return fun.execute(args); + return fun.execute(protocolVersion, args); } ScalarFunctionSelector(Function fun, List<Selector> argSelectors) http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/Selection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/selection/Selection.java b/src/java/org/apache/cassandra/cql3/selection/Selection.java index 888d96d..6ad36e9 100644 --- a/src/java/org/apache/cassandra/cql3/selection/Selection.java +++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java @@ -219,33 +219,33 @@ public abstract class Selection return c == null || !c.isLive(now); } - public void newRow() throws InvalidRequestException + public void newRow(int protocolVersion) throws InvalidRequestException { if (current != null) { - selectors.addInputRow(this); + selectors.addInputRow(protocolVersion, this); if (!selectors.isAggregate()) { - resultSet.addRow(selectors.getOutputRow()); + resultSet.addRow(selectors.getOutputRow(protocolVersion)); selectors.reset(); } } current = new ArrayList<ByteBuffer>(columns.size()); } - public ResultSet build() throws InvalidRequestException + public ResultSet build(int protocolVersion) throws InvalidRequestException { if (current != null) { - selectors.addInputRow(this); - resultSet.addRow(selectors.getOutputRow()); + selectors.addInputRow(protocolVersion, this); + resultSet.addRow(selectors.getOutputRow(protocolVersion)); selectors.reset(); current = null; } if (resultSet.isEmpty() && selectors.isAggregate()) { - resultSet.addRow(selectors.getOutputRow()); + resultSet.addRow(selectors.getOutputRow(protocolVersion)); } return resultSet; } @@ -268,9 +268,9 @@ public abstract class Selection * @param rs the <code>ResultSetBuilder</code> * @throws InvalidRequestException */ - public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException; + public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException; - public List<ByteBuffer> getOutputRow() throws InvalidRequestException; + public List<ByteBuffer> getOutputRow(int protocolVersion) throws InvalidRequestException; public void reset(); } @@ -318,12 +318,12 @@ public abstract class Selection current = null; } - public List<ByteBuffer> getOutputRow() + public List<ByteBuffer> getOutputRow(int protocolVersion) { return current; } - public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException + public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException { current = rs.current; } @@ -388,22 +388,22 @@ public abstract class Selection return factories.containsOnlyAggregateFunctions(); } - public List<ByteBuffer> getOutputRow() throws InvalidRequestException + public List<ByteBuffer> getOutputRow(int protocolVersion) throws InvalidRequestException { List<ByteBuffer> outputRow = new ArrayList<>(selectors.size()); for (int i = 0, m = selectors.size(); i < m; i++) { - outputRow.add(selectors.get(i).getOutput()); + outputRow.add(selectors.get(i).getOutput(protocolVersion)); } return outputRow; } - public void addInputRow(ResultSetBuilder rs) throws InvalidRequestException + public void addInputRow(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException { for (int i = 0, m = selectors.size(); i < m; i++) { - selectors.get(i).addInput(rs); + selectors.get(i).addInput(protocolVersion, rs); } } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/Selector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/selection/Selector.java b/src/java/org/apache/cassandra/cql3/selection/Selector.java index f2c729b..0c1933f 100644 --- a/src/java/org/apache/cassandra/cql3/selection/Selector.java +++ b/src/java/org/apache/cassandra/cql3/selection/Selector.java @@ -120,18 +120,20 @@ public abstract class Selector implements AssignmentTestable /** * Add the current value from the specified <code>ResultSetBuilder</code>. * + * @param protocolVersion protocol version used for serialization * @param rs the <code>ResultSetBuilder</code> * @throws InvalidRequestException if a problem occurs while add the input value */ - public abstract void addInput(ResultSetBuilder rs) throws InvalidRequestException; + public abstract void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException; /** * Returns the selector output. * + * @param protocolVersion protocol version used for serialization * @return the selector output * @throws InvalidRequestException if a problem occurs while computing the output value */ - public abstract ByteBuffer getOutput() throws InvalidRequestException; + public abstract ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException; /** * Returns the <code>Selector</code> output type. http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java index a5ff4cd..c2edaed 100644 --- a/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java +++ b/src/java/org/apache/cassandra/cql3/selection/SimpleSelector.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import org.apache.cassandra.cql3.selection.Selection.ResultSetBuilder; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.exceptions.InvalidRequestException; public final class SimpleSelector extends Selector { @@ -54,13 +55,13 @@ public final class SimpleSelector extends Selector } @Override - public void addInput(ResultSetBuilder rs) + public void addInput(int protocolVersion, ResultSetBuilder rs) throws InvalidRequestException { current = rs.current.get(idx); } @Override - public ByteBuffer getOutput() + public ByteBuffer getOutput(int protocolVersion) throws InvalidRequestException { return current; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java index 2494334..a1ecd3d 100644 --- a/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java +++ b/src/java/org/apache/cassandra/cql3/selection/WritetimeOrTTLSelector.java @@ -63,7 +63,7 @@ final class WritetimeOrTTLSelector extends Selector }; } - public void addInput(ResultSetBuilder rs) + public void addInput(int protocolVersion, ResultSetBuilder rs) { if (isWritetime) { @@ -77,7 +77,7 @@ final class WritetimeOrTTLSelector extends Selector } } - public ByteBuffer getOutput() + public ByteBuffer getOutput(int protocolVersion) { return current; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java index c41fb08..8d8c27a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java @@ -50,6 +50,7 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement private final List<ColumnIdentifier> argNames; private final List<CQL3Type.Raw> argRawTypes; private final CQL3Type.Raw rawReturnType; + private String currentKeyspace; public CreateFunctionStatement(FunctionName functionName, String language, @@ -74,8 +75,10 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement public void prepareKeyspace(ClientState state) throws InvalidRequestException { - if (!functionName.hasKeyspace() && state.getRawKeyspace() != null) - functionName = new FunctionName(state.getKeyspace(), functionName.name); + currentKeyspace = state.getRawKeyspace(); + + if (!functionName.hasKeyspace() && currentKeyspace != null) + functionName = new FunctionName(currentKeyspace, functionName.name); if (!functionName.hasKeyspace()) throw new InvalidRequestException("You need to be logged in a keyspace or use a fully qualified function name"); @@ -112,11 +115,9 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement List<AbstractType<?>> argTypes = new ArrayList<>(argRawTypes.size()); for (CQL3Type.Raw rawType : argRawTypes) - // We have no proper keyspace to give, which means that this will break (NPE currently) - // for UDT: #7791 is open to fix this - argTypes.add(rawType.prepare(functionName.keyspace).getType()); + argTypes.add(rawType.prepare(typeKeyspace(rawType)).getType()); - AbstractType<?> returnType = rawReturnType.prepare(null).getType(); + AbstractType<?> returnType = rawReturnType.prepare(typeKeyspace(rawReturnType)).getType(); Function old = Functions.find(functionName, argTypes); if (old != null) @@ -126,7 +127,7 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement if (!orReplace) throw new InvalidRequestException(String.format("Function %s already exists", old)); - if (!old.returnType().isValueCompatibleWith(returnType)) + if (!Functions.typeEquals(old.returnType(), returnType)) throw new InvalidRequestException(String.format("Cannot replace function %s, the new return type %s is not compatible with the return type %s of existing function", functionName, returnType.asCQL3Type(), old.returnType().asCQL3Type())); } @@ -134,4 +135,12 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement MigrationManager.announceNewFunction(UDFunction.create(functionName, argNames, argTypes, returnType, language, body, deterministic), isLocalOnly); return true; } + + private String typeKeyspace(CQL3Type.Raw rawType) + { + String ks = rawType.keyspace(); + if (ks != null) + return ks; + return functionName.keyspace; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java index 4cd9460..0ba3721 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java @@ -103,7 +103,7 @@ public final class DropFunctionStatement extends SchemaAlteringStatement List<AbstractType<?>> argTypes = new ArrayList<>(argRawTypes.size()); for (CQL3Type.Raw rawType : argRawTypes) - argTypes.add(rawType.prepare(functionName.keyspace).getType()); + argTypes.add(rawType.prepare(typeKeyspace(rawType)).getType()); Function old; if (argsPresent) @@ -139,4 +139,12 @@ public final class DropFunctionStatement extends SchemaAlteringStatement MigrationManager.announceFunctionDrop((UDFunction)old, isLocalOnly); return true; } + + private String typeKeyspace(CQL3Type.Raw rawType) + { + String ks = rawType.keyspace(); + if (ks != null) + return ks; + return functionName.keyspace; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java index a3b82a4..ed21957 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java @@ -20,6 +20,8 @@ package org.apache.cassandra.cql3.statements; import org.apache.cassandra.auth.Permission; import org.apache.cassandra.config.*; import org.apache.cassandra.cql3.*; +import org.apache.cassandra.cql3.functions.Function; +import org.apache.cassandra.cql3.functions.Functions; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; @@ -72,6 +74,15 @@ public class DropTypeStatement extends SchemaAlteringStatement // we drop and 2) existing tables referencing the type (maybe in a nested // way). + for (Function function : Functions.all()) + { + if (isUsedBy(function.returnType())) + throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by function %s", name, function)); + for (AbstractType<?> argType : function.argTypes()) + if (isUsedBy(argType)) + throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by function %s", name, function)); + } + for (KSMetaData ksm2 : Schema.instance.getKeyspaceDefinitions()) { for (UserType ut : ksm2.userTypes.getAllTypes().values()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 2607e12..4e39614 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -630,7 +630,7 @@ public abstract class ModificationStatement implements CQLStatement Selection.ResultSetBuilder builder = selection.resultSetBuilder(now); SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, options, now, builder); - return builder.build(); + return builder.build(options.getProtocolVersion()); } public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 2d28b71..3360d40 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -280,7 +280,7 @@ public class SelectStatement implements CQLStatement processColumnFamily(row.key.getKey(), row.cf, options, now, result); } } - return new ResultMessage.Rows(result.build()); + return new ResultMessage.Rows(result.build(options.getProtocolVersion())); } public ResultMessage.Rows processResults(List<Row> rows, QueryOptions options, int limit, long now) throws RequestValidationException @@ -1149,7 +1149,7 @@ public class SelectStatement implements CQLStatement processColumnFamily(row.key.getKey(), row.cf, options, now, result); } - ResultSet cqlRows = result.build(); + ResultSet cqlRows = result.build(options.getProtocolVersion()); orderResults(cqlRows); @@ -1189,7 +1189,7 @@ public class SelectStatement implements CQLStatement CQL3Row staticRow = iter.getStaticRow(); if (staticRow != null && !iter.hasNext() && !usesSecondaryIndexing && hasNoClusteringColumnsRestriction()) { - result.newRow(); + result.newRow(options.getProtocolVersion()); for (ColumnDefinition def : selection.getColumns()) { switch (def.kind) @@ -1212,7 +1212,7 @@ public class SelectStatement implements CQLStatement CQL3Row cql3Row = iter.next(); // Respect requested order - result.newRow(); + result.newRow(options.getProtocolVersion()); // Respect selection order for (ColumnDefinition def : selection.getColumns()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java index 555c1cd..21e30e2 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java @@ -236,7 +236,7 @@ public class CqlRecordReader extends RecordReader<Long, Row> public Long createKey() { - return new Long(0L); + return Long.valueOf(0L); } public Row createValue() http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 15fad88..cc071b1 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -64,6 +64,7 @@ public class Server implements CassandraDaemon.Server private static final Logger logger = LoggerFactory.getLogger(Server.class); private static final boolean enableEpoll = Boolean.valueOf(System.getProperty("cassandra.native.epoll.enabled", "true")); + public static final int VERSION_2 = 2; public static final int VERSION_3 = 3; public static final int CURRENT_VERSION = VERSION_3; http://git-wip-us.apache.org/repos/asf/cassandra/blob/794d68b5/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 9105b9d..68f90bd 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -18,8 +18,11 @@ package org.apache.cassandra.cql3; import java.io.File; +import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; +import java.net.InetAddress; +import java.net.ServerSocket; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.CountDownLatch; @@ -29,23 +32,31 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Objects; import com.google.common.collect.ImmutableSet; +import org.apache.cassandra.utils.ByteBufferUtil; import org.junit.AfterClass; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datastax.driver.core.*; +import com.datastax.driver.core.ResultSet; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.db.marshal.TupleType; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.serializers.TypeSerializer; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.transport.Server; /** * Base class for CQL tests. @@ -55,17 +66,38 @@ public abstract class CQLTester protected static final Logger logger = LoggerFactory.getLogger(CQLTester.class); public static final String KEYSPACE = "cql_test_keyspace"; + public static final String KEYSPACE_PER_TEST = "cql_test_keyspace_alt"; private static final boolean USE_PREPARED_VALUES = Boolean.valueOf(System.getProperty("cassandra.test.use_prepared", "true")); private static final AtomicInteger seqNumber = new AtomicInteger(); + private static org.apache.cassandra.transport.Server server; + private static final int nativePort; + private static final InetAddress nativeAddr; + private static final Cluster cluster[] = new Cluster[Server.CURRENT_VERSION]; + private static final Session session[] = new Session[Server.CURRENT_VERSION]; + static { // Once per-JVM is enough SchemaLoader.prepareServer(); + + nativeAddr = InetAddress.getLoopbackAddress(); + + try + { + ServerSocket serverSocket = new ServerSocket(0); + nativePort = serverSocket.getLocalPort(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } } private String currentTable; private final Set<String> currentTypes = new HashSet<>(); + private final Set<String> currentFunctions = new HashSet<>(); + private final Set<String> currentAggregates = new HashSet<>(); // We don't use USE_PREPARED_VALUES in the code below so some test can foce value preparation (if the result // is not expected to be the same without preparation) @@ -80,11 +112,28 @@ public abstract class CQLTester @AfterClass public static void tearDownClass() { + for (Session sess : session) + if (sess != null) + sess.close(); + for (Cluster cl : cluster) + if (cl != null) + cl.close(); + + if (server != null) + server.stop(); + } + + @Before + public void beforeTest() throws Throwable + { + schemaChange(String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", KEYSPACE_PER_TEST)); } @After public void afterTest() throws Throwable { + dropPerTestKeyspace(); + // Restore standard behavior in case it was changed usePrepared = USE_PREPARED_VALUES; @@ -93,8 +142,12 @@ public abstract class CQLTester final String tableToDrop = currentTable; final Set<String> typesToDrop = currentTypes.isEmpty() ? Collections.emptySet() : new HashSet(currentTypes); + final Set<String> functionsToDrop = currentFunctions.isEmpty() ? Collections.emptySet() : new HashSet(currentFunctions); + final Set<String> aggregatesToDrop = currentAggregates.isEmpty() ? Collections.emptySet() : new HashSet(currentAggregates); currentTable = null; currentTypes.clear(); + currentFunctions.clear(); + currentAggregates.clear(); // We want to clean up after the test, but dropping a table is rather long so just do that asynchronously ScheduledExecutors.optionalTasks.execute(new Runnable() @@ -105,6 +158,12 @@ public abstract class CQLTester { schemaChange(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, tableToDrop)); + for (String aggregateName : aggregatesToDrop) + schemaChange(String.format("DROP AGGREGATE IF EXISTS %s", aggregateName)); + + for (String functionName : functionsToDrop) + schemaChange(String.format("DROP FUNCTION IF EXISTS %s", functionName)); + for (String typeName : typesToDrop) schemaChange(String.format("DROP TYPE IF EXISTS %s.%s", KEYSPACE, typeName)); @@ -133,6 +192,40 @@ public abstract class CQLTester }); } + // lazy initialization for all tests that require Java Driver + private static void requireNetwork() throws ConfigurationException + { + if (server != null) + return; + + SystemKeyspace.finishStartup(); + StorageService.instance.initServer(); + SchemaLoader.startGossiper(); + + server = new org.apache.cassandra.transport.Server(nativeAddr, nativePort); + server.start(); + + for (int version = 1; version <= Server.CURRENT_VERSION; version++) + { + if (cluster[version-1] != null) + continue; + + cluster[version-1] = Cluster.builder().addContactPoints(nativeAddr) + .withClusterName("Test Cluster") + .withPort(nativePort) + .withProtocolVersion(ProtocolVersion.fromInt(version)) + .build(); + session[version-1] = cluster[version-1].connect(); + + logger.info("Started Java Driver instance for protocol version {}", version); + } + } + + protected void dropPerTestKeyspace() throws Throwable + { + execute(String.format("DROP KEYSPACE IF EXISTS %s", KEYSPACE_PER_TEST)); + } + public void flush() { try @@ -183,7 +276,7 @@ public abstract class CQLTester protected String createType(String query) { - String typeName = "type_" + seqNumber.getAndIncrement(); + String typeName = callerName() + "_type_" + seqNumber.getAndIncrement(); String fullQuery = String.format(query, KEYSPACE + "." + typeName); currentTypes.add(typeName); logger.info(fullQuery); @@ -191,18 +284,48 @@ public abstract class CQLTester return typeName; } + protected String createFunction(String keyspace, String argTypes, String query) throws Throwable + { + String functionName = keyspace + "." + callerName() + "_function_" + seqNumber.getAndIncrement(); + createFunctionOverload(functionName, argTypes, query); + return functionName; + } + + protected void createFunctionOverload(String functionName, String argTypes, String query) throws Throwable + { + String fullQuery = String.format(query, functionName); + currentFunctions.add(functionName + '(' + argTypes + ')'); + logger.info(fullQuery); + execute(fullQuery); + } + + protected String createAggregate(String keyspace, String argTypes, String query) throws Throwable + { + String aggregateName = keyspace + "." + callerName() + "_aggregate_" + seqNumber.getAndIncrement(); + createAggregateOverload(aggregateName, argTypes, query); + return aggregateName; + } + + protected void createAggregateOverload(String aggregateName, String argTypes, String query) throws Throwable + { + String fullQuery = String.format(query, aggregateName); + currentAggregates.add(aggregateName + '(' + argTypes + ')'); + logger.info(fullQuery); + execute(fullQuery); + } + protected void createTable(String query) { - currentTable = "table_" + seqNumber.getAndIncrement(); - String fullQuery = String.format(query, KEYSPACE + "." + currentTable); + currentTable = callerName() + "_table_" + seqNumber.getAndIncrement(); + String fullQuery = formatQuery(query); logger.info(fullQuery); schemaChange(fullQuery); } protected void createTableMayThrow(String query) throws Throwable { - currentTable = "table_" + seqNumber.getAndIncrement(); - String fullQuery = String.format(query, KEYSPACE + "." + currentTable); + currentTable = callerName() + "_table_" + seqNumber.getAndIncrement(); + String fullQuery = formatQuery(query); logger.info(fullQuery); try { @@ -216,14 +339,14 @@ public abstract class CQLTester protected void alterTable(String query) { - String fullQuery = String.format(query, KEYSPACE + "." + currentTable); + String fullQuery = formatQuery(query); logger.info(fullQuery); schemaChange(fullQuery); } protected void alterTableMayThrow(String query) throws Throwable { - String fullQuery = String.format(query, KEYSPACE + "." + currentTable); + String fullQuery = formatQuery(query); logger.info(fullQuery); try { @@ -244,14 +367,14 @@ public abstract class CQLTester protected void createIndex(String query) { - String fullQuery = String.format(query, KEYSPACE + "." + currentTable); + String fullQuery = formatQuery(query); logger.info(fullQuery); schemaChange(fullQuery); } protected void createIndexMayThrow(String query) throws Throwable { - String fullQuery = String.format(query, KEYSPACE + "." + currentTable); + String fullQuery = formatQuery(query); logger.info(fullQuery); try { @@ -270,6 +393,11 @@ public abstract class CQLTester schemaChange(fullQuery); } + private static String callerName() + { + return new Exception().getStackTrace()[2].getMethodName().toLowerCase(); + } + private static void schemaChange(String query) { try @@ -288,11 +416,24 @@ public abstract class CQLTester return Schema.instance.getCFMetaData(KEYSPACE, currentTable); } + protected com.datastax.driver.core.ResultSet executeNet(int protocolVersion, String query, Object... values) throws Throwable + { + requireNetwork(); + + return session[protocolVersion-1].execute(formatQuery(query), values); + } + + private String formatQuery(String query) + { + query = currentTable == null ? query : String.format(query, KEYSPACE + "." + currentTable); + return query; + } + protected UntypedResultSet execute(String query, Object... values) throws Throwable { try { - query = currentTable == null ? query : String.format(query, KEYSPACE + "." + currentTable); + query = formatQuery(query); UntypedResultSet rs; if (usePrepared) @@ -318,6 +459,64 @@ public abstract class CQLTester } } + protected void assertRowsNet(int protocolVersion, ResultSet result, Object[]... rows) + { + if (result == null) + { + if (rows.length > 0) + Assert.fail(String.format("No rows returned by query but %d expected", rows.length)); + return; + } + + ColumnDefinitions meta = result.getColumnDefinitions(); + Iterator<Row> iter = result.iterator(); + int i = 0; + while (iter.hasNext() && i < rows.length) + { + Object[] expected = rows[i]; + Row actual = iter.next(); + + Assert.assertEquals(String.format("Invalid number of (expected) values provided for row %d (using protocol version %d)", + i, protocolVersion), + meta.size(), expected.length); + + for (int j = 0; j < meta.size(); j++) + { + DataType type = meta.getType(j); + ByteBuffer expectedByteValue = type.serialize(expected[j], ProtocolVersion.fromInt(protocolVersion)); + int expectedBytes = expectedByteValue.remaining(); + ByteBuffer actualValue = actual.getBytesUnsafe(meta.getName(j)); + int actualBytes = actualValue.remaining(); + + if (!Objects.equal(expectedByteValue, actualValue)) + Assert.fail(String.format("Invalid value for row %d column %d (%s of type %s), " + + "expected <%s> (%d bytes) but got <%s> (%d bytes) " + + "(using protocol version %d)", + i, j, meta.getName(j), type, + type.format(expected[j]), + expectedBytes, + type.format(type.deserialize(actualValue, ProtocolVersion.fromInt(protocolVersion))), + actualBytes, + protocolVersion)); + } + i++; + } + + if (iter.hasNext()) + { + while (iter.hasNext()) + { + iter.next(); + i++; + } + Assert.fail(String.format("Got less rows than expected. Expected %d but got %d (using protocol version %d).", + rows.length, i, protocolVersion)); + } + + Assert.assertTrue(String.format("Got %s rows than expected. Expected %d but got %d (using protocol version %d)", + rows.length>i ? "less" : "more", rows.length, i, protocolVersion), i == rows.length); + } + protected void assertRows(UntypedResultSet result, Object[]... rows) { if (result == null) @@ -394,7 +593,7 @@ public abstract class CQLTester protected void assertEmpty(UntypedResultSet result) throws Throwable { if (result != null && result.size() != 0) - throw new InvalidRequestException(String.format("Expected empty result but got %d rows", result.size())); + throw new AssertionError(String.format("Expected empty result but got %d rows", result.size())); } protected void assertInvalid(String query, Object... values) throws Throwable @@ -406,7 +605,16 @@ public abstract class CQLTester { try { - execute(query, values); + try + { + execute(query, values); + } + catch (RuntimeException e) + { + Throwable cause = e.getCause(); + if (cause instanceof InvalidRequestException) + throw cause; + } String q = USE_PREPARED_VALUES ? query + " (values: " + formatAllValues(values) + ")" : replaceValues(query, values);