RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  [email protected]
  Module: rpm                              Date:   26-May-2017 21:53:49
  Branch: rpm-5_4                          Handle: 2017052619534900

  Modified files:           (Branch: rpm-5_4)
    rpm/rpmio               msqio.c rpmmsq.h

  Log:
    - msqio: attach an rpmzlog.
    - msqio: SPEW is now variadic.

  Summary:
    Revision    Changes     Path
    1.1.2.12    +116 -94    rpm/rpmio/msqio.c
    1.1.2.11    +9  -6      rpm/rpmio/rpmmsq.h
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/msqio.c
  ============================================================================
  $ cvs diff -u -r1.1.2.11 -r1.1.2.12 msqio.c
  --- rpm/rpmio/msqio.c 26 May 2017 13:22:17 -0000      1.1.2.11
  +++ rpm/rpmio/msqio.c 26 May 2017 19:53:49 -0000      1.1.2.12
  @@ -18,10 +18,11 @@
   #endif       /* WITH_MQ */
   
   #include "rpmio_internal.h"
  +#define      _RPMLOG_INTERNAL        /* XXX ANSI_COLOR* */
   #include <rpmlog.h>
   #include <rpmmacro.h>
   #include <rpmcb.h>           /* XXX rpmIsDebug() */
  -#include <yarn.h>
  +#include <rpmzlog.h>         /* XXX rpmzLog type */
   
   #define      _RPMMSQ_INTERNAL
   #include "rpmmsq.h"
  @@ -33,29 +34,34 @@
   #define F_ISSET(_f, _FLAG) ((_f) & RPMMSQ_FLAGS_##_FLAG)
   #define MSQF_ISSET(_FLAG) F_ISSET(msq->flags, _FLAG)
   
  -#define SPEW(_list) \
  -    if ((msq && MSQF_ISSET(DEBUG)) || _rpmmsq_debug || _rpmio_debug) fprintf 
