[jira] [Commented] (CASSANDRA-13630) support large internode messages with netty

2018-09-09 Thread Dinesh Joshi (JIRA)


[ 
https://issues.apache.org/jira/browse/CASSANDRA-13630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16608650#comment-16608650
 ] 

Dinesh Joshi commented on CASSANDRA-13630:
--

Hi [~jasobrown] the latest changes look good. I'm +1.

> support large internode messages with netty
> ---
>
> Key: CASSANDRA-13630
> URL: https://issues.apache.org/jira/browse/CASSANDRA-13630
> Project: Cassandra
>  Issue Type: Bug
>  Components: Streaming and Messaging
>Reporter: Jason Brown
>Assignee: Jason Brown
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> As part of CASSANDRA-8457, we decided to punt on large mesages to reduce the 
> scope of that ticket. However, we still need that functionality to ship a 
> correctly operating internode messaging subsystem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



[jira] [Commented] (CASSANDRA-13630) support large internode messages with netty

2018-09-07 Thread Dinesh Joshi (JIRA)


[ 
https://issues.apache.org/jira/browse/CASSANDRA-13630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607764#comment-16607764
 ] 

Dinesh Joshi commented on CASSANDRA-13630:
--

I left a few more comments on the PR.

> support large internode messages with netty
> ---
>
> Key: CASSANDRA-13630
> URL: https://issues.apache.org/jira/browse/CASSANDRA-13630
> Project: Cassandra
>  Issue Type: Bug
>  Components: Streaming and Messaging
>Reporter: Jason Brown
>Assignee: Jason Brown
>Priority: Major
> Fix For: 4.0
>
>
> As part of CASSANDRA-8457, we decided to punt on large mesages to reduce the 
> scope of that ticket. However, we still need that functionality to ship a 
> correctly operating internode messaging subsystem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



[jira] [Commented] (CASSANDRA-13630) support large internode messages with netty

2018-09-07 Thread Jason Brown (JIRA)


[ 
https://issues.apache.org/jira/browse/CASSANDRA-13630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607603#comment-16607603
 ] 

Jason Brown commented on CASSANDRA-13630:
-

[~djoshi3] Made a few comments on the PR, and in response I have:

- moved autoRead check out of {{RebufferingByteBufDataInputPlus.available}} 
method and into it's own method; also added tests
- refactored {{MessageInProcessor.process}} to move the main loop logic into 
the base class, and moved logic for each case statement into sub-methods rather 
than directly in-line with the loop

> support large internode messages with netty
> ---
>
> Key: CASSANDRA-13630
> URL: https://issues.apache.org/jira/browse/CASSANDRA-13630
> Project: Cassandra
>  Issue Type: Bug
>  Components: Streaming and Messaging
>Reporter: Jason Brown
>Assignee: Jason Brown
>Priority: Major
> Fix For: 4.0
>
>
> As part of CASSANDRA-8457, we decided to punt on large mesages to reduce the 
> scope of that ticket. However, we still need that functionality to ship a 
> correctly operating internode messaging subsystem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



[jira] [Commented] (CASSANDRA-13630) support large internode messages with netty

2018-08-21 Thread Jason Brown (JIRA)


[ 
https://issues.apache.org/jira/browse/CASSANDRA-13630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587988#comment-16587988
 ] 

Jason Brown commented on CASSANDRA-13630:
-

Picking this back up after a year, I realize that my previous solution only 
solved part of the problem. I solved the "don't allocate an enormous buffer" 
problem, but I was still allocating an "enourmous buffer"'s worth of memory at 
the same time, albeit across multiple buffers. I believe this is ultimately 
what [~aweisberg]'s concerns with the previous solution encompassed, and I 
fully agree. Further, the previous patch attempted to do more than just solve 
the large buffer problem, it optimized allocating small buffers. With this new 
insight, that optimization is best left to a separate ticket.

Thus, the new solution focuses only on the large buffer problem. The high-level 
overview of this patch is:
 - use the existing {{ByteBufDataOutputStreamPlus}} to chunk up the large 
message into small buffers, and use {{ByteBufDataOutputStreamPlus}} 's existing 
rateLimiting mechanism to make sure we don't keep too much outstanding data in 
the channel
 - rework the inbound side to allow a blocking-style message deserialization.
 - Refactoring to make serilization/deserialization code reusable as well as 
some clean up.

