I run the attached script, and then I run a bash command:  "ls -al"

which gives me the following:

/home/bitcycle/testing/q
total 12K
srwxr-xr-x 1 bitcycle bitcycle    0 2011-09-22 07:44
publisher-6497A874E52911E099520022FA02089C-520014=
srwxr-xr-x 1 bitcycle bitcycle    0 2011-09-22 07:44
publisher-6497A874E52911E099520022FA02089C-520014-sync=


Though I haven't dealt with file descriptors very much in the past, it seems
that they are residual file descriptors from the processes that I spawn.  I
can confirm that no zombie processes stick around via "ps aux" and I can
confirm that the processes are not alive when the script exits.

Maybe take a look at my code.  Its probably something basic that I'm missing
with zeromq.  I'm still pretty new to it.

-- Sean

On Thu, Sep 22, 2011 at 5:36 AM, Chuck Remes <[email protected]> wrote:

> On Sep 22, 2011, at 3:03 AM, Sean Ochoa wrote:
>
> > Hey all.  I'm wondering if someone could help me out with a pub/sub
> issue.  I run my script with the pub and sub in different child processes
> with the Python zmq bindings.  When the processes end the sockets are
> closed,  but it leaves the file descriptors around.  I've searched online
> for a bit, but haven't found a way to get it stop leaving them around.  Any
> ideas?  - Sean _______________________________________________
>
> This shouldn't be possible. When the process exits, the OS is responsible
> for freeing file descriptors.
>
> What do you mean by "it leaves the file descriptors around"? How are you
> detecting or measuring this?
>
> cr
>
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>



-- 
Sean | (206) 962-7954

import os
import logging
import zmq
import threading
import uuid
import re
import zlib
import traceback as tb
import sys
import pickle as pkl
from time import sleep
import multiprocessing as mp
from datetime import datetime

logging.basicConfig(format="[%(process)s] %(name)s %(levelname)s -- %(message)s", level=logging.DEBUG)

class Publisher(object):

    def __init__(self, 
                 url = None, 
                 sync = False,  
                 min_num_subscribers = 1,  
                 serialize = None,
                 compress = None):
        self.log = logging.getLogger(Publisher.__name__)

        if not url or not isinstance(url,str):
            self.log.info("Getting a default IPC url for the zeromq socket")
            url = self.generate_url()
        self.url = url

        self._context = None
        self._socket = None
        self._sync_socket = None

        self.syncd = sync 
        self.min_num_subscribers = min_num_subscribers

        self.serialize = serialize
        self.compress = compress

        self._connect()

    @staticmethod
    def generate_url():
        log = logging.getLogger(Publisher.__class__.__name__)
        uid = str(uuid.uuid1())
        uid = uid.upper()
        uid = re.sub("-", "", uid)
        url = "ipc://publisher-%s-%d" % (uid, datetime.now().microsecond)
        log.debug("url => %s" % (url))
        return url

    def __str__(self):
        return "<%s url=\"%s\", context=%s>" % (self.__class__.__name__, self.url, self._context)

    def __del__(self):
        log = logging.getLogger(Publisher.__name__)
        if self._socket:
            log.info("Closing pub socket")
            self._socket.close()
        if self._sync_socket:
            log.info("Closing pub sync socket")
            self._sync_socket.close()
        
    def _connect(self, context = None):
        log = self.log
        try:

            if not context:
                context = zmq.Context.instance()
                log.debug("Using context:  %s" % (context))

            log.info("Creating publisher socket")
            self._context = context
            self._socket = context.socket(zmq.PUB)

            log.debug("Binding to %s" % (self.url))
            self._socket.bind(self.url)
            sleep(1)
            if not self.syncd:
                log.debug("Not using sync.")
                return

            log.info("Synchronizing with %d subscriber(s)", (self.min_num_subscribers))
            log.info("Creating sync socket")
            self._sync_socket = context.socket(zmq.REP)
            sync = self._sync_socket
            sync_url = self.url + "-sync" 
            log.debug("Binding to %s" % (sync_url))
            sync.bind(sync_url)
            NULL_STRING = str()
            count = 0
            log.info("Waiting for subscribers.")
            while count < self.min_num_subscribers:
                try:
                    msg = sync.recv(zmq.NOBLOCK)
                    sync.send(NULL_STRING)
                    count += 1
                    log.info("%d subscriber(s) available" % (count))
                except zmq.ZMQError, e:
                    if e.errno == zmq.EAGAIN:
                        log.info("The message wasn't ready.")
                        sleep(0.1)
                    else:
                        raise e
        except:
            log.critical(tb.format_exception(*sys.exc_info()))

    def send(self, item):
        log = self.log
        try:

            if self.serialize:
                log.info("Serializing the msg")
                item = self.serialize(item)
            else:
                log.debug("Not using serialization")

            if self.compress:
                log.info("Compressing the msg")
                item = self.compress(item)
            else:
                log.debug("Not using compression")

            log.info("Sending item")
            self._socket.send(item)
            log.info("Item sent!")
        except:
            log.critical(tb.format_exception(*sys.exc_info()))

