Adam,
   I stand corrected. My apologies for the confusion. I re-verified with a 
debugger. The insertions are getting batched into  ArrayLists and the entire 
list is inserted as one element. This buffer however is not the ‘currentBatch’ 
that is shown in the diagram. There is a batching step that happens even prior 
to that, inside the  spout/bolt’s emit() code path.

The code sequence is something like this:

spout/bolt::Emit()  ->   transfer () ->  receiveQ.publish() -> 
currentBatch.add()


Consequently, the ‘currentBatch’   is made up of elements that are ArrayLists. 
So oddly, as you can see, there is double batching going on.

Once currentBatch is full (with 100 lists in your case), then it actually gets 
drained into the actual Disruptor Q.

Incidentally the batch size used for both the first level of batching and 
currentBatch’s size is the same … ‘topology.disruptor.batch.size’
So you end up with a current batch containing = 100 lists x100 elements each = 
10,000.  (if that setting is set to 100) …. assuming there is only one 
downstream bolt instance. If there is 2 downstream bolt instances then you are 
will have approx.(assuming even distribution)  100 lists x 50 elements each 
stored in currentBatch.

This double batching looks like an accident of code evolution.. the first level 
of batching existed (as noted by M. Noll) while the second level came in later 
when Disruptor Batching was introduced. The first level of batching will be 
eliminated in the new messaging system.

-roshan


From: Adam Meyerowitz <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Tuesday, June 6, 2017 at 2:36 PM
To: "[email protected]" <[email protected]>
Subject: Re: LMAX queue batch size

Hi Roshan, these metrics are for an intermediate bolt and they are for the 
inbound disruptor queue.

Happy to provide other information to help track this down so that we better 
understand what's going on.

Thanks!

On Tue, Jun 6, 2017 at 5:26 PM, Roshan Naik 
<[email protected]<mailto:[email protected]>> wrote:
The overflow list is basically flattened when it is drained into the Disruptor 
by the flusher. So again insertion is one element at a time.

In your case overflow seems empty in both readings. The observation is indeed 
puzzling.

Are these metrics for a terminal bolt / intermediate bolt / spout ?
Are those metrics for the inbound or the outbound DisruptorQ  of the spout/bolt?
-roshan



From: Adam Meyerowitz <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Tuesday, June 6, 2017 at 7:12 AM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: LMAX queue batch size

To make things a little bit more specific this is the kind of thing we are 
seeing.. I have chopped out some line, etc to make it more readable.


Note that during this one minute interval the read_pos went from
177914 to 178522 which is a difference of 608 yet we processed 4140 tuples 
based on the execute-count.


Maybe we are just interpreting these stats incorrectly.....
2017-06-05 13:50:17.834 EMSMetricsConsumer   10:B_CALC :8__receive 
{arrival_rate_secs=9.876286516269882, overflow=0, read_pos=177914, 
write_pos=177915, sojourn_time_ms=101.25263157894737, capacity=16384, population
2017-06-05 13:51:17.836 EMSMetricsConsumer 10:B_CALC :8 __receive 
{arrival_rate_secs=10.809687142708658, overflow=0, read_pos=178522, 
write_pos=178523, sojourn_time_ms=92.50961538461539, capacity=16384, population
=1}

2017-06-05 13:51:17.837 EMSMetricsConsumer 10:B_CALC :8 __execute-count 
{B_MARKET:TRADE_STREAM=4140, B_TIMER:DATE_TICK=0, B_MARKET:OPTX_STREAM=0}




On Mon, Jun 5, 2017 at 9:10 PM, Roshan Naik 
<[email protected]<mailto:[email protected]>> wrote:


AM> So the difference in the read and write queue positions will be 100 then, 
correct?

Only if the diff was 0 prior to the publish. More accurately… the diff between 
the read and write positions will increase by 100 after the publish.


AM> Empirically what we are seeing would lead us to believe that each queue 
entry is actually a list of tuples, not a single tuple.

It is not the case with the current code in master/1.x branches. Not sure if it 
used to be different previously.


AM> There is a pretty old description of this behavior by Michael Noll 
@http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/.
   Very good write-up but wondering if it's still appliest.


Michael’s otherwise excellent description seems a bit dated now. Not very sure, 
but that behavior may have been true when he wrote the blog.

I give a more detailed and up-to-date diagram/description of current messaging 
system  here (starting 36:00)
https://www.youtube.com/watch?v=kCRv6iEd7Ow

With the diagram show there, here is more info on the write path:


-          The incoming writes are buffered into an ArrayList<Object> called 
‘currentBatch’

-          If currenBatch is full then we try to drain it into the Disruptor 
(as I described previously… one element at a time.. followed by a single 
publish).

-          But if Disruptor is full, the entire currentBatch list is inserted 
as *one element* into another (unbounded) overflow list which is of type 
ConcurrentLinkedQueue<ArrayList<Object>>. The currentBatch is then cleared to 
make room for new incoming events

-          Every once in a while a flusher thread comes along and tries to 
drain any available items in overflow list into the Disruptor.

Having said that, we are planning to significantly revise this messaging 
subsystem for 2.0 as explained later in the above video.

-roshan




Reply via email to