In order to support both serialization and deserialization of arbitrarily large 
messages and our blocking-style {{IVersionedSerializers}}, I need to perform 
the those activities on a separate (background) thread. On the outbound side 
this is achieved with a new {{ChannelWriter}} subclass. On the inbound side, 
there is a fair bit of refactoring, but the thread for deserialization is in 
{{MessageInHandler.BlockingBufferHandler}}. Both of these "background threads" 
are implemented as {{ThreadExecutorServices}} so that if no large messages are 
being sent or received, the thread can be shutdown (and save the system 
resources).

On the outbound side, it is easy to know if a specific 
{{OutboundMessagingConnection}} will be sending large messages, as we can look 
at it's {{OutboundConnectionIdentifier}}. The inbound side does not have that 
luxury, and my previous patch attempted to do some overly clever things. The 
simpler solution is to add a flag to the internode messaging protocol that 
advises the receiving side that the connection will be used for large messages, 
and the receiver can setup appropriately. We already have a flags section in 
the internode messaging protocol's header, and many unused bits within that. 
Further, peers that are unaware of the new bit flag (i.e. any cassandra version 
less than 4.0) will completely ignore the flag as they do not attempt to 
interpret those bits. Thus, this change is rather safe, from a 
protocol/handshake perspective. In fact, I'd like to backport this protocol 
change to 3.0 and 3.11 to have the flag sent out on new connections. The flag 
will be completely ignored on those versions, except when, during a cluster 
upgrade, the 3.0 node connects to a 4.0 node, the 4.0 node will know that the 
connection will contain large messages and can setup the receive side 
appropriately. In no way would operators be required to minor upgrade to those 
versions of 3.X which contain the upgraded messaging version flag (before 
upgrading to 4.0), but it would help make the upgrade to 4.0 smoother from a 
performance/memory management perspective.

