[ 
https://issues.apache.org/jira/browse/CASSANDRA-15299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162062#comment-17162062
 ] 

Alex Petrov edited comment on CASSANDRA-15299 at 7/21/20, 2:29 PM:
-------------------------------------------------------------------

Thank you for the changes. I'm mostly done with a second pass of the review. 

First, wanted to bring up a couple of important things:

  * Current memory management is rather hard to follow, and maybe we should 
make an effort to document and/or codify it somewhere. I've spent about a day 
following all paths and figuring out ins-and-outs of it. One thing that I've 
found is that in {{FrameSet#finish}}, we're releasing not only the frame that 
was encoded into the sending buffer, but also the one we flush. This is done in 
three places with {{writeAndFlush}}. I believe this was leading to crc mismatch 
bug that I was catching earlier (see stacktrace [1] below). What might have 
been happening is we were releasing the buffer too early, which would allow it 
to get recycled.
  * There were several places in {{SimpleClient}} where we were not accouting 
for bytes. One is that we were never releasing {{Frame}} of the response we 
were getting. I've implemented copying, which is not optimal, but should work 
for testing purposes. The other one is that we weren't giving the bytes back to 
the limits. They would be acquired via netty processing, so we need to release 
them eventually. I've added a simple hook to release those. An alterntive to 
this would be to not use limits at all, with a downside of loosing a bit of 
observability. 
  * [this one was discussed privately and source of the leak suggested by Sam] 
Flusher takes care of calling release for the flush item, which releases the 
source buffer. However, _response_ frame is released only for small buffers in 
{{FrameSet}}

Here's a branch with aforementioned changes. However, I haven't run CI on it, 
I'll check if it breaks anything later today: 
https://github.com/ifesdjeen/cassandra/pull/new/15299-alex

{code}
[1] 
org.apache.cassandra.net.Crc$InvalidCrc: Read -854589741, Computed 1432984585
 at 
org.apache.cassandra.transport.CQLMessageHandler.processCorruptFrame(CQLMessageHandler.java:328)
 at 
org.apache.cassandra.net.AbstractMessageHandler.process(AbstractMessageHandler.java:217)
 at org.apache.cassandra.net.FrameDecoder.deliver(FrameDecoder.java:321)
 at org.apache.cassandra.net.FrameDecoder.channelRead(FrameDecoder.java:285)
 at org.apache.cassandra.net.FrameDecoder.channelRead(FrameDecoder.java:269)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at
{code}

Some small nits: 

  * In {{CQLMessageHandler#processOneContainedMessage}}, when we can't acquire 
capacity and, subsequently, we're not passing the frame further down the line. 
Shouold we release the frame in this case, since usually we're releasing the 
source frame after flush.
  * {{ReusableBuffer}} is unused.
  * {{Server}} has a few unused imports and {{eventExecutorGroup}} which is 
unused.
  * I'm not sure if we currently handle releasing corrupted frames.
  * In {{FrameSet}}, we're currently relying on the fact that we'll be able to 
go through {{finish}} and release everything successfully. However, this might 
not always be the case. I couldn't trigger such error, but it might still be 
possible. Shouold we maybe make {{FrameSet}} auto-closeable and make sure we 
always release buffers in {{finally}}? I've also made a similar change to 
{{processItem}} which would add item to {{flushed}} to make sure it's released. 
That makes {{flushed}} variable name not quite right though.

And some improvements that were done to simple client:

  * it now supports time outs
  * supports sending multiple messages in a single frame when using v5
  * simpler pipelines
  * reuses code for frame processing 

There's one more issue with a driver, which is easy to reproduce (with attached 
jar and test), but here's a stack trace:

{code}
DEBUG [cluster1-nio-worker-1] 2020-07-21 11:54:20,474 Connection.java:1396 - 
Connection[/127.0.0.1:64050-2, inFlight=2, closed=false] connection error
java.lang.AssertionError: null
        at 
com.datastax.driver.core.BytesToSegmentDecoder.decode(BytesToSegmentDecoder.java:56)
        at 
io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:332)
        at 
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
        at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
        at 
com.datastax.driver.core.InboundTrafficMeter.channelRead(InboundTrafficMeter.java:38)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)
{code}


was (Author: ifesdjeen):
Thank you for the changes. I'm mostly done with a second pass of the review. 

First, wanted to bring up a couple of important things:

  * Current memory management is rather hard to follow, and maybe we should 
make an effort to document and/or codify it somewhere. I've spent about a day 
following all paths and figuring out ins-and-outs of it. One thing that I've 
found is that in {{FrameSet#finish}}, we're releasing not only the frame that 
was encoded into the sending buffer, but also the one we flush. This is done in 
three places with {{writeAndFlush}}. I believe this was leading to crc mismatch 
bug that I was catching earlier (see stacktrace [1] below). What might have 
been happening is we were releasing the buffer too early, which would allow it 
to get recycled.
  * There were several places in {{SimpleClient}} where we were not accouting 
for bytes. One is that we were never releasing {{Frame}} of the response we 
were getting. I've implemented copying, which is not optimal, but should work 
for testing purposes. The other one is that we weren't giving the bytes back to 
the limits. They would be acquired via netty processing, so we need to release 
them eventually. I've added a simple hook to release those. An alterntive to 
this would be to not use limits at all, with a downside of loosing a bit of 
observability. 

Here's a branch with aforementioned changes. However, I haven't run CI on it, 
I'll check if it breaks anything later today: 
https://github.com/ifesdjeen/cassandra/pull/new/15299-alex

{code}
[1] 
org.apache.cassandra.net.Crc$InvalidCrc: Read -854589741, Computed 1432984585
 at 
org.apache.cassandra.transport.CQLMessageHandler.processCorruptFrame(CQLMessageHandler.java:328)
 at 
org.apache.cassandra.net.AbstractMessageHandler.process(AbstractMessageHandler.java:217)
 at org.apache.cassandra.net.FrameDecoder.deliver(FrameDecoder.java:321)
 at org.apache.cassandra.net.FrameDecoder.channelRead(FrameDecoder.java:285)
 at org.apache.cassandra.net.FrameDecoder.channelRead(FrameDecoder.java:269)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at
{code}

Some small nits: 

  * In {{CQLMessageHandler#processOneContainedMessage}}, when we can't acquire 
capacity and, subsequently, we're not passing the frame further down the line. 
Shouold we release the frame in this case, since usually we're releasing the 
source frame after flush.
  * {{ReusableBuffer}} is unused.
  * {{Server}} has a few unused imports and {{eventExecutorGroup}} which is 
unused.
  * I'm not sure if we currently handle releasing corrupted frames.
  * In {{FrameSet}}, we're currently relying on the fact that we'll be able to 
go through {{finish}} and release everything successfully. However, this might 
not always be the case. I couldn't trigger such error, but it might still be 
possible. Shouold we maybe make {{FrameSet}} auto-closeable and make sure we 
always release buffers in {{finally}}? I've also made a similar change to 
{{processItem}} which would add item to {{flushed}} to make sure it's released. 
That makes {{flushed}} variable name not quite right though.

And some improvements that were done to simple client:

  * it now supports time outs
  * supports sending multiple messages in a single frame when using v5
  * simpler pipelines
  * reuses code for frame processing 

There's one more issue with a driver, which is easy to reproduce (with attached 
jar and test), but here's a stack trace:

{code}
DEBUG [cluster1-nio-worker-1] 2020-07-21 11:54:20,474 Connection.java:1396 - 
Connection[/127.0.0.1:64050-2, inFlight=2, closed=false] connection error
java.lang.AssertionError: null
        at 
com.datastax.driver.core.BytesToSegmentDecoder.decode(BytesToSegmentDecoder.java:56)
        at 
io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:332)
        at 
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
        at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
        at 
com.datastax.driver.core.InboundTrafficMeter.channelRead(InboundTrafficMeter.java:38)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)
{code}

> CASSANDRA-13304 follow-up: improve checksumming and compression in protocol 
> v5-beta
> -----------------------------------------------------------------------------------
>
>                 Key: CASSANDRA-15299
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-15299
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Messaging/Client
>            Reporter: Aleksey Yeschenko
>            Assignee: Alex Petrov
>            Priority: Normal
>              Labels: protocolv5
>             Fix For: 4.0-alpha
>
>
> CASSANDRA-13304 made an important improvement to our native protocol: it 
> introduced checksumming/CRC32 to request and response bodies. It’s an 
> important step forward, but it doesn’t cover the entire stream. In 
> particular, the message header is not covered by a checksum or a crc, which 
> poses a correctness issue if, for example, {{streamId}} gets corrupted.
> Additionally, we aren’t quite using CRC32 correctly, in two ways:
> 1. We are calculating the CRC32 of the *decompressed* value instead of 
> computing the CRC32 on the bytes written on the wire - losing the properties 
> of the CRC32. In some cases, due to this sequencing, attempting to decompress 
> a corrupt stream can cause a segfault by LZ4.
> 2. When using CRC32, the CRC32 value is written in the incorrect byte order, 
> also losing some of the protections.
> See https://users.ece.cmu.edu/~koopman/pubs/KoopmanCRCWebinar9May2012.pdf for 
> explanation for the two points above.
> Separately, there are some long-standing issues with the protocol - since 
> *way* before CASSANDRA-13304. Importantly, both checksumming and compression 
> operate on individual message bodies rather than frames of multiple complete 
> messages. In reality, this has several important additional downsides. To 
> name a couple:
> # For compression, we are getting poor compression ratios for smaller 
> messages - when operating on tiny sequences of bytes. In reality, for most 
> small requests and responses we are discarding the compressed value as it’d 
> be smaller than the uncompressed one - incurring both redundant allocations 
> and compressions.
> # For checksumming and CRC32 we pay a high overhead price for small messages. 
> 4 bytes extra is *a lot* for an empty write response, for example.
> To address the correctness issue of {{streamId}} not being covered by the 
> checksum/CRC32 and the inefficiency in compression and checksumming/CRC32, we 
> should switch to a framing protocol with multiple messages in a single frame.
> I suggest we reuse the framing protocol recently implemented for internode 
> messaging in CASSANDRA-15066 to the extent that its logic can be borrowed, 
> and that we do it before native protocol v5 graduates from beta. See 
> https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/net/FrameDecoderCrc.java
>  and 
> https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/net/FrameDecoderLZ4.java.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to