Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 98502e0c3 -> 5654e7368
Preserve stream ID for more protocol errors Patch by Chris Bannister and Tyler Hobbs for CASSANDRA-8848 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5654e736 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5654e736 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5654e736 Branch: refs/heads/cassandra-2.0 Commit: 5654e7368c2d68b3701cb5cdf190975f1079b10c Parents: 98502e0 Author: Chris Bannister <c.bannis...@gmail.com> Authored: Fri Feb 27 16:09:26 2015 -0600 Committer: Tyler Hobbs <tylerho...@apache.org> Committed: Fri Feb 27 16:09:26 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 4 +- .../org/apache/cassandra/transport/Frame.java | 18 +++- .../transport/messages/ErrorMessage.java | 9 +- .../cassandra/transport/ProtocolErrorTest.java | 97 ++++++++++++++++++++ 4 files changed, 123 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5654e736/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f2b4469..3a8d824 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.13: + * Preserve stream ID for more protocol errors (CASSANDRA-8848) * Fix combining token() function with multi-column relations on clustering columns (CASSANDRA-8797) * Make CFS.markReferenced() resistant to bad refcounting (CASSANDRA-8829) @@ -7,7 +8,8 @@ table with ASC ordering and paging (CASSANDRA-8767) * AssertionError: "Memory was freed" when running cleanup (CASSANDRA-8716) * Make it possible to set max_sstable_age to fractional days (CASSANDRA-8406) - * Fix memory leak in SSTableSimple*Writer and SSTableReader.validate() (CASSANDRA-8748) + * Fix memory leak in SSTableSimple*Writer and SSTableReader.validate() + (CASSANDRA-8748) * Fix some multi-column relations with indexes on some clustering columns (CASSANDRA-8275) * Fix IllegalArgumentException in dynamic snitch (CASSANDRA-8448) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5654e736/src/java/org/apache/cassandra/transport/Frame.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java index 89755df..7520c41 100644 --- a/src/java/org/apache/cassandra/transport/Frame.java +++ b/src/java/org/apache/cassandra/transport/Frame.java @@ -166,14 +166,22 @@ public class Frame int streamId = buffer.getByte(idx + 2); // This throws a protocol exceptions if the opcode is unknown - Message.Type type = Message.Type.fromOpcode(buffer.getByte(idx + 3), direction); + Message.Type type; + try + { + type = Message.Type.fromOpcode(buffer.getByte(idx + 3), direction); + } + catch (ProtocolException e) + { + throw ErrorMessage.wrap(e, streamId); + } long bodyLength = buffer.getUnsignedInt(idx + Header.BODY_LENGTH_OFFSET); if (bodyLength < 0) { buffer.skipBytes(Header.LENGTH); - throw new ProtocolException("Invalid frame body length: " + bodyLength); + throw ErrorMessage.wrap(new ProtocolException("Invalid frame body length: " + bodyLength), streamId); } long frameLength = bodyLength + Header.LENGTH; @@ -207,7 +215,11 @@ public class Frame } else if (connection.getVersion() != version) { - throw new ProtocolException(String.format("Invalid message version. Got %d but previous messages on this connection had version %d", version, connection.getVersion())); + throw ErrorMessage.wrap( + new ProtocolException(String.format( + "Invalid message version. Got %d but previous messages on this connection had version %d", + version, connection.getVersion())), + streamId); } return new Frame(new Header(version, flags, streamId, type), body); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5654e736/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index 4d60a1f..e27fb88 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.transport.messages; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import org.jboss.netty.buffer.ChannelBuffer; import org.slf4j.Logger; @@ -243,7 +244,8 @@ public class ErrorMessage extends Message.Response return new WrappedException(t, streamId); } - private static class WrappedException extends RuntimeException + @VisibleForTesting + public static class WrappedException extends RuntimeException { private final int streamId; @@ -252,6 +254,11 @@ public class ErrorMessage extends Message.Response super(cause); this.streamId = streamId; } + + public int getStreamId() + { + return this.streamId; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5654e736/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java new file mode 100644 index 0000000..387d159 --- /dev/null +++ b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.transport; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.HeapChannelBufferFactory; +import org.apache.cassandra.transport.messages.ErrorMessage; +import org.junit.Assert; +import org.junit.Test; + +public class ProtocolErrorTest { + + @Test + public void testInvalidDirection() throws Exception + { + Frame.Decoder dec = new Frame.Decoder(null); + + // should generate a protocol exception for using a response frame with + // a prepare op, ensure that it comes back with stream ID 1 + byte[] frame = new byte[] { + (byte) 0x82, // direction & version + 0x00, // flags + 0x01, // stream ID + 0x09, // opcode + 0x00, 0x00, 0x00, 0x21, // body length + 0x00, 0x00, 0x00, 0x1b, 0x00, 0x1b, 0x53, 0x45, + 0x4c, 0x45, 0x43, 0x54, 0x20, 0x2a, 0x20, 0x46, + 0x52, 0x4f, 0x4d, 0x20, 0x73, 0x79, 0x73, 0x74, + 0x65, 0x6d, 0x2e, 0x6c, 0x6f, 0x63, 0x61, 0x6c, + 0x3b + }; + ChannelBuffer buf = new HeapChannelBufferFactory().getBuffer(frame, 0, frame.length); + try { + dec.decode(null, null, buf); + } catch (ErrorMessage.WrappedException e) { + // make sure the exception has the correct stream ID + Assert.assertEquals(1, e.getStreamId()); + } + } + + @Test + public void testNegativeBodyLength() throws Exception + { + Frame.Decoder dec = new Frame.Decoder(null); + + byte[] frame = new byte[] { + (byte) 0x82, // direction & version + 0x00, // flags + 0x01, // stream ID + 0x09, // opcode + (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, // body length (-1) + }; + ChannelBuffer buf = new HeapChannelBufferFactory().getBuffer(frame, 0, frame.length); + try { + dec.decode(null, null, buf); + } catch (ErrorMessage.WrappedException e) { + // make sure the exception has the correct stream ID + Assert.assertEquals(1, e.getStreamId()); + } + } + + @Test + public void testBodyLengthOverLimit() throws Exception + { + Frame.Decoder dec = new Frame.Decoder(null); + + byte[] frame = new byte[] { + (byte) 0x82, // direction & version + 0x00, // flags + 0x01, // stream ID + 0x09, // opcode + 0x7f, (byte) 0xff, (byte) 0xff, (byte) 0xff, // body length + }; + ChannelBuffer buf = new HeapChannelBufferFactory().getBuffer(frame, 0, frame.length); + try { + dec.decode(null, null, buf); + } catch (ErrorMessage.WrappedException e) { + // make sure the exception has the correct stream ID + Assert.assertEquals(1, e.getStreamId()); + } + } +}