[ 
https://issues.apache.org/jira/browse/CASSANDRA-8457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15735327#comment-15735327
 ] 

Jason Brown commented on CASSANDRA-8457:
----------------------------------------

Ok, [~slebresne], next round of review is ready :) I'm currently ferreting out 
a regression with {{TriggersTest}}, but that shouldn't hold up things any 
further.

bq. {{MessageOutHandler.SwappingByteBufDataOutputStreamPlus}} - I'm wondering 
if it's not premature optimization

Yeah, you may be correct here. This is one of the first parts I wrote, and 
clearly I was worried about the "pack multiple messages into a single flush" 
issue. It's possible that a simpler implementation, each message gets own 
buffer, might just be best. 

However, one counter argument here could be that when we have a very large 
message to serialize, think tens of MBs, we would allocate that one large 
buffer, wait to serialize data into it, and then send/flush it. 
{{OutboundTcpConnection}} and my current branch will buffer a certain subset of 
that data (say 64k) and send/flush that, rather than wait for it all to be 
serialized. I'm not sure if that's compelling enough at this time, or if we 
consider that followup ticket/work. Either way, I've removed 
{{MessageOutHandler.SwappingByteBufDataOutputStreamPlus}}.

bq. OMC.enqueue and background queue
Unfortunately, there's a few things that get in our way here of making it that 
simple, assuming we don't want to block threads. The biggest is that we don't 
want to write general messages to the channel until the internode messaging 
handshake completes successfully as it's a three-way handshake and if we start 
sending general messages in the middle of the handshake, it'll break the 
handshake. Further, it's not so clean or obvious how to get the ChannelFuture 
instance without *some* degree of blocking (some of which we might be able to 
avoid with some clever tricks @normanm showed me). Ultimately, we need a 
singular {{Channel}} instance, which would require blocking to get only one 
instance.

Another reason why I opted for the backlog queue was to allow as close to an 
ordered sending of the messages as possible, to a given peer: in other words, 
to write to the channel as many backlogged messages as possible before sending 
the new message. For the vast majority of cassandra's functionality, ordering 
of messages is irrelevant. However, I'm wondering how sensitive repair and 
cluster membership changes might be to this potential edge-case of message 
reordering.

bq. I'd expect {{enqueue()}} (which might warrant a rename) to just do a 
{{ctx.write()}}

Makes sense, done.

That being said, I've left the code in newly-named {{#sendMessage}} and 
{{#finishHandshake}} a bit tentative, with a note that you and I should hammer 
this part out further :)

bq. ... flushing ...

I really like what you've done wrt flushing in {{FlushingHandler}} and 
{{CoalesingStrategies}}. I think as I was trying to maintain both blocking and 
non-blocking behaviors in {{CoalesingStrategies}}, that's why that code got 
twisted and complex. Thus, I've eliminated {{CoalescingMessageOutHandler}} and 
brought in your {{FlushingHandler}} and {{CoalesingStrategies}} changes. One 
thing to point out is that when scheduling a task to a netty executor thread, 
it will execute on the same IO thread as the current context. Thus, in 
{{FlushingHandler}} you don't have to worry about any concurrency between a new 
message arrival and the execution of the task, as they execute on the same 
thread. I've removed your (well-documented!) javadoc comments and the volatile 
keyword on the {{scheduledFlush}} member field.

I agree we can address "targeted benchmarking ... in a separate ticket", as I 
think getting the essential behavior of flush and coalesce is most important 
here.

bq. Regarding dropped messages, the current implementation was going through 
the {{MessagingService.incrementDroppedMessages()}} ...

Hmm, {{OutboundTcpConnection}} has it's own [counter for connection-specific 
dropped 
messages|https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java#L136],
 which I've maintained in {{OutboundMessagingConnection}}. It seems like 
{{OutboundTcpConnection}} should probably be reporting it's dropped messages to 
{{MessagingService.incrementDroppedMessages()}} in addition to what it 
currently does. If you think that's legit, I'll fix it in this patch and I'll 
open a separate ticket to fix it in 3.0+. wdyt?

bq. InboundHandshakeHandler#handshakeHandlerChannelHandlerName

Removed this. I think I needed it with the earlier version of netty? Either 
way, killed it.

bq. In {{OutboundMessagingConnection#writeBacklogToChannel}}, we seem to be 
sending timed out messages if those are retried

This behavior was changed in CASSANDRA-12192, so I've retained it here.

bq. {{MessagingService.MessageSender}}

Removed

bq. InboundHandshakeHandler#setupMessagingPipeline

Luckily the comment is not relevant anymore; too bad I didn't fix that up when 
I fixed the code :D 

bq. pull out the fix to {{AnticompactionRequest}} into a separate ticket

Done in CASSANDRA-12934.

> nio MessagingService
> --------------------
>
>                 Key: CASSANDRA-8457
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-8457
>             Project: Cassandra
>          Issue Type: New Feature
>            Reporter: Jonathan Ellis
>            Assignee: Jason Brown
>            Priority: Minor
>              Labels: netty, performance
>             Fix For: 4.x
>
>
> Thread-per-peer (actually two each incoming and outbound) is a big 
> contributor to context switching, especially for larger clusters.  Let's look 
> at switching to nio, possibly via Netty.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to