Repository: cassandra
Updated Branches:
  refs/heads/trunk df147cc09 -> 922dbdb65


Add result set metadata to prepared statement MD5 hash calculation

Patch by Alex Petrov; reviewed by Robert Stupp for CASSANDRA-10786

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/922dbdb6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/922dbdb6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/922dbdb6

Branch: refs/heads/trunk
Commit: 922dbdb658b1693973926026b213153d05b4077c
Parents: df147cc
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Authored: Fri May 13 14:34:03 2016 +0200
Committer: Alex Petrov <oleksandr.pet...@gmail.com>
Committed: Wed Oct 11 16:15:29 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 build.xml                                       |  12 +-
 doc/native_protocol_v5.spec                     |  27 +-
 lib/cassandra-driver-core-3.0.1-shaded.jar      | Bin 2445093 -> 0 bytes
 ...e-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar | Bin 0 -> 2613656 bytes
 lib/cassandra-driver-internal-only-3.10.zip     | Bin 256997 -> 0 bytes
 lib/cassandra-driver-internal-only-3.11.zip     | Bin 0 -> 264882 bytes
 .../apache/cassandra/cql3/QueryProcessor.java   |  13 +-
 .../org/apache/cassandra/cql3/ResultSet.java    | 122 ++++++-
 .../cql3/selection/SelectionColumnMapping.java  |   4 +-
 .../statements/ListPermissionsStatement.java    |   3 +-
 .../cql3/statements/ListRolesStatement.java     |   3 +-
 .../cql3/statements/ListUsersStatement.java     |   4 +-
 .../cql3/statements/ModificationStatement.java  |  16 +-
 .../cql3/statements/ParsedStatement.java        |   8 +-
 .../org/apache/cassandra/transport/Client.java  |   6 +-
 .../cassandra/transport/SimpleClient.java       |   4 +-
 .../transport/messages/ExecuteMessage.java      |  48 ++-
 .../transport/messages/ResultMessage.java       |  39 ++-
 .../org/apache/cassandra/cql3/CQLTester.java    |  20 +-
 .../cassandra/cql3/PreparedStatementsTest.java  | 317 ++++++++++++++++---
 .../cassandra/cql3/PstmtPersistenceTest.java    |   4 +-
 .../cql3/validation/entities/JsonTest.java      |  10 +-
 .../validation/operations/SelectLimitTest.java  |   5 +-
 .../cassandra/transport/MessagePayloadTest.java |   6 +-
 .../operations/predefined/CqlOperation.java     |   8 +-
 26 files changed, 540 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2454c4f..3f2fe15 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add result set metadata to prepared statement MD5 hash calculation 
(CASSANDRA-10786)
  * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941)
  * Checksum sstable metadata (CASSANDRA-13321)
  * Expose recent histograms in JmxHistograms (CASSANDRA-13642)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 4e7f9b1..c657211 100644
--- a/build.xml
+++ b/build.xml
@@ -298,7 +298,7 @@
       <!-- define the remote repositories we use -->
       <artifact:remoteRepository id="central"   
url="${artifact.remoteRepository.central}"/>
       <artifact:remoteRepository id="apache"    
url="${artifact.remoteRepository.apache}"/>
-
+      
       <macrodef name="install">
         <attribute name="pomFile"/>
         <attribute name="file"/>
@@ -423,6 +423,7 @@
           <dependency groupId="io.netty" artifactId="netty-all" 
version="4.1.14.Final" />
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" 
version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" 
version="2.5.2" />
+         <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
           <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" version="3.0.1" classifier="shaded">
             <exclusion groupId="io.netty" artifactId="netty-buffer"/>
             <exclusion groupId="io.netty" artifactId="netty-codec"/>
@@ -430,6 +431,7 @@
             <exclusion groupId="io.netty" artifactId="netty-transport"/>
             <exclusion groupId="org.slf4j" artifactId="slf4j-api"/>
           </dependency>
+         -->
           <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj" 
version="4.4.2" />
           <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" 
version="0.4.4">
             <exclusion groupId="org.slf4j" artifactId="slf4j-api"/>
@@ -506,7 +508,9 @@
        <dependency groupId="org.apache.hadoop" 
artifactId="hadoop-minicluster"/>
        <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
         <dependency groupId="org.antlr" artifactId="antlr"/>
+       <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE     
         <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" classifier="shaded"/>
+       -->
         <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
         <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
         <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8"/>
@@ -523,7 +527,9 @@
                 artifactId="cassandra-parent"
                 version="${version}"/>
         <dependency groupId="junit" artifactId="junit"/>
+       <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
         <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" classifier="shaded"/>
+       -->     
         <dependency groupId="io.netty" artifactId="netty-all"/>
         <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
         <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
@@ -596,13 +602,15 @@
         <dependency groupId="org.apache.hadoop" 
artifactId="hadoop-minicluster" optional="true"/>
 
         <!-- don't need the Java Driver to run, but if you use the hadoop 
stuff or UDFs -->
+       <!-- UPDATE AND UNCOMMENT ON THE DRIVER RELEASE, BEFORE 4.0 RELEASE
         <dependency groupId="com.datastax.cassandra" 
artifactId="cassandra-driver-core" classifier="shaded" optional="true">
           <exclusion groupId="io.netty" artifactId="netty-buffer"/>
           <exclusion groupId="io.netty" artifactId="netty-codec"/>
           <exclusion groupId="io.netty" artifactId="netty-handler"/>
           <exclusion groupId="io.netty" artifactId="netty-transport"/>
         </dependency>
-
+       -->
+       
         <!-- don't need jna to run, but nice to have -->
         <dependency groupId="net.java.dev.jna" artifactId="jna"/>
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/doc/native_protocol_v5.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v5.spec b/doc/native_protocol_v5.spec
index 13ac208..0addbc4 100644
--- a/doc/native_protocol_v5.spec
+++ b/doc/native_protocol_v5.spec
@@ -407,12 +407,15 @@ Table of Contents
 4.1.6. EXECUTE
 
   Executes a prepared query. The body of the message must be:
-    <id><query_parameters>
-  where <id> is the prepared query ID. It's the [short bytes] returned as a
-  response to a PREPARE message. As for <query_parameters>, it has the exact
-  same definition as in QUERY (see Section 4.1.4).
-
-  The response from the server will be a RESULT message.
+  <id><result_metadata_id><query_parameters>
+  where
+  - <id> is the prepared query ID. It's the [short bytes] returned as a
+      response to a PREPARE message. As for <query_parameters>, it has the 
exact
+      same definition as in QUERY (see Section 4.1.4).
+    - <result_metadata_id> is the ID of the resultset metadata that was sent
+      along with response to PREPARE message. If a RESULT/Rows message reports
+      changed resultset metadata with the Metadata_changed flag, the reported 
new
+      resultset metadata must be used in subsequent executions.
 
 
 4.1.7. BATCH
@@ -583,7 +586,7 @@ Table of Contents
     <metadata><rows_count><rows_content>
   where:
     - <metadata> is composed of:
-        
<flags><columns_count>[<paging_state>][<global_table_spec>?<col_spec_1>...<col_spec_n>]
+        
<flags><columns_count>[<new_metadata_id>][<paging_state>][<global_table_spec>?<col_spec_1>...<col_spec_n>]
       where:
         - <flags> is an [int]. The bits of <flags> provides information on the
           formatting of the remaining information. A flag is set if the bit
