[ 
https://issues.apache.org/jira/browse/CASSANDRA-14574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16549610#comment-16549610
 ] 

Jason Brown commented on CASSANDRA-14574:
-----------------------------------------

In short, I wasn't handling all error cases correctly. I was correctly handling 
the case where there is a single message contained in the {{ByteBuf}} that is 
fully deserialized, and then if some exception happens in the pipeline, we 
close the channel and everything is fine. However, if there are multiple 
messages in the buffer, or the buffer is not fully consumed when deserializing, 
this is where the problems are. In the catch block of 
{{MessageInHandler.decode()}}, I am calling {{exceptionHandled()}}, which 
closes the channel. However, as we derive from {{ByteToMessageDecoder}}, as it 
is responding to the channel close event, it will see there are unconsumed 
bytes in the buffer (called {{cumulator}} in the class), and (re-)invoke 
{{decode()}}. Unfortunately, if you are in a bad state and partway through the 
stream, you will fail to correctly deserialize any messages and it's downhill 
from there (you start looping over the same failure pattern: exception, call 
close channel, {{ByteToMessageDecoder}} calls {{decode()}}, repeat ...). The 
most safe thing to do here is pass the caught exception to 
{{ByteToMessageDecoder}}, and prevent any future processing in the {{decode()}} 
method.

The patch here resolves the error handling in the inbound pipeline (see below 
for details on the failing dtest):
||14574||
|[branch|https://github.com/jasobrown/cassandra/tree/14574]|
|[utests & 
dtests|https://circleci.com/gh/jasobrown/workflows/cassandra/tree/14574]|

This patch does several things in the {{MessageInHandler.decode()}} method's 
exception block (which is where the problems lie):
 - explicitly throws the exception from the handler to the parent 
{{ByteToMessageDecoder}}, where it can properly break out of the while loop in 
{{callDecode()}}, and more properly send the exception to the 
{{exceptionHandled()}} method (which is overridden in {{BaseMessageInHandler}}) 
where we close the channel.
 - moves the {{ByteBuf}} 's readIndex to the end of the buffer, to make it 
appear as though the buffer has been fully 'consumed'. This optimizes (and 
helps with correctness of) {{ByteToMessageDecoder}}, because when the channel 
is closed, {{ByteToMessageDecoder.channelInputClosed()}} attempts, several 
times, to ensure all the bytes from the backing {{ByteBuf}} ({{cumulator}}) are 
consumed. Even though the state of the implementing handler is borked, the 
parent {{ByteToMessageDecoder}} will still keep trying to make sure all the 
bytes in {{cumulator}} are consumed before closing the channel. Thus, forcing 
the readIndex to the end of the buffer avoids that situation.
 - adds an explicit {{CLOSED}} state to the {{MessageInHandler}}, and the 
handler's state is set to {{CLOSED}} when a message fails to be deserialized or 
other error, for example: when the table doesn't exist (see below). While this 
is probably not completely necessary for correctness due to the other changes 
(primarily the one about moving the readIndex to the end of the buffer), it 
makes the state of the handler much more explicit, depends less on knowledge of 
the internal details of netty, and more resilient to implementation changes in 
the netty library itself.

 

After this fix, the 
{{materialized_views_test.py::TestMaterializedViews::test_populate_mv_after_insert_wide_rows}}
 dtest still fails, however, not with the same exception stack trace as 
reported. Instead, this one:
{noformat}
io.netty.handler.codec.DecoderException: 
org.apache.cassandra.exceptions.UnknownTableException: Couldn't find table with 
id 3694c0c0-8b6b-11e8-841f-cd3e85e9c250. If a table was just created, this is 
likely due to the schemanot being fully propagated.  Please wait for schema 
agreement on table creation.
        at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1342)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:934)
        at 
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:979)
        at 
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:404)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:307)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.cassandra.exceptions.UnknownTableException: Couldn't find 
table with id 3694c0c0-8b6b-11e8-841f-cd3e85e9c250. If a table was just 
created, this is likely due to the schemanot being fully propagated.  Please 
wait for schema agreement on table creation.
        at 
org.apache.cassandra.schema.Schema.getExistingTableMetadata(Schema.java:438)
        at 
org.apache.cassandra.db.partitions.PartitionUpdate$PartitionUpdateSerializer.deserialize(PartitionUpdate.java:612)
        at 
org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:353)
        at 
org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:371)
        at 
org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:335)
        at org.apache.cassandra.net.MessageIn.read(MessageIn.java:158)
        at 
org.apache.cassandra.net.async.MessageInHandler.decode(MessageInHandler.java:130)
        at 
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
        at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
{noformat}
Thus the problem here appears to be request not finding the correct table in 
the schema. In my test local runs with the above patch applied, that table is 
id correct and eventually exists (for the MView), but not when the message 
comes in.

The reason why I had not seen this dtest failure in the past (and dtest runs 
were green), is because it was only exposed by the recent commit for 
CASSANDRA-13426. I bisected back to a few commits before CASSANDRA-14485 
(started on sha {{2bad5d5b6d2134ecd3db63d02aa2274299d1d748}}), and it 
identified CASSANDRA-13426 as the commit that caused 
{{materialized_views_test.py::TestMaterializedViews::test_populate_mv_after_insert_wide_rows}}
 to start failing. My fix corrects the nastier part of that failure, but 
there's another issue that is outside the scope of the internode messaging.

> Racy incorrect handling of incoming messages 
> ---------------------------------------------
>
>                 Key: CASSANDRA-14574
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-14574
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Streaming and Messaging
>            Reporter: Aleksey Yeschenko
>            Assignee: Jason Brown
>            Priority: Major
>             Fix For: 4.0
>
>
> {{MessageInHandler.decode()}} occasionally reads the payload incorrectly, 
> passing the full message to {{MessageIn.read()}} instead of just the payload 
> bytes.
> You can see the stack trace in the logs from this [CI 
> run|https://circleci.com/gh/iamaleksey/cassandra/437#tests/containers/38].
> {code}
> Caused by: java.lang.AssertionError: null
>       at 
> org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:351)
>       at 
> org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:371)
>       at 
> org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:335)
>       at org.apache.cassandra.net.MessageIn.read(MessageIn.java:158)
>       at 
> org.apache.cassandra.net.async.MessageInHandler.decode(MessageInHandler.java:132)
> {code}
> Reconstructed, truncated stream passed to {{MessageIn.read()}}:
> {{0000000b000743414c5f42414301002a01e1a5c9b089fd11e8b517436ee1243007040000005d10fc50ec}}
> You can clearly see parameters in there encoded before the payload:
> {{[43414c5f424143 - CAL_BAC] [01 - ONE_BYTE] [002a - 42, payload size] 01 e1 
> a5 c9 b0 89 fd 11 e8 b5 17 43 6e e1 24 30 07 04 00 00 00 1d 10 fc 50 ec}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to