Repository: cassandra Updated Branches: refs/heads/trunk df147cc09 -> 922dbdb65
Add result set metadata to prepared statement MD5 hash calculation Patch by Alex Petrov; reviewed by Robert Stupp for CASSANDRA-10786 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/922dbdb6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/922dbdb6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/922dbdb6 Branch: refs/heads/trunk Commit: 922dbdb658b1693973926026b213153d05b4077c Parents: df147cc Author: Alex Petrov <oleksandr.pet...@gmail.com> Authored: Fri May 13 14:34:03 2016 +0200 Committer: Alex Petrov <oleksandr.pet...@gmail.com> Committed: Wed Oct 11 16:15:29 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 12 +- doc/native_protocol_v5.spec | 27 +- lib/cassandra-driver-core-3.0.1-shaded.jar | Bin 2445093 -> 0 bytes ...e-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar | Bin 0 -> 2613656 bytes lib/cassandra-driver-internal-only-3.10.zip | Bin 256997 -> 0 bytes lib/cassandra-driver-internal-only-3.11.zip | Bin 0 -> 264882 bytes .../apache/cassandra/cql3/QueryProcessor.java | 13 +- .../org/apache/cassandra/cql3/ResultSet.java | 122 ++++++- .../cql3/selection/SelectionColumnMapping.java | 4 +- .../statements/ListPermissionsStatement.java | 3 +- .../cql3/statements/ListRolesStatement.java | 3 +- .../cql3/statements/ListUsersStatement.java | 4 +- .../cql3/statements/ModificationStatement.java | 16 +- .../cql3/statements/ParsedStatement.java | 8 +- .../org/apache/cassandra/transport/Client.java | 6 +- .../cassandra/transport/SimpleClient.java | 4 +- .../transport/messages/ExecuteMessage.java | 48 ++- .../transport/messages/ResultMessage.java | 39 ++- .../org/apache/cassandra/cql3/CQLTester.java | 20 +- .../cassandra/cql3/PreparedStatementsTest.java | 317 ++++++++++++++++--- .../cassandra/cql3/PstmtPersistenceTest.java | 4 +- .../cql3/validation/entities/JsonTest.java | 10 +- .../validation/operations/SelectLimitTest.java | 5 +- .../cassandra/transport/MessagePayloadTest.java | 6 +- .../operations/predefined/CqlOperation.java | 8 +- 26 files changed, 540 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2454c4f..3f2fe15 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add result set metadata to prepared statement MD5 hash calculation (CASSANDRA-10786) * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941) * Checksum sstable metadata (CASSANDRA-13321) * Expose recent histograms in JmxHistograms (CASSANDRA-13642) http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 4e7f9b1..c657211 100644 --- a/build.xml +++ b/build.xml @@ -298,7 +298,7 @@ <!-- define the remote repositories we use --> <artifact:remoteRepository id="central" url="${artifact.remoteRepository.central}"/> <artifact:remoteRepository id="apache" url="${artifact.remoteRepository.apache}"/> - + <macrodef name="install"> <attribute name="pomFile"/> <attribute name="file"/> @@ -423,6 +423,7 @@ <dependency groupId="io.netty" artifactId="netty-all" version="4.1.14.Final" /> <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" /> <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" /> + <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="3.0.1" classifier="shaded"> <exclusion groupId="io.netty" artifactId="netty-buffer"/> <exclusion groupId="io.netty" artifactId="netty-codec"/> @@ -430,6 +431,7 @@ <exclusion groupId="io.netty" artifactId="netty-transport"/> <exclusion groupId="org.slf4j" artifactId="slf4j-api"/> </dependency> + --> <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj" version="4.4.2" /> <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4.4"> <exclusion groupId="org.slf4j" artifactId="slf4j-api"/> @@ -506,7 +508,9 @@ <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/> <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/> <dependency groupId="org.antlr" artifactId="antlr"/> + <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/> + --> <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/> <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/> <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8"/> @@ -523,7 +527,9 @@ artifactId="cassandra-parent" version="${version}"/> <dependency groupId="junit" artifactId="junit"/> + <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/> + --> <dependency groupId="io.netty" artifactId="netty-all"/> <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/> <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/> @@ -596,13 +602,15 @@ <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/> <!-- don't need the Java Driver to run, but if you use the hadoop stuff or UDFs --> + <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded" optional="true"> <exclusion groupId="io.netty" artifactId="netty-buffer"/> <exclusion groupId="io.netty" artifactId="netty-codec"/> <exclusion groupId="io.netty" artifactId="netty-handler"/> <exclusion groupId="io.netty" artifactId="netty-transport"/> </dependency> - + --> + <!-- don't need jna to run, but nice to have --> <dependency groupId="net.java.dev.jna" artifactId="jna"/> http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/doc/native_protocol_v5.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol_v5.spec b/doc/native_protocol_v5.spec index 13ac208..0addbc4 100644 --- a/doc/native_protocol_v5.spec +++ b/doc/native_protocol_v5.spec @@ -407,12 +407,15 @@ Table of Contents 4.1.6. EXECUTE Executes a prepared query. The body of the message must be: - <id><query_parameters> - where <id> is the prepared query ID. It's the [short bytes] returned as a - response to a PREPARE message. As for <query_parameters>, it has the exact - same definition as in QUERY (see Section 4.1.4). - - The response from the server will be a RESULT message. + <id><result_metadata_id><query_parameters> + where + - <id> is the prepared query ID. It's the [short bytes] returned as a + response to a PREPARE message. As for <query_parameters>, it has the exact + same definition as in QUERY (see Section 4.1.4). + - <result_metadata_id> is the ID of the resultset metadata that was sent + along with response to PREPARE message. If a RESULT/Rows message reports + changed resultset metadata with the Metadata_changed flag, the reported new + resultset metadata must be used in subsequent executions. 4.1.7. BATCH @@ -583,7 +586,7 @@ Table of Contents <metadata><rows_count><rows_content> where: - <metadata> is composed of: - <flags><columns_count>[<paging_state>][<global_table_spec>?<col_spec_1>...<col_spec_n>] + <flags><columns_count>[<new_metadata_id>][<paging_state>][<global_table_spec>?<col_spec_1>...<col_spec_n>] where: - <flags> is an [int]. The bits of <flags> provides information on the formatting of the remaining information. A flag is set if the bit @@ -604,9 +607,16 @@ Table of Contents no other information (so no <global_table_spec> nor <col_spec_i>). This will only ever be the case if this was requested during the query (see QUERY and RESULT messages). + 0x0008 Metadata_changed: if set, the No_metadata flag has to be unset + and <new_metadata_id> has to be supplied. This flag is to be + used to avoid a roundtrip in case of metadata changes for queries + that requested metadata to be skipped. - <columns_count> is an [int] representing the number of columns selected by the query that produced this result. It defines the number of <col_spec_i> elements in and the number of elements for each row in <rows_content>. + - <new_metadata_id> is [short bytes] representing the new, changed resultset + metadata. The new metadata ID must also be used in subsequent executions of + the corresponding prepared statement, if any. - <global_table_spec> is present if the Global_tables_spec is set in <flags>. It is composed of two [string] representing the (unique) keyspace name and table name the columns belong to. @@ -688,9 +698,10 @@ Table of Contents 4.2.5.4. Prepared The result to a PREPARE message. The body of a Prepared result is: - <id><metadata><result_metadata> + <id><result_metadata_id><metadata><result_metadata> where: - <id> is [short bytes] representing the prepared query ID. + - <result_metadata_id> is [short bytes] representing the resultset metadata ID. - <metadata> is composed of: <flags><columns_count><pk_count>[<pk_index_1>...<pk_index_n>][<global_table_spec>?<col_spec_1>...<col_spec_n>] where: http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/lib/cassandra-driver-core-3.0.1-shaded.jar ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-core-3.0.1-shaded.jar b/lib/cassandra-driver-core-3.0.1-shaded.jar deleted file mode 100644 index bc269a0..0000000 Binary files a/lib/cassandra-driver-core-3.0.1-shaded.jar and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar b/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar new file mode 100644 index 0000000..d95a811 Binary files /dev/null and b/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/lib/cassandra-driver-internal-only-3.10.zip ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-internal-only-3.10.zip b/lib/cassandra-driver-internal-only-3.10.zip deleted file mode 100644 index 22b877c..0000000 Binary files a/lib/cassandra-driver-internal-only-3.10.zip and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/lib/cassandra-driver-internal-only-3.11.zip ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-internal-only-3.11.zip b/lib/cassandra-driver-internal-only-3.11.zip new file mode 100644 index 0000000..f7760af Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.11.zip differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index ade98e7..3f0b196 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -28,8 +28,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; +import com.google.common.collect.*; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; @@ -420,7 +419,10 @@ public class QueryProcessor implements QueryHandler checkTrue(queryString.equals(existing.rawCQLStatement), String.format("MD5 hash collision: query with the same MD5 hash was already prepared. \n Existing: '%s'", existing.rawCQLStatement)); - return new ResultMessage.Prepared(statementId, existing); + + ResultSet.PreparedMetadata preparedMetadata = ResultSet.PreparedMetadata.fromPrepared(existing); + ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.fromPrepared(existing); + return new ResultMessage.Prepared(statementId, resultMetadata.getResultMetadataId(), preparedMetadata, resultMetadata); } private static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, ParsedStatement.Prepared prepared) @@ -438,7 +440,9 @@ public class QueryProcessor implements QueryHandler MD5Digest statementId = computeId(queryString, keyspace); preparedStatements.put(statementId, prepared); SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString); - return new ResultMessage.Prepared(statementId, prepared); + ResultSet.PreparedMetadata preparedMetadata = ResultSet.PreparedMetadata.fromPrepared(prepared); + ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.fromPrepared(prepared); + return new ResultMessage.Prepared(statementId, resultMetadata.getResultMetadataId(), preparedMetadata, resultMetadata); } public ResultMessage processPrepared(CQLStatement statement, @@ -464,7 +468,6 @@ public class QueryProcessor implements QueryHandler variables.size())); // at this point there is a match in count between markers and variables that is non-zero - if (logger.isTraceEnabled()) for (int i = 0; i < variables.size(); i++) logger.trace("[{}] '{}'", i+1, variables.get(i)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/ResultSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java index 2bb9997..e4b03ca 100644 --- a/src/java/org/apache/cassandra/cql3/ResultSet.java +++ b/src/java/org/apache/cassandra/cql3/ResultSet.java @@ -18,14 +18,29 @@ package org.apache.cassandra.cql3; import java.nio.ByteBuffer; -import java.util.*; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Objects; import io.netty.buffer.ByteBuf; - -import org.apache.cassandra.transport.*; +import org.apache.cassandra.cql3.statements.ModificationStatement; +import org.apache.cassandra.cql3.statements.ParsedStatement; +import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.service.pager.PagingState; +import org.apache.cassandra.transport.CBCodec; +import org.apache.cassandra.transport.CBUtil; +import org.apache.cassandra.transport.DataType; +import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MD5Digest; public class ResultSet { @@ -34,14 +49,14 @@ public class ResultSet public final ResultMetadata metadata; public final List<List<ByteBuffer>> rows; - public ResultSet(List<ColumnSpecification> metadata) + public ResultSet(ResultMetadata resultMetadata) { - this(new ResultMetadata(metadata), new ArrayList<List<ByteBuffer>>()); + this(resultMetadata, new ArrayList<List<ByteBuffer>>()); } - public ResultSet(ResultMetadata metadata, List<List<ByteBuffer>> rows) + public ResultSet(ResultMetadata resultMetadata, List<List<ByteBuffer>> rows) { - this.metadata = metadata; + this.metadata = resultMetadata; this.rows = rows; } @@ -179,7 +194,7 @@ public class ResultSet { public static final CBCodec<ResultMetadata> codec = new Codec(); - public static final ResultMetadata EMPTY = new ResultMetadata(EnumSet.of(Flag.NO_METADATA), null, 0, null); + public static final ResultMetadata EMPTY = new ResultMetadata(MD5Digest.compute(new byte[0]), EnumSet.of(Flag.NO_METADATA), null, 0, null); private final EnumSet<Flag> flags; // Please note that columnCount can actually be smaller than names, even if names is not null. This is @@ -189,16 +204,27 @@ public class ResultSet public final List<ColumnSpecification> names; private final int columnCount; private PagingState pagingState; + private final MD5Digest resultMetadataId; + + public ResultMetadata(MD5Digest digest, List<ColumnSpecification> names) + { + this(digest, EnumSet.noneOf(Flag.class), names, names.size(), null); + if (!names.isEmpty() && ColumnSpecification.allInSameTable(names)) + flags.add(Flag.GLOBAL_TABLES_SPEC); + } + // Problem is that we compute the metadata from the columns on creation; + // when re-preparing we create the intermediate object public ResultMetadata(List<ColumnSpecification> names) { - this(EnumSet.noneOf(Flag.class), names, names.size(), null); + this(computeResultMetadataId(names), EnumSet.noneOf(Flag.class), names, names.size(), null); if (!names.isEmpty() && ColumnSpecification.allInSameTable(names)) flags.add(Flag.GLOBAL_TABLES_SPEC); } - private ResultMetadata(EnumSet<Flag> flags, List<ColumnSpecification> names, int columnCount, PagingState pagingState) + private ResultMetadata(MD5Digest resultMetadataId, EnumSet<Flag> flags, List<ColumnSpecification> names, int columnCount, PagingState pagingState) { + this.resultMetadataId = resultMetadataId; this.flags = flags; this.names = names; this.columnCount = columnCount; @@ -207,7 +233,7 @@ public class ResultSet public ResultMetadata copy() { - return new ResultMetadata(EnumSet.copyOf(flags), names, columnCount, pagingState); + return new ResultMetadata(resultMetadataId, EnumSet.copyOf(flags), names, columnCount, pagingState); } /** @@ -252,6 +278,26 @@ public class ResultSet flags.add(Flag.NO_METADATA); } + public void setMetadataChanged() + { + flags.add(Flag.METADATA_CHANGED); + } + + public MD5Digest getResultMetadataId() + { + return resultMetadataId; + } + + public static ResultMetadata fromPrepared(ParsedStatement.Prepared prepared) + { + CQLStatement statement = prepared.statement; + + if (statement instanceof SelectStatement) + return ((SelectStatement)statement).getResultMetadata(); + + return ResultSet.ResultMetadata.EMPTY; + } + @Override public boolean equals(Object other) { @@ -308,12 +354,21 @@ public class ResultSet EnumSet<Flag> flags = Flag.deserialize(iflags); + MD5Digest resultMetadataId = null; + if (flags.contains(Flag.METADATA_CHANGED)) + { + assert version.isGreaterOrEqualTo(ProtocolVersion.V5) : "MetadataChanged flag is not supported before native protocol v5"; + assert !flags.contains(Flag.NO_METADATA) : "MetadataChanged and NoMetadata are mutually exclusive flags"; + + resultMetadataId = MD5Digest.wrap(CBUtil.readBytes(body)); + } + PagingState state = null; if (flags.contains(Flag.HAS_MORE_PAGES)) state = PagingState.deserialize(CBUtil.readValueNoCopy(body), version); if (flags.contains(Flag.NO_METADATA)) - return new ResultMetadata(flags, null, columnCount, state); + return new ResultMetadata(null, flags, null, columnCount, state); boolean globalTablesSpec = flags.contains(Flag.GLOBAL_TABLES_SPEC); @@ -335,7 +390,7 @@ public class ResultSet AbstractType type = DataType.toType(DataType.codec.decodeOne(body, version)); names.add(new ColumnSpecification(ksName, cfName, colName, type)); } - return new ResultMetadata(flags, names, names.size(), state); + return new ResultMetadata(resultMetadataId, flags, names, names.size(), state); } public void encode(ResultMetadata m, ByteBuf dest, ProtocolVersion version) @@ -343,7 +398,7 @@ public class ResultSet boolean noMetadata = m.flags.contains(Flag.NO_METADATA); boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC); boolean hasMorePages = m.flags.contains(Flag.HAS_MORE_PAGES); - + boolean metadataChanged = m.flags.contains(Flag.METADATA_CHANGED); assert version.isGreaterThan(ProtocolVersion.V1) || (!hasMorePages && !noMetadata) : "version = " + version + ", flags = " + m.flags; @@ -353,6 +408,12 @@ public class ResultSet if (hasMorePages) CBUtil.writeValue(m.pagingState.serialize(version), dest); + if (version.isGreaterOrEqualTo(ProtocolVersion.V5) && metadataChanged) + { + assert !noMetadata : "MetadataChanged and NoMetadata are mutually exclusive flags"; + CBUtil.writeBytes(m.getResultMetadataId().bytes, dest); + } + if (!noMetadata) { if (globalTablesSpec) @@ -380,11 +441,15 @@ public class ResultSet boolean noMetadata = m.flags.contains(Flag.NO_METADATA); boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC); boolean hasMorePages = m.flags.contains(Flag.HAS_MORE_PAGES); + boolean metadataChanged = m.flags.contains(Flag.METADATA_CHANGED); int size = 8; if (hasMorePages) size += CBUtil.sizeOfValue(m.pagingState.serializedSize(version)); + if (version.isGreaterOrEqualTo(ProtocolVersion.V5) && metadataChanged) + size += CBUtil.sizeOfBytes(m.getResultMetadataId().bytes); + if (!noMetadata) { if (globalTablesSpec) @@ -486,6 +551,11 @@ public class ResultSet return sb.toString(); } + public static PreparedMetadata fromPrepared(ParsedStatement.Prepared prepared) + { + return new PreparedMetadata(prepared.boundNames, prepared.partitionKeyBindIndexes); + } + private static class Codec implements CBCodec<PreparedMetadata> { public PreparedMetadata decode(ByteBuf body, ProtocolVersion version) @@ -603,7 +673,8 @@ public class ResultSet // The order of that enum matters!! GLOBAL_TABLES_SPEC, HAS_MORE_PAGES, - NO_METADATA; + NO_METADATA, + METADATA_CHANGED; public static EnumSet<Flag> deserialize(int flags) { @@ -625,4 +696,23 @@ public class ResultSet return i; } } + + public static MD5Digest computeResultMetadataId(List<ColumnSpecification> columnSpecifications) + { + MessageDigest md = FBUtilities.threadLocalMD5Digest(); + + if (columnSpecifications != null) + { + for (ColumnSpecification cs : columnSpecifications) + { + md.update(cs.name.bytes.duplicate()); + md.update((byte) 0); + md.update(cs.type.toString().getBytes(StandardCharsets.UTF_8)); + md.update((byte) 0); + md.update((byte) 0); + } + } + + return MD5Digest.wrap(md.digest()); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java b/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java index cd04d94..5d3727f 100644 --- a/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java +++ b/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java @@ -37,8 +37,8 @@ import org.apache.cassandra.cql3.ColumnSpecification; */ public class SelectionColumnMapping implements SelectionColumns { - private final ArrayList<ColumnSpecification> columnSpecifications; - private final HashMultimap<ColumnSpecification, ColumnMetadata> columnMappings; + private final List<ColumnSpecification> columnSpecifications; + private final Multimap<ColumnSpecification, ColumnMetadata> columnMappings; private SelectionColumnMapping() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java index be7fb5d..aa2157a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java @@ -118,7 +118,8 @@ public class ListPermissionsStatement extends AuthorizationStatement if (details.isEmpty()) return new ResultMessage.Void(); - ResultSet result = new ResultSet(metadata); + ResultSet.ResultMetadata resultMetadata = new ResultSet.ResultMetadata(metadata); + ResultSet result = new ResultSet(resultMetadata); for (PermissionDetails pd : details) { result.addColumnValue(UTF8Type.instance.decompose(pd.grantee)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java index 0c0822c..7ed460c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java @@ -112,7 +112,8 @@ public class ListRolesStatement extends AuthorizationStatement // overridden in ListUsersStatement to include legacy metadata protected ResultMessage formatResults(List<RoleResource> sortedRoles) { - ResultSet result = new ResultSet(metadata); + ResultSet.ResultMetadata resultMetadata = new ResultSet.ResultMetadata(metadata); + ResultSet result = new ResultSet(resultMetadata); IRoleManager roleManager = DatabaseDescriptor.getRoleManager(); for (RoleResource role : sortedRoles) http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java index 9641333..1347fba 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java @@ -44,7 +44,8 @@ public class ListUsersStatement extends ListRolesStatement @Override protected ResultMessage formatResults(List<RoleResource> sortedRoles) { - ResultSet result = new ResultSet(metadata); + ResultSet.ResultMetadata resultMetadata = new ResultSet.ResultMetadata(metadata); + ResultSet result = new ResultSet(resultMetadata); IRoleManager roleManager = DatabaseDescriptor.getRoleManager(); for (RoleResource role : sortedRoles) @@ -54,6 +55,7 @@ public class ListUsersStatement extends ListRolesStatement result.addColumnValue(UTF8Type.instance.decompose(role.getRoleName())); result.addColumnValue(BooleanType.instance.decompose(Roles.hasSuperuserStatus(role))); } + return new ResultMessage.Rows(result); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index caa24b2..4191285 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -497,6 +497,19 @@ public abstract class ModificationStatement implements CQLStatement conditions.addConditionsTo(request, clustering, options); } + private static ResultSet.ResultMetadata buildCASSuccessMetadata(String ksName, String cfName) + { + List<ColumnSpecification> specs = new ArrayList<>(); + specs.add(casResultColumnSpecification(ksName, cfName)); + + return new ResultSet.ResultMetadata(specs); + } + + private static ColumnSpecification casResultColumnSpecification(String ksName, String cfName) + { + return new ColumnSpecification(ksName, cfName, CAS_RESULT_COLUMN, BooleanType.instance); + } + private ResultSet buildCasResultSet(RowIterator partition, QueryOptions options) throws InvalidRequestException { return buildCasResultSet(keyspace(), columnFamily(), partition, getColumnsWithConditions(), false, options); @@ -507,8 +520,7 @@ public abstract class ModificationStatement implements CQLStatement { boolean success = partition == null; - ColumnSpecification spec = new ColumnSpecification(ksName, tableName, CAS_RESULT_COLUMN, BooleanType.instance); - ResultSet.ResultMetadata metadata = new ResultSet.ResultMetadata(Collections.singletonList(spec)); + ResultSet.ResultMetadata metadata = buildCASSuccessMetadata(ksName, tableName); List<List<ByteBuffer>> rows = Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(success))); ResultSet rs = new ResultSet(metadata, rows); http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java index e617ba7..34bfc3d 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java @@ -17,12 +17,12 @@ */ package org.apache.cassandra.cql3.statements; -import java.util.Collections; -import java.util.List; +import java.util.*; import org.apache.cassandra.cql3.*; import org.apache.cassandra.cql3.functions.Function; import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.utils.*; public abstract class ParsedStatement { @@ -56,8 +56,9 @@ public abstract class ParsedStatement */ public String rawCQLStatement; - public final CQLStatement statement; + public final MD5Digest resultMetadataId; public final List<ColumnSpecification> boundNames; + public final CQLStatement statement; public final short[] partitionKeyBindIndexes; protected Prepared(CQLStatement statement, List<ColumnSpecification> boundNames, short[] partitionKeyBindIndexes) @@ -65,6 +66,7 @@ public abstract class ParsedStatement this.statement = statement; this.boundNames = boundNames; this.partitionKeyBindIndexes = partitionKeyBindIndexes; + this.resultMetadataId = ResultSet.ResultMetadata.fromPrepared(this).getResultMetadataId(); this.rawCQLStatement = ""; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/transport/Client.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java index 7fec473..4793d17 100644 --- a/src/java/org/apache/cassandra/transport/Client.java +++ b/src/java/org/apache/cassandra/transport/Client.java @@ -147,7 +147,9 @@ public class Client extends SimpleClient { try { - byte[] id = Hex.hexToBytes(iter.next()); + byte[] preparedStatementId = Hex.hexToBytes(iter.next()); + byte[] resultMetadataId = Hex.hexToBytes(iter.next()); + List<ByteBuffer> values = new ArrayList<ByteBuffer>(); while(iter.hasNext()) { @@ -164,7 +166,7 @@ public class Client extends SimpleClient } values.add(bb); } - return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, values)); + return new ExecuteMessage(MD5Digest.wrap(preparedStatementId), MD5Digest.wrap(resultMetadataId), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, values)); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index d5148ab..ddd3484 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -184,9 +184,9 @@ public class SimpleClient implements Closeable return (ResultMessage.Prepared)msg; } - public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency) + public ResultMessage executePrepared(ResultMessage.Prepared prepared, List<ByteBuffer> values, ConsistencyLevel consistency) { - Message.Response msg = execute(new ExecuteMessage(MD5Digest.wrap(statementId), QueryOptions.forInternalCalls(consistency, values))); + Message.Response msg = execute(new ExecuteMessage(prepared.statementId, prepared.resultMetadataId, QueryOptions.forInternalCalls(consistency, values))); assert msg instanceof ResultMessage; return (ResultMessage)msg; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index d881e63..a8fd2a0 100644 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@ -26,7 +26,9 @@ import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.cql3.ColumnSpecification; import org.apache.cassandra.cql3.QueryHandler; import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.ResultSet; import org.apache.cassandra.cql3.statements.ParsedStatement; +import org.apache.cassandra.cql3.statements.UpdateStatement; import org.apache.cassandra.exceptions.PreparedQueryNotFoundException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; @@ -42,13 +44,22 @@ public class ExecuteMessage extends Message.Request { public ExecuteMessage decode(ByteBuf body, ProtocolVersion version) { - byte[] id = CBUtil.readBytes(body); - return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.codec.decode(body, version)); + MD5Digest statementId = MD5Digest.wrap(CBUtil.readBytes(body)); + + MD5Digest resultMetadataId = null; + if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) + resultMetadataId = MD5Digest.wrap(CBUtil.readBytes(body)); + + return new ExecuteMessage(statementId, resultMetadataId, QueryOptions.codec.decode(body, version)); } public void encode(ExecuteMessage msg, ByteBuf dest, ProtocolVersion version) { CBUtil.writeBytes(msg.statementId.bytes, dest); + + if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) + CBUtil.writeBytes(msg.resultMetadataId.bytes, dest); + if (version == ProtocolVersion.V1) { CBUtil.writeValueList(msg.options.getValues(), dest); @@ -64,6 +75,10 @@ public class ExecuteMessage extends Message.Request { int size = 0; size += CBUtil.sizeOfBytes(msg.statementId.bytes); + + if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) + size += CBUtil.sizeOfBytes(msg.resultMetadataId.bytes); + if (version == ProtocolVersion.V1) { size += CBUtil.sizeOfValueList(msg.options.getValues()); @@ -78,13 +93,15 @@ public class ExecuteMessage extends Message.Request }; public final MD5Digest statementId; + public final MD5Digest resultMetadataId; public final QueryOptions options; - public ExecuteMessage(MD5Digest statementId, QueryOptions options) + public ExecuteMessage(MD5Digest statementId, MD5Digest resultMetadataId, QueryOptions options) { super(Message.Type.EXECUTE); this.statementId = statementId; this.options = options; + this.resultMetadataId = resultMetadataId; } public Message.Response execute(QueryState state, long queryStartNanoTime) @@ -144,8 +161,29 @@ public class ExecuteMessage extends Message.Request // by wrapping the QueryOptions. QueryOptions queryOptions = QueryOptions.addColumnSpecifications(options, prepared.boundNames); Message.Response response = handler.processPrepared(statement, state, queryOptions, getCustomPayload(), queryStartNanoTime); - if (options.skipMetadata() && response instanceof ResultMessage.Rows) - ((ResultMessage.Rows)response).result.metadata.setSkipMetadata(); + + if (response instanceof ResultMessage.Rows) + { + ResultMessage.Rows rows = (ResultMessage.Rows) response; + + ResultSet.ResultMetadata resultMetadata = rows.result.metadata; + if (options.getProtocolVersion().isGreaterOrEqualTo(ProtocolVersion.V5)) + { + // Starting with V5 we can rely on the result metadata id coming with execute message in order to + // check if there was a change, comparing it with metadata that's about to be returned to client. + if (!resultMetadata.getResultMetadataId().equals(resultMetadataId)) + resultMetadata.setMetadataChanged(); + else if (options.skipMetadata()) + resultMetadata.setSkipMetadata(); + } + else + { + // Pre-V5 code has to rely on the difference between the metadata in the prepared message cache + // and compare it with the metadata to be returned to client. + if (options.skipMetadata() && prepared.resultMetadataId.equals(resultMetadata.getResultMetadataId())) + resultMetadata.setSkipMetadata(); + } + } if (tracingId != null) response.setTracingId(tracingId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/transport/messages/ResultMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java index e1ea948..d8aefbe 100644 --- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java @@ -20,9 +20,9 @@ package org.apache.cassandra.transport.messages; import io.netty.buffer.ByteBuf; +import org.apache.cassandra.cql3.ColumnSpecification; import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.cql3.ResultSet; -import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.transport.*; import org.apache.cassandra.utils.MD5Digest; @@ -51,12 +51,12 @@ public abstract class ResultMessage extends Message.Response public enum Kind { - VOID (1, Void.subcodec), - ROWS (2, Rows.subcodec), - SET_KEYSPACE (3, SetKeyspace.subcodec), - PREPARED (4, Prepared.subcodec), - SCHEMA_CHANGE(5, SchemaChange.subcodec); + VOID (1, Void.subcodec), + ROWS (2, Rows.subcodec), + SET_KEYSPACE (3, SetKeyspace.subcodec), + PREPARED (4, Prepared.subcodec), + SCHEMA_CHANGE (5, SchemaChange.subcodec); public final int id; public final Message.Codec<ResultMessage> subcodec; @@ -216,13 +216,16 @@ public abstract class ResultMessage extends Message.Response public ResultMessage decode(ByteBuf body, ProtocolVersion version) { MD5Digest id = MD5Digest.wrap(CBUtil.readBytes(body)); + MD5Digest resultMetadataId = null; + if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) + resultMetadataId = MD5Digest.wrap(CBUtil.readBytes(body)); ResultSet.PreparedMetadata metadata = ResultSet.PreparedMetadata.codec.decode(body, version); ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.EMPTY; if (version.isGreaterThan(ProtocolVersion.V1)) resultMetadata = ResultSet.ResultMetadata.codec.decode(body, version); - return new Prepared(id, metadata, resultMetadata); + return new Prepared(id, resultMetadataId, metadata, resultMetadata); } public void encode(ResultMessage msg, ByteBuf dest, ProtocolVersion version) @@ -232,6 +235,9 @@ public abstract class ResultMessage extends Message.Response assert prepared.statementId != null; CBUtil.writeBytes(prepared.statementId.bytes, dest); + if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) + CBUtil.writeBytes(prepared.resultMetadataId.bytes, dest); + ResultSet.PreparedMetadata.codec.encode(prepared.metadata, dest, version); if (version.isGreaterThan(ProtocolVersion.V1)) ResultSet.ResultMetadata.codec.encode(prepared.resultMetadata, dest, version); @@ -245,6 +251,8 @@ public abstract class ResultMessage extends Message.Response int size = 0; size += CBUtil.sizeOfBytes(prepared.statementId.bytes); + if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) + size += CBUtil.sizeOfBytes(prepared.resultMetadataId.bytes); size += ResultSet.PreparedMetadata.codec.encodedSize(prepared.metadata, version); if (version.isGreaterThan(ProtocolVersion.V1)) size += ResultSet.ResultMetadata.codec.encodedSize(prepared.resultMetadata, version); @@ -253,6 +261,7 @@ public abstract class ResultMessage extends Message.Response }; public final MD5Digest statementId; + public final MD5Digest resultMetadataId; /** Describes the variables to be bound in the prepared statement */ public final ResultSet.PreparedMetadata metadata; @@ -260,27 +269,15 @@ public abstract class ResultMessage extends Message.Response /** Describes the results of executing this prepared statement */ public final ResultSet.ResultMetadata resultMetadata; - public Prepared(MD5Digest statementId, ParsedStatement.Prepared prepared) - { - this(statementId, new ResultSet.PreparedMetadata(prepared.boundNames, prepared.partitionKeyBindIndexes), extractResultMetadata(prepared.statement)); - } - - private Prepared(MD5Digest statementId, ResultSet.PreparedMetadata metadata, ResultSet.ResultMetadata resultMetadata) + public Prepared(MD5Digest statementId, MD5Digest resultMetadataId, ResultSet.PreparedMetadata metadata, ResultSet.ResultMetadata resultMetadata) { super(Kind.PREPARED); this.statementId = statementId; + this.resultMetadataId = resultMetadataId; this.metadata = metadata; this.resultMetadata = resultMetadata; } - private static ResultSet.ResultMetadata extractResultMetadata(CQLStatement statement) - { - if (!(statement instanceof SelectStatement)) - return ResultSet.ResultMetadata.EMPTY; - - return ((SelectStatement)statement).getResultMetadata(); - } - @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 0a0d757..062a4bc 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -94,7 +94,7 @@ public abstract class CQLTester protected static final int nativePort; protected static final InetAddress nativeAddr; private static final Map<ProtocolVersion, Cluster> clusters = new HashMap<>(); - private static final Map<ProtocolVersion, Session> sessions = new HashMap<>(); + protected static final Map<ProtocolVersion, Session> sessions = new HashMap<>(); private static boolean isServerPrepared = false; @@ -386,12 +386,18 @@ public abstract class CQLTester if (clusters.containsKey(version)) continue; - Cluster cluster = Cluster.builder() - .addContactPoints(nativeAddr) - .withClusterName("Test Cluster") - .withPort(nativePort) - .withProtocolVersion(com.datastax.driver.core.ProtocolVersion.fromInt(version.asInt())) - .build(); + Cluster.Builder builder = Cluster.builder() + .withoutJMXReporting() + .addContactPoints(nativeAddr) + .withClusterName("Test Cluster") + .withPort(nativePort); + + if (version.isBeta()) + builder = builder.allowBetaProtocolVersion(); + else + builder = builder.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.fromInt(version.asInt())); + + Cluster cluster = builder.build(); clusters.put(version, cluster); sessions.put(version, cluster.connect()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java index 385ebb7..f843965 100644 --- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java +++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java @@ -17,63 +17,38 @@ */ package org.apache.cassandra.cql3; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.Session; +import com.datastax.driver.core.*; +import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.exceptions.SyntaxError; -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.schema.Schema; import org.apache.cassandra.index.StubIndex; -import org.apache.cassandra.service.EmbeddedCassandraService; +import org.apache.cassandra.transport.ProtocolVersion; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -public class PreparedStatementsTest extends SchemaLoader +public class PreparedStatementsTest extends CQLTester { - private static Cluster cluster; - private static Session session; - private static final String KEYSPACE = "prepared_stmt_cleanup"; private static final String createKsStatement = "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"; private static final String dropKsStatement = "DROP KEYSPACE IF EXISTS " + KEYSPACE; - @BeforeClass - public static void setup() throws Exception + @Before + public void setup() { - Schema.instance.clear(); - - EmbeddedCassandraService cassandra = new EmbeddedCassandraService(); - cassandra.start(); - - // Currently the native server start method return before the server is fully binded to the socket, so we need - // to wait slightly before trying to connect to it. We should fix this but in the meantime using a sleep. - Thread.sleep(1500); - - cluster = Cluster.builder().addContactPoint("127.0.0.1") - .withPort(DatabaseDescriptor.getNativeTransportPort()) - .build(); - session = cluster.connect(); - - session.execute(dropKsStatement); - session.execute(createKsStatement); - } - - @AfterClass - public static void tearDown() throws Exception - { - cluster.close(); + requireNetwork(); } @Test public void testInvalidatePreparedStatementsOnDrop() { + Session session = sessions.get(ProtocolVersion.V5); + session.execute(dropKsStatement); + session.execute(createKsStatement); + String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (id int PRIMARY KEY, cid int, val text);"; String dropTableStatement = "DROP TABLE IF EXISTS " + KEYSPACE + ".qp_cleanup;"; @@ -101,15 +76,128 @@ public class PreparedStatementsTest extends SchemaLoader } @Test + public void testInvalidatePreparedStatementOnAlterV5() + { + testInvalidatePreparedStatementOnAlter(ProtocolVersion.V5, true); + } + + @Test + public void testInvalidatePreparedStatementOnAlterV4() + { + testInvalidatePreparedStatementOnAlter(ProtocolVersion.V4, false); + } + + private void testInvalidatePreparedStatementOnAlter(ProtocolVersion version, boolean supportsMetadataChange) + { + Session session = sessions.get(version); + String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (a int PRIMARY KEY, b int, c int);"; + String alterTableStatement = "ALTER TABLE " + KEYSPACE + ".qp_cleanup ADD d int;"; + + session.execute(dropKsStatement); + session.execute(createKsStatement); + session.execute(createTableStatement); + + PreparedStatement preparedSelect = session.prepare("SELECT * FROM " + KEYSPACE + ".qp_cleanup"); + session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c) VALUES (?, ?, ?);", + 1, 2, 3); + session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c) VALUES (?, ?, ?);", + 2, 3, 4); + + assertRowsNet(session.execute(preparedSelect.bind()), + row(1, 2, 3), + row(2, 3, 4)); + + session.execute(alterTableStatement); + session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c, d) VALUES (?, ?, ?, ?);", + 3, 4, 5, 6); + + ResultSet rs; + if (supportsMetadataChange) + { + rs = session.execute(preparedSelect.bind()); + assertRowsNet(version, + rs, + row(1, 2, 3, null), + row(2, 3, 4, null), + row(3, 4, 5, 6)); + assertEquals(rs.getColumnDefinitions().size(), 4); + } + else + { + rs = session.execute(preparedSelect.bind()); + assertRowsNet(rs, + row(1, 2, 3), + row(2, 3, 4), + row(3, 4, 5)); + assertEquals(rs.getColumnDefinitions().size(), 3); + } + + session.execute(dropKsStatement); + } + + @Test + public void testInvalidatePreparedStatementOnAlterUnchangedMetadataV4() + { + testInvalidatePreparedStatementOnAlterUnchangedMetadata(ProtocolVersion.V4); + } + + @Test + public void testInvalidatePreparedStatementOnAlterUnchangedMetadataV5() + { + testInvalidatePreparedStatementOnAlterUnchangedMetadata(ProtocolVersion.V5); + } + + private void testInvalidatePreparedStatementOnAlterUnchangedMetadata(ProtocolVersion version) + { + Session session = sessions.get(version); + String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (a int PRIMARY KEY, b int, c int);"; + String alterTableStatement = "ALTER TABLE " + KEYSPACE + ".qp_cleanup ADD d int;"; + + session.execute(dropKsStatement); + session.execute(createKsStatement); + session.execute(createTableStatement); + + PreparedStatement preparedSelect = session.prepare("SELECT a, b, c FROM " + KEYSPACE + ".qp_cleanup"); + session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c) VALUES (?, ?, ?);", + 1, 2, 3); + session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c) VALUES (?, ?, ?);", + 2, 3, 4); + + ResultSet rs = session.execute(preparedSelect.bind()); + + assertRowsNet(rs, + row(1, 2, 3), + row(2, 3, 4)); + assertEquals(rs.getColumnDefinitions().size(), 3); + + session.execute(alterTableStatement); + session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c, d) VALUES (?, ?, ?, ?);", + 3, 4, 5, 6); + + rs = session.execute(preparedSelect.bind()); + assertRowsNet(rs, + row(1, 2, 3), + row(2, 3, 4), + row(3, 4, 5)); + assertEquals(rs.getColumnDefinitions().size(), 3); + + session.execute(dropKsStatement); + } + + @Test public void testStatementRePreparationOnReconnect() { + Session session = sessions.get(ProtocolVersion.V5); + session.execute("USE " + keyspace()); + session.execute(dropKsStatement); session.execute(createKsStatement); - session.execute("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_test (id int PRIMARY KEY, cid int, val text);"); + createTable("CREATE TABLE %s (id int PRIMARY KEY, cid int, val text);"); + - String insertCQL = "INSERT INTO " + KEYSPACE + ".qp_test (id, cid, val) VALUES (?, ?, ?)"; - String selectCQL = "Select * from " + KEYSPACE + ".qp_test where id = ?"; + String insertCQL = "INSERT INTO " + currentTable() + " (id, cid, val) VALUES (?, ?, ?)"; + String selectCQL = "Select * from " + currentTable() + " where id = ?"; PreparedStatement preparedInsert = session.prepare(insertCQL); PreparedStatement preparedSelect = session.prepare(selectCQL); @@ -117,23 +205,31 @@ public class PreparedStatementsTest extends SchemaLoader session.execute(preparedInsert.bind(1, 1, "value")); assertEquals(1, session.execute(preparedSelect.bind(1)).all().size()); - cluster.close(); - - cluster = Cluster.builder().addContactPoint("127.0.0.1") - .withPort(DatabaseDescriptor.getNativeTransportPort()) - .build(); - session = cluster.connect(); - - preparedInsert = session.prepare(insertCQL); - preparedSelect = session.prepare(selectCQL); - session.execute(preparedInsert.bind(1, 1, "value")); + try (Cluster newCluster = Cluster.builder() + .addContactPoints(nativeAddr) + .withClusterName("Test Cluster") + .withPort(nativePort) + .withoutJMXReporting() + .allowBetaProtocolVersion() + .build()) + { + try (Session newSession = newCluster.connect()) + { + newSession.execute("USE " + keyspace()); + preparedInsert = newSession.prepare(insertCQL); + preparedSelect = newSession.prepare(selectCQL); + session.execute(preparedInsert.bind(1, 1, "value")); - assertEquals(1, session.execute(preparedSelect.bind(1)).all().size()); + assertEquals(1, session.execute(preparedSelect.bind(1)).all().size()); + } + } } @Test public void prepareAndExecuteWithCustomExpressions() throws Throwable { + Session session = sessions.get(ProtocolVersion.V5); + session.execute(dropKsStatement); session.execute(createKsStatement); String table = "custom_expr_test"; @@ -163,4 +259,123 @@ public class PreparedStatementsTest extends SchemaLoader assertEquals("Bind variables cannot be used for index names", e.getMessage()); } } + + @Test + public void testPrepareWithLWT() throws Throwable + { + testPrepareWithLWT(ProtocolVersion.V4); + testPrepareWithLWT(ProtocolVersion.V5); + } + + + private void testPrepareWithLWT(ProtocolVersion version) throws Throwable + { + Session session = sessionNet(version); + session.execute("USE " + keyspace()); + createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))"); + + PreparedStatement prepared1 = session.prepare(String.format("UPDATE %s SET v1 = ?, v2 = ? WHERE pk = 1 IF v1 = ?", currentTable())); + PreparedStatement prepared2 = session.prepare(String.format("INSERT INTO %s (pk, v1, v2) VALUES (?, 200, 300) IF NOT EXISTS", currentTable())); + execute("INSERT INTO %s (pk, v1, v2) VALUES (1,1,1)"); + execute("INSERT INTO %s (pk, v1, v2) VALUES (2,2,2)"); + + ResultSet rs; + + rs = session.execute(prepared1.bind(10, 20, 1)); + assertRowsNet(rs, + row(true)); + assertEquals(rs.getColumnDefinitions().size(), 1); + + rs = session.execute(prepared1.bind(100, 200, 1)); + assertRowsNet(rs, + row(false, 10)); + assertEquals(rs.getColumnDefinitions().size(), 2); + + rs = session.execute(prepared1.bind(30, 40, 10)); + assertRowsNet(rs, + row(true)); + assertEquals(rs.getColumnDefinitions().size(), 1); + + // Try executing the same message once again + rs = session.execute(prepared1.bind(100, 200, 1)); + assertRowsNet(rs, + row(false, 30)); + assertEquals(rs.getColumnDefinitions().size(), 2); + + rs = session.execute(prepared2.bind(1)); + assertRowsNet(rs, + row(false, 1, 30, 40)); + assertEquals(rs.getColumnDefinitions().size(), 4); + + alterTable("ALTER TABLE %s ADD v3 int;"); + + rs = session.execute(prepared2.bind(1)); + assertRowsNet(rs, + row(false, 1, 30, 40, null)); + assertEquals(rs.getColumnDefinitions().size(), 5); + + rs = session.execute(prepared2.bind(20)); + assertRowsNet(rs, + row(true)); + assertEquals(rs.getColumnDefinitions().size(), 1); + + rs = session.execute(prepared2.bind(20)); + assertRowsNet(rs, + row(false, 20, 200, 300, null)); + assertEquals(rs.getColumnDefinitions().size(), 5); + } + + @Test + public void testPrepareWithBatchLWT() throws Throwable + { + testPrepareWithBatchLWT(ProtocolVersion.V4); + testPrepareWithBatchLWT(ProtocolVersion.V5); + } + + private void testPrepareWithBatchLWT(ProtocolVersion version) throws Throwable + { + Session session = sessionNet(version); + session.execute("USE " + keyspace()); + createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))"); + + PreparedStatement prepared1 = session.prepare("BEGIN BATCH " + + "UPDATE " + currentTable() + " SET v1 = ? WHERE pk = 1 IF v1 = ?;" + + "UPDATE " + currentTable() + " SET v2 = ? WHERE pk = 1 IF v2 = ?;" + + "APPLY BATCH;"); + PreparedStatement prepared2 = session.prepare("BEGIN BATCH " + + "INSERT INTO " + currentTable() + " (pk, v1, v2) VALUES (1, 200, 300) IF NOT EXISTS;" + + "APPLY BATCH"); + execute("INSERT INTO %s (pk, v1, v2) VALUES (1,1,1)"); + execute("INSERT INTO %s (pk, v1, v2) VALUES (2,2,2)"); + + com.datastax.driver.core.ResultSet rs; + + rs = session.execute(prepared1.bind(10, 1, 20, 1)); + assertRowsNet(rs, + row(true)); + assertEquals(rs.getColumnDefinitions().size(), 1); + + rs = session.execute(prepared1.bind(100, 1, 200, 1)); + assertRowsNet(rs, + row(false, 1, 10, 20)); + assertEquals(rs.getColumnDefinitions().size(), 4); + + // Try executing the same message once again + rs = session.execute(prepared1.bind(100, 1, 200, 1)); + assertRowsNet(rs, + row(false, 1, 10, 20)); + assertEquals(rs.getColumnDefinitions().size(), 4); + + rs = session.execute(prepared2.bind()); + assertRowsNet(rs, + row(false, 1, 10, 20)); + assertEquals(rs.getColumnDefinitions().size(), 4); + + alterTable("ALTER TABLE %s ADD v3 int;"); + + rs = session.execute(prepared2.bind()); + assertRowsNet(rs, + row(false, 1, 10, 20, null)); + assertEquals(rs.getColumnDefinitions().size(), 5); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java index 228352c..7ca6ab9 100644 --- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java +++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java @@ -78,14 +78,14 @@ public class PstmtPersistenceTest extends CQLTester assertEquals(5, stmtIds.size()); assertEquals(5, QueryProcessor.preparedStatementsCount()); - Assert.assertEquals(5, numberOfStatementsOnDisk()); + assertEquals(5, numberOfStatementsOnDisk()); QueryHandler handler = ClientState.getCQLQueryHandler(); validatePstmts(stmtIds, handler); // clear prepared statements cache QueryProcessor.clearPreparedStatements(true); - Assert.assertEquals(0, QueryProcessor.preparedStatementsCount()); + assertEquals(0, QueryProcessor.preparedStatementsCount()); for (MD5Digest stmtId : stmtIds) Assert.assertNull(handler.getPrepared(stmtId)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java index 2bd95be..c7a41f3 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.cql3.validation.entities; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.Json; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.Duration; @@ -41,10 +42,17 @@ import static org.junit.Assert.fail; public class JsonTest extends CQLTester { + // This method will be ran instead of the CQLTester#setUpClass @BeforeClass - public static void setUp() + public static void setUpClass() { + if (ROW_CACHE_SIZE_IN_MB > 0) + DatabaseDescriptor.setRowCacheSizeInMB(ROW_CACHE_SIZE_IN_MB); + StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance); + + // Once per-JVM is enough + prepareServer(); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java index 68b2e93..5d6ffb1 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java @@ -30,11 +30,14 @@ import org.apache.cassandra.service.StorageService; public class SelectLimitTest extends CQLTester { + // This method will be ran instead of the CQLTester#setUpClass @BeforeClass - public static void setUp() + public static void setUpClass() { StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance); DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance); + + prepareServer(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java index c27593b..817cb06 100644 --- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java +++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java @@ -162,7 +162,7 @@ public class MessagePayloadTest extends CQLTester payloadEquals(reqMap, requestPayload); payloadEquals(respMap, prepareResponse.getCustomPayload()); - ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT); + ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, prepareResponse.resultMetadataId, QueryOptions.DEFAULT); reqMap = Collections.singletonMap("foo", bytes(44)); responsePayload = respMap = Collections.singletonMap("bar", bytes(44)); executeMessage.setCustomPayload(reqMap); @@ -231,7 +231,7 @@ public class MessagePayloadTest extends CQLTester payloadEquals(reqMap, requestPayload); payloadEquals(respMap, prepareResponse.getCustomPayload()); - ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT); + ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, prepareResponse.resultMetadataId, QueryOptions.DEFAULT); reqMap = Collections.singletonMap("foo", bytes(44)); responsePayload = respMap = Collections.singletonMap("bar", bytes(44)); executeMessage.setCustomPayload(reqMap); @@ -315,7 +315,7 @@ public class MessagePayloadTest extends CQLTester prepareMessage.setCustomPayload(null); ResultMessage.Prepared prepareResponse = (ResultMessage.Prepared) client.execute(prepareMessage); - ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT); + ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, prepareResponse.resultMetadataId, QueryOptions.DEFAULT); reqMap = Collections.singletonMap("foo", bytes(44)); responsePayload = Collections.singletonMap("bar", bytes(44)); executeMessage.setCustomPayload(reqMap); http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java index c524107..c89a1d1 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java @@ -281,11 +281,11 @@ public abstract class CqlOperation<V> extends PredefinedOperation } @Override - public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) + public <V> V execute(Object preparedStatement, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) { return handler.javaDriverHandler().apply( client.executePrepared( - (PreparedStatement) preparedStatementId, + (PreparedStatement) preparedStatement, queryParams, settings.command.consistencyLevel)); } @@ -313,11 +313,11 @@ public abstract class CqlOperation<V> extends PredefinedOperation } @Override - public <V> V execute(Object preparedStatementId, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) + public <V> V execute(Object preparedStatement, ByteBuffer key, List<Object> queryParams, ResultHandler<V> handler) { return handler.simpleClientHandler().apply( client.executePrepared( - (byte[]) preparedStatementId, + (ResultMessage.Prepared) preparedStatement, toByteBufferParams(queryParams), settings.command.consistencyLevel)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org