[
https://issues.apache.org/jira/browse/CASSANDRA-20052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17896682#comment-17896682
]
Sam Tunnicliffe commented on CASSANDRA-20052:
---------------------------------------------
{quote} The tricky thing with the current rate limiting logic is that we read
frames to memory first and only then we decide if we need to apply a
back-pressure
{quote}
Yes, what I was trying to say was that we *do* know the size of the entire
message when we have read the first frame but in the large message case, but if
we don't have capacity for it we read and buffer the subsequent frames anyway.
Then we may or may not process the message, depending on {{throwOnOverload}}
and/or whether rate limiting was also triggered by the number of requests in
flight or the queue time.
My intention was to suggest that instead of doing that, if we don't have
capacity to process the entire message we should just read *but not buffer* the
subsequent frames and then return an error response and not do _anything_ else.
You're right that {{OverloadedException}} may not be the right error if this
signals to clients that they should retry again later. If that's the case then
maybe we should use a generic {{{}InvalidRequestException{}}}.
{quote}introducing frames makes it quite complicated to implement
{quote}
I don't quite see why though, it's exactly the same as the {{throwOnOverload}}
case. We know by reading the first frame that we are overloaded so we decide we
won't process the message and set a flag to that effect right away (we should
just release the subsequent frames as we read them). How is this different?
I'm just saying we should do that regardless of {{throwOnOverload.}}
{quote}Introducing a message limit should help to reduce the probability of
this branch invocation but I think it will not help to avoid it completely
without extra changes in rate limiting logic itself (because we may hit the
same case when we receive several large enough messages each one smaller than a
limit).
{quote}
After the {{acquireCapacity}} check, rate limiting is based on request
rate/time in queue and not on message size, so I don't see what you mean here.
If we receive a stream of large messages, we'll process them until we cannot
acquire capacity and then we'll start rejecting them (assuming we implement
what I said in the previous paragraphs). Rate limiting doesn't really come into
it at this point.
However, I *do* think we should extend the {{acquireCapacity}} check to also
ensure that {{{}messageSize <= IMutation.MAX_MUTATION_SIZE{}}}. If either
condition evaluates to false, then we don't want to process that message.
e.g.
{code:java}
if (!acquireCapacity(header, endpointReserve, globalReserve) || messageSize >
IMutation.MAX_MUTATION_SIZE)
// read subsequent frames, but don't buffer them.
// When done, return error response (either OverloadedException
// or InvalidRequestException, depending on connection config).
{code}
I am mostly trying to understand whether there's a valid reason to introduce
another configuration option specifically for max message size. If the server
has capacity to process a message and any mutations in it are within the
acceptable size boundaries, do we really need another setting in yaml?
> Size of CQL messages is not limited in V5 protocol logic
> --------------------------------------------------------
>
> Key: CASSANDRA-20052
> URL: https://issues.apache.org/jira/browse/CASSANDRA-20052
> Project: Cassandra
> Issue Type: Bug
> Components: Messaging/Client
> Reporter: Dmitry Konstantinov
> Assignee: Dmitry Konstantinov
> Priority: Normal
> Attachments: cassandra_rate_limit.svg
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Size of CQL messages is not limited in V5 protocol logic
> - After introducing of v5 frames we do not have any CQL message limit
> anymore, native_transport_max_frame_size_in_mb which had such limit in pre-V5
> epoch is applicable now only to pre-V5 protocol sessions, otherwise it is
> applied only to the initial STARTUP/OPTIONS messages handling, it is not
> checked in any v5 logic. So, currently a v5 CQL message of any size can be
> sent to Cassandra server.
> - The overload logic just allows to process huge messages for free to avoid
> starvation, so it does not provide any protection against the most dangerous
> requests from a memory pressure point of view.
> - The situation even more dangerous: the v5 framing logic is enabled just
> after AUTH response, so we do not limit message size even for AUTH_RESPONSE
> messages from a client. It can be used as a DoS attack: a non-authenticated
> client can send a huge username/password to Cassandra server to cause
> troubles with GC or even kill it.
> An easy example:
> {code:java}
> public class TestBigAuthRequest {
> public static void main(String[] args) {
> String password = getString(500_000_000, '-');
> try (CqlSession session = CqlSession.builder()
> .addContactEndPoint(new DefaultEndPoint(new
> InetSocketAddress("localhost", 9042)))
> .withAuthCredentials("cassandra", password)
> .withLocalDatacenter("datacenter1")
> .build()) {
> session.execute("select * from system.local");
> }
> }
> private static String getString(int length, char charToFill) {
> if (length > 0) {
> char[] array = new char[length];
> Arrays.fill(array, charToFill);
> return new String(array);
> }
> return "";
> }
> }
> {code}
> A thread stack of such invocation (captured to show the execution flow):
> {code:java}
> "nioEventLoopGroup-5-21@9164" prio=10 tid=0x86 nid=NA runnable
> java.lang.Thread.State: RUNNABLE
> at
> org.apache.cassandra.transport.messages.AuthResponse$1.decode(AuthResponse.java:45)
> at
> org.apache.cassandra.transport.messages.AuthResponse$1.decode(AuthResponse.java:39)
> at
> org.apache.cassandra.transport.Message$Decoder.decodeMessage(Message.java:432)
> at
> org.apache.cassandra.transport.Message$Decoder$RequestDecoder.decode(Message.java:467)
> at
> org.apache.cassandra.transport.Message$Decoder$RequestDecoder.decode(Message.java:459)
> at
> org.apache.cassandra.transport.CQLMessageHandler.processRequest(CQLMessageHandler.java:377)
> at
> org.apache.cassandra.transport.CQLMessageHandler$LargeMessage.onComplete(CQLMessageHandler.java:755)
> at
> org.apache.cassandra.net.AbstractMessageHandler$LargeMessage.supply(AbstractMessageHandler.java:561)
> at
> org.apache.cassandra.net.AbstractMessageHandler.processSubsequentFrameOfLargeMessage(AbstractMessageHandler.java:257)
> at
> org.apache.cassandra.net.AbstractMessageHandler.processIntactFrame(AbstractMessageHandler.java:229)
> at
> org.apache.cassandra.net.AbstractMessageHandler.process(AbstractMessageHandler.java:216)
> at
> org.apache.cassandra.transport.CQLMessageHandler.process(CQLMessageHandler.java:147)
> at
> org.apache.cassandra.net.FrameDecoder.deliver(FrameDecoder.java:330)
> at
> org.apache.cassandra.net.FrameDecoder.channelRead(FrameDecoder.java:294)
> at
> org.apache.cassandra.net.FrameDecoder.channelRead(FrameDecoder.java:277)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> at
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> at java.lang.Thread.run(Thread.java:829)
> {code}
> The provided MR (https://github.com/apache/cassandra/pull/3655) contains a
> fix for the issue which introduces 2 new parameters:
> native_transport_max_message_size - to limit any CQL message size
> native_transport_max_auth_message_size (default = 128KiB) - to limit auth
> response message size more strictly and add an extra protection against a
> possible DoS attack.
> Design questions:
> * The current implementation closes a CQL connection if a message is bigger
> than the limits. A skip message body logic can be implemented to continue the
> connection usage but it is more complicated and error prone.
> * The tricky question is the default value for
> native_transport_max_message_size,
> from one side - we want to have it not more than
> min(native_transport_max_request_data_in_flight_per_ip,
> native_transport_max_request_data_in_flight) to reduce chances to invoke the
> branch of logic when a error handling does not work
> from another size - min(native_transport_max_request_data_in_flight_per_ip,
> native_transport_max_request_data_in_flight) can be too small and there is a
> chance to break a backward compatibility for existing deployments where
> people use large messages and small heaps (while it is not a good idea).
> Related observations:
> 1) https://issues.apache.org/jira/browse/CASSANDRA-16886 - Reduce
> native_transport_max_frame_size_in_mb (from 256M to 16M)
> 2) A correspondent logic for Cassandra server internode protocol a message
> limit exists and rate limiting parameters are validated to be smaller than a
> single message max size:
> internode_max_message_size =
> min(internode_application_receive_queue_reserve_endpoint_capacity,
> internode_application_send_queue_reserve_endpoint_capacity)
> internode_application_receive_queue_reserve_endpoint_capacity = 128MiB
> internode_application_send_queue_reserve_endpoint_capacity = 128MiB
> internode_max_message_size <=
> internode_application_receive_queue_reserve_endpoint_capacity
> internode_max_message_size <=
> internode_application_receive_queue_reserve_global_capacity
> internode_max_message_size <=
> internode_application_send_queue_reserve_endpoint_capacity
> internode_max_message_size <=
> internode_application_send_queue_reserve_global_capacity
> 3) Request types according to CQL specification:
> 4.1.1. STARTUP, in normal cases should be small
> 4.1.2. AUTH_RESPONSE, in normal cases should be small
> 4.1.3. OPTIONS, in normal cases should be small
> 4.1.4. QUERY, in normal cases should be small
> 4.1.5. PREPARE, in normal cases should be small
> 4.1.6. EXECUTE <-- potentially large in case of inserts, max_mutation_size =
> commitlog_segment_size / 2; where commitlog_segment_size_in_mb = 32MiB
> 4.1.7. BATCH <-- potentially large, max_mutation_size =
> commitlog_segment_size / 2; where commitlog_segment_size_in_mb = 32MiB
> 4.1.8. REGISTER, in normal cases should be small
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]