@@ -604,9 +607,16 @@ Table of Contents
                       no other information (so no <global_table_spec> nor 
<col_spec_i>).
                       This will only ever be the case if this was requested
                       during the query (see QUERY and RESULT messages).
+            0x0008    Metadata_changed: if set, the No_metadata flag has to be 
unset
+                      and <new_metadata_id> has to be supplied. This flag is 
to be
+                      used to avoid a roundtrip in case of metadata changes 
for queries
+                      that requested metadata to be skipped.
         - <columns_count> is an [int] representing the number of columns 
selected
           by the query that produced this result. It defines the number of 
<col_spec_i>
           elements in and the number of elements for each row in 
<rows_content>.
+        - <new_metadata_id> is [short bytes] representing the new, changed 
resultset
+           metadata. The new metadata ID must also be used in subsequent 
executions of
+           the corresponding prepared statement, if any.
         - <global_table_spec> is present if the Global_tables_spec is set in
           <flags>. It is composed of two [string] representing the
           (unique) keyspace name and table name the columns belong to.
@@ -688,9 +698,10 @@ Table of Contents
 4.2.5.4. Prepared
 
   The result to a PREPARE message. The body of a Prepared result is:
-    <id><metadata><result_metadata>
+    <id><result_metadata_id><metadata><result_metadata>
   where:
     - <id> is [short bytes] representing the prepared query ID.
+    - <result_metadata_id> is [short bytes] representing the resultset 
metadata ID.
     - <metadata> is composed of:
         
<flags><columns_count><pk_count>[<pk_index_1>...<pk_index_n>][<global_table_spec>?<col_spec_1>...<col_spec_n>]
       where:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/lib/cassandra-driver-core-3.0.1-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-3.0.1-shaded.jar 
b/lib/cassandra-driver-core-3.0.1-shaded.jar
deleted file mode 100644
index bc269a0..0000000
Binary files a/lib/cassandra-driver-core-3.0.1-shaded.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git 
a/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar 
b/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar
new file mode 100644
index 0000000..d95a811
Binary files /dev/null and 
b/lib/cassandra-driver-core-3.4.0-3d1e4f3-java1196-SNAPSHOT-shaded.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/lib/cassandra-driver-internal-only-3.10.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.10.zip 
b/lib/cassandra-driver-internal-only-3.10.zip
deleted file mode 100644
index 22b877c..0000000
Binary files a/lib/cassandra-driver-internal-only-3.10.zip and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/lib/cassandra-driver-internal-only-3.11.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.11.zip 
b/lib/cassandra-driver-internal-only-3.11.zip
new file mode 100644
index 0000000..f7760af
Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.11.zip differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java 
b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ade98e7..3f0b196 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -28,8 +28,7 @@ import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
+import com.google.common.collect.*;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
@@ -420,7 +419,10 @@ public class QueryProcessor implements QueryHandler
 
         checkTrue(queryString.equals(existing.rawCQLStatement),
                 String.format("MD5 hash collision: query with the same MD5 
hash was already prepared. \n Existing: '%s'", existing.rawCQLStatement));
-        return new ResultMessage.Prepared(statementId, existing);
+
+        ResultSet.PreparedMetadata preparedMetadata = 
ResultSet.PreparedMetadata.fromPrepared(existing);
+        ResultSet.ResultMetadata resultMetadata = 
ResultSet.ResultMetadata.fromPrepared(existing);
+        return new ResultMessage.Prepared(statementId, 
resultMetadata.getResultMetadataId(), preparedMetadata, resultMetadata);
     }
 
     private static ResultMessage.Prepared storePreparedStatement(String 
queryString, String keyspace, ParsedStatement.Prepared prepared)
@@ -438,7 +440,9 @@ public class QueryProcessor implements QueryHandler
         MD5Digest statementId = computeId(queryString, keyspace);
         preparedStatements.put(statementId, prepared);
         SystemKeyspace.writePreparedStatement(keyspace, statementId, 
queryString);
-        return new ResultMessage.Prepared(statementId, prepared);
+        ResultSet.PreparedMetadata preparedMetadata = 
ResultSet.PreparedMetadata.fromPrepared(prepared);
+        ResultSet.ResultMetadata resultMetadata = 
ResultSet.ResultMetadata.fromPrepared(prepared);
+        return new ResultMessage.Prepared(statementId, 
resultMetadata.getResultMetadataId(), preparedMetadata, resultMetadata);
     }
 
     public ResultMessage processPrepared(CQLStatement statement,
@@ -464,7 +468,6 @@ public class QueryProcessor implements QueryHandler
                                                                 
variables.size()));
 
             // at this point there is a match in count between markers and 
variables that is non-zero
-
             if (logger.isTraceEnabled())
                 for (int i = 0; i < variables.size(); i++)
                     logger.trace("[{}] '{}'", i+1, variables.get(i));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java 
b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 2bb9997..e4b03ca 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -18,14 +18,29 @@
 package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Objects;
 
 import io.netty.buffer.ByteBuf;
