On Thursday 08 October 2009 14:28:58 Jim Easterbrook wrote: > On 08/10/2009 13:45, Matt Hammond wrote: > > Ignoring threaded component behaviour for a moment. Axon delivers > > messages to the destination component immediately. So if you send a > > message(s) to the "inbox" inbox of a component, then a message to the > > "control" inbox of the same component; then if that component notices the > > message in its "control" inbox, then it is guaranteed that the messages > > sent, earlier, to its "inbox" inbox will already be there. Ie. (in the > > current > > implementation at least) message delivery is guaranteed in-order. > > Good. I, for one, would like this to be made part of the "specification" > so future implementations preserve this aspect.
It's guaranteed that inboxes and outboxes are FIFO queues, and guaranteed to have in order delivery. You could define a special outbox which didn't guarantee delivery or order - ala UDP - but that would be more sensible to implement as a component so someone knew explicitly what they're getting into. > > It does cause queues to be flushed. And the documentation (I wrote) is > > poor in that it does not explicitly state what actually happens (or what > > is guaranteed from the perspective of a component writer). Yes, its > > purpose when I coded it was to ensure queues are flushed. > > That's reassuring. I'd sort of worked out from the code what it was up > to, but it's nice to have confirmation from the author. It's the maintainer you need to worry about. Given I'm working on redoing the scheduler, there wasn't any guarantee this trick would work in future. I'll add a self.flushboxes() method to threadedcomponent that (for the moment) calls sync(). Whilst Matth says the purpose of sync was to flush the queues, the code doesn't really agree - it looks more like an accident of implementation. That's not to say that wasn't the intent, but you'd be hard pressed to take away that interpretation from the code. cf: Regulating speed ---------------- In addition to being able to pause (with an optional timeout), a threaded component can also regulate its speed by briefly synchronising with the rest of the system. Calling the sync() method simply briefly blocks until the rest of the system can acknowledge. ... def sync(self): """\ Call this from main() to synchronise with the main scheduler's thread. You may wish to do this to throttle your component's behaviour This is akin to posix.sched_yield or shoving extra "yield" statements into a component's generator. """ return self._do_threadsafe( lambda:None, [], {} ) Which means that the anonymous function "lambda: None" (called here "anon" for sake of discussion) is called inside the scheduler. _do_threadsafe's purpose is essentially described as follows: For other methods such as link(), unlink() and (in the case of threadedadaptivecommscomponent) addInbox(), deleteInbox(), addOutbox() and deleteOutbox(), the _localmain() microprocess also acts on the thread's behalf. ... This is implemented by the _do_threadsafe() method. Crucially, in order for this to flush the queues, it has to work on the assumption that this can and will cause _localmain to have a full cycle before returning. This relies on a side effect of the way the scheduler is implemented today - which if left relied upon makes the system brittle. > > However, even inserting self.sync() calls doesn't eliminate all race > > conditions: a fresh message could arrive at the "control" inbox just > > after sync() returns, and be delivered first (before any other messages > > pending at the "inbox" inbox) meaning the thread might still miss them. > > Hmm. > > I'd not given much thought to inboxes when I wrote my original, being > more concerned with the producer's outboxes. However, testing my > threaded consumer with the unthreaded producer revealed that inboxes are > also a problem. That's because as I noted the issue is in the fact that movement of data from inboxes to inqueues results in data from the control inbox being passed over by the localmain first. Thinking about it a flush() method in both threaded/non-threaded components would solve this, as would changing the message delivery system. The latter would would be backwards compatible with existing components though, and make the system work essentially the way you want it to. (ie without making components more complex and without needing to alter existing logic). I'll implement the latter. > Here's my solution (tested) which I think achieves the same thing: > > class ConsumerT(Axon.ThreadedComponent.threadedcomponent): > def main(self): > count = 0 > while 1: > if self.dataReady('inbox'): > msg = self.recv('inbox') > count += 1 > elif self.dataReady('control'): > self.sync() > if not self.dataReady('inbox'): > msg = self.recv('control') > if isinstance(msg, producerFinished): > break > else: > self.pause(0.1) > print "%d messages received" % count That would work, but from a performance perspective, it's a huge hit on the system. > This just does a sync() any time there is data in 'control', and defers > dealing with the control input until there is no inbox input. > > Note that in neither case am I considering inbox messages sent after the > control message. My components do not send anything after a control > message. I think whilst the above works, making Axon work the way you expected it to in the first place is probably more beneficial. It's a bug in Axon IMO, thanks for discussing it here :-) We can also make a feature (sync() -> flush() ) clearer, but that's a fringe benefit IMO. Michael. -- http://yeoldeclue.com/blog http://twitter.com/kamaelian http://www.kamaelia.org/Home --~--~---------~--~----~------------~-------~--~----~ You received this message because you are subscribed to the Google Groups "kamaelia" group. To post to this group, send email to kamaelia@googlegroups.com To unsubscribe from this group, send email to kamaelia+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/kamaelia?hl=en -~----------~----~----~----~------~----~------~--~---