_list
  +#define SPEW(_fmt, ...) \
  +    if ((msq && MSQF_ISSET(DEBUG)) || _rpmmsq_debug || _rpmio_debug) \
  +     fprintf(stderr, _fmt, __VA_ARGS__)
   
   #define      MSQONLY(fd)     assert(fdGetIo(fd) == msqio)
   
   static int _lockdebug = 0;
  -static int _conddebug = 0;
  +static int _conddebug = -1;
   #define Z(_rc)  assert((_rc) == 0)
   #define LOCK(_m)  \
  -    {        if (_lockdebug) fprintf(stderr, "***  LOCKING(%p) %s\n", &_m, 
#_m);\
  -     Z(pthread_mutex_lock(&_m));\
  +    {        if (_lockdebug) \
  +         rpmzLogAdd(msq->zlog, "***  LOCKING(%p) %s", &_m, #_m); \
  +     Z(pthread_mutex_lock(&_m)); \
       }
   #define UNLOCK(_m) \
  -    {        Z(pthread_mutex_unlock(&_m));\
  -     if (_lockdebug) fprintf(stderr, "*** UNLOCKED(%p) %s\n", &_m, #_m);\
  +    {        Z(pthread_mutex_unlock(&_m)); \
  +     if (_lockdebug) \
  +         rpmzLogAdd(msq->zlog, "*** UNLOCKED(%p) %s", &_m, #_m); \
       }
   #define SIGNAL(_c) \
  -    {        if (_conddebug) fprintf(stderr, "***   SIGNAL(%p) %s\n", &_c, 
#_c);\
  -     Z(pthread_cond_broadcast(&_c));\
  +    {        if (_conddebug) \
  +         rpmzLogAdd(msq->zlog, "***   SIGNAL(%p) %s", &_c, #_c); \
  +     Z(pthread_cond_broadcast(&_c)); \
       }
   #define WAIT(_c,_m) \
  -    {        if (_conddebug) fprintf(stderr, "***     WAIT(%p) %s\n", &_c, 
#_c);\
  -     Z(pthread_cond_wait(&_c, &_m));\
  +    {        if (_conddebug) \
  +         rpmzLogAdd(msq->zlog, "***     WAIT(%p) %s", &_c, #_c); \
  +     Z(pthread_cond_wait(&_c, &_m)); \
       }
   
   #ifdef       NOTYET
  @@ -83,6 +89,12 @@
   GENfree(rpmmsq)
   #endif       /* __cplusplus */
   
  +/* XXX TODO: read system settings. */
  +/* /proc/sys/fs/mqueue/msg_default */
  +#define      MSQ_MSGMAX      10
  +/* /proc/sys/fs/mqueue/msgsize_default */
  +#define      MSQ_MSGSIZE     BUFSIZ
  +
   /* =============================================================== */
   #if defined(WITH_MQ)
   static
  @@ -91,7 +103,7 @@
   {
       rpmmsq msq = NULL;
       mqd_t mqdes = mq_open(name, oflag, mode, attr);
  -SPEW((stderr, "<--\t%s(%s,0x%x,0%o,%p) qid %d\n", __FUNCTION__, name, oflag, 
mode, attr, mqdes));
  +SPEW("<--\t%s(%s,0x%x,0%o,%p) qid %d\n", __FUNCTION__, name, oflag, mode, 
attr, mqdes);
       return mqdes;
   }
   
  @@ -100,7 +112,7 @@
   {
       rpmmsq msq = NULL;
       int rc = mq_getattr(mqdes, attr);
  -SPEW((stderr, "<--\t%s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, attr, rc));
  +SPEW("<--\t%s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, attr, rc);
       return rc;
   }
   
  @@ -109,7 +121,7 @@
   {
       rpmmsq msq = NULL;
       int rc = mq_setattr(mqdes, newattr, oldattr);
  -SPEW((stderr, "<--\t%s(0x%x,%p,%p) rc %d\n", __FUNCTION__, mqdes, newattr, 
oldattr, rc));
  +SPEW("<--\t%s(0x%x,%p,%p) rc %d\n", __FUNCTION__, mqdes, newattr, oldattr, 
rc);
       return rc;
   }
   
  @@ -118,7 +130,7 @@
   {
       rpmmsq msq = NULL;
       int rc = mq_notify(mqdes, sevp);
  -SPEW((stderr, "<--\t%s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, sevp, rc));
  +SPEW("<--\t%s(0x%x,%p) rc %d\n", __FUNCTION__, mqdes, sevp, rc);
       return rc;
   }
   
  @@ -143,7 +155,7 @@
            break;
        }
       }
  -SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%u) rc %d\t\t\t\"%.*s\"\n", 
__FUNCTION__, mqdes, buf, (unsigned long)count, prio, rc, nc, buf));
  +SPEW("<--\t%s(0x%x,%p[%lu],%u) rc %d\t\t\t\"%.*s\"\n", __FUNCTION__, mqdes, 
buf, (unsigned long)count, prio, rc, nc, buf);
       return rc;
   }
   
  @@ -169,7 +181,7 @@
            break;
        }
       }
  -SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%u,%p) rc %d\t\t\t\"%.*s\"\n", 
__FUNCTION__, mqdes, buf, (unsigned long)count, prio, abs_timeout, rc, nc, 
buf));
  +SPEW("<--\t%s(0x%x,%p[%lu],%u,%p) rc %d\t\t\t\"%.*s\"\n", __FUNCTION__, 
mqdes, buf, (unsigned long)count, prio, abs_timeout, rc, nc, buf);
       return rc;
   }
   
  @@ -196,7 +208,7 @@
            break;
        }
       }
  -SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", 
__FUNCTION__, mqdes, buf, (unsigned long)count, priop, (long)rc, prio, nc, 
buf));
  +SPEW("<--\t%s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", __FUNCTION__, 
mqdes, buf, (unsigned long)count, priop, (long)rc, prio, nc, buf);
       return rc;
   }
   
  @@ -223,7 +235,7 @@
            break;
        }
       }
  -SPEW((stderr, "<--\t%s(0x%x,%p[%lu],%p,%p) rc %ld prio %u\t\"%.*s\"\n", 
__FUNCTION__, mqdes, buf, (unsigned long)count, priop, abs_timeout, (long)rc, 
prio, nc, buf));
  +SPEW("<--\t%s(0x%x,%p[%lu],%p,%p) rc %ld prio %u\t\"%.*s\"\n", __FUNCTION__, 
mqdes, buf, (unsigned long)count, priop, abs_timeout, (long)rc, prio, nc, buf);
       return rc;
   }
   
  @@ -232,7 +244,7 @@
   {
       rpmmsq msq = NULL;
       int rc = mq_close(mqdes);
  -SPEW((stderr, "<--\t%s(0x%x) rc %d\n", __FUNCTION__, mqdes, rc));
  +SPEW("<--\t%s(0x%x) rc %d\n", __FUNCTION__, mqdes, rc);
       return rc;
   }
   
  @@ -241,7 +253,7 @@
   {
       rpmmsq msq = NULL;
       int rc = mq_unlink(name);
  -SPEW((stderr, "<--\t%s(%s) rc %d\n", __FUNCTION__, name, rc));
  +SPEW("<--\t%s(%s) rc %d\n", __FUNCTION__, name, rc);
       return rc;
   }
   
  @@ -254,7 +266,7 @@
   {
       rpmmsq msq = NULL;
       int rc = msgget(key, msgflg);
  -SPEW((stderr, "<--\t%s(0x%x,0%o) rc %d\n", __FUNCTION__, key, msgflg, rc));
  +SPEW("<--\t%s(0x%x,0%o) rc %d\n", __FUNCTION__, key, msgflg, rc);
       return rc;
   }
   
  @@ -277,7 +289,7 @@
            break;
        }
       }
  -SPEW((stderr, "<--\t%s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", 
__FUNCTION__, msqid, msgp, (unsigned long)msgsz, msgtyp, msgflg, (long)rc, nc, 
buf));
  +SPEW("<--\t%s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", __FUNCTION__, msqid, 
msgp, (unsigned long)msgsz, msgtyp, msgflg, (long)rc, nc, buf);
       return rc;
   }
   
  @@ -300,7 +312,7 @@
            break;
        }
       }
  -SPEW((stderr, "<--\t%s(0x%x,%p,%lu,%d) rc %d\t\t\"%.*s\"\n", __FUNCTION__, 
msqid, msgp, (unsigned long)msgsz, msgflg, rc, nc, buf));
  +SPEW("<--\t%s(0x%x,%p,%lu,%d) rc %d\t\t\"%.*s\"\n", __FUNCTION__, msqid, 
msgp, (unsigned long)msgsz, msgflg, rc, nc, buf);
       return rc;
   }
   
  @@ -309,7 +321,7 @@
   {
       rpmmsq msq = NULL;
       int rc = msgctl(msqid, cmd, buf);
  -SPEW((stderr, "<--\t%s(0x%x,%d,%p) rc %d\n", __FUNCTION__, msqid, cmd, buf, 
rc));
  +SPEW("<--\t%s(0x%x,%d,%p) rc %d\n", __FUNCTION__, msqid, cmd, buf, rc);
       return rc;
   }
   #endif       /* WITH_MSQ */
  @@ -331,14 +343,18 @@
        PRINT(0o, omode);
        PRINT(x, key);
        PRINT(ld, mtype);
  -     if (msq->tid) {
  -         PRINT(lx, tid);
  -         PRINT(d, ntimeout);
  -         PRINT(d, nagain);
  -     }
  +#if defined(WITH_MSQ)
  +     PRINT(d, msgmax);
  +     PRINT(d, msgsize);
  +#endif       /* WITH_MSQ */
        if (stats) {
  +         LOCK(msq->m);
  +         PRINT(d, inflight);
            PRINT(d, nsent);
            PRINT(d, nrecv);
  +         PRINT(d, ntimeout);
  +         PRINT(d, nagain);
  +         UNLOCK(msq->m);
        }
   #undef       PRINT
       }
  @@ -348,15 +364,20 @@
   static void rpmmsqFini(void *_msq)
   {
       rpmmsq msq = (rpmmsq) _msq;
  +
       if (msq) {
  +SPEW("%s: inflight %d:%d sent %d recv %d timeout %d again %d\n", 
__FUNCTION__, msq->inflight, msq->msgmax, msq->nsent, msq->nrecv, 
msq->ntimeout, msq->nagain);
  +     msq->msgmax = MSQ_MSGMAX;
  +     msq->msgsize = MSQ_MSGSIZE;
   
  -     msq->tid = 0;
  -SPEW((stderr, "%s: inflight %d:%d sent %d recv %d timeout %d again %d\n", 
__FUNCTION__, msq->i, msq->imax, msq->nsent, msq->nrecv, msq->ntimeout, 
msq->nagain));
        Z(pthread_mutex_destroy(&msq->m));
        Z(pthread_cond_destroy(&msq->e));
        Z(pthread_cond_destroy(&msq->f));
  -     msq->i = 0;
  -     msq->imax = 0;
  +     msq->inflight = 0;
  +     msq->nsent = 0;
  +     msq->nrecv = 0;
  +     msq->ntimeout = 0;
  +     msq->nagain = 0;
   
        msq->flags = 0;
        msq->qname = _free(msq->qname);
  @@ -368,10 +389,8 @@
        msq->key = 0;
        msq->mtype = 0;
   
  -     msq->ntimeout = 0;
  -     msq->nagain = 0;
  -     msq->nsent = 0;
  -     msq->nrecv = 0;
  +     if (msq->zlog)
  +         msq->zlog = rpmzLogDump(msq->zlog, NULL);
       }
   }
   
  @@ -480,11 +499,31 @@
       msq->omode = omode & 0777;       /* XXX mask IPC_CREAT | IPC_EXCL */
       msq->mtype = getpid();   /* XXX SysV message queues only. */
   
  -    /* XXX TODO: merge the rpmsqFdopen path. */
  +    /* XXX TODO: configure non-default settings. */
  +    msq->msgmax = MSQ_MSGMAX;
  +    msq->msgsize = MSQ_MSGSIZE;
  +
  +    /* Initialize the monitor. */
  +    /* XXX TODO: use POSIX shared mutexes. */
  +    Z(pthread_mutex_init(&msq->m, NULL));
  +    Z(pthread_cond_init(&msq->e, NULL));
  +    Z(pthread_cond_init(&msq->f, NULL));
  +    msq->inflight = 0;
  +    msq->nsent = 0;
  +    msq->nrecv = 0;
  +    msq->ntimeout = 0;
  +    msq->nagain = 0;
  +    msq->zlog = rpmzLogNew(NULL);
  +
  +/* XXX TODO: fix rpmmsqFdopen() entry. */
  +assert(path != NULL);
  +assert(fdno == -1);
       if (path == NULL && fdno != -1) {                /* rpmmsqFdopen */
        path = "/rpm";  /* XXX */
  +     msq->qid = fdno;
       }
   
  +    /* XXX TODO: append ftok(3) keys to path. */
       msq->qname = rpmGetPath("/", path, NULL);
   
       key_t key = 0;
  @@ -536,44 +575,32 @@
       }
   
       /* Set the QID. */
  -    if (fdno != -1) {
  -     msq->qid = fdno;
  -    } else
  +    if (msq->qid == -1)
       switch (msq->flags & RPMMSQ_TYPE_MASK) {
       case RPMMSQ_TYPE_DEFAULT:
       case RPMMSQ_TYPE_POSIX:
       {
   #if defined(WITH_MQ)
  -     /* XXX TODO: configure non-default settings. */
  +     /* XXX put *open in a monitor. */
        struct mq_attr _attrs = {
            .mq_flags = (oflags & O_NONBLOCK),
  -         .mq_maxmsg = 10,    /* /proc/sys/fs/mqueue/msg_default */
  -         .mq_msgsize = 8192, /* /proc/sys/fs/mqueue/msgsize_default */
  +         .mq_maxmsg = msq->msgmax,
  +         .mq_msgsize = msq->msgsize,
            .mq_curmsgs = 0,
        }, *attrs = &_attrs;
        msq->qid = Mq_open(msq->qname, msq->oflags, msq->omode, attrs);
   
  -     /* Initialize the monitor. */
  -     msq->tid = 0;
  -     Z(pthread_mutex_init(&msq->m, NULL));
  -     Z(pthread_cond_init(&msq->e, NULL));
  -     Z(pthread_cond_init(&msq->f, NULL));
  -     msq->i = 0;
  -     msq->imax = 10;
  -
        if (msq->qid != -1 && MSQF_ISSET(LOOP)) {
            pthread_attr_t attr;
  -         int xx;
  -         xx = pthread_getattr_default_np(&attr);
  -         xx = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  -
  +         Z(pthread_getattr_default_np(&attr));
  +         Z(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED));
            struct sigevent sigev = {
                .sigev_notify = SIGEV_THREAD,
                .sigev_notify_function = rpmmsqReader,
                .sigev_notify_attributes = &attr,
                .sigev_value.sival_ptr = msq,
            };
  -         xx = rpmmsqNotify(msq, &sigev);
  +         Z(rpmmsqNotify(msq, &sigev));
        }
   #endif       /* WITH_MQ */
       }        break;
  @@ -587,7 +614,7 @@
        break;
       }
   
  -SPEW((stderr, "<== %s(%s,%s,%d,0x%x) qid %d\n", __FUNCTION__, path, fmode, 
fdno, flags, (msq ? msq->qid : -1)));
  +SPEW("<== %s(%s,%s,%d,0x%x) qid %d\n", __FUNCTION__, path, fmode, fdno, 
flags, (msq ? msq->qid : -1));
   
       return rpmmsqLink(msq);
   }
  @@ -609,15 +636,16 @@
   
        /* Consumer in monitor. */
        LOCK(msq->m);
  -     while (msq->i == 0)
  +     while (msq->inflight == 0)
            WAIT(msq->e, msq->m);
        rc = Mq_receive(msq->qid, buf, count, &prio);
  -     if (msq->i-- == msq->imax)
  +     if (rc >= 0)
  +         msq->nrecv++;
  +     if (msq->inflight-- == msq->msgmax)
            SIGNAL(msq->f);
        UNLOCK(msq->m);
   
        if (rc >= 0) {
  -         msq->nrecv++;
            if (priop)
                *priop = prio;
        }
  @@ -646,7 +674,7 @@
        break;
       }
   
  -SPEW((stderr, "<== %s(%p,%p[%lu],%p) qid %d rc %ld *priop %lu\n", 
__FUNCTION__, msq, buf, count, priop, (msq ? msq->qid : -1), rc, (priop ? 
*priop : 0)));
  +SPEW("<== %s(%p,%p[%lu],%p) qid %d rc %ld *priop %lu\n", __FUNCTION__, msq, 
buf, count, priop, (msq ? msq->qid : -1), rc, (priop ? *priop : 0));
       return rc;
   }
   
  @@ -663,10 +691,10 @@
   
        /* Producer in monitor. */
        LOCK(msq->m);
  -     while (msq->i == msq->imax)
  +     while (msq->inflight == msq->msgmax)
            WAIT(msq->f, msq->m);
        rc = Mq_send(msq->qid, buf, count, prio);
  -     if (msq->i++ == 0)
  +     if (msq->inflight++ == 0)
            SIGNAL(msq->e);
        UNLOCK(msq->m);
   
  @@ -708,7 +736,7 @@
        break;
       }
   
  -SPEW((stderr, "<== %s(%p,%p[%lu],%lu) qid %d rc %ld\n", __FUNCTION__, msq, 
buf, count, prio, (msq ? msq->qid : -1), rc));
  +SPEW("<== %s(%p,%p[%lu],%lu) qid %d rc %ld\n", __FUNCTION__, msq, buf, 
count, prio, (msq ? msq->qid : -1), rc);
       return rc;
   }
   
  @@ -723,7 +751,7 @@
       off_t p = pos;
   #endif
       int rc = -2;     /* assume failure */
  -SPEW((stderr, "<== %s(%p, %ld, SEEK_%s) qid %d rc %d\n", __FUNCTION__, msq, 
p, SEEK_[whence&0x3], (msq ? msq->qid : -1), rc));
  +SPEW("<== %s(%p, %ld, SEEK_%s) qid %d rc %d\n", __FUNCTION__, msq, p, 
SEEK_[whence&0x3], (msq ? msq->qid : -1), rc);
       return rc;
   }
   
  @@ -731,7 +759,7 @@
   {
       int rc = -2;     /* assume failure */
   
  -SPEW((stderr, "==> %s(%p,%d) qid %d\n", __FUNCTION__, msq, delete, (msq ? 
msq->qid : -1)));
  +SPEW("==> %s(%p,%d) qid %d\n", __FUNCTION__, msq, delete, (msq ? msq->qid : 
-1));
       if (msq)
       switch (msq->flags & RPMMSQ_TYPE_MASK) {
       case RPMMSQ_TYPE_DEFAULT:
  @@ -744,14 +772,14 @@
   
            /* Producer in monitor. */
            LOCK(msq->m);
  -         while (msq->i == msq->imax)
  +         while (msq->inflight == msq->msgmax)
                WAIT(msq->f, msq->m);
            /* Turn off the sigev detached thread. */
            if (MSQF_ISSET(LOOP))
                rc = rpmmsqNotify(msq, NULL);
            rc = Mq_close(msq->qid);
            msq->qid = -1;
  -         if (msq->i++ == 0)
  +         if (msq->inflight++ == 0)
                SIGNAL(msq->e);
            UNLOCK(msq->m);
   
  @@ -777,7 +805,7 @@
       default:
        break;
       }
  -SPEW((stderr, "<== %s(%p,%d) qid %d rc %d\n", __FUNCTION__, msq, delete, 
(msq ? msq->qid : -1), rc));
  +SPEW("<== %s(%p,%d) qid %d rc %d\n", __FUNCTION__, msq, delete, (msq ? 
msq->qid : -1), rc);
       return rc;
   }
   
  @@ -787,7 +815,7 @@
   if (strchr(fmode, '?') || _rpmmsq_debug || _rpmio_debug)
   fprintf(stderr, "==> %s(%s,%s)\n", __FUNCTION__, path, fmode);
       msq = rpmmsqNew(path, fmode, -1, 0);
  -SPEW((stderr, "<== %s(%s,%s) qid %d\n", __FUNCTION__, path, fmode, (msq ? 
msq->qid : -1)));
  +SPEW("<== %s(%s,%s) qid %d\n", __FUNCTION__, path, fmode, (msq ? msq->qid : 
-1));
       return msq;
   }
   
  @@ -798,7 +826,7 @@
   if (strchr(fmode, '?') || _rpmmsq_debug || _rpmio_debug)
   fprintf(stderr, "==> %s(%d,%s)\n", __FUNCTION__, fdno, fmode);
       msq = rpmmsqNew(NULL, fmode, fdno, 0);
  -SPEW((stderr, "<== %s(%d,%s) qid %d\n", __FUNCTION__, fdno, fmode, (msq ? 
msq->qid : -1)));
  +SPEW("<== %s(%d,%s) qid %d\n", __FUNCTION__, fdno, fmode, (msq ? msq->qid : 
-1));
       return msq;
   }
   
  @@ -807,7 +835,7 @@
       int rc = -2;     /* assume failure */
       rpmmsq msq = (rpmmsq) _msq;
       /* XXX TODO: drain the message queue? */
  -SPEW((stderr, "<== %s(%p) qid %d rc %d\n", __FUNCTION__, _msq, (msq ? 
msq->qid : -1), rc));
  +SPEW("<== %s(%p) qid %d rc %d\n", __FUNCTION__, _msq, (msq ? msq->qid : -1), 
rc);
       return rc;
   }
   
  @@ -856,7 +884,7 @@
        break;
       }
   
  -SPEW((stderr, "<== %s(%p,%d,%p) qid %d rc %d\n", __FUNCTION__, msq, cmd, 
buf, (msq ? msq->qid : -1), rc));
  +SPEW("<== %s(%p,%d,%p) qid %d rc %d\n", __FUNCTION__, msq, cmd, buf, (msq ? 
msq->qid : -1), rc);
       return rc;
   }
   
  @@ -881,7 +909,7 @@
        break;
       }
   
  -SPEW((stderr, "<== %s(%p,%p) qid %d rc %d\n", __FUNCTION__, msq, _sevp, (msq 
? msq->qid : -1), rc));
  +SPEW("<== %s(%p,%p) qid %d rc %d\n", __FUNCTION__, msq, _sevp, (msq ? 
msq->qid : -1), rc);
       return rc;
   }
   
  @@ -891,7 +919,7 @@
   
       rc = 0;
   
  -SPEW((stderr, "<== %s(%p,%p[%u],%lu) qid %d rc %d\n", __FUNCTION__, msq, b, 
(unsigned)nb, prio, (msq ? msq->qid : -1), rc));
  +SPEW("<== %s(%p,%p[%u],%lu) qid %d rc %d\n", __FUNCTION__, msq, b, 
(unsigned)nb, prio, (msq ? msq->qid : -1), rc);
   
       return rc;
   }
  @@ -902,9 +930,7 @@
       rpmmsq msq = (rpmmsq) sv.sival_ptr;
   assert(msq = rpmmsqLink(msq));
   
  -    msq->tid = pthread_self();
  -
  -SPEW((stderr, "==> %s(%p) qid %d tid %ld\n", __FUNCTION__, msq, msq->qid, 
msq->tid));
  +SPEW("==> %s(%p) qid %d\n", __FUNCTION__, msq, msq->qid);
       char b[BUFSIZ];
       size_t nb = sizeof(b);
       int rc;
  @@ -915,7 +941,7 @@
   
        /* Consumer in monitor. */
        LOCK(msq->m);
  -     while (msq->i == 0)
  +     while (msq->inflight == 0)
            WAIT(msq->e, msq->m);
        errno = 0;
        do {
  @@ -933,23 +959,23 @@
                    continue;
                }
            }
  +         if (rc >= 0)
  +             msq->nrecv++;
            break;
        } while (rc < 0);
  -     if (msq->i-- == msq->imax)
  +     if (msq->inflight-- == msq->msgmax)
            SIGNAL(msq->f);
        UNLOCK(msq->m);
   
        if (rc >= 0) {
  -         msq->nrecv++;
            /* Deliver the mssage through a callback. */
            int xx = rpmmsqDeliver(msq, b, rc, prio);
            (void)xx;
        }
       }
   
  -SPEW((stderr, "<== %s(%p) qid %d tid %ld rc %d\n", __FUNCTION__, msq, 
msq->qid, msq->tid, rc));
  +SPEW("<== %s(%p) qid %d rc %d\n", __FUNCTION__, msq, msq->qid, rc);
   
  -    msq->tid = 0;
       msq = rpmmsqFree(msq);
   
   #endif       /* WITH_MQ */
  @@ -1095,7 +1121,7 @@
        break;
       }
   
  -SPEW((stderr, "<== %s(%p,%p) qid %d rc %d\n", __FUNCTION__, msg, msq, (msq ? 
msq->qid : -1), rc));
  +SPEW("<== %s(%p,%p) qid %d rc %d\n", __FUNCTION__, msg, msq, (msq ? msq->qid 
: -1), rc);
       return rc;
   }
   
  @@ -1238,7 +1264,7 @@
       off_t p = pos;
   #endif
   
  -SPEW((stderr, "==> %s(%p, %ld, SEEK_%s)\n", __FUNCTION__, fd, p, 
SEEK_[whence&0x3]));
  +SPEW("==> %s(%p, %ld, SEEK_%s)\n", __FUNCTION__, fd, p, SEEK_[whence&0x3]);
   assert(msq != NULL);
       return rpmmsqSeek(msq, pos, whence);
   }
  @@ -1254,12 +1280,8 @@
   
       fdstat_enter(fd, FDSTAT_CLOSE);
       rc = rpmmsqClose(msq, 0);
  -
  -    /* XXX TODO: preserve fd if errors */
  -
  -    if (fd)
  -    if (rc >= 0)
  -     fdstat_exit(fd, FDSTAT_CLOSE, rc);
  +    fdstat_exit(fd, FDSTAT_CLOSE, rc);
  +    fdSetFdno(fd, -1);
   
   DBGIO(fd, (stderr, "<==\tmsqClose(%p) rc %lx %s\n", cookie, (unsigned 
long)rc, fdbg(fd)));
   
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmsq.h
  ============================================================================
  $ cvs diff -u -r1.1.2.10 -r1.1.2.11 rpmmsq.h
  --- rpm/rpmio/rpmmsq.h        26 May 2017 13:22:17 -0000      1.1.2.10
  +++ rpm/rpmio/rpmmsq.h        26 May 2017 19:53:49 -0000      1.1.2.11
  @@ -47,20 +47,23 @@
       int qid;                 /*!< message queue id. */
       int oflags;                      /*!< message queue open flags. */
       int omode;                       /*!< message queue open mode. */
  +
       key_t key;                       /*!< SysV: ftok(3) or IPC_PRIVATE. */
       long mtype;                      /*!< SysV: message type. */
  -    pthread_t tid;           /*!< LOOP: sigev thread id. */
  +
  +    int msgmax;                      /*!< max. inflight messages. */
  +    int msgsize;                     /*!< max. message size. */
   
       pthread_mutex_t m;               /*!< monitor mutex. */
       pthread_cond_t  e;               /*!< empty condition. */
       pthread_cond_t  f;               /*!< full condition. */
  -    int i;                   /*!< no. of inflight messages. */
  -    int imax;                        /*!< max. inflight messages. */
  -
  -    int ntimeout;            /*!< LOOP: no. of receive timeouts. */
  -    int nagain;                      /*!< LOOP: no. of waits (O_NONBLOCK). */
  +    int inflight;            /*!< no. of inflight messages. */
       int nsent;                       /*!< no. messages sent. */
       int nrecv;                       /*!< no. messages received. */
  +    int ntimeout;            /*!< no. of receive timeouts. */
  +    int nagain;                      /*!< no. of waits (O_NONBLOCK). */
  +
  +    rpmzLog zlog;
   };
   #endif       /* _RPMMSQ_INTERNAL */
   
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                [email protected]

Reply via email to