On Thursday 08 October 2009 14:28:58 Jim Easterbrook wrote:
> On 08/10/2009 13:45, Matt Hammond wrote:
> > Ignoring threaded component behaviour for a moment. Axon delivers
> > messages to the destination component immediately. So if you send a
> > message(s) to the "inbox" inbox of a component, then a message to the
> > "control" inbox of the same component; then if that component notices the
> > message in its "control" inbox, then it is guaranteed that the messages
> > sent, earlier, to its "inbox" inbox will already be there. Ie. (in the
> > current
> > implementation at least) message delivery is guaranteed in-order.
>
> Good. I, for one, would like this to be made part of the "specification"
> so future implementations preserve this aspect.

It's guaranteed that inboxes and outboxes are FIFO queues, and guaranteed to 
have in order delivery. You could define a special outbox which didn't 
guarantee delivery or order - ala UDP - but that would be more sensible to 
implement as a component so someone knew explicitly what they're getting 
into.

> > It does cause queues to be flushed. And the documentation (I wrote) is
> > poor in that it does not explicitly state what actually happens (or what
> > is guaranteed from the perspective of a component writer). Yes, its
> > purpose when I coded it was to ensure queues are flushed.
>
> That's reassuring. I'd sort of worked out from the code what it was up
> to, but it's nice to have confirmation from the author.

It's the maintainer you need to worry about. Given I'm working on redoing the 
scheduler, there wasn't any guarantee this trick would work in future. I'll 
add a self.flushboxes() method to threadedcomponent that (for the moment) 
calls sync(). 

Whilst Matth says the purpose of sync was to flush the queues, the code 
doesn't really agree - it looks more like an accident of implementation. 
That's not to say that wasn't the intent, but you'd be hard pressed to take 
away that interpretation from the code.

cf:
    Regulating speed
    ----------------

    In addition to being able to pause (with an optional timeout), a threaded
    component can also regulate its speed by briefly synchronising with the
    rest of the system. Calling the sync() method simply briefly blocks until
    the rest of the system can acknowledge.

...

   def sync(self):
        """\
        Call this from main() to synchronise with the main scheduler's thread.

        You may wish to do this to throttle your component's behaviour
        This is akin to posix.sched_yield or shoving extra "yield"
        statements into a component's generator.
        """
        return self._do_threadsafe( lambda:None, [], {} )

Which means that the anonymous function "lambda: None" (called here "anon" for 
sake of discussion) is called inside the scheduler.

_do_threadsafe's purpose is essentially described as follows:

    For other methods such as link(), unlink() and (in the case of
    threadedadaptivecommscomponent) addInbox(), deleteInbox(),
    addOutbox() and deleteOutbox(), the _localmain() microprocess
    also acts on the thread's behalf.
    ...
    This is implemented by the _do_threadsafe() method. 

Crucially, in order for this to flush the queues, it has to work on the 
assumption that this can and will cause _localmain to have a full cycle 
before returning. This relies on a side effect of the way the scheduler is 
implemented today - which if left relied upon makes the system brittle.

> > However, even inserting self.sync() calls doesn't eliminate all race
> > conditions: a fresh message could arrive at the "control" inbox just
> > after sync() returns, and be delivered first (before any other messages
> > pending at the "inbox" inbox) meaning the thread might still miss them.
> > Hmm.
>
> I'd not given much thought to inboxes when I wrote my original, being
> more concerned with the producer's outboxes. However, testing my
> threaded consumer with the unthreaded producer revealed that inboxes are
> also a problem.

That's because as I noted the issue is in the fact that movement of data from 
inboxes to inqueues results in data from the control inbox being passed over 
by the localmain first.

Thinking about it a flush() method in both threaded/non-threaded components 
would solve this, as would changing the message delivery system. The latter 
would would be backwards compatible with existing components though, and make 
the system work essentially the way you want it to. (ie without making 
components more complex and without needing to alter existing logic).

I'll implement the latter.

> Here's my solution (tested) which I think achieves the same thing:
>
> class ConsumerT(Axon.ThreadedComponent.threadedcomponent):
>     def main(self):
>         count = 0
>         while 1:
>             if self.dataReady('inbox'):
>                 msg = self.recv('inbox')
>                 count += 1
>             elif self.dataReady('control'):
>                 self.sync()
>                 if not self.dataReady('inbox'):
>                     msg = self.recv('control')
>                     if isinstance(msg, producerFinished):
>                         break
>             else:
>                 self.pause(0.1)
>         print "%d messages received" % count

That would work, but from a performance perspective, it's a huge hit on the 
system.

> This just does a sync() any time there is data in 'control', and defers
> dealing with the control input until there is no inbox input.
>
> Note that in neither case am I considering inbox messages sent after the
> control message. My components do not send anything after a control
> message.

I think whilst the above works, making Axon work the way you expected it to in 
the first place is probably more beneficial.

It's a bug in Axon IMO, thanks for discussing it here :-)

We can also make a feature (sync() -> flush() ) clearer, but that's a fringe 
benefit IMO.


Michael.
-- 
http://yeoldeclue.com/blog
http://twitter.com/kamaelian
http://www.kamaelia.org/Home

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"kamaelia" group.
To post to this group, send email to kamaelia@googlegroups.com
To unsubscribe from this group, send email to 
kamaelia+unsubscr...@googlegroups.com
For more options, visit this group at 
http://groups.google.com/group/kamaelia?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to