-
-import org.apache.cassandra.transport.*;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.service.pager.PagingState;
+import org.apache.cassandra.transport.CBCodec;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.DataType;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MD5Digest;
 
 public class ResultSet
 {
@@ -34,14 +49,14 @@ public class ResultSet
     public final ResultMetadata metadata;
     public final List<List<ByteBuffer>> rows;
 
-    public ResultSet(List<ColumnSpecification> metadata)
+    public ResultSet(ResultMetadata resultMetadata)
     {
-        this(new ResultMetadata(metadata), new ArrayList<List<ByteBuffer>>());
+        this(resultMetadata, new ArrayList<List<ByteBuffer>>());
     }
 
-    public ResultSet(ResultMetadata metadata, List<List<ByteBuffer>> rows)
+    public ResultSet(ResultMetadata resultMetadata, List<List<ByteBuffer>> 
rows)
     {
-        this.metadata = metadata;
+        this.metadata = resultMetadata;
         this.rows = rows;
     }
 
@@ -179,7 +194,7 @@ public class ResultSet
     {
         public static final CBCodec<ResultMetadata> codec = new Codec();
 
-        public static final ResultMetadata EMPTY = new 
ResultMetadata(EnumSet.of(Flag.NO_METADATA), null, 0, null);
+        public static final ResultMetadata EMPTY = new 
ResultMetadata(MD5Digest.compute(new byte[0]), EnumSet.of(Flag.NO_METADATA), 
null, 0, null);
 
         private final EnumSet<Flag> flags;
         // Please note that columnCount can actually be smaller than names, 
even if names is not null. This is
@@ -189,16 +204,27 @@ public class ResultSet
         public final List<ColumnSpecification> names;
         private final int columnCount;
         private PagingState pagingState;
+        private final MD5Digest resultMetadataId;
+
+        public ResultMetadata(MD5Digest digest, List<ColumnSpecification> 
names)
+        {
+            this(digest, EnumSet.noneOf(Flag.class), names, names.size(), 
null);
+            if (!names.isEmpty() && ColumnSpecification.allInSameTable(names))
+                flags.add(Flag.GLOBAL_TABLES_SPEC);
+        }
 
+        // Problem is that we compute the metadata from the columns on 
creation;
+        // when re-preparing we create the intermediate object
         public ResultMetadata(List<ColumnSpecification> names)
         {
-            this(EnumSet.noneOf(Flag.class), names, names.size(), null);
+            this(computeResultMetadataId(names), EnumSet.noneOf(Flag.class), 
names, names.size(), null);
             if (!names.isEmpty() && ColumnSpecification.allInSameTable(names))
                 flags.add(Flag.GLOBAL_TABLES_SPEC);
         }
 
-        private ResultMetadata(EnumSet<Flag> flags, List<ColumnSpecification> 
names, int columnCount, PagingState pagingState)
+        private ResultMetadata(MD5Digest resultMetadataId, EnumSet<Flag> 
flags, List<ColumnSpecification> names, int columnCount, PagingState 
pagingState)
         {
+            this.resultMetadataId = resultMetadataId;
             this.flags = flags;
             this.names = names;
             this.columnCount = columnCount;
@@ -207,7 +233,7 @@ public class ResultSet
 
         public ResultMetadata copy()
         {
-            return new ResultMetadata(EnumSet.copyOf(flags), names, 
columnCount, pagingState);
+            return new ResultMetadata(resultMetadataId, EnumSet.copyOf(flags), 
names, columnCount, pagingState);
         }
 
         /**
@@ -252,6 +278,26 @@ public class ResultSet
             flags.add(Flag.NO_METADATA);
         }
 
+        public void setMetadataChanged()
+        {
+            flags.add(Flag.METADATA_CHANGED);
+        }
+
+        public MD5Digest getResultMetadataId()
+        {
+            return resultMetadataId;
+        }
+
+        public static ResultMetadata fromPrepared(ParsedStatement.Prepared 
prepared)
+        {
+            CQLStatement statement = prepared.statement;
+
+            if (statement instanceof SelectStatement)
+                return ((SelectStatement)statement).getResultMetadata();
+
+            return ResultSet.ResultMetadata.EMPTY;
+        }
+
         @Override
         public boolean equals(Object other)
         {
@@ -308,12 +354,21 @@ public class ResultSet
 
                 EnumSet<Flag> flags = Flag.deserialize(iflags);
 
+                MD5Digest resultMetadataId = null;
+                if (flags.contains(Flag.METADATA_CHANGED))
+                {
+                    assert version.isGreaterOrEqualTo(ProtocolVersion.V5) : 
"MetadataChanged flag is not supported before native protocol v5";
+                    assert !flags.contains(Flag.NO_METADATA) : 
"MetadataChanged and NoMetadata are mutually exclusive flags";
+
+                    resultMetadataId = MD5Digest.wrap(CBUtil.readBytes(body));
+                }
+
                 PagingState state = null;
                 if (flags.contains(Flag.HAS_MORE_PAGES))
                     state = 
PagingState.deserialize(CBUtil.readValueNoCopy(body), version);
 
                 if (flags.contains(Flag.NO_METADATA))
-                    return new ResultMetadata(flags, null, columnCount, state);
+                    return new ResultMetadata(null, flags, null, columnCount, 
state);
 
                 boolean globalTablesSpec = 
flags.contains(Flag.GLOBAL_TABLES_SPEC);
 
@@ -335,7 +390,7 @@ public class ResultSet
                     AbstractType type = 
DataType.toType(DataType.codec.decodeOne(body, version));
                     names.add(new ColumnSpecification(ksName, cfName, colName, 
type));
                 }
-                return new ResultMetadata(flags, names, names.size(), state);
+                return new ResultMetadata(resultMetadataId, flags, names, 
names.size(), state);
             }
 
             public void encode(ResultMetadata m, ByteBuf dest, ProtocolVersion 
version)
@@ -343,7 +398,7 @@ public class ResultSet
                 boolean noMetadata = m.flags.contains(Flag.NO_METADATA);
                 boolean globalTablesSpec = 
m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
                 boolean hasMorePages = m.flags.contains(Flag.HAS_MORE_PAGES);
-
+                boolean metadataChanged = 
m.flags.contains(Flag.METADATA_CHANGED);
                 assert version.isGreaterThan(ProtocolVersion.V1) || 
(!hasMorePages && !noMetadata)
                     : "version = " + version + ", flags = " + m.flags;
 
@@ -353,6 +408,12 @@ public class ResultSet
                 if (hasMorePages)
                     CBUtil.writeValue(m.pagingState.serialize(version), dest);
 
+                if (version.isGreaterOrEqualTo(ProtocolVersion.V5)  && 
metadataChanged)
+                {
+                    assert !noMetadata : "MetadataChanged and NoMetadata are 
mutually exclusive flags";
+                    CBUtil.writeBytes(m.getResultMetadataId().bytes, dest);
+                }
+
                 if (!noMetadata)
                 {
                     if (globalTablesSpec)
@@ -380,11 +441,15 @@ public class ResultSet
                 boolean noMetadata = m.flags.contains(Flag.NO_METADATA);
                 boolean globalTablesSpec = 
m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
                 boolean hasMorePages = m.flags.contains(Flag.HAS_MORE_PAGES);
+                boolean metadataChanged = 
m.flags.contains(Flag.METADATA_CHANGED);
 
                 int size = 8;
                 if (hasMorePages)
                     size += 
CBUtil.sizeOfValue(m.pagingState.serializedSize(version));
 
+                if (version.isGreaterOrEqualTo(ProtocolVersion.V5) && 
metadataChanged)
+                    size += CBUtil.sizeOfBytes(m.getResultMetadataId().bytes);
+
                 if (!noMetadata)
                 {
                     if (globalTablesSpec)
@@ -486,6 +551,11 @@ public class ResultSet
             return sb.toString();
         }
 
+        public static PreparedMetadata fromPrepared(ParsedStatement.Prepared 
prepared)
+        {
+            return new PreparedMetadata(prepared.boundNames, 
prepared.partitionKeyBindIndexes);
+        }
+
         private static class Codec implements CBCodec<PreparedMetadata>
         {
             public PreparedMetadata decode(ByteBuf body, ProtocolVersion 
version)
@@ -603,7 +673,8 @@ public class ResultSet
         // The order of that enum matters!!
         GLOBAL_TABLES_SPEC,
         HAS_MORE_PAGES,
-        NO_METADATA;
+        NO_METADATA,
+        METADATA_CHANGED;
 
         public static EnumSet<Flag> deserialize(int flags)
         {
@@ -625,4 +696,23 @@ public class ResultSet
             return i;
         }
     }
+
+    public static MD5Digest computeResultMetadataId(List<ColumnSpecification> 
columnSpecifications)
+    {
+        MessageDigest md = FBUtilities.threadLocalMD5Digest();
+
+        if (columnSpecifications != null)
+        {
+            for (ColumnSpecification cs : columnSpecifications)
+            {
+                md.update(cs.name.bytes.duplicate());
+                md.update((byte) 0);
+                md.update(cs.type.toString().getBytes(StandardCharsets.UTF_8));
+                md.update((byte) 0);
+                md.update((byte) 0);
+            }
+        }
+
+        return MD5Digest.wrap(md.digest());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java 
b/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
index cd04d94..5d3727f 100644
--- a/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
+++ b/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java
@@ -37,8 +37,8 @@ import org.apache.cassandra.cql3.ColumnSpecification;
  */
 public class SelectionColumnMapping implements SelectionColumns
 {
-    private final ArrayList<ColumnSpecification> columnSpecifications;
-    private final HashMultimap<ColumnSpecification, ColumnMetadata> 
columnMappings;
+    private final List<ColumnSpecification> columnSpecifications;
+    private final Multimap<ColumnSpecification, ColumnMetadata> columnMappings;
 
     private SelectionColumnMapping()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
index be7fb5d..aa2157a 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java
@@ -118,7 +118,8 @@ public class ListPermissionsStatement extends 
AuthorizationStatement
         if (details.isEmpty())
             return new ResultMessage.Void();
 
-        ResultSet result = new ResultSet(metadata);
+        ResultSet.ResultMetadata resultMetadata = new 
ResultSet.ResultMetadata(metadata);
+        ResultSet result = new ResultSet(resultMetadata);
         for (PermissionDetails pd : details)
         {
             result.addColumnValue(UTF8Type.instance.decompose(pd.grantee));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
index 0c0822c..7ed460c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java
@@ -112,7 +112,8 @@ public class ListRolesStatement extends 
AuthorizationStatement
     // overridden in ListUsersStatement to include legacy metadata
     protected ResultMessage formatResults(List<RoleResource> sortedRoles)
     {
-        ResultSet result = new ResultSet(metadata);
+        ResultSet.ResultMetadata resultMetadata = new 
ResultSet.ResultMetadata(metadata);
+        ResultSet result = new ResultSet(resultMetadata);
 
         IRoleManager roleManager = DatabaseDescriptor.getRoleManager();
         for (RoleResource role : sortedRoles)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
index 9641333..1347fba 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java
@@ -44,7 +44,8 @@ public class ListUsersStatement extends ListRolesStatement
     @Override
     protected ResultMessage formatResults(List<RoleResource> sortedRoles)
     {
-        ResultSet result = new ResultSet(metadata);
+        ResultSet.ResultMetadata resultMetadata = new 
ResultSet.ResultMetadata(metadata);
+        ResultSet result = new ResultSet(resultMetadata);
 
         IRoleManager roleManager = DatabaseDescriptor.getRoleManager();
         for (RoleResource role : sortedRoles)
@@ -54,6 +55,7 @@ public class ListUsersStatement extends ListRolesStatement
             
result.addColumnValue(UTF8Type.instance.decompose(role.getRoleName()));
             
result.addColumnValue(BooleanType.instance.decompose(Roles.hasSuperuserStatus(role)));
         }
+
         return new ResultMessage.Rows(result);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index caa24b2..4191285 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -497,6 +497,19 @@ public abstract class ModificationStatement implements 
CQLStatement
         conditions.addConditionsTo(request, clustering, options);
     }
 
+    private static ResultSet.ResultMetadata buildCASSuccessMetadata(String 
ksName, String cfName)
+    {
+        List<ColumnSpecification> specs = new ArrayList<>();
+        specs.add(casResultColumnSpecification(ksName, cfName));
+
+        return new ResultSet.ResultMetadata(specs);
+    }
+
+    private static ColumnSpecification casResultColumnSpecification(String 
ksName, String cfName)
+    {
+        return new ColumnSpecification(ksName, cfName, CAS_RESULT_COLUMN, 
BooleanType.instance);
+    }
+
     private ResultSet buildCasResultSet(RowIterator partition, QueryOptions 
options) throws InvalidRequestException
     {
         return buildCasResultSet(keyspace(), columnFamily(), partition, 
getColumnsWithConditions(), false, options);
@@ -507,8 +520,7 @@ public abstract class ModificationStatement implements 
CQLStatement
     {
         boolean success = partition == null;
 
-        ColumnSpecification spec = new ColumnSpecification(ksName, tableName, 
CAS_RESULT_COLUMN, BooleanType.instance);
-        ResultSet.ResultMetadata metadata = new 
ResultSet.ResultMetadata(Collections.singletonList(spec));
+        ResultSet.ResultMetadata metadata = buildCASSuccessMetadata(ksName, 
tableName);
         List<List<ByteBuffer>> rows = 
Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(success)));
 
         ResultSet rs = new ResultSet(metadata, rows);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
index e617ba7..34bfc3d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
@@ -17,12 +17,12 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.utils.*;
 
 public abstract class ParsedStatement
 {
@@ -56,8 +56,9 @@ public abstract class ParsedStatement
          */
         public String rawCQLStatement;
 
-        public final CQLStatement statement;
+        public final MD5Digest resultMetadataId;
         public final List<ColumnSpecification> boundNames;
+        public final CQLStatement statement;
         public final short[] partitionKeyBindIndexes;
 
         protected Prepared(CQLStatement statement, List<ColumnSpecification> 
boundNames, short[] partitionKeyBindIndexes)
@@ -65,6 +66,7 @@ public abstract class ParsedStatement
             this.statement = statement;
             this.boundNames = boundNames;
             this.partitionKeyBindIndexes = partitionKeyBindIndexes;
+            this.resultMetadataId = 
ResultSet.ResultMetadata.fromPrepared(this).getResultMetadataId();
             this.rawCQLStatement = "";
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java 
b/src/java/org/apache/cassandra/transport/Client.java
index 7fec473..4793d17 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -147,7 +147,9 @@ public class Client extends SimpleClient
         {
             try
             {
-                byte[] id = Hex.hexToBytes(iter.next());
+                byte[] preparedStatementId = Hex.hexToBytes(iter.next());
+                byte[] resultMetadataId = Hex.hexToBytes(iter.next());
+
                 List<ByteBuffer> values = new ArrayList<ByteBuffer>();
                 while(iter.hasNext())
                 {
@@ -164,7 +166,7 @@ public class Client extends SimpleClient
                     }
                     values.add(bb);
                 }
-                return new ExecuteMessage(MD5Digest.wrap(id), 
QueryOptions.forInternalCalls(ConsistencyLevel.ONE, values));
+                return new ExecuteMessage(MD5Digest.wrap(preparedStatementId), 
MD5Digest.wrap(resultMetadataId), 
QueryOptions.forInternalCalls(ConsistencyLevel.ONE, values));
             }
             catch (Exception e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java 
b/src/java/org/apache/cassandra/transport/SimpleClient.java
index d5148ab..ddd3484 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -184,9 +184,9 @@ public class SimpleClient implements Closeable
         return (ResultMessage.Prepared)msg;
     }
 
-    public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> 
values, ConsistencyLevel consistency)
+    public ResultMessage executePrepared(ResultMessage.Prepared prepared, 
List<ByteBuffer> values, ConsistencyLevel consistency)
     {
-        Message.Response msg = execute(new 
ExecuteMessage(MD5Digest.wrap(statementId), 
QueryOptions.forInternalCalls(consistency, values)));
+        Message.Response msg = execute(new 
ExecuteMessage(prepared.statementId, prepared.resultMetadataId, 
QueryOptions.forInternalCalls(consistency, values)));
         assert msg instanceof ResultMessage;
         return (ResultMessage)msg;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index d881e63..a8fd2a0 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -26,7 +26,9 @@ import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.QueryHandler;
 import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.ResultSet;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.cql3.statements.UpdateStatement;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
@@ -42,13 +44,22 @@ public class ExecuteMessage extends Message.Request
     {
         public ExecuteMessage decode(ByteBuf body, ProtocolVersion version)
         {
-            byte[] id = CBUtil.readBytes(body);
-            return new ExecuteMessage(MD5Digest.wrap(id), 
QueryOptions.codec.decode(body, version));
+            MD5Digest statementId = MD5Digest.wrap(CBUtil.readBytes(body));
+
+            MD5Digest resultMetadataId = null;
+            if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+                resultMetadataId = MD5Digest.wrap(CBUtil.readBytes(body));
+
+            return new ExecuteMessage(statementId, resultMetadataId, 
QueryOptions.codec.decode(body, version));
         }
 
         public void encode(ExecuteMessage msg, ByteBuf dest, ProtocolVersion 
version)
         {
             CBUtil.writeBytes(msg.statementId.bytes, dest);
+
+            if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+                CBUtil.writeBytes(msg.resultMetadataId.bytes, dest);
+
             if (version == ProtocolVersion.V1)
             {
                 CBUtil.writeValueList(msg.options.getValues(), dest);
@@ -64,6 +75,10 @@ public class ExecuteMessage extends Message.Request
         {
             int size = 0;
             size += CBUtil.sizeOfBytes(msg.statementId.bytes);
+
+            if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+                size += CBUtil.sizeOfBytes(msg.resultMetadataId.bytes);
+
             if (version == ProtocolVersion.V1)
             {
                 size += CBUtil.sizeOfValueList(msg.options.getValues());
@@ -78,13 +93,15 @@ public class ExecuteMessage extends Message.Request
     };
 
     public final MD5Digest statementId;
+    public final MD5Digest resultMetadataId;
     public final QueryOptions options;
 
-    public ExecuteMessage(MD5Digest statementId, QueryOptions options)
+    public ExecuteMessage(MD5Digest statementId, MD5Digest resultMetadataId, 
QueryOptions options)
     {
         super(Message.Type.EXECUTE);
         this.statementId = statementId;
         this.options = options;
+        this.resultMetadataId = resultMetadataId;
     }
 
     public Message.Response execute(QueryState state, long queryStartNanoTime)
@@ -144,8 +161,29 @@ public class ExecuteMessage extends Message.Request
             // by wrapping the QueryOptions.
             QueryOptions queryOptions = 
QueryOptions.addColumnSpecifications(options, prepared.boundNames);
             Message.Response response = handler.processPrepared(statement, 
state, queryOptions, getCustomPayload(), queryStartNanoTime);
-            if (options.skipMetadata() && response instanceof 
ResultMessage.Rows)
-                
((ResultMessage.Rows)response).result.metadata.setSkipMetadata();
+
+            if (response instanceof ResultMessage.Rows)
+            {
+                ResultMessage.Rows rows = (ResultMessage.Rows) response;
+
+                ResultSet.ResultMetadata resultMetadata = rows.result.metadata;
+                if 
(options.getProtocolVersion().isGreaterOrEqualTo(ProtocolVersion.V5))
+                {
+                    // Starting with V5 we can rely on the result metadata id 
coming with execute message in order to
+                    // check if there was a change, comparing it with metadata 
that's about to be returned to client.
+                    if 
(!resultMetadata.getResultMetadataId().equals(resultMetadataId))
+                        resultMetadata.setMetadataChanged();
+                    else if (options.skipMetadata())
+                        resultMetadata.setSkipMetadata();
+                }
+                else
+                {
+                    // Pre-V5 code has to rely on the difference between the 
metadata in the prepared message cache
+                    // and compare it with the metadata to be returned to 
client.
+                    if (options.skipMetadata() && 
prepared.resultMetadataId.equals(resultMetadata.getResultMetadataId()))
+                        resultMetadata.setSkipMetadata();
+                }
+            }
 
             if (tracingId != null)
                 response.setTracingId(tracingId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index e1ea948..d8aefbe 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -20,9 +20,9 @@ package org.apache.cassandra.transport.messages;
 
 import io.netty.buffer.ByteBuf;
 
+import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.ResultSet;
-import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.transport.*;
 import org.apache.cassandra.utils.MD5Digest;
@@ -51,12 +51,12 @@ public abstract class ResultMessage extends Message.Response
 
     public enum Kind
     {
-        VOID         (1, Void.subcodec),
-        ROWS         (2, Rows.subcodec),
-        SET_KEYSPACE (3, SetKeyspace.subcodec),
-        PREPARED     (4, Prepared.subcodec),
-        SCHEMA_CHANGE(5, SchemaChange.subcodec);
 
+        VOID               (1, Void.subcodec),
+        ROWS               (2, Rows.subcodec),
+        SET_KEYSPACE       (3, SetKeyspace.subcodec),
+        PREPARED           (4, Prepared.subcodec),
+        SCHEMA_CHANGE      (5, SchemaChange.subcodec);
         public final int id;
         public final Message.Codec<ResultMessage> subcodec;
 
@@ -216,13 +216,16 @@ public abstract class ResultMessage extends 
Message.Response
             public ResultMessage decode(ByteBuf body, ProtocolVersion version)
             {
                 MD5Digest id = MD5Digest.wrap(CBUtil.readBytes(body));
+                MD5Digest resultMetadataId = null;
+                if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+                    resultMetadataId = MD5Digest.wrap(CBUtil.readBytes(body));
                 ResultSet.PreparedMetadata metadata = 
ResultSet.PreparedMetadata.codec.decode(body, version);
 
                 ResultSet.ResultMetadata resultMetadata = 
ResultSet.ResultMetadata.EMPTY;
                 if (version.isGreaterThan(ProtocolVersion.V1))
                     resultMetadata = 
ResultSet.ResultMetadata.codec.decode(body, version);
 
-                return new Prepared(id, metadata, resultMetadata);
+                return new Prepared(id, resultMetadataId, metadata, 
resultMetadata);
             }
 
             public void encode(ResultMessage msg, ByteBuf dest, 
ProtocolVersion version)
@@ -232,6 +235,9 @@ public abstract class ResultMessage extends Message.Response
                 assert prepared.statementId != null;
 
                 CBUtil.writeBytes(prepared.statementId.bytes, dest);
+                if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+                    CBUtil.writeBytes(prepared.resultMetadataId.bytes, dest);
+
                 ResultSet.PreparedMetadata.codec.encode(prepared.metadata, 
dest, version);
                 if (version.isGreaterThan(ProtocolVersion.V1))
                     
ResultSet.ResultMetadata.codec.encode(prepared.resultMetadata, dest, version);
@@ -245,6 +251,8 @@ public abstract class ResultMessage extends Message.Response
 
                 int size = 0;
                 size += CBUtil.sizeOfBytes(prepared.statementId.bytes);
+                if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+                    size += 
CBUtil.sizeOfBytes(prepared.resultMetadataId.bytes);
                 size += 
ResultSet.PreparedMetadata.codec.encodedSize(prepared.metadata, version);
                 if (version.isGreaterThan(ProtocolVersion.V1))
                     size += 
ResultSet.ResultMetadata.codec.encodedSize(prepared.resultMetadata, version);
@@ -253,6 +261,7 @@ public abstract class ResultMessage extends Message.Response
         };
 
         public final MD5Digest statementId;
+        public final MD5Digest resultMetadataId;
 
         /** Describes the variables to be bound in the prepared statement */
         public final ResultSet.PreparedMetadata metadata;
@@ -260,27 +269,15 @@ public abstract class ResultMessage extends 
Message.Response
         /** Describes the results of executing this prepared statement */
         public final ResultSet.ResultMetadata resultMetadata;
 
-        public Prepared(MD5Digest statementId, ParsedStatement.Prepared 
prepared)
-        {
-            this(statementId, new 
ResultSet.PreparedMetadata(prepared.boundNames, 
prepared.partitionKeyBindIndexes), extractResultMetadata(prepared.statement));
-        }
-
-        private Prepared(MD5Digest statementId, ResultSet.PreparedMetadata 
metadata, ResultSet.ResultMetadata resultMetadata)
+        public Prepared(MD5Digest statementId, MD5Digest resultMetadataId, 
ResultSet.PreparedMetadata metadata, ResultSet.ResultMetadata resultMetadata)
         {
             super(Kind.PREPARED);
             this.statementId = statementId;
+            this.resultMetadataId = resultMetadataId;
             this.metadata = metadata;
             this.resultMetadata = resultMetadata;
         }
 
-        private static ResultSet.ResultMetadata 
extractResultMetadata(CQLStatement statement)
-        {
-            if (!(statement instanceof SelectStatement))
-                return ResultSet.ResultMetadata.EMPTY;
-
-            return ((SelectStatement)statement).getResultMetadata();
-        }
-
         @Override
         public String toString()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java 
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 0a0d757..062a4bc 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -94,7 +94,7 @@ public abstract class CQLTester
     protected static final int nativePort;
     protected static final InetAddress nativeAddr;
     private static final Map<ProtocolVersion, Cluster> clusters = new 
HashMap<>();
-    private static final Map<ProtocolVersion, Session> sessions = new 
HashMap<>();
+    protected static final Map<ProtocolVersion, Session> sessions = new 
HashMap<>();
 
     private static boolean isServerPrepared = false;
 
@@ -386,12 +386,18 @@ public abstract class CQLTester
             if (clusters.containsKey(version))
                 continue;
 
-            Cluster cluster = Cluster.builder()
-                                     .addContactPoints(nativeAddr)
-                                     .withClusterName("Test Cluster")
-                                     .withPort(nativePort)
-                                     
.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.fromInt(version.asInt()))
-                                     .build();
+            Cluster.Builder builder = Cluster.builder()
+                                             .withoutJMXReporting()
+                                             .addContactPoints(nativeAddr)
+                                             .withClusterName("Test Cluster")
+                                             .withPort(nativePort);
+
+            if (version.isBeta())
+                builder = builder.allowBetaProtocolVersion();
+            else
+                builder = 
builder.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.fromInt(version.asInt()));
+
+            Cluster cluster = builder.build();
             clusters.put(version, cluster);
             sessions.put(version, cluster.connect());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java 
b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index 385ebb7..f843965 100644
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@ -17,63 +17,38 @@
  */
 package org.apache.cassandra.cql3;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Session;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.exceptions.SyntaxError;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.index.StubIndex;
-import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.cassandra.transport.ProtocolVersion;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-public class PreparedStatementsTest extends SchemaLoader
+public class PreparedStatementsTest extends CQLTester
 {
-    private static Cluster cluster;
-    private static Session session;
-
     private static final String KEYSPACE = "prepared_stmt_cleanup";
     private static final String createKsStatement = "CREATE KEYSPACE " + 
KEYSPACE +
                                                     " WITH REPLICATION = { 
'class' : 'SimpleStrategy', 'replication_factor' : 1 };";
     private static final String dropKsStatement = "DROP KEYSPACE IF EXISTS " + 
KEYSPACE;
 
-    @BeforeClass
-    public static void setup() throws Exception
+    @Before
+    public void setup()
     {
-        Schema.instance.clear();
-
-        EmbeddedCassandraService cassandra = new EmbeddedCassandraService();
-        cassandra.start();
-
-        // Currently the native server start method return before the server 
is fully binded to the socket, so we need
-        // to wait slightly before trying to connect to it. We should fix this 
but in the meantime using a sleep.
-        Thread.sleep(1500);
-
-        cluster = Cluster.builder().addContactPoint("127.0.0.1")
-                                   
.withPort(DatabaseDescriptor.getNativeTransportPort())
-                                   .build();
-        session = cluster.connect();
-
-        session.execute(dropKsStatement);
-        session.execute(createKsStatement);
-    }
-
-    @AfterClass
-    public static void tearDown() throws Exception
-    {
-        cluster.close();
+        requireNetwork();
     }
 
     @Test
     public void testInvalidatePreparedStatementsOnDrop()
     {
+        Session session = sessions.get(ProtocolVersion.V5);
+        session.execute(dropKsStatement);
+        session.execute(createKsStatement);
+
         String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE 
+ ".qp_cleanup (id int PRIMARY KEY, cid int, val text);";
         String dropTableStatement = "DROP TABLE IF EXISTS " + KEYSPACE + 
".qp_cleanup;";
 
@@ -101,15 +76,128 @@ public class PreparedStatementsTest extends SchemaLoader
     }
 
     @Test
+    public void testInvalidatePreparedStatementOnAlterV5()
+    {
+        testInvalidatePreparedStatementOnAlter(ProtocolVersion.V5, true);
+    }
+
+    @Test
+    public void testInvalidatePreparedStatementOnAlterV4()
+    {
+        testInvalidatePreparedStatementOnAlter(ProtocolVersion.V4, false);
+    }
+
+    private void testInvalidatePreparedStatementOnAlter(ProtocolVersion 
version, boolean supportsMetadataChange)
+    {
+        Session session = sessions.get(version);
+        String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE 
+ ".qp_cleanup (a int PRIMARY KEY, b int, c int);";
+        String alterTableStatement = "ALTER TABLE " + KEYSPACE + ".qp_cleanup 
ADD d int;";
+
+        session.execute(dropKsStatement);
+        session.execute(createKsStatement);
+        session.execute(createTableStatement);
+
+        PreparedStatement preparedSelect = session.prepare("SELECT * FROM " + 
KEYSPACE + ".qp_cleanup");
+        session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c) 
VALUES (?, ?, ?);",
+                        1, 2, 3);
+        session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c) 
VALUES (?, ?, ?);",
+                        2, 3, 4);
+
+        assertRowsNet(session.execute(preparedSelect.bind()),
+                      row(1, 2, 3),
+                      row(2, 3, 4));
+
+        session.execute(alterTableStatement);
+        session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c, d) 
VALUES (?, ?, ?, ?);",
+                        3, 4, 5, 6);
+
+        ResultSet rs;
+        if (supportsMetadataChange)
+        {
+            rs = session.execute(preparedSelect.bind());
+            assertRowsNet(version,
+                          rs,
+                          row(1, 2, 3, null),
+                          row(2, 3, 4, null),
+                          row(3, 4, 5, 6));
+            assertEquals(rs.getColumnDefinitions().size(), 4);
+        }
+        else
+        {
+            rs = session.execute(preparedSelect.bind());
+            assertRowsNet(rs,
+                          row(1, 2, 3),
+                          row(2, 3, 4),
+                          row(3, 4, 5));
+            assertEquals(rs.getColumnDefinitions().size(), 3);
+        }
+
+        session.execute(dropKsStatement);
+    }
+
+    @Test
+    public void testInvalidatePreparedStatementOnAlterUnchangedMetadataV4()
+    {
+        
testInvalidatePreparedStatementOnAlterUnchangedMetadata(ProtocolVersion.V4);
+    }
+
+    @Test
+    public void testInvalidatePreparedStatementOnAlterUnchangedMetadataV5()
+    {
+        
testInvalidatePreparedStatementOnAlterUnchangedMetadata(ProtocolVersion.V5);
+    }
+
+    private void 
testInvalidatePreparedStatementOnAlterUnchangedMetadata(ProtocolVersion version)
+    {
+        Session session = sessions.get(version);
+        String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE 
+ ".qp_cleanup (a int PRIMARY KEY, b int, c int);";
+        String alterTableStatement = "ALTER TABLE " + KEYSPACE + ".qp_cleanup 
ADD d int;";
+
+        session.execute(dropKsStatement);
+        session.execute(createKsStatement);
+        session.execute(createTableStatement);
+
+        PreparedStatement preparedSelect = session.prepare("SELECT a, b, c 
FROM " + KEYSPACE + ".qp_cleanup");
+        session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c) 
VALUES (?, ?, ?);",
+                        1, 2, 3);
+        session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c) 
VALUES (?, ?, ?);",
+                        2, 3, 4);
+
+        ResultSet rs = session.execute(preparedSelect.bind());
+
+        assertRowsNet(rs,
+                      row(1, 2, 3),
+                      row(2, 3, 4));
+        assertEquals(rs.getColumnDefinitions().size(), 3);
+
+        session.execute(alterTableStatement);
+        session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c, d) 
VALUES (?, ?, ?, ?);",
+                        3, 4, 5, 6);
+
+        rs = session.execute(preparedSelect.bind());
+        assertRowsNet(rs,
+                      row(1, 2, 3),
+                      row(2, 3, 4),
+                      row(3, 4, 5));
+        assertEquals(rs.getColumnDefinitions().size(), 3);
+
+        session.execute(dropKsStatement);
+    }
+
+    @Test
     public void testStatementRePreparationOnReconnect()
     {
+        Session session = sessions.get(ProtocolVersion.V5);
+        session.execute("USE " + keyspace());
+
         session.execute(dropKsStatement);
         session.execute(createKsStatement);
 
-        session.execute("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_test 
(id int PRIMARY KEY, cid int, val text);");
+        createTable("CREATE TABLE %s (id int PRIMARY KEY, cid int, val 
text);");
+
 
-        String insertCQL = "INSERT INTO " + KEYSPACE + ".qp_test (id, cid, 
val) VALUES (?, ?, ?)";
-        String selectCQL = "Select * from " + KEYSPACE + ".qp_test where id = 
?";
+        String insertCQL = "INSERT INTO " + currentTable() + " (id, cid, val) 
VALUES (?, ?, ?)";
+        String selectCQL = "Select * from " + currentTable() + " where id = ?";
 
         PreparedStatement preparedInsert = session.prepare(insertCQL);
         PreparedStatement preparedSelect = session.prepare(selectCQL);
@@ -117,23 +205,31 @@ public class PreparedStatementsTest extends SchemaLoader
         session.execute(preparedInsert.bind(1, 1, "value"));
         assertEquals(1, session.execute(preparedSelect.bind(1)).all().size());
 
-        cluster.close();
-
-        cluster = Cluster.builder().addContactPoint("127.0.0.1")
-                                   
.withPort(DatabaseDescriptor.getNativeTransportPort())
-                                   .build();
-        session = cluster.connect();
-
-        preparedInsert = session.prepare(insertCQL);
-        preparedSelect = session.prepare(selectCQL);
-        session.execute(preparedInsert.bind(1, 1, "value"));
+        try (Cluster newCluster = Cluster.builder()
+                                 .addContactPoints(nativeAddr)
+                                 .withClusterName("Test Cluster")
+                                 .withPort(nativePort)
+                                 .withoutJMXReporting()
+                                 .allowBetaProtocolVersion()
+                                 .build())
+        {
+            try (Session newSession = newCluster.connect())
+            {
+                newSession.execute("USE " + keyspace());
+                preparedInsert = newSession.prepare(insertCQL);
+                preparedSelect = newSession.prepare(selectCQL);
+                session.execute(preparedInsert.bind(1, 1, "value"));
 
-        assertEquals(1, session.execute(preparedSelect.bind(1)).all().size());
+                assertEquals(1, 
session.execute(preparedSelect.bind(1)).all().size());
+            }
+        }
     }
 
     @Test
     public void prepareAndExecuteWithCustomExpressions() throws Throwable
     {
+        Session session = sessions.get(ProtocolVersion.V5);
+
         session.execute(dropKsStatement);
         session.execute(createKsStatement);
         String table = "custom_expr_test";
@@ -163,4 +259,123 @@ public class PreparedStatementsTest extends SchemaLoader
             assertEquals("Bind variables cannot be used for index names", 
e.getMessage());
         }
     }
