[ https://issues.apache.org/jira/browse/CASSANDRA-14574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16549610#comment-16549610 ]
Jason Brown commented on CASSANDRA-14574: ----------------------------------------- In short, I wasn't handling all error cases correctly. I was correctly handling the case where there is a single message contained in the {{ByteBuf}} that is fully deserialized, and then if some exception happens in the pipeline, we close the channel and everything is fine. However, if there are multiple messages in the buffer, or the buffer is not fully consumed when deserializing, this is where the problems are. In the catch block of {{MessageInHandler.decode()}}, I am calling {{exceptionHandled()}}, which closes the channel. However, as we derive from {{ByteToMessageDecoder}}, as it is responding to the channel close event, it will see there are unconsumed bytes in the buffer (called {{cumulator}} in the class), and (re-)invoke {{decode()}}. Unfortunately, if you are in a bad state and partway through the stream, you will fail to correctly deserialize any messages and it's downhill from there (you start looping over the same failure pattern: exception, call close channel, {{ByteToMessageDecoder}} calls {{decode()}}, repeat ...). The most safe thing to do here is pass the caught exception to {{ByteToMessageDecoder}}, and prevent any future processing in the {{decode()}} method. The patch here resolves the error handling in the inbound pipeline (see below for details on the failing dtest): ||14574|| |[branch|https://github.com/jasobrown/cassandra/tree/14574]| |[utests & dtests|https://circleci.com/gh/jasobrown/workflows/cassandra/tree/14574]| This patch does several things in the {{MessageInHandler.decode()}} method's exception block (which is where the problems lie): - explicitly throws the exception from the handler to the parent {{ByteToMessageDecoder}}, where it can properly break out of the while loop in {{callDecode()}}, and more properly send the exception to the {{exceptionHandled()}} method (which is overridden in {{BaseMessageInHandler}}) where we close the channel. - moves the {{ByteBuf}} 's readIndex to the end of the buffer, to make it appear as though the buffer has been fully 'consumed'. This optimizes (and helps with correctness of) {{ByteToMessageDecoder}}, because when the channel is closed, {{ByteToMessageDecoder.channelInputClosed()}} attempts, several times, to ensure all the bytes from the backing {{ByteBuf}} ({{cumulator}}) are consumed. Even though the state of the implementing handler is borked, the parent {{ByteToMessageDecoder}} will still keep trying to make sure all the bytes in {{cumulator}} are consumed before closing the channel. Thus, forcing the readIndex to the end of the buffer avoids that situation. - adds an explicit {{CLOSED}} state to the {{MessageInHandler}}, and the handler's state is set to {{CLOSED}} when a message fails to be deserialized or other error, for example: when the table doesn't exist (see below). While this is probably not completely necessary for correctness due to the other changes (primarily the one about moving the readIndex to the end of the buffer), it makes the state of the handler much more explicit, depends less on knowledge of the internal details of netty, and more resilient to implementation changes in the netty library itself. After this fix, the {{materialized_views_test.py::TestMaterializedViews::test_populate_mv_after_insert_wide_rows}} dtest still fails, however, not with the same exception stack trace as reported. Instead, this one: {noformat} io.netty.handler.codec.DecoderException: org.apache.cassandra.exceptions.UnknownTableException: Couldn't find table with id 3694c0c0-8b6b-11e8-841f-cd3e85e9c250. If a table was just created, this is likely due to the schemanot being fully propagated. Please wait for schema agreement on table creation. at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1342) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:934) at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:979) at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:404) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:307) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.cassandra.exceptions.UnknownTableException: Couldn't find table with id 3694c0c0-8b6b-11e8-841f-cd3e85e9c250. If a table was just created, this is likely due to the schemanot being fully propagated. Please wait for schema agreement on table creation. at org.apache.cassandra.schema.Schema.getExistingTableMetadata(Schema.java:438) at org.apache.cassandra.db.partitions.PartitionUpdate$PartitionUpdateSerializer.deserialize(PartitionUpdate.java:612) at org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:353) at org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:371) at org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:335) at org.apache.cassandra.net.MessageIn.read(MessageIn.java:158) at org.apache.cassandra.net.async.MessageInHandler.decode(MessageInHandler.java:130) at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) {noformat} Thus the problem here appears to be request not finding the correct table in the schema. In my test local runs with the above patch applied, that table is id correct and eventually exists (for the MView), but not when the message comes in. The reason why I had not seen this dtest failure in the past (and dtest runs were green), is because it was only exposed by the recent commit for CASSANDRA-13426. I bisected back to a few commits before CASSANDRA-14485 (started on sha {{2bad5d5b6d2134ecd3db63d02aa2274299d1d748}}), and it identified CASSANDRA-13426 as the commit that caused {{materialized_views_test.py::TestMaterializedViews::test_populate_mv_after_insert_wide_rows}} to start failing. My fix corrects the nastier part of that failure, but there's another issue that is outside the scope of the internode messaging. > Racy incorrect handling of incoming messages > --------------------------------------------- > > Key: CASSANDRA-14574 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14574 > Project: Cassandra > Issue Type: Bug > Components: Streaming and Messaging > Reporter: Aleksey Yeschenko > Assignee: Jason Brown > Priority: Major > Fix For: 4.0 > > > {{MessageInHandler.decode()}} occasionally reads the payload incorrectly, > passing the full message to {{MessageIn.read()}} instead of just the payload > bytes. > You can see the stack trace in the logs from this [CI > run|https://circleci.com/gh/iamaleksey/cassandra/437#tests/containers/38]. > {code} > Caused by: java.lang.AssertionError: null > at > org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:351) > at > org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:371) > at > org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:335) > at org.apache.cassandra.net.MessageIn.read(MessageIn.java:158) > at > org.apache.cassandra.net.async.MessageInHandler.decode(MessageInHandler.java:132) > {code} > Reconstructed, truncated stream passed to {{MessageIn.read()}}: > {{0000000b000743414c5f42414301002a01e1a5c9b089fd11e8b517436ee1243007040000005d10fc50ec}} > You can clearly see parameters in there encoded before the payload: > {{[43414c5f424143 - CAL_BAC] [01 - ONE_BYTE] [002a - 42, payload size] 01 e1 > a5 c9 b0 89 fd 11 e8 b5 17 43 6e e1 24 30 07 04 00 00 00 1d 10 fc 50 ec}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org