Repository: cassandra Updated Branches: refs/heads/trunk c1f623902 -> 33ab4902a
Remove obsoleted CredentialsMessage patch by Stefan Podkowinski; reviewed by Jeremiah Jordan for CASSANDRA-13662 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/33ab4902 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/33ab4902 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/33ab4902 Branch: refs/heads/trunk Commit: 33ab4902a1bef5aa662b80b89b5dd9a318f67db5 Parents: c1f6239 Author: Stefan Podkowinski <stefan.podkowin...@1und1.de> Authored: Tue Jul 4 10:05:51 2017 +0200 Committer: Stefan Podkowinski <stefan.podkowin...@1und1.de> Committed: Tue Sep 5 16:11:42 2017 +0200 ---------------------------------------------------------------------- .../org/apache/cassandra/transport/Client.java | 7 -- .../org/apache/cassandra/transport/Frame.java | 37 +++++--- .../org/apache/cassandra/transport/Message.java | 2 +- .../cassandra/transport/SimpleClient.java | 8 -- .../transport/messages/CredentialsMessage.java | 97 -------------------- .../messages/UnsupportedMessageCodec.java | 56 +++++++++++ .../cassandra/transport/ProtocolErrorTest.java | 21 +++++ 7 files changed, 102 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/33ab4902/src/java/org/apache/cassandra/transport/Client.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java index 9a76e03..7fec473 100644 --- a/src/java/org/apache/cassandra/transport/Client.java +++ b/src/java/org/apache/cassandra/transport/Client.java @@ -175,13 +175,6 @@ public class Client extends SimpleClient { return new OptionsMessage(); } - else if (msgType.equals("CREDENTIALS")) - { - System.err.println("[WARN] CREDENTIALS command is deprecated, use AUTHENTICATE instead"); - CredentialsMessage msg = new CredentialsMessage(); - msg.credentials.putAll(readCredentials(iter)); - return msg; - } else if (msgType.equals("AUTHENTICATE")) { Map<String, String> credentials = readCredentials(iter); http://git-wip-us.apache.org/repos/asf/cassandra/blob/33ab4902/src/java/org/apache/cassandra/transport/Frame.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java index 6cd8b1e..41e64f9 100644 --- a/src/java/org/apache/cassandra/transport/Frame.java +++ b/src/java/org/apache/cassandra/transport/Frame.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.EnumSet; import java.util.List; +import com.google.common.annotations.VisibleForTesting; + import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.handler.codec.ByteToMessageDecoder; @@ -146,8 +148,8 @@ public class Frame this.factory = factory; } - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> results) + @VisibleForTesting + Frame decodeFrame(ByteBuf buffer) throws Exception { if (discardingTooLongFrame) @@ -156,12 +158,12 @@ public class Frame // If we have discarded everything, throw the exception if (bytesToDiscard <= 0) fail(); - return; + return null; } int readableBytes = buffer.readableBytes(); if (readableBytes == 0) - return; + return null; int idx = buffer.readerIndex(); @@ -174,7 +176,7 @@ public class Frame // Wait until we have the complete header if (readableBytes < Header.LENGTH) - return; + return null; int flags = buffer.getByte(idx++); EnumSet<Header.Flag> decodedFlags = Header.Flag.deserialize(flags); @@ -210,11 +212,11 @@ public class Frame bytesToDiscard = discard(buffer, frameLength); if (bytesToDiscard <= 0) fail(); - return; + return null; } if (buffer.readableBytes() < frameLength) - return; + return null; // extract body ByteBuf body = buffer.slice(idx, (int) bodyLength); @@ -223,24 +225,33 @@ public class Frame idx += bodyLength; buffer.readerIndex(idx); + return new Frame(new Header(version, decodedFlags, streamId, type), body); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> results) + throws Exception + { + Frame frame = decodeFrame(buffer); + if (frame == null) return; + Attribute<Connection> attrConn = ctx.channel().attr(Connection.attributeKey); Connection connection = attrConn.get(); if (connection == null) { // First message seen on this channel, attach the connection object - connection = factory.newConnection(ctx.channel(), version); + connection = factory.newConnection(ctx.channel(), frame.header.version); attrConn.set(connection); } - else if (connection.getVersion() != version) + else if (connection.getVersion() != frame.header.version) { throw ErrorMessage.wrap( new ProtocolException(String.format( "Invalid message version. Got %s but previous messages on this connection had version %s", - version, connection.getVersion())), - streamId); + frame.header.version, connection.getVersion())), + frame.header.streamId); } - - results.add(new Frame(new Header(version, decodedFlags, streamId, type), body)); + results.add(frame); } private void fail() http://git-wip-us.apache.org/repos/asf/cassandra/blob/33ab4902/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 1afe910..2da2ca7 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -88,7 +88,7 @@ public abstract class Message STARTUP (1, Direction.REQUEST, StartupMessage.codec), READY (2, Direction.RESPONSE, ReadyMessage.codec), AUTHENTICATE (3, Direction.RESPONSE, AuthenticateMessage.codec), - CREDENTIALS (4, Direction.REQUEST, CredentialsMessage.codec), + CREDENTIALS (4, Direction.REQUEST, UnsupportedMessageCodec.instance), OPTIONS (5, Direction.REQUEST, OptionsMessage.codec), SUPPORTED (6, Direction.RESPONSE, SupportedMessage.codec), QUERY (7, Direction.REQUEST, QueryMessage.codec), http://git-wip-us.apache.org/repos/asf/cassandra/blob/33ab4902/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index c72d6e9..d5148ab 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -44,7 +44,6 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.security.SSLFactory; -import org.apache.cassandra.transport.messages.CredentialsMessage; import org.apache.cassandra.transport.messages.ErrorMessage; import org.apache.cassandra.transport.messages.EventMessage; import org.apache.cassandra.transport.messages.ExecuteMessage; @@ -166,13 +165,6 @@ public class SimpleClient implements Closeable } } - public void login(Map<String, String> credentials) - { - CredentialsMessage msg = new CredentialsMessage(); - msg.credentials.putAll(credentials); - execute(msg); - } - public ResultMessage execute(String query, ConsistencyLevel consistency) { return execute(query, Collections.<ByteBuffer>emptyList(), consistency); http://git-wip-us.apache.org/repos/asf/cassandra/blob/33ab4902/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java deleted file mode 100644 index 764d992..0000000 --- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.transport.messages; - -import java.util.HashMap; -import java.util.Map; - -import io.netty.buffer.ByteBuf; -import org.apache.cassandra.auth.AuthenticatedUser; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.exceptions.AuthenticationException; -import org.apache.cassandra.metrics.AuthMetrics; -import org.apache.cassandra.service.QueryState; -import org.apache.cassandra.transport.CBUtil; -import org.apache.cassandra.transport.Message; -import org.apache.cassandra.transport.ProtocolException; -import org.apache.cassandra.transport.ProtocolVersion; - -/** - * Message to indicate that the server is ready to receive requests. - */ -public class CredentialsMessage extends Message.Request -{ - public static final Message.Codec<CredentialsMessage> codec = new Message.Codec<CredentialsMessage>() - { - public CredentialsMessage decode(ByteBuf body, ProtocolVersion version) - { - if (version.isGreaterThan(ProtocolVersion.V1)) - throw new ProtocolException("Legacy credentials authentication is not supported in " + - "protocol versions > 1. Please use SASL authentication via a SaslResponse message"); - - Map<String, String> credentials = CBUtil.readStringMap(body); - return new CredentialsMessage(credentials); - } - - public void encode(CredentialsMessage msg, ByteBuf dest, ProtocolVersion version) - { - CBUtil.writeStringMap(msg.credentials, dest); - } - - public int encodedSize(CredentialsMessage msg, ProtocolVersion version) - { - return CBUtil.sizeOfStringMap(msg.credentials); - } - }; - - public final Map<String, String> credentials; - - public CredentialsMessage() - { - this(new HashMap<String, String>()); - } - - private CredentialsMessage(Map<String, String> credentials) - { - super(Message.Type.CREDENTIALS); - this.credentials = credentials; - } - - public Message.Response execute(QueryState state, long queryStartNanoTime) - { - try - { - AuthenticatedUser user = DatabaseDescriptor.getAuthenticator().legacyAuthenticate(credentials); - state.getClientState().login(user); - AuthMetrics.instance.markSuccess(); - } - catch (AuthenticationException e) - { - AuthMetrics.instance.markFailure(); - return ErrorMessage.fromException(e); - } - - return new ReadyMessage(); - } - - @Override - public String toString() - { - return "CREDENTIALS"; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/33ab4902/src/java/org/apache/cassandra/transport/messages/UnsupportedMessageCodec.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/UnsupportedMessageCodec.java b/src/java/org/apache/cassandra/transport/messages/UnsupportedMessageCodec.java new file mode 100644 index 0000000..563e5d6 --- /dev/null +++ b/src/java/org/apache/cassandra/transport/messages/UnsupportedMessageCodec.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport.messages; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBuf; +import org.apache.cassandra.transport.Message; +import org.apache.cassandra.transport.ProtocolException; +import org.apache.cassandra.transport.ProtocolVersion; + +/** + * Catch-all codec for any unsupported legacy messages. + */ +public class UnsupportedMessageCodec <T extends Message> implements Message.Codec<T> +{ + public final static UnsupportedMessageCodec instance = new UnsupportedMessageCodec(); + + private static final Logger logger = LoggerFactory.getLogger(UnsupportedMessageCodec.class); + + public T decode(ByteBuf body, ProtocolVersion version) + { + if (ProtocolVersion.SUPPORTED.contains(version)) + { + logger.error("Received invalid message for supported protocol version {}", version); + } + throw new ProtocolException("Unsupported message"); + } + + public void encode(T t, ByteBuf dest, ProtocolVersion version) + { + throw new ProtocolException("Unsupported message"); + } + + public int encodedSize(T t, ProtocolVersion version) + { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/33ab4902/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java index 5e9731a..26b3d96 100644 --- a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java +++ b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java @@ -172,4 +172,25 @@ public class ProtocolErrorTest { Assert.assertEquals(expected, buf); } + + @Test + public void testUnsupportedMessage() throws Exception + { + byte[] incomingFrame = new byte[] { + (byte) REQUEST.addToVersion(ProtocolVersion.CURRENT.asInt()), // direction & version + 0x00, // flags + 0x00, 0x01, // stream ID + 0x04, // opcode for obsoleted CREDENTIALS message + 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x10, // body length + }; + byte[] body = new byte[0x10]; + ByteBuf buf = Unpooled.wrappedBuffer(incomingFrame, body); + Frame decodedFrame = new Frame.Decoder(null).decodeFrame(buf); + try { + decodedFrame.header.type.codec.decode(decodedFrame.body, decodedFrame.header.version); + Assert.fail("Expected protocol error"); + } catch (ProtocolException e) { + Assert.assertTrue(e.getMessage().contains("Unsupported message")); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org