class Subscriber(object):

    def __init__(self, 
                 url, 
                 sub_filter = str(),
                 deserialize = None,
                 decompress = None,
                 sync = False):

        self.log = logging.getLogger(self.__class__.__name__)
        log = self.log 
        
        if not url or not isinstance(url,str):
            raise Exception("%s must have a valid URL" % (self.__class__.__name__))
        self.url = url
        self.filter = sub_filter
        self._context = None
        self._socket = None        
        self.connected = False
        self.syncd = sync 
        self._connect()
        self.deserialize = deserialize
        self.decompress = decompress

    def __str__(self):
        return "<%s url=\"%s\", context=%s>" % (self.__class__.__name__, self.url, self._context)

    def __del__(self):
        self.log = logging.getLogger(Subscriber.__name__)
        if self._socket:
            log.info("Closing sub socket")
            self._socket.close()
        if self._sync_socket:
            log.info("Closing sub sync socket")
            self._sync_socket.close()

    def _connect(self, context = None):
        log = self.log

        NULL_STRING = ""

        if self.connected:
            log.warning("Already connected.")
        
        if not context:
            log.info("Requesting zmq context")
            context = zmq.Context.instance()
            log.debug("Using context:  %s" % (context))

        try:
            self._context = context
            self._socket = context.socket(zmq.SUB)
            log.debug("Socket connecting to %s" % (self.url))
            self._socket.connect(self.url)

            if len(self.filter) > 0:
                log.info("Using subscription filter:  \"%s\"" % (self.filter))
            self._socket.setsockopt(zmq.SUBSCRIBE, self.filter)
            self.connected = True

            if not self.syncd:
                log.info("Not using synchronization.")
                return

            log.info("Creating sync socket")
            self._sync_socket = context.socket(zmq.REQ)
            sync = self._sync_socket
            sync_url = self.url + "-sync"
            log.debug("Connecting to sync socket")
            sync.connect(sync_url)

            log.info("Synchronizing...")
            sync.send(NULL_STRING)
            sync.recv()

        except:
            log.critical(tb.format_exception(*sys.exc_info()))

    def recv(self, block=True):

        log = self.log

        try:

            flags = 0
            if not block:
                log.debug("Using non-blocking option wiht zmq.")
                flags = flags + zmq.NOBLOCK

            try:

                log.info("Recieving msg")
                msg = self._socket.recv(flags)
                log.info("Msg recieved!")

                if self.decompress:
                    log.info("Decompressing the msg using %s" % (self.decompress))
                    msg = self.decompress(msg)
                else:
                    log.debug("Not using decompression")                

                if self.deserialize:
                    log.info("Deserializing the msg using %s" % (self.deserialize))
                    msg = self.deserialize(msg)
                else:
                    log.debug("Not using deserialization")                
                return msg

            except zmq.ZMQError, e:
                if e.errno == zmq.EAGAIN:
                    log.info("The message wasn't ready.")
                else:
                    raise e
        except:
            log.critical(tb.format_exception(*sys.exc_info()))

def publish(url):
    log = logging.getLogger(Publisher.__name__)
    p = Publisher(url, sync = True)
    log.info("Sending msg")
    p.send("foo")

def subscribe(url):
    log = logging.getLogger(Subscriber.__name__)
    log.info("Creating subscriber")
    s = Subscriber(url, sync = True)
    log.debug(s.recv())

log = logging.getLogger()
log.info("Creating publisher")
url = Publisher.generate_url()

p = mp.Process(target=publish, args=(url,))
c = mp.Process(target=subscribe, args=(url,))

p.start()
log.info("Started pub process %d" % (p.pid))
c.start()
log.info("Started sub process %d" % (c.pid))

while p.is_alive():
    sleep(1)

while c.is_alive():
    sleep(1)

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to