The other major aspect of this ticket was a refactoring mostly to move the 
serialization/deserialization out of {{MessageOutHandler}}/{{MessageInHandler}} 
so that logic could be invoked outside of the context of a netty handler. This 
also allowed me to clean up {{MessageIn}} and {{MessageOut}}, as well. Note: 
I've eliminated {{BaseMessageInHandler}} and moved the version-specific 
messaging parsing into classes derived from the new 
{{MessageIn.MessageInProcessor}}. {{MessageInHandler}} now determines if it 
needs to do non-blocking or blocking deserialization, and handles the buffers 
appropriately. {{MessageInHandler}} now derives from 
{{ChannelInboundHandlerAdapter}}, so the error handling changed slightly. The 
refacorings also affected where the unit tests are layed out (corresponding to 
where the logic/code unit test now lives), so I moved things around in there, 
as well.
||13630||
|[branch|https://github.com/jasobrown/cassandra/tree/13630]|
|[utests & 
dtests|https://circleci.com/gh/jasobrown/workflows/cassandra/tree/13630]|

I also needed to make a trivial [change to one 
dtest|https://github.com/jasobrown/cassandra-dtest/tree/13630].

To make the review easier, I've [opened a PR 
here|https://github.com/apache/cassandra/pull/253]

> support large internode messages with netty
> ---
>
> Key: CASSANDRA-13630
> URL: https://issues.apache.org/jira/browse/CASSANDRA-13630
>  

[jira] [Commented] (CASSANDRA-13630) support large internode messages with netty

2017-08-23 Thread Ariel Weisberg (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-13630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138399#comment-16138399
 ] 

Ariel Weisberg commented on CASSANDRA-13630:


bq. We do not have 1x amplification in pre-4.0 code; 
You always had to have the entire message in memory as a POJO so I count that 
as 1x. We don't lazily materialize result messages or message contents with the 
exception of paging.

bq. The cost of the amplification is hidden by that reusable backing buffer, 
but it's still there.
64k is a constant though. I'm not concerned with CPU time and performance I am 
concerned about OOMing. You can get predictable behavior for a cluster once all 
connections are provisioned. With 8647 the memory utilization is no longer 
constant it's linear with message size multiplied by fan out.

> support large internode messages with netty
> ---
>
> Key: CASSANDRA-13630
> URL: https://issues.apache.org/jira/browse/CASSANDRA-13630
> Project: Cassandra
>  Issue Type: Task
>  Components: Streaming and Messaging
>Reporter: Jason Brown
>Assignee: Jason Brown
> Fix For: 4.0
>
>
> As part of CASSANDRA-8457, we decided to punt on large mesages to reduce the 
> scope of that ticket. However, we still need that functionality to ship a 
> correctly operating internode messaging subsystem.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



[jira] [Commented] (CASSANDRA-13630) support large internode messages with netty

2017-08-23 Thread Jason Brown (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-13630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138305#comment-16138305
 ] 

Jason Brown commented on CASSANDRA-13630:
-

bq. I thought worst case memory amplification from this NIO approach was 2x 
message size which is worse than our current 1x message size, but it's not, 
it's cluster size * message size if a message is fanned out to all nodes in the 
cluster. 

We do not have 1x amplification in pre-4.0 code; it's always been messageSize 
times the number of target peers. In `OutboundTcpConnector` we wrote into a 
[backing buffer of 
64k|https://github.com/apache/cassandra/blob/cassandra-3.11/src/java/org/apache/cassandra/net/OutboundTcpConnection.java#L457]
 for each outbound peer and flushed when the buffer filled up (see 
`BufferedDataOutputStreamPlus`). The cost of the amplification is hidden by 
that reusable backing buffer, but it's still there.

With CASSANDRA-8457, everything gets it's own distinct buffer, allocated once 
per-message, which is serialized to and then flushed. With this ticket we'll 
move back to the previous model where there's a backing buffer that's used for 
aggregating small messages or chunks of larger messages. That buffer, of 
course, is not reused, but that's because of the asynchronous nature of NIO vs 
blocking IO. 

(FTR, I have thought about moving serialization outside of the "outbound 
connections" (either `OutboundTcpConnection` or netty handlers) - where we 
serialize before sending to the outbound channels and send a slice of a buffer 
to those channels. That way you only serialize once (less repetitive CPU work), 
as well as potentially consume less memory. But I think that's a different 
ticket.)

bq. I really wonder if that be a shared pool of threads and we size it 
generously

yeah, i thought about this. The problem is that because the deserialization is 
blocking, you basically need one thread in the pool for each "blocker"; else 
you starve some deserialization activities. Hence, i just used a background 
thread. Not my favorite choice, but I'm not sure a "well-sized" pool will be 
sufficient. 

Reading over your comments on the code itself this morning.


> support large internode messages with netty
> ---
>
> Key: CASSANDRA-13630
> URL: https://issues.apache.org/jira/browse/CASSANDRA-13630
> Project: Cassandra
>  Issue Type: Task
>  Components: Streaming and Messaging
>Reporter: Jason Brown
>Assignee: Jason Brown
> Fix For: 4.0
>
>
> As part of CASSANDRA-8457, we decided to punt on large mesages to reduce the 
> scope of that ticket. However, we still need that functionality to ship a 
> correctly operating internode messaging subsystem.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



[jira] [Commented] (CASSANDRA-13630) support large internode messages with netty

2017-08-22 Thread Ariel Weisberg (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-13630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137116#comment-16137116
 ] 

Ariel Weisberg commented on CASSANDRA-13630:


I added some comments on 
https://github.com/jasobrown/cassandra/commit/2d58ad5f0ca5a63cc0fbead0b9234876d2dbd770#diff-55d5a06a8f012c31e11a06fc3f5bb960R265

At a high level the thing that worries me most is fan out message patterns. I 
thought worst case memory amplification from this NIO approach was 2x message 
size which is worse than our current 1x message size, but it's not, it's 
cluster size * message size if a message is fanned out to all nodes in the 
cluster. At the barest of bare minimums we need to detect this condition (large 
message + fanout) and log it. But really I would need to be convinced that we 
don't ever send large messages to the entire cluster. Just by nature of the 
problem serialization is faster than networking so the large message would be 
serialized to all the connections faster than the bytes can be drained out.

I think you have the gist of it on the receive side where a thread is forced to 
block for large messages. You are also creating a thread per large message 
channel. I really wonder if that be a shared pool of threads and we size it 
generously. Heck, use same pool for send and receive.

Looking over the tests now.

> support large internode messages with netty
> ---
>
> Key: CASSANDRA-13630
> URL: https://issues.apache.org/jira/browse/CASSANDRA-13630
> Project: Cassandra
>  Issue Type: Task
>  Components: Streaming and Messaging
>Reporter: Jason Brown
>Assignee: Jason Brown
> Fix For: 4.0
>
>
> As part of CASSANDRA-8457, we decided to punt on large mesages to reduce the 
> scope of that ticket. However, we still need that functionality to ship a 
> correctly operating internode messaging subsystem.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



[jira] [Commented] (CASSANDRA-13630) support large internode messages with netty

2017-08-10 Thread Jason Brown (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-13630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16121770#comment-16121770
 ] 

Jason Brown commented on CASSANDRA-13630:
-

The core idea here is that if the outgoing message is large/huge, we don't want 
to naively allocate a huge buffer just for serialization. For example, if it's 
a large mutation (say 16MB), we don't want to allocate 16MB * n number of 
replica buffers on the coordinator. A safer approach is to allocate standard 
sized buffers (currently 64k), serialize into them via {{DataOutputPlus}} 
interface, write each buffer to the netty channel when the buffer is full, and 
allocate another buffer for further serialization.

The outbound side which splits up serialization into multiple buffers is 
implemented in {{MessageOutHandler.ByteBufDataOutputStreamPlus}}. At the same 
time, I've made it so that all messages are written into a shared buffer (via 
{{MessageOutHandler.ByteBufDataOutputStreamPlus}}), whether it's a large 
message being chunked across multiple buffers, or multiple small messages being 
aggregated into one buffer (think mutations ACKs). This upside here is that we 
don't need to go to the netty allocator for each individual small message, and 
thus just send the single, 'aggregation' buffer downstream in the channel when 
we need to flush.

As I implemented this behavior, I discovered that the 'aggregating buffer' 
could be a problem wrt {{MessageOutHandler#channelWritabilityChanged}} as that 
method, when it gets the signal the channel is writable, attempts to drain any 
backlog from {{OutboundMessagingConnection}} (via the 
{{MessageOutHandler#backlogSupplier}}). If i had retained the current code it 
is quite likely that I would start to serialize a backlogged message while in 
the middle of a message already being serialized (from 
{{MessageOutHandler#write}}), which happened to fill the buffer and write it to 
the channel.

Further, I noticed I needed to forward-port more of CASSANDRA-13265 in order to 
handle expiring messages from the backlog. (FTR, 
{{MessageOutHandler#userEventTriggered}} handles closing the channel when we 
make no progress, but there's no other purging or removing items from the 
backlog queue. Closing the channel will fail any messages in the channel, but 
not from the backlog). Thus, I added the backlog-expiring behavior to 
{{OutboundMessagingConnection#expireMessages}}, and now drain messages from the 
backlog in {{MessageOutHandler#write}}. By trying to send the backlogged 
messages before the incoming message on the channel, it gives us a better shot 
at ordering the sending of the messages wrt the order in which they came into 
the {{OutboundMessagingConnection}}.

I updated jctools to 2.0.2. Instead of using a {{LinkedBlockingQueue}} in 
{{OutboundMessagingConnection}} for the backlog, I decided to use something 
without locks from jctools. Even though the queue still needs to be an 
unbounded multi-producer/multi-consumer (at least, to replicate existing 
behaviors), the jctools queue should be a bit more efficient than an LBQ.

Fixing the outbound size is only half of the problem, as we don't want to 
naively allocate a huge buffer on the receiving node, either. This is a bit 
trickier due to the blocking IO style of our deserializers. Thus, similar to 
what I've done in CASSANDRA-12229, I need to add incoming {{ByteBuf}}s to a 
{{RebufferingByteBufDataInputPlus}} and spin up a background thread for 
performing the deserialization. Since we only need to spin up the the thread 
when we have large message payloads, this will only happen in a minority of use 
cases:

- we are actually transmitting a message larger than 
{{OutboundMessagingPool#LARGE_MESSAGE_THRESHOLD}}, which defaults to 64k. At 
that point we're sending all of those over the outbound large message queue 
anyway, so all messages on that channel/socket will be over the threshold and 
require the background deserialization. So this won't apply to the small 
messages channel, where we can still handle all those messages in-line on the 
inbound netty event loop.
- If you are operating a huge sized cluster (I'm guessing at least 500 nodes in 
size, haven't done the math, tbh), large gossip messages might trigger the 
receiving gossip channel to switch to the background deserialization mode, 
especially ACK/ACK2 messages after a bounce as they will contain all the 
{{ApplicationState}}s for all the peers in the cluster. I do not think this 
will be a problem in practice.

I want to add more comments/documentation before committing, but that should 
not hold up a review. Also, this code is based on the current CASSANDRA-12229. 
Currently failing tests for this branch seem to be race conditions only in the 
streaming code, so I'll fix on the CASSANDRA-12229 branch.

> support large internode messages with netty
> ---
>
>