Bobby,

Thank you for such a detailed reply. It was really useful. I was wondering if 
you could confirm a few final details for me? (This is for Storm version 1.2.2)


1) My first question is about the worker process local transfer function 
(defined in 
https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/worker.clj#L116),
 that is used for both moving tuples off the executor send thread to internal 
executor receive queues and also for moving batches of tuples arriving from 
other workers onto the relevant receive queues. The local transfer function 
groups all AddressedTuples for a given executor into a list (called pairs in 
the code) that is then given to the DisruptoQueue's publish function (via a 
Clojure wrapper).


My question is, because my knowledge of Clojure is limited, does the local 
transfer function (at 
https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/worker.clj#L125)
 pass individual tuples from the pairs list to the publish function (via some 
kind of map operation) or does it pass the list of tuples in one go?


I ask because I am trying to model the tuple flow and so I need to work out if 
batches in the DisruptorQueue's overflow queue are batches of individual tuples 
or batches of batches (lists) of tuples (which complicates things).


The executor's event handler 
(https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L457),
 which is called on a batch of objects returned from the ring buffer, seems to 
be expecting a list of individual tuples, not a list of lists of tuples. 
Therefore it seems like the former situation described above (where the local 
transfer function gives individual tuples to the publish function) is likely, 
but I just wanted to be sure.


2) My second question is about the DistruptorQueue consumeBatchWhenAvailable 
method 
(https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L481).
 I just wanted to confirm that the "batch" which this method refers to is 
actually "every available object in the ring buffer at that time". Therefore 
the batched consumed via this method could be of any size from 1 up to the ring 
buffer's limit (1024 by default)?


Thanks again for helping with my questions.


Regards,


Thomas Cooper
PhD Student
Newcastle University, School of Computer Science
W: http://www.tomcooper.org.uk | Twitter: 
@tomncooper<https://twitter.com/tomncooper>


________________________________
From: Bobby Evans <bo...@apache.org>
Sent: 27 November 2018 15:56
To: user
Subject: Re: Disruptor Queue flush behaviour

FYI in 2.x all of this is different, but to answer your questions for 1.x.

It is a little complicated to try and keep the memory and CPU overhead low, 
especially when few tuples are flowing.  Conceptually what happens is that 
tuples are placed into a separate data structure when they are inserted.

https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L225

If that batch fills up it will attempt to insert them into the disruptor queue. 
 Inserting multiple messages into the queue is more efficient than inserting in 
a single message at a time.  If there is not enough capacity to insert the 
message it goes into an overflow queue.

Every millisecond there is a thread pool that will then work at flushing all of 
the tuples buffered in the entire JVM.  First it will force any outstanding 
tuples to be placed into the overflow queue.

https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L268-L273

After that it goes through that overflow queue and makes sure all of the tuples 
from overflow are flushed into the disruptor queue.  So if no tuples are 
flowing a single thread in the thread pool will wake up once a ms to do a few 
checks per queue and end up doing nothing.  If messages are flowing once a ms a 
partial batch is inserted into the disruptor queue, and depending on how long 
it takes to insert those messages into the queue there may be a few threads 
doing this.

I hope this helps,

Bobby


On Mon, Nov 26, 2018 at 7:03 AM Thomas Cooper (PGR) 
<t.coo...@newcastle.ac.uk<mailto:t.coo...@newcastle.ac.uk>> wrote:

Hi,


I have a question about the behaviour of the LMAX disruptor queues that the 
executor send/receive and the worker transfer queues use.


These queues batch tuples for processing (100 by default) and will wait until a 
full batch has arrived before passing them to the executor. However, they will 
also flush any tuples in the queue periodically (1 ms by default) to prevent 
the queue blocking for a long time while it waits for 100 tuples to turn up.


My question is about the implementation of the flush interval behaviour:


  1.  Does the flush interval thread run continuously, issuing a flush command 
every 1 ms and the queue just ignores it if it is already flushing. If 100 
tuples turn up between the constant flush commands the queue issues them 
straight away.
  2.  Or does the flush interval timer only start when 
consumeBatchWhenAvailable is called on the disruptor queue and a full batch is 
not available? In which case the queue will wait for 1ms and return whatever is 
in the queue at the end of that interval or, if 100 tuples turn up within that 
1ms, return the full batch.


>From the code in storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java 
>it seems option 1 might be the case. However, the code in that class is quite 
>complex and the interplay with the underlying LMAX library makes it hard to 
>reason about.


Any help with the above would greatly appreciated, I am attempting to model the 
effect of these queues on topology performance and hopefully investigate a way 
to optimise the choice of batch size and flush 
interval.<http://localhost:7080/github.com/apache/storm@v1.2.2/-/blob/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java?utm_source=share#L300:16>


Thanks,


Thomas Cooper
PhD Student
Newcastle University, School of Computing
W: http://www.tomcooper.org.uk | Twitter: 
@tomncooper<https://twitter.com/tomncooper>

Reply via email to