Bobby,
Thank you for such a detailed reply. It was really useful. I was wondering if you could confirm a few final details for me? (This is for Storm version 1.2.2) 1) My first question is about the worker process local transfer function (defined in https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/worker.clj#L116), that is used for both moving tuples off the executor send thread to internal executor receive queues and also for moving batches of tuples arriving from other workers onto the relevant receive queues. The local transfer function groups all AddressedTuples for a given executor into a list (called pairs in the code) that is then given to the DisruptoQueue's publish function (via a Clojure wrapper). My question is, because my knowledge of Clojure is limited, does the local transfer function (at https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/worker.clj#L125) pass individual tuples from the pairs list to the publish function (via some kind of map operation) or does it pass the list of tuples in one go? I ask because I am trying to model the tuple flow and so I need to work out if batches in the DisruptorQueue's overflow queue are batches of individual tuples or batches of batches (lists) of tuples (which complicates things). The executor's event handler (https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L457), which is called on a batch of objects returned from the ring buffer, seems to be expecting a list of individual tuples, not a list of lists of tuples. Therefore it seems like the former situation described above (where the local transfer function gives individual tuples to the publish function) is likely, but I just wanted to be sure. 2) My second question is about the DistruptorQueue consumeBatchWhenAvailable method (https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L481). I just wanted to confirm that the "batch" which this method refers to is actually "every available object in the ring buffer at that time". Therefore the batched consumed via this method could be of any size from 1 up to the ring buffer's limit (1024 by default)? Thanks again for helping with my questions. Regards, Thomas Cooper PhD Student Newcastle University, School of Computer Science W: http://www.tomcooper.org.uk | Twitter: @tomncooper<https://twitter.com/tomncooper> ________________________________ From: Bobby Evans <bo...@apache.org> Sent: 27 November 2018 15:56 To: user Subject: Re: Disruptor Queue flush behaviour FYI in 2.x all of this is different, but to answer your questions for 1.x. It is a little complicated to try and keep the memory and CPU overhead low, especially when few tuples are flowing. Conceptually what happens is that tuples are placed into a separate data structure when they are inserted. https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L225 If that batch fills up it will attempt to insert them into the disruptor queue. Inserting multiple messages into the queue is more efficient than inserting in a single message at a time. If there is not enough capacity to insert the message it goes into an overflow queue. Every millisecond there is a thread pool that will then work at flushing all of the tuples buffered in the entire JVM. First it will force any outstanding tuples to be placed into the overflow queue. https://github.com/apache/storm/blob/d2d6f40344e6cc92ab07f3a462d577ef6b61f8b1/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java#L268-L273 After that it goes through that overflow queue and makes sure all of the tuples from overflow are flushed into the disruptor queue. So if no tuples are flowing a single thread in the thread pool will wake up once a ms to do a few checks per queue and end up doing nothing. If messages are flowing once a ms a partial batch is inserted into the disruptor queue, and depending on how long it takes to insert those messages into the queue there may be a few threads doing this. I hope this helps, Bobby On Mon, Nov 26, 2018 at 7:03 AM Thomas Cooper (PGR) <t.coo...@newcastle.ac.uk<mailto:t.coo...@newcastle.ac.uk>> wrote: Hi, I have a question about the behaviour of the LMAX disruptor queues that the executor send/receive and the worker transfer queues use. These queues batch tuples for processing (100 by default) and will wait until a full batch has arrived before passing them to the executor. However, they will also flush any tuples in the queue periodically (1 ms by default) to prevent the queue blocking for a long time while it waits for 100 tuples to turn up. My question is about the implementation of the flush interval behaviour: 1. Does the flush interval thread run continuously, issuing a flush command every 1 ms and the queue just ignores it if it is already flushing. If 100 tuples turn up between the constant flush commands the queue issues them straight away. 2. Or does the flush interval timer only start when consumeBatchWhenAvailable is called on the disruptor queue and a full batch is not available? In which case the queue will wait for 1ms and return whatever is in the queue at the end of that interval or, if 100 tuples turn up within that 1ms, return the full batch. >From the code in storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java >it seems option 1 might be the case. However, the code in that class is quite >complex and the interplay with the underlying LMAX library makes it hard to >reason about. Any help with the above would greatly appreciated, I am attempting to model the effect of these queues on topology performance and hopefully investigate a way to optimise the choice of batch size and flush interval.<http://localhost:7080/github.com/apache/storm@v1.2.2/-/blob/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java?utm_source=share#L300:16> Thanks, Thomas Cooper PhD Student Newcastle University, School of Computing W: http://www.tomcooper.org.uk | Twitter: @tomncooper<https://twitter.com/tomncooper>