Updated Branches: refs/heads/trunk 29564517a -> f6106a18a
Invalid streamId in cql binary protocol when using invalid CL patch by slebresne; reviewed by pchalamet for CASSANDRA-5164 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b14fc6d0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b14fc6d0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b14fc6d0 Branch: refs/heads/trunk Commit: b14fc6d0cda035633590dbe495e9ba08c21d26cc Parents: 01bc564 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Wed Mar 27 10:51:05 2013 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Wed Mar 27 10:51:05 2013 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/transport/Message.java | 38 +++++++++------ .../cassandra/transport/messages/ErrorMessage.java | 34 ++++++++++++- 3 files changed, 56 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b14fc6d0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 184e70d..3722f2d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ * Validate that provided CQL3 collection value are < 64K (CASSANDRA-5355) * Make upgradeSSTable skip current version sstables by default (CASSANDRA-5366) * Optimize min/max timestamp collection (CASSANDRA-5373) + * Invalid streamId in cql binary protocol when using invalid CL (CASSANDRA-5164) Merged from 1.1: * cli: Quote ks and cf names in schema output when needed (CASSANDRA-5052) * Fix bad default for min/max timestamp in SSTableMetadata (CASSANDRA-5372) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b14fc6d0/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index d4d3da6..8aec501 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -205,25 +205,33 @@ public abstract class Message UUID tracingId = isRequest || !isTracing ? null : CBUtil.readUuid(frame.body); - Message message = frame.header.type.codec.decode(frame.body); - message.setStreamId(frame.header.streamId); - - if (isRequest) + try { - assert message instanceof Request; - Request req = (Request)message; - req.attach(frame.connection); - if (isTracing) - req.setTracingRequested(); + Message message = frame.header.type.codec.decode(frame.body); + message.setStreamId(frame.header.streamId); + + if (isRequest) + { + assert message instanceof Request; + Request req = (Request)message; + req.attach(frame.connection); + if (isTracing) + req.setTracingRequested(); + } + else + { + assert message instanceof Response; + if (isTracing) + ((Response)message).setTracingId(tracingId); + } + + return message; } - else + catch (Exception ex) { - assert message instanceof Response; - if (isTracing) - ((Response)message).setTracingId(tracingId); + // Remember the streamId + throw ErrorMessage.wrap(ex, frame.header.streamId); } - - return message; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b14fc6d0/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index 0751584..dd685fa 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -188,14 +188,27 @@ public class ErrorMessage extends Message.Response this.error = error; } + private ErrorMessage(TransportException error, int streamId) + { + this(error); + setStreamId(streamId); + } + public static ErrorMessage fromException(Throwable e) { + int streamId = 0; + if (e instanceof WrappedException) + { + streamId = ((WrappedException)e).streamId; + e = e.getCause(); + } + if (e instanceof TransportException) - return new ErrorMessage((TransportException)e); + return new ErrorMessage((TransportException)e, streamId); // Unexpected exception logger.error("Unexpected exception during request", e); - return new ErrorMessage(new ServerError(e)); + return new ErrorMessage(new ServerError(e), streamId); } public ChannelBuffer encode() @@ -208,4 +221,21 @@ public class ErrorMessage extends Message.Response { return "ERROR " + error.code() + ": " + error.getMessage(); } + + public static RuntimeException wrap(Throwable t, int streamId) + { + return new WrappedException(t, streamId); + } + + private static class WrappedException extends RuntimeException + { + private final int streamId; + + public WrappedException(Throwable cause, int streamId) + { + super(cause); + this.streamId = streamId; + } + } + }