Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 98502e0c3 -> 5654e7368


Preserve stream ID for more protocol errors

Patch by Chris Bannister and Tyler Hobbs for CASSANDRA-8848


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5654e736
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5654e736
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5654e736

Branch: refs/heads/cassandra-2.0
Commit: 5654e7368c2d68b3701cb5cdf190975f1079b10c
Parents: 98502e0
Author: Chris Bannister <c.bannis...@gmail.com>
Authored: Fri Feb 27 16:09:26 2015 -0600
Committer: Tyler Hobbs <tylerho...@apache.org>
Committed: Fri Feb 27 16:09:26 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 +-
 .../org/apache/cassandra/transport/Frame.java   | 18 +++-
 .../transport/messages/ErrorMessage.java        |  9 +-
 .../cassandra/transport/ProtocolErrorTest.java  | 97 ++++++++++++++++++++
 4 files changed, 123 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5654e736/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f2b4469..3a8d824 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.13:
+ * Preserve stream ID for more protocol errors (CASSANDRA-8848)
  * Fix combining token() function with multi-column relations on
    clustering columns (CASSANDRA-8797)
  * Make CFS.markReferenced() resistant to bad refcounting (CASSANDRA-8829)
@@ -7,7 +8,8 @@
    table with ASC ordering and paging (CASSANDRA-8767)
  * AssertionError: "Memory was freed" when running cleanup (CASSANDRA-8716)
  * Make it possible to set max_sstable_age to fractional days (CASSANDRA-8406)
- * Fix memory leak in SSTableSimple*Writer and SSTableReader.validate() 
(CASSANDRA-8748)
+ * Fix memory leak in SSTableSimple*Writer and SSTableReader.validate()
+   (CASSANDRA-8748)
  * Fix some multi-column relations with indexes on some clustering
    columns (CASSANDRA-8275)
  * Fix IllegalArgumentException in dynamic snitch (CASSANDRA-8448)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5654e736/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java 
b/src/java/org/apache/cassandra/transport/Frame.java
index 89755df..7520c41 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -166,14 +166,22 @@ public class Frame
             int streamId = buffer.getByte(idx + 2);
 
             // This throws a protocol exceptions if the opcode is unknown
-            Message.Type type = Message.Type.fromOpcode(buffer.getByte(idx + 
3), direction);
+            Message.Type type;
+            try
+            {
+                type = Message.Type.fromOpcode(buffer.getByte(idx + 3), 
direction);
+            }
+            catch (ProtocolException e)
+            {
+                throw ErrorMessage.wrap(e, streamId);
+            }
 
             long bodyLength = buffer.getUnsignedInt(idx + 
Header.BODY_LENGTH_OFFSET);
 
             if (bodyLength < 0)
             {
                 buffer.skipBytes(Header.LENGTH);
-                throw new ProtocolException("Invalid frame body length: " + 
bodyLength);
+                throw ErrorMessage.wrap(new ProtocolException("Invalid frame 
body length: " + bodyLength), streamId);
             }
 
             long frameLength = bodyLength + Header.LENGTH;
@@ -207,7 +215,11 @@ public class Frame
             }
             else if (connection.getVersion() != version)
             {
-                throw new ProtocolException(String.format("Invalid message 
version. Got %d but previous messages on this connection had version %d", 
version, connection.getVersion()));
+                throw ErrorMessage.wrap(
+                        new ProtocolException(String.format(
+                                "Invalid message version. Got %d but previous 
messages on this connection had version %d",
+                                version, connection.getVersion())),
+                        streamId);
             }
 
             return new Frame(new Header(version, flags, streamId, type), body);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5654e736/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 4d60a1f..e27fb88 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.transport.messages;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
@@ -243,7 +244,8 @@ public class ErrorMessage extends Message.Response
         return new WrappedException(t, streamId);
     }
 
-    private static class WrappedException extends RuntimeException
+    @VisibleForTesting
+    public static class WrappedException extends RuntimeException
     {
         private final int streamId;
 
@@ -252,6 +254,11 @@ public class ErrorMessage extends Message.Response
             super(cause);
             this.streamId = streamId;
         }
+
+        public int getStreamId()
+        {
+            return this.streamId;
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5654e736/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java 
b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
new file mode 100644
index 0000000..387d159
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.HeapChannelBufferFactory;
+import org.apache.cassandra.transport.messages.ErrorMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ProtocolErrorTest {
+
+    @Test
+    public void testInvalidDirection() throws Exception
+    {
+        Frame.Decoder dec = new Frame.Decoder(null);
+
+        // should generate a protocol exception for using a response frame with
+        // a prepare op, ensure that it comes back with stream ID 1
+        byte[] frame = new byte[] {
+                (byte) 0x82,  // direction & version
+                0x00,  // flags
+                0x01,  // stream ID
+                0x09,  // opcode
+                0x00, 0x00, 0x00, 0x21,  // body length
+                0x00, 0x00, 0x00, 0x1b, 0x00, 0x1b, 0x53, 0x45,
+                0x4c, 0x45, 0x43, 0x54, 0x20, 0x2a, 0x20, 0x46,
+                0x52, 0x4f, 0x4d, 0x20, 0x73, 0x79, 0x73, 0x74,
+                0x65, 0x6d, 0x2e, 0x6c, 0x6f, 0x63, 0x61, 0x6c,
+                0x3b
+        };
+        ChannelBuffer buf = new HeapChannelBufferFactory().getBuffer(frame, 0, 
frame.length);
+        try {
+            dec.decode(null, null, buf);
+        } catch (ErrorMessage.WrappedException e) {
+            // make sure the exception has the correct stream ID
+            Assert.assertEquals(1, e.getStreamId());
+        }
+    }
+
+    @Test
+    public void testNegativeBodyLength() throws Exception
+    {
+        Frame.Decoder dec = new Frame.Decoder(null);
+
+        byte[] frame = new byte[] {
+                (byte) 0x82,  // direction & version
+                0x00,  // flags
+                0x01,  // stream ID
+                0x09,  // opcode
+                (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,  // body 
length (-1)
+        };
+        ChannelBuffer buf = new HeapChannelBufferFactory().getBuffer(frame, 0, 
frame.length);
+        try {
+            dec.decode(null, null, buf);
+        } catch (ErrorMessage.WrappedException e) {
+            // make sure the exception has the correct stream ID
+            Assert.assertEquals(1, e.getStreamId());
+        }
+    }
+
+    @Test
+    public void testBodyLengthOverLimit() throws Exception
+    {
+        Frame.Decoder dec = new Frame.Decoder(null);
+
+        byte[] frame = new byte[] {
+                (byte) 0x82,  // direction & version
+                0x00,  // flags
+                0x01,  // stream ID
+                0x09,  // opcode
+                0x7f, (byte) 0xff, (byte) 0xff, (byte) 0xff,  // body length
+        };
+        ChannelBuffer buf = new HeapChannelBufferFactory().getBuffer(frame, 0, 
frame.length);
+        try {
+            dec.decode(null, null, buf);
+        } catch (ErrorMessage.WrappedException e) {
+            // make sure the exception has the correct stream ID
+            Assert.assertEquals(1, e.getStreamId());
+        }
+    }
+}

Reply via email to