I run the attached script, and then I run a bash command: "ls -al"
which gives me the following:
/home/bitcycle/testing/q
total 12K
srwxr-xr-x 1 bitcycle bitcycle 0 2011-09-22 07:44
publisher-6497A874E52911E099520022FA02089C-520014=
srwxr-xr-x 1 bitcycle bitcycle 0 2011-09-22 07:44
publisher-6497A874E52911E099520022FA02089C-520014-sync=
Though I haven't dealt with file descriptors very much in the past, it seems
that they are residual file descriptors from the processes that I spawn. I
can confirm that no zombie processes stick around via "ps aux" and I can
confirm that the processes are not alive when the script exits.
Maybe take a look at my code. Its probably something basic that I'm missing
with zeromq. I'm still pretty new to it.
-- Sean
On Thu, Sep 22, 2011 at 5:36 AM, Chuck Remes <[email protected]> wrote:
> On Sep 22, 2011, at 3:03 AM, Sean Ochoa wrote:
>
> > Hey all. I'm wondering if someone could help me out with a pub/sub
> issue. I run my script with the pub and sub in different child processes
> with the Python zmq bindings. When the processes end the sockets are
> closed, but it leaves the file descriptors around. I've searched online
> for a bit, but haven't found a way to get it stop leaving them around. Any
> ideas? - Sean _______________________________________________
>
> This shouldn't be possible. When the process exits, the OS is responsible
> for freeing file descriptors.
>
> What do you mean by "it leaves the file descriptors around"? How are you
> detecting or measuring this?
>
> cr
>
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
--
Sean | (206) 962-7954
import os
import logging
import zmq
import threading
import uuid
import re
import zlib
import traceback as tb
import sys
import pickle as pkl
from time import sleep
import multiprocessing as mp
from datetime import datetime
logging.basicConfig(format="[%(process)s] %(name)s %(levelname)s -- %(message)s", level=logging.DEBUG)
class Publisher(object):
def __init__(self,
url = None,
sync = False,
min_num_subscribers = 1,
serialize = None,
compress = None):
self.log = logging.getLogger(Publisher.__name__)
if not url or not isinstance(url,str):
self.log.info("Getting a default IPC url for the zeromq socket")
url = self.generate_url()
self.url = url
self._context = None
self._socket = None
self._sync_socket = None
self.syncd = sync
self.min_num_subscribers = min_num_subscribers
self.serialize = serialize
self.compress = compress
self._connect()
@staticmethod
def generate_url():
log = logging.getLogger(Publisher.__class__.__name__)
uid = str(uuid.uuid1())
uid = uid.upper()
uid = re.sub("-", "", uid)
url = "ipc://publisher-%s-%d" % (uid, datetime.now().microsecond)
log.debug("url => %s" % (url))
return url
def __str__(self):
return "<%s url=\"%s\", context=%s>" % (self.__class__.__name__, self.url, self._context)
def __del__(self):
log = logging.getLogger(Publisher.__name__)
if self._socket:
log.info("Closing pub socket")
self._socket.close()
if self._sync_socket:
log.info("Closing pub sync socket")
self._sync_socket.close()
def _connect(self, context = None):
log = self.log
try:
if not context:
context = zmq.Context.instance()
log.debug("Using context: %s" % (context))
log.info("Creating publisher socket")
self._context = context
self._socket = context.socket(zmq.PUB)
log.debug("Binding to %s" % (self.url))
self._socket.bind(self.url)
sleep(1)
if not self.syncd:
log.debug("Not using sync.")
return
log.info("Synchronizing with %d subscriber(s)", (self.min_num_subscribers))
log.info("Creating sync socket")
self._sync_socket = context.socket(zmq.REP)
sync = self._sync_socket
sync_url = self.url + "-sync"
log.debug("Binding to %s" % (sync_url))
sync.bind(sync_url)
NULL_STRING = str()
count = 0
log.info("Waiting for subscribers.")
while count < self.min_num_subscribers:
try:
msg = sync.recv(zmq.NOBLOCK)
sync.send(NULL_STRING)
count += 1
log.info("%d subscriber(s) available" % (count))
except zmq.ZMQError, e:
if e.errno == zmq.EAGAIN:
log.info("The message wasn't ready.")
sleep(0.1)
else:
raise e
except:
log.critical(tb.format_exception(*sys.exc_info()))
def send(self, item):
log = self.log
try:
if self.serialize:
log.info("Serializing the msg")
item = self.serialize(item)
else:
log.debug("Not using serialization")
if self.compress:
log.info("Compressing the msg")
item = self.compress(item)
else:
log.debug("Not using compression")
log.info("Sending item")
self._socket.send(item)
log.info("Item sent!")
except:
log.critical(tb.format_exception(*sys.exc_info()))
class Subscriber(object):
def __init__(self,
url,
sub_filter = str(),
deserialize = None,
decompress = None,
sync = False):
self.log = logging.getLogger(self.__class__.__name__)
log = self.log
if not url or not isinstance(url,str):
raise Exception("%s must have a valid URL" % (self.__class__.__name__))
self.url = url
self.filter = sub_filter
self._context = None
self._socket = None
self.connected = False
self.syncd = sync
self._connect()
self.deserialize = deserialize
self.decompress = decompress
def __str__(self):
return "<%s url=\"%s\", context=%s>" % (self.__class__.__name__, self.url, self._context)
def __del__(self):
self.log = logging.getLogger(Subscriber.__name__)
if self._socket:
log.info("Closing sub socket")
self._socket.close()
if self._sync_socket:
log.info("Closing sub sync socket")
self._sync_socket.close()
def _connect(self, context = None):
log = self.log
NULL_STRING = ""
if self.connected:
log.warning("Already connected.")
if not context:
log.info("Requesting zmq context")
context = zmq.Context.instance()
log.debug("Using context: %s" % (context))
try:
self._context = context
self._socket = context.socket(zmq.SUB)
log.debug("Socket connecting to %s" % (self.url))
self._socket.connect(self.url)
if len(self.filter) > 0:
log.info("Using subscription filter: \"%s\"" % (self.filter))
self._socket.setsockopt(zmq.SUBSCRIBE, self.filter)
self.connected = True
if not self.syncd:
log.info("Not using synchronization.")
return
log.info("Creating sync socket")
self._sync_socket = context.socket(zmq.REQ)
sync = self._sync_socket
sync_url = self.url + "-sync"
log.debug("Connecting to sync socket")
sync.connect(sync_url)
log.info("Synchronizing...")
sync.send(NULL_STRING)
sync.recv()
except:
log.critical(tb.format_exception(*sys.exc_info()))
def recv(self, block=True):
log = self.log
try:
flags = 0
if not block:
log.debug("Using non-blocking option wiht zmq.")
flags = flags + zmq.NOBLOCK
try:
log.info("Recieving msg")
msg = self._socket.recv(flags)
log.info("Msg recieved!")
if self.decompress:
log.info("Decompressing the msg using %s" % (self.decompress))
msg = self.decompress(msg)
else:
log.debug("Not using decompression")
if self.deserialize:
log.info("Deserializing the msg using %s" % (self.deserialize))
msg = self.deserialize(msg)
else:
log.debug("Not using deserialization")
return msg
except zmq.ZMQError, e:
if e.errno == zmq.EAGAIN:
log.info("The message wasn't ready.")
else:
raise e
except:
log.critical(tb.format_exception(*sys.exc_info()))
def publish(url):
log = logging.getLogger(Publisher.__name__)
p = Publisher(url, sync = True)
log.info("Sending msg")
p.send("foo")
def subscribe(url):
log = logging.getLogger(Subscriber.__name__)
log.info("Creating subscriber")
s = Subscriber(url, sync = True)
log.debug(s.recv())
log = logging.getLogger()
log.info("Creating publisher")
url = Publisher.generate_url()
p = mp.Process(target=publish, args=(url,))
c = mp.Process(target=subscribe, args=(url,))
p.start()
log.info("Started pub process %d" % (p.pid))
c.start()
log.info("Started sub process %d" % (c.pid))
while p.is_alive():
sleep(1)
while c.is_alive():
sleep(1)
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev