Excerpts from Yuya Nishihara's message of 2017-02-28 00:37:04 +0900:
> On Wed, 22 Feb 2017 18:16:08 -0800, Jun Wu wrote:
> > # HG changeset patch
> > # User Jun Wu <qu...@fb.com>
> > # Date 1487802520 28800
> > #      Wed Feb 22 14:28:40 2017 -0800
> > # Node ID aef9e96fb573b85f5731367a470f574dbe730839
> > # Parent  80f04ba7f4d1f439d726068f02172f9a52b981fe
> > # Available At https://bitbucket.org/quark-zju/hg-draft 
> > #              hg pull https://bitbucket.org/quark-zju/hg-draft  -r 
> > aef9e96fb573
> > chgcache: implement socketpair-based IPC
> 
> > 
> > +class socketipc(object):
> > +    """A simple interprocess communication mechanism that sets up an 
> > channel
> > +    between the master server and (multiple) forked worker processes. The
> > +    forked workers do non-blocking, unreliable writes, while the master 
> > server
> > +    does blocking reads.
> > +
> > +    To use the object, create it in the master server, read from a thread, 
> > and
> > +    write from forked processes:
> > +
> > +        # pid=1000, master, main thread
> > +        ipc = socketipc() # initialize ipc before forking
> > +
> > +        # pid=1000, master, a background thread
> > +        while True:
> > +            msg = ipc.recv() # blocking
> > +            ....
> > +
> > +        # pid=1001, worker
> > +        ipc.send('foo') # non-blocking, silently ignore errors
> > +
> > +        # pid=1002, worker
> > +        ipc.send('bar') # non-blocking, silently ignore errors
> > +    """
> > +
> > +    def __init__(self):
> > +        self._in, self._out = socket.socketpair(socket.AF_UNIX,
> > +                                                socket.SOCK_DGRAM)
> > +        self._out.setblocking(False)
> > +
> > +    def send(self, msg):
> > +        """send msg without blocking. fail silently on errors, ex. msg is 
> > too
> > +        long, or the queue is full.
> > +        """
> > +        try:
> > +            return self._out.send(msg)
> > +        except socket.error:
> > +            pass
> > +
> > +    def recv(self):
> > +        """receive a complete msg. will block."""
> > +        select.select([self._in], [], [])
> > +        # get message length, see "man tty_ioctl", not POSIX compliant
> > +        intbuf = array.array('i', [0])
> > +        fcntl.ioctl(self._in, termios.FIONREAD, intbuf)
> > +        msglen = intbuf[0]
> > +        # allocate one more byte, so we can detect bad msglen (bad OS)
> > +        msg = self._in.recv(msglen + 1)
> > +        assert len(msg) == msglen
> > +        return msg
> 
> Looks okay, but can't we simply call recv() with reasonably large buffer size
> (e.g. 8k) ?

That's actually a good idea. I was not very comfortable with the non-POSIX
API too.

> Nit: if all peer ends were closed appropriately, recv() would return '' and 
> the
> assertion would fail.

Currently, that's impossible - the master does not close the fds.

> 
> > +    def __del__(self):
> > +        self._in.close()
> > +        self._out.close()
> 
> It's generally a bad idea to free resources by GC. Can't we have .close()
> method?

Actually I had tried but it seems over complicated. "close" and "recv" may
be called from different threads, thus thread lock may be necessary. And it
requires a way to interrupt a blocking "recv" from "close", which is quite
painful to do "correctly" (see the reply to that patch). It seems shm does
not introduce these kinds of troubles.
_______________________________________________
Mercurial-devel mailing list
Mercurial-devel@mercurial-scm.org
https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel

Reply via email to