+
+    @Test
+    public void testPrepareWithLWT() throws Throwable
+    {
+        testPrepareWithLWT(ProtocolVersion.V4);
+        testPrepareWithLWT(ProtocolVersion.V5);
+    }
+
+
+    private void testPrepareWithLWT(ProtocolVersion version) throws Throwable
+    {
+        Session session = sessionNet(version);
+        session.execute("USE " + keyspace());
+        createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY 
(pk))");
+
+        PreparedStatement prepared1 = session.prepare(String.format("UPDATE %s 
SET v1 = ?, v2 = ?  WHERE pk = 1 IF v1 = ?", currentTable()));
+        PreparedStatement prepared2 = session.prepare(String.format("INSERT 
INTO %s (pk, v1, v2) VALUES (?, 200, 300) IF NOT EXISTS", currentTable()));
+        execute("INSERT INTO %s (pk, v1, v2) VALUES (1,1,1)");
+        execute("INSERT INTO %s (pk, v1, v2) VALUES (2,2,2)");
+
+        ResultSet rs;
+
+        rs = session.execute(prepared1.bind(10, 20, 1));
+        assertRowsNet(rs,
+                      row(true));
+        assertEquals(rs.getColumnDefinitions().size(), 1);
+
+        rs = session.execute(prepared1.bind(100, 200, 1));
+        assertRowsNet(rs,
+                      row(false, 10));
+        assertEquals(rs.getColumnDefinitions().size(), 2);
+
+        rs = session.execute(prepared1.bind(30, 40, 10));
+        assertRowsNet(rs,
+                      row(true));
+        assertEquals(rs.getColumnDefinitions().size(), 1);
+
+        // Try executing the same message once again
+        rs = session.execute(prepared1.bind(100, 200, 1));
+        assertRowsNet(rs,
+                      row(false, 30));
+        assertEquals(rs.getColumnDefinitions().size(), 2);
+
+        rs = session.execute(prepared2.bind(1));
+        assertRowsNet(rs,
+                      row(false, 1, 30, 40));
+        assertEquals(rs.getColumnDefinitions().size(), 4);
+
+        alterTable("ALTER TABLE %s ADD v3 int;");
+
+        rs = session.execute(prepared2.bind(1));
+        assertRowsNet(rs,
+                      row(false, 1, 30, 40, null));
+        assertEquals(rs.getColumnDefinitions().size(), 5);
+
+        rs = session.execute(prepared2.bind(20));
+        assertRowsNet(rs,
+                      row(true));
+        assertEquals(rs.getColumnDefinitions().size(), 1);
+
+        rs = session.execute(prepared2.bind(20));
+        assertRowsNet(rs,
+                      row(false, 20, 200, 300, null));
+        assertEquals(rs.getColumnDefinitions().size(), 5);
+    }
+
+    @Test
+    public void testPrepareWithBatchLWT() throws Throwable
+    {
+        testPrepareWithBatchLWT(ProtocolVersion.V4);
+        testPrepareWithBatchLWT(ProtocolVersion.V5);
+    }
+
+    private void testPrepareWithBatchLWT(ProtocolVersion version) throws 
Throwable
+    {
+        Session session = sessionNet(version);
+        session.execute("USE " + keyspace());
+        createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY 
(pk))");
+
+        PreparedStatement prepared1 = session.prepare("BEGIN BATCH " +
+                                                      "UPDATE " + 
currentTable() + " SET v1 = ? WHERE pk = 1 IF v1 = ?;" +
+                                                      "UPDATE " + 
currentTable() + " SET v2 = ? WHERE pk = 1 IF v2 = ?;" +
+                                                      "APPLY BATCH;");
+        PreparedStatement prepared2 = session.prepare("BEGIN BATCH " +
+                                                      "INSERT INTO " + 
currentTable() + " (pk, v1, v2) VALUES (1, 200, 300) IF NOT EXISTS;" +
+                                                      "APPLY BATCH");
+        execute("INSERT INTO %s (pk, v1, v2) VALUES (1,1,1)");
+        execute("INSERT INTO %s (pk, v1, v2) VALUES (2,2,2)");
+
+        com.datastax.driver.core.ResultSet rs;
+
+        rs = session.execute(prepared1.bind(10, 1, 20, 1));
+        assertRowsNet(rs,
+                      row(true));
+        assertEquals(rs.getColumnDefinitions().size(), 1);
+
+        rs = session.execute(prepared1.bind(100, 1, 200, 1));
+        assertRowsNet(rs,
+                      row(false, 1, 10, 20));
+        assertEquals(rs.getColumnDefinitions().size(), 4);
+
+        // Try executing the same message once again
+        rs = session.execute(prepared1.bind(100, 1, 200, 1));
+        assertRowsNet(rs,
+                      row(false, 1, 10, 20));
+        assertEquals(rs.getColumnDefinitions().size(), 4);
+
+        rs = session.execute(prepared2.bind());
+        assertRowsNet(rs,
+                      row(false, 1, 10, 20));
+        assertEquals(rs.getColumnDefinitions().size(), 4);
+
+        alterTable("ALTER TABLE %s ADD v3 int;");
+
+        rs = session.execute(prepared2.bind());
+        assertRowsNet(rs,
+                      row(false, 1, 10, 20, null));
+        assertEquals(rs.getColumnDefinitions().size(), 5);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java 
b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
index 228352c..7ca6ab9 100644
--- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
@@ -78,14 +78,14 @@ public class PstmtPersistenceTest extends CQLTester
         assertEquals(5, stmtIds.size());
         assertEquals(5, QueryProcessor.preparedStatementsCount());
 
-        Assert.assertEquals(5, numberOfStatementsOnDisk());
+        assertEquals(5, numberOfStatementsOnDisk());
 
         QueryHandler handler = ClientState.getCQLQueryHandler();
         validatePstmts(stmtIds, handler);
 
         // clear prepared statements cache
         QueryProcessor.clearPreparedStatements(true);
-        Assert.assertEquals(0, QueryProcessor.preparedStatementsCount());
+        assertEquals(0, QueryProcessor.preparedStatementsCount());
         for (MD5Digest stmtId : stmtIds)
             Assert.assertNull(handler.getPrepared(stmtId));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java 
b/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
index 2bd95be..c7a41f3 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/JsonTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.validation.entities;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.Duration;
@@ -41,10 +42,17 @@ import static org.junit.Assert.fail;
 
 public class JsonTest extends CQLTester
 {
+    // This method will be ran instead of the CQLTester#setUpClass
     @BeforeClass
-    public static void setUp()
+    public static void setUpClass()
     {
+        if (ROW_CACHE_SIZE_IN_MB > 0)
+            DatabaseDescriptor.setRowCacheSizeInMB(ROW_CACHE_SIZE_IN_MB);
+
         
StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+
+        // Once per-JVM is enough
+        prepareServer();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
index 68b2e93..5d6ffb1 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
@@ -30,11 +30,14 @@ import org.apache.cassandra.service.StorageService;
 
 public class SelectLimitTest extends CQLTester
 {
+    // This method will be ran instead of the CQLTester#setUpClass
     @BeforeClass
-    public static void setUp()
+    public static void setUpClass()
     {
         
StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
         
DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+
+        prepareServer();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java 
b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index c27593b..817cb06 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -162,7 +162,7 @@ public class MessagePayloadTest extends CQLTester
                 payloadEquals(reqMap, requestPayload);
                 payloadEquals(respMap, prepareResponse.getCustomPayload());
 
-                ExecuteMessage executeMessage = new 
ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT);
+                ExecuteMessage executeMessage = new 
ExecuteMessage(prepareResponse.statementId, prepareResponse.resultMetadataId, 
QueryOptions.DEFAULT);
                 reqMap = Collections.singletonMap("foo", bytes(44));
                 responsePayload = respMap = Collections.singletonMap("bar", 
bytes(44));
                 executeMessage.setCustomPayload(reqMap);
@@ -231,7 +231,7 @@ public class MessagePayloadTest extends CQLTester
                 payloadEquals(reqMap, requestPayload);
                 payloadEquals(respMap, prepareResponse.getCustomPayload());
 
-                ExecuteMessage executeMessage = new 
ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT);
+                ExecuteMessage executeMessage = new 
ExecuteMessage(prepareResponse.statementId, prepareResponse.resultMetadataId, 
QueryOptions.DEFAULT);
                 reqMap = Collections.singletonMap("foo", bytes(44));
                 responsePayload = respMap = Collections.singletonMap("bar", 
bytes(44));
                 executeMessage.setCustomPayload(reqMap);
@@ -315,7 +315,7 @@ public class MessagePayloadTest extends CQLTester
                 prepareMessage.setCustomPayload(null);
                 ResultMessage.Prepared prepareResponse = 
(ResultMessage.Prepared) client.execute(prepareMessage);
 
-                ExecuteMessage executeMessage = new 
ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT);
+                ExecuteMessage executeMessage = new 
ExecuteMessage(prepareResponse.statementId, prepareResponse.resultMetadataId, 
QueryOptions.DEFAULT);
                 reqMap = Collections.singletonMap("foo", bytes(44));
                 responsePayload = Collections.singletonMap("bar", bytes(44));
                 executeMessage.setCustomPayload(reqMap);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/922dbdb6/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
index c524107..c89a1d1 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
@@ -281,11 +281,11 @@ public abstract class CqlOperation<V> extends 
PredefinedOperation
         }
 
         @Override
-        public <V> V execute(Object preparedStatementId, ByteBuffer key, 
List<Object> queryParams, ResultHandler<V> handler)
+        public <V> V execute(Object preparedStatement, ByteBuffer key, 
List<Object> queryParams, ResultHandler<V> handler)
         {
             return handler.javaDriverHandler().apply(
                     client.executePrepared(
-                            (PreparedStatement) preparedStatementId,
+                            (PreparedStatement) preparedStatement,
                             queryParams,
                             settings.command.consistencyLevel));
         }
@@ -313,11 +313,11 @@ public abstract class CqlOperation<V> extends 
PredefinedOperation
         }
 
         @Override
-        public <V> V execute(Object preparedStatementId, ByteBuffer key, 
List<Object> queryParams, ResultHandler<V> handler)
+        public <V> V execute(Object preparedStatement, ByteBuffer key, 
List<Object> queryParams, ResultHandler<V> handler)
         {
             return handler.simpleClientHandler().apply(
                     client.executePrepared(
-                            (byte[]) preparedStatementId,
+                            (ResultMessage.Prepared) preparedStatement,
                             toByteBufferParams(queryParams),
                             settings.command.consistencyLevel));
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to