Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 a30d8bd21 -> 6f93bd1f6
Fix custom payload encoding decoding to match protocol spec patch by blerer; reviewed by slebresne for CASSANDRA-9515 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f93bd1f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f93bd1f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f93bd1f Branch: refs/heads/cassandra-2.2 Commit: 6f93bd1f65888104e33da2f9f01056b6115952e5 Parents: a30d8bd Author: Benjamin Lerer <benjamin.le...@datastax.com> Authored: Tue Jun 2 15:34:31 2015 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Jun 2 15:34:31 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../CustomPayloadMirroringQueryHandler.java | 18 +++-- .../org/apache/cassandra/cql3/QueryHandler.java | 20 ++++-- .../apache/cassandra/cql3/QueryProcessor.java | 22 ++++-- .../org/apache/cassandra/transport/CBUtil.java | 18 ++--- .../org/apache/cassandra/transport/Message.java | 13 ++-- .../cassandra/transport/MessagePayloadTest.java | 73 ++++++++++++-------- 7 files changed, 110 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f93bd1f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7f0ef51..db94c76 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2 + * Fix custom payload coding/decoding to match the spec (CASSANDRA-9515) * ant test-all results incomplete when parsed (CASSANDRA-9463) * Disallow frozen<> types in function arguments and return types for clarity (CASSANDRA-9411) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f93bd1f/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java index 3930e9c..02a6df9 100644 --- a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java +++ b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.cql3; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.cassandra.cql3.statements.BatchStatement; @@ -34,14 +35,17 @@ public class CustomPayloadMirroringQueryHandler implements QueryHandler { static QueryProcessor queryProcessor = QueryProcessor.instance; - public ResultMessage process(String query, QueryState state, QueryOptions options, Map<String, byte[]> customPayload) + public ResultMessage process(String query, + QueryState state, + QueryOptions options, + Map<String, ByteBuffer> customPayload) { ResultMessage result = queryProcessor.process(query, state, options, customPayload); result.setCustomPayload(customPayload); return result; } - public ResultMessage.Prepared prepare(String query, QueryState state, Map<String, byte[]> customPayload) + public ResultMessage.Prepared prepare(String query, QueryState state, Map<String, ByteBuffer> customPayload) { ResultMessage.Prepared prepared = queryProcessor.prepare(query, state, customPayload); prepared.setCustomPayload(customPayload); @@ -58,14 +62,20 @@ public class CustomPayloadMirroringQueryHandler implements QueryHandler return queryProcessor.getPreparedForThrift(id); } - public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options, Map<String, byte[]> customPayload) + public ResultMessage processPrepared(CQLStatement statement, + QueryState state, + QueryOptions options, + Map<String, ByteBuffer> customPayload) { ResultMessage result = queryProcessor.processPrepared(statement, state, options, customPayload); result.setCustomPayload(customPayload); return result; } - public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options, Map<String, byte[]> customPayload) + public ResultMessage processBatch(BatchStatement statement, + QueryState state, + BatchQueryOptions options, + Map<String, ByteBuffer> customPayload) { ResultMessage result = queryProcessor.processBatch(statement, state, options, customPayload); result.setCustomPayload(customPayload); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f93bd1f/src/java/org/apache/cassandra/cql3/QueryHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java b/src/java/org/apache/cassandra/cql3/QueryHandler.java index 8b579d7..3c11c0e 100644 --- a/src/java/org/apache/cassandra/cql3/QueryHandler.java +++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.cql3; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.cassandra.cql3.statements.BatchStatement; @@ -29,15 +30,26 @@ import org.apache.cassandra.utils.MD5Digest; public interface QueryHandler { - ResultMessage process(String query, QueryState state, QueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException; + ResultMessage process(String query, + QueryState state, + QueryOptions options, + Map<String, ByteBuffer> customPayload) throws RequestExecutionException, RequestValidationException; - ResultMessage.Prepared prepare(String query, QueryState state, Map<String, byte[]> customPayload) throws RequestValidationException; + ResultMessage.Prepared prepare(String query, + QueryState state, + Map<String, ByteBuffer> customPayload) throws RequestValidationException; ParsedStatement.Prepared getPrepared(MD5Digest id); ParsedStatement.Prepared getPreparedForThrift(Integer id); - ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException; + ResultMessage processPrepared(CQLStatement statement, + QueryState state, + QueryOptions options, + Map<String, ByteBuffer> customPayload) throws RequestExecutionException, RequestValidationException; - ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException; + ResultMessage processBatch(BatchStatement statement, + QueryState state, + BatchQueryOptions options, + Map<String, ByteBuffer> customPayload) throws RequestExecutionException, RequestValidationException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f93bd1f/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 7b9261c..3170932 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -232,7 +232,11 @@ public class QueryProcessor implements QueryHandler return instance.process(queryString, queryState, QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList())); } - public ResultMessage process(String query, QueryState state, QueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException + public ResultMessage process(String query, + QueryState state, + QueryOptions options, + Map<String, ByteBuffer> customPayload) + throws RequestExecutionException, RequestValidationException { return process(query, state, options); } @@ -342,7 +346,9 @@ public class QueryProcessor implements QueryHandler return UntypedResultSet.create(cqlRows); } - public ResultMessage.Prepared prepare(String query, QueryState state, Map<String, byte[]> customPayload) throws RequestValidationException + public ResultMessage.Prepared prepare(String query, + QueryState state, + Map<String, ByteBuffer> customPayload) throws RequestValidationException { return prepare(query, state); } @@ -422,7 +428,11 @@ public class QueryProcessor implements QueryHandler } } - public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException + public ResultMessage processPrepared(CQLStatement statement, + QueryState state, + QueryOptions options, + Map<String, ByteBuffer> customPayload) + throws RequestExecutionException, RequestValidationException { return processPrepared(statement, state, options); } @@ -450,7 +460,11 @@ public class QueryProcessor implements QueryHandler return processStatement(statement, queryState, options); } - public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException + public ResultMessage processBatch(BatchStatement statement, + QueryState state, + BatchQueryOptions options, + Map<String, ByteBuffer> customPayload) + throws RequestExecutionException, RequestValidationException { return processBatch(statement, state, options); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f93bd1f/src/java/org/apache/cassandra/transport/CBUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java index 48beea0..92e2891 100644 --- a/src/java/org/apache/cassandra/transport/CBUtil.java +++ b/src/java/org/apache/cassandra/transport/CBUtil.java @@ -183,36 +183,36 @@ public abstract class CBUtil return 2 + bytes.length; } - public static Map<String, byte[]> readBytesMap(ByteBuf cb) + public static Map<String, ByteBuffer> readBytesMap(ByteBuf cb) { int length = cb.readUnsignedShort(); - Map<String, byte[]> m = new HashMap<>(length); + Map<String, ByteBuffer> m = new HashMap<>(length); for (int i = 0; i < length; i++) { String k = readString(cb); - byte[] v = readBytes(cb); + ByteBuffer v = readValue(cb); m.put(k, v); } return m; } - public static void writeBytesMap(Map<String, byte[]> m, ByteBuf cb) + public static void writeBytesMap(Map<String, ByteBuffer> m, ByteBuf cb) { cb.writeShort(m.size()); - for (Map.Entry<String, byte[]> entry : m.entrySet()) + for (Map.Entry<String, ByteBuffer> entry : m.entrySet()) { writeString(entry.getKey(), cb); - writeBytes(entry.getValue(), cb); + writeValue(entry.getValue(), cb); } } - public static int sizeOfBytesMap(Map<String, byte[]> m) + public static int sizeOfBytesMap(Map<String, ByteBuffer> m) { int size = 2; - for (Map.Entry<String, byte[]> entry : m.entrySet()) + for (Map.Entry<String, ByteBuffer> entry : m.entrySet()) { size += sizeOfString(entry.getKey()); - size += sizeOfBytes(entry.getValue()); + size += sizeOfValue(entry.getValue()); } return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f93bd1f/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index b6d5a95..440d481 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -19,6 +19,7 @@ package org.apache.cassandra.transport; import java.util.ArrayList; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.EnumSet; import java.util.HashSet; import java.util.List; @@ -148,7 +149,7 @@ public abstract class Message protected Connection connection; private int streamId; private Frame sourceFrame; - private Map<String, byte[]> customPayload; + private Map<String, ByteBuffer> customPayload; protected Message(Type type) { @@ -186,12 +187,12 @@ public abstract class Message return sourceFrame; } - public Map<String, byte[]> getCustomPayload() + public Map<String, ByteBuffer> getCustomPayload() { return customPayload; } - public void setCustomPayload(Map<String, byte[]> customPayload) + public void setCustomPayload(Map<String, ByteBuffer> customPayload) { this.customPayload = customPayload; } @@ -269,7 +270,7 @@ public abstract class Message UUID tracingId = isRequest || !isTracing ? null : CBUtil.readUUID(frame.body); List<String> warnings = isRequest || !hasWarning ? null : CBUtil.readStringList(frame.body); - Map<String, byte[]> customPayload = !isCustomPayload ? null : CBUtil.readBytesMap(frame.body); + Map<String, ByteBuffer> customPayload = !isCustomPayload ? null : CBUtil.readBytesMap(frame.body); try { @@ -329,7 +330,7 @@ public abstract class Message if (message instanceof Response) { UUID tracingId = ((Response)message).getTracingId(); - Map<String, byte[]> customPayload = message.getCustomPayload(); + Map<String, ByteBuffer> customPayload = message.getCustomPayload(); if (tracingId != null) messageSize += CBUtil.sizeOfUUID(tracingId); List<String> warnings = ((Response)message).getWarnings(); @@ -367,7 +368,7 @@ public abstract class Message assert message instanceof Request; if (((Request)message).isTracingRequested()) flags.add(Frame.Header.Flag.TRACING); - Map<String, byte[]> payload = message.getCustomPayload(); + Map<String, ByteBuffer> payload = message.getCustomPayload(); if (payload != null) messageSize += CBUtil.sizeOfBytesMap(payload); body = CBUtil.allocator.buffer(messageSize); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f93bd1f/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java index 1049d63..73daa48 100644 --- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java +++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java @@ -48,10 +48,12 @@ import org.apache.cassandra.transport.messages.QueryMessage; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.MD5Digest; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + public class MessagePayloadTest extends CQLTester { - public static Map<String, byte[]> requestPayload; - public static Map<String, byte[]> responsePayload; + public static Map<String, ByteBuffer> requestPayload; + public static Map<String, ByteBuffer> responsePayload; private static Field cqlQueryHandlerField; private static boolean modifiersAccessible; @@ -125,8 +127,8 @@ public class MessagePayloadTest extends CQLTester { client.connect(false); - Map<String, byte[]> reqMap; - Map<String, byte[]> respMap; + Map<String, ByteBuffer> reqMap; + Map<String, ByteBuffer> respMap; QueryMessage queryMessage = new QueryMessage( "CREATE TABLE " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)", @@ -134,23 +136,23 @@ public class MessagePayloadTest extends CQLTester ); PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable"); - reqMap = Collections.singletonMap("foo", "42".getBytes()); - responsePayload = respMap = Collections.singletonMap("bar", "42".getBytes()); + reqMap = Collections.singletonMap("foo", bytes(42)); + responsePayload = respMap = Collections.singletonMap("bar", bytes(42)); queryMessage.setCustomPayload(reqMap); Message.Response queryResponse = client.execute(queryMessage); payloadEquals(reqMap, requestPayload); payloadEquals(respMap, queryResponse.getCustomPayload()); - reqMap = Collections.singletonMap("foo", "43".getBytes()); - responsePayload = respMap = Collections.singletonMap("bar", "43".getBytes()); + reqMap = Collections.singletonMap("foo", bytes(43)); + responsePayload = respMap = Collections.singletonMap("bar", bytes(43)); prepareMessage.setCustomPayload(reqMap); ResultMessage.Prepared prepareResponse = (ResultMessage.Prepared) client.execute(prepareMessage); payloadEquals(reqMap, requestPayload); payloadEquals(respMap, prepareResponse.getCustomPayload()); ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT); - reqMap = Collections.singletonMap("foo", "44".getBytes()); - responsePayload = respMap = Collections.singletonMap("bar", "44".getBytes()); + reqMap = Collections.singletonMap("foo", bytes(44)); + responsePayload = respMap = Collections.singletonMap("bar", bytes(44)); executeMessage.setCustomPayload(reqMap); Message.Response executeResponse = client.execute(executeMessage); payloadEquals(reqMap, requestPayload); @@ -160,8 +162,8 @@ public class MessagePayloadTest extends CQLTester Collections.<Object>singletonList("INSERT INTO " + KEYSPACE + ".atable (pk,v) VALUES (1, 'foo')"), Collections.singletonList(Collections.<ByteBuffer>emptyList()), QueryOptions.DEFAULT); - reqMap = Collections.singletonMap("foo", "45".getBytes()); - responsePayload = respMap = Collections.singletonMap("bar", "45".getBytes()); + reqMap = Collections.singletonMap("foo", bytes(45)); + responsePayload = respMap = Collections.singletonMap("bar", bytes(45)); batchMessage.setCustomPayload(reqMap); Message.Response batchResponse = client.execute(batchMessage); payloadEquals(reqMap, requestPayload); @@ -194,7 +196,7 @@ public class MessagePayloadTest extends CQLTester { client.connect(false); - Map<String, byte[]> reqMap; + Map<String, ByteBuffer> reqMap; QueryMessage queryMessage = new QueryMessage( "CREATE TABLE " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)", @@ -202,8 +204,8 @@ public class MessagePayloadTest extends CQLTester ); PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable"); - reqMap = Collections.singletonMap("foo", "42".getBytes()); - responsePayload = Collections.singletonMap("bar", "42".getBytes()); + reqMap = Collections.singletonMap("foo", bytes(42)); + responsePayload = Collections.singletonMap("bar", bytes(42)); queryMessage.setCustomPayload(reqMap); try { @@ -217,8 +219,8 @@ public class MessagePayloadTest extends CQLTester queryMessage.setCustomPayload(null); client.execute(queryMessage); - reqMap = Collections.singletonMap("foo", "43".getBytes()); - responsePayload = Collections.singletonMap("bar", "43".getBytes()); + reqMap = Collections.singletonMap("foo", bytes(43)); + responsePayload = Collections.singletonMap("bar", bytes(43)); prepareMessage.setCustomPayload(reqMap); try { @@ -233,8 +235,8 @@ public class MessagePayloadTest extends CQLTester ResultMessage.Prepared prepareResponse = (ResultMessage.Prepared) client.execute(prepareMessage); ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT); - reqMap = Collections.singletonMap("foo", "44".getBytes()); - responsePayload = Collections.singletonMap("bar", "44".getBytes()); + reqMap = Collections.singletonMap("foo", bytes(44)); + responsePayload = Collections.singletonMap("bar", bytes(44)); executeMessage.setCustomPayload(reqMap); try { @@ -250,8 +252,8 @@ public class MessagePayloadTest extends CQLTester Collections.<Object>singletonList("INSERT INTO " + KEYSPACE + ".atable (pk,v) VALUES (1, 'foo')"), Collections.singletonList(Collections.<ByteBuffer>emptyList()), QueryOptions.DEFAULT); - reqMap = Collections.singletonMap("foo", "45".getBytes()); - responsePayload = Collections.singletonMap("bar", "45".getBytes()); + reqMap = Collections.singletonMap("foo", bytes(45)); + responsePayload = Collections.singletonMap("bar", bytes(45)); batchMessage.setCustomPayload(reqMap); try { @@ -274,13 +276,13 @@ public class MessagePayloadTest extends CQLTester } } - private static void payloadEquals(Map<String, byte[]> map1, Map<String, byte[]> map2) + private static void payloadEquals(Map<String, ByteBuffer> map1, Map<String, ByteBuffer> map2) { Assert.assertNotNull(map1); Assert.assertNotNull(map2); Assert.assertEquals(map1.keySet(), map2.keySet()); - for (Map.Entry<String, byte[]> e : map1.entrySet()) - Assert.assertArrayEquals(e.getValue(), map2.get(e.getKey())); + for (Map.Entry<String, ByteBuffer> e : map1.entrySet()) + Assert.assertEquals(e.getValue(), map2.get(e.getKey())); } public static class TestQueryHandler implements QueryHandler @@ -295,7 +297,10 @@ public class MessagePayloadTest extends CQLTester return QueryProcessor.instance.getPreparedForThrift(id); } - public ResultMessage.Prepared prepare(String query, QueryState state, Map<String, byte[]> customPayload) throws RequestValidationException + public ResultMessage.Prepared prepare(String query, + QueryState state, + Map<String, ByteBuffer> customPayload) + throws RequestValidationException { if (customPayload != null) requestPayload = customPayload; @@ -308,7 +313,11 @@ public class MessagePayloadTest extends CQLTester return result; } - public ResultMessage process(String query, QueryState state, QueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException + public ResultMessage process(String query, + QueryState state, + QueryOptions options, + Map<String, ByteBuffer> customPayload) + throws RequestExecutionException, RequestValidationException { if (customPayload != null) requestPayload = customPayload; @@ -321,7 +330,11 @@ public class MessagePayloadTest extends CQLTester return result; } - public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException + public ResultMessage processBatch(BatchStatement statement, + QueryState state, + BatchQueryOptions options, + Map<String, ByteBuffer> customPayload) + throws RequestExecutionException, RequestValidationException { if (customPayload != null) requestPayload = customPayload; @@ -334,7 +347,11 @@ public class MessagePayloadTest extends CQLTester return result; } - public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException + public ResultMessage processPrepared(CQLStatement statement, + QueryState state, + QueryOptions options, + Map<String, ByteBuffer> customPayload) + throws RequestExecutionException, RequestValidationException { if (customPayload != null) requestPayload = customPayload;