RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  [email protected]
  Module: rpm                              Date:   22-May-2017 23:07:19
  Branch: rpm-5_4                          Handle: 2017052221071900

  Modified files:           (Branch: rpm-5_4)
    rpm                     CHANGES
    rpm/rpmio               msqio.c rpmio.c rpmmsq.h tmq.c

  Log:
    - msqio: loopback mode refactoring.

  Summary:
    Revision    Changes     Path
    1.3501.2.556+1  -0      rpm/CHANGES
    1.1.2.6     +94 -51     rpm/rpmio/msqio.c
    1.230.2.49  +4  -0      rpm/rpmio/rpmio.c
    1.1.2.6     +6  -3      rpm/rpmio/rpmmsq.h
    1.1.2.6     +27 -52     rpm/rpmio/tmq.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/CHANGES
  ============================================================================
  $ cvs diff -u -r1.3501.2.555 -r1.3501.2.556 CHANGES
  --- rpm/CHANGES       22 May 2017 13:30:41 -0000      1.3501.2.555
  +++ rpm/CHANGES       22 May 2017 21:07:19 -0000      1.3501.2.556
  @@ -1,4 +1,5 @@
   5.4.17 -> 5.4.18:
  +    - jbj: msqio: loopback mode refactoring.
       - jbj: msqio: make peace with mq_notify (and sigev detached threads).
       - jbj: msqio: add (and prefer) POSIX message queues (if available).
       - jbj: msqio: permit building --without-msq.
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/msqio.c
  ============================================================================
  $ cvs diff -u -r1.1.2.5 -r1.1.2.6 msqio.c
  --- rpm/rpmio/msqio.c 22 May 2017 13:30:42 -0000      1.1.2.5
  +++ rpm/rpmio/msqio.c 22 May 2017 21:07:19 -0000      1.1.2.6
  @@ -29,9 +29,10 @@
   
   int _rpmmsq_debug;
   #define F_ISSET(_f, _FLAG) (((_f) & ((RPMMSQ_FLAGS_##_FLAG) & ~0x40000000)) 
!= RPMMSQ_FLAGS_NONE)
  -#define MF_ISSET(_FLAG) F_ISSET(msq->flags, _FLAG)
  +#define QF_ISSET(_FLAG) F_ISSET(msq->flags, _FLAG)
   
  -#define SPEW(_list)  if (_rpmmsq_debug || _rpmio_debug) fprintf _list
  +#define SPEW(_list) \
  +    if (QF_ISSET(DEBUG) || _rpmmsq_debug || _rpmio_debug) fprintf _list
   
   #define      MSQONLY(fd)     assert(fdGetIo(fd) == msqio)
   
  @@ -49,8 +50,10 @@
        msq->oflags = 0;
        msq->omode = 0;
        msq->key = 0;
  -     msq->tid = 0;
        msq->mtype = 0;
  +     msq->tid = 0;
  +     msq->ntimeout = 0;
  +     msq->nwait = 0;
        msq->nsent = 0;
        msq->nrecv = 0;
       }
  @@ -58,34 +61,11 @@
   
   RPMIOPOOL_MODULE(msq)
   
  -#ifdef       REFERENCE
  -key_t
  -xftok (const char *path, int proj_id)
  -{
  -    key_t key = -1;  /* assume failure */
  -    struct stat st;
  -
  -    if (stat(path, &st) < 0)
  -     goto exit;
  -
  -    key = 0;
  -    key |= ((st.st_ino & 0xffff);
  -    key |= ((st.st_dev & 0xff) << 16);
  -    key |= ((proj_id & 0xff) << 24));
  -
  -exit:
  -fprintf(stderr, "<-- %s(%p,0x%02x) key 0x%x\n", __FUNCTION__, (proj_id & 
0xff), key);
  -  return key;
  -}
  -#endif
  -
   rpmmsq rpmmsqNew(const char * path, const char * fmode, int fdno, unsigned 
flags)
   {
       rpmmsq msq = rpmmsqGetPool(_rpmmsqPool);
   
   assert(fmode != NULL);               /* XXX return NULL instead? */
  -    msq->flags = flags;
  -    msq->qid = -1;
   
       const char * s = fmode;
       int c;
  @@ -122,9 +102,14 @@
            continue;
            break;
        case 'e':       /* O_CLOEXEC */
  +         /* XXX always set with mq_open */
            oflags |= O_CLOEXEC;
            continue;
            break;
  +     case 'l':       /* XXX loopback mode */
  +         flags |= RPMMSQ_FLAGS_LOOP;
  +         continue;
  +         break;
        case 'n':       /* XXX O_NONBLOCK */
            oflags |= O_NONBLOCK;
            continue;
  @@ -137,6 +122,10 @@
            omode |= IPC_EXCL;
            continue;
            break;
  +     case '?':       /* XXX loopback mode */
  +         flags |= RPMMSQ_FLAGS_DEBUG;
  +         continue;
  +         break;
        default:
            continue;
            break;
  @@ -144,32 +133,49 @@
        break;
       }
   
  -#if defined(WITH_MQ)
  -
  +    msq->flags = flags;
       msq->qname = rpmGetPath("/", path, NULL);
  -    struct mq_attr _attrs = {
  -     .mq_flags = (oflags & O_NONBLOCK),
  -     .mq_maxmsg = 10,        /* /proc/sys/fs/mqueue/msg_default */
  -     .mq_msgsize = 8192,     /* /proc/sys/fs/mqueue/msgsize_default */
  -     .mq_curmsgs = 0,
  -    }, *attrs = &_attrs;
  -    attrs = NULL;            /* XXX */
  +    msq->qid = -1;
       msq->oflags = oflags;
       msq->omode = omode & 0777;       /* XXX mask IPC_CREAT | IPC_EXCL */
  -    msq->mtype = getpid();   /* XXX single queue, multiple RW processes. */
  +    msq->mtype = getpid();   /* XXX SysV message queues only. */
  +
  +#if defined(WITH_MQ)
  +
  +    int xx;
   
       /* Reset the queue by removing. */
  -    if (MF_ISSET(RESET)) {
  -     int xx = mq_unlink(msq->qname);
  +    if (QF_ISSET(RESET)) {
  +     xx = mq_unlink(msq->qname);
   SPEW((stderr, "<-- %s(%s) rc %d\n", "mq_unlink", msq->qname, xx));
       }
   
  +    struct mq_attr _attrs = {
  +     .mq_flags = (oflags & O_NONBLOCK),
  +     .mq_maxmsg = 10,        /* /proc/sys/fs/mqueue/msg_default */
  +     .mq_msgsize = 8192,     /* /proc/sys/fs/mqueue/msgsize_default */
  +     .mq_curmsgs = 0,
  +    }, *attrs = &_attrs;
       msq->qid = mq_open(msq->qname, msq->oflags, msq->omode, attrs);
   SPEW((stderr, "<-- %s(%s,0x%x,0%o,%p) qid %d\n", "mq_open", msq->qname, 
msq->oflags, msq->omode, attrs, msq->qid));
  +
  +    /* (loopback mode) Configure the detached reader. */
  +    if (msq->qid != -1 && QF_ISSET(LOOP)) {
  +     pthread_attr_t attr;
  +     xx = pthread_getattr_default_np(&attr);
  +     xx = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  +
  +     struct sigevent sigev = {
  +             .sigev_notify = SIGEV_THREAD,
  +             .sigev_notify_function = rpmmsqReader,
  +             .sigev_notify_attributes = &attr,
  +             .sigev_value.sival_ptr = msq,
  +     };
  +     xx = rpmmsqNotify(msq, &sigev);
  +    }
    
   #elif defined(WITH_MSQ)
   
  -    msq->qname = rpmGetPath("/", path, NULL);
       if (!strcmp(msq->qname, "/private")
        || !strcmp(msq->qname, "/IPC_PRIVATE"))
        msq->key = IPC_PRIVATE;
  @@ -191,9 +197,6 @@
            rpmlog(lvl, "Using program key 0x%x\n", msq->key);
        }
       }
  -    msq->oflags = oflags;
  -    msq->omode = omode & 0777;       /* XXX mask IPC_CREAT | IPC_EXCL */
  -    msq->mtype = getpid();   /* XXX single queue, multiple RW processes. */
       msq->qid = msgget(msq->key, omode);
   SPEW((stderr, "<-- %s(0x%x,0%o) qid %d\n", "msqget", msq->key, omode, 
msq->qid));
   
  @@ -208,6 +211,8 @@
   
   #if defined(WITH_MQ)
   
  +    if (QF_ISSET(LOOP)) return 0;    /* XXX EOF with loopback service. */
  +
       unsigned int prio = 0;
       int nb = 0;
       rc = mq_receive(msq->qid, buf, count, &prio);
  @@ -256,6 +261,16 @@
        msq->nsent++;
       }
   
  +    /* (loopback mode) Wait for rpmmsqReader to start up. */
  +    sched_yield();   /* Give detached threads a chance to run. */
  +    if (QF_ISSET(LOOP) && prio == 0 && msq->tid == 0) {
  +     struct timespec req = { 0, 1000*1000 };
  +     while (msq->tid == 0) {
  +         nanosleep(&req, NULL);
  +         msq->nwait++;
  +     }
  +    }
  +
   #elif defined(WITH_MSQ)
   
       struct msgbuf * msgp = xmalloc(sizeof(*msgp) + count);
  @@ -290,19 +305,43 @@
   int rpmmsqClose(rpmmsq msq, int delete)
   {
       int rc = -2;     /* assume failure */
  +
   #if defined(WITH_MQ)
  +
  +    if (QF_ISSET(LOOP)) {
  +     /* (loopback mode) Terminate rpmmsqReader. */
  +     if (msq->tid) {
  +         rc = rpmmsqSend(msq, __FUNCTION__, sizeof(__FUNCTION__)-1, 
RPMMSQ_PRIO_EXIT);
  +         struct timespec req = { 0, 1000*1000 };
  +         do {
  +             nanosleep(&req, NULL);
  +             msq->nwait++;
  +         } while (msq->tid != 0);
  +     }
  +     /* (loopback mode) Turn off the sigev detached thread. */
  +     rc = rpmmsqNotify(msq, NULL);
  +    }
  +
  +SPEW((stderr, "%s: sent %d recv %d wait %d timeout %d\n", __FUNCTION__, 
msq->nsent, msq->nrecv, msq->nwait, msq->ntimeout));
  +
       if (_rpmmsq_debug)
        rpmmsqDump(__FUNCTION__, msq, NULL);
       rc = mq_close(msq->qid);
  -    if (!rc && delete && msq->qname && !strcmp(msq->qname, "/private"))
  -     rc = mq_unlink(msq->qname);
  +SPEW((stderr, "<-- %s(0x%x) rc %d\n", "mq_close", msq->qid, rc));
  +    if (!rc && delete && msq->qname && !strcmp(msq->qname, "/private")) {
  +     rc = mq_unlink(msq->qname);             /* XXX rpmmsqReset? */
  +SPEW((stderr, "<-- %s(%s) rc %d\n", "mq_unlink", msq->qname, rc));
  +    }
       rc = 0;  /* XXX */
  +
   #elif defined(WITH_MSQ)
  +
       if (_rpmmsq_debug)
        rpmmsqDump(__FUNCTION__, msq, NULL);
       if (delete || msq->key == IPC_PRIVATE)
  -     (void) rpmmsqCtl(msq, IPC_RMID, NULL);
  -    rc = 0;
  +     rc = rpmmsqCtl(msq, IPC_RMID, NULL);    /* XXX rpmmsqReset? */
  +    rc = 0;  /* XXX */
  +
   #endif       /* WITH_MQ */
       return rc;
   }
  @@ -392,23 +431,26 @@
       rpmmsq msq = (rpmmsq) sv.sival_ptr;
   
   assert(msq);
  -
  +    msq = rpmmsqLink(msq);
       msq->tid = pthread_self();
   
  -SPEW((stderr, "==> %s(%p) qid %d\n", __FUNCTION__, msq, (msq ? msq->qid : 
-1)));
  +SPEW((stderr, "==> %s(%p) qid %d tid %ld\n", __FUNCTION__, msq, msq->qid, 
msq->tid));
       char b[BUFSIZ];
       size_t nb = sizeof(b);
       struct timespec ts = { 0, 0 };
       int rc;
   
       while (1) {
  -     ts.tv_sec = time(NULL) + 2;
  +     int lrto_secs = 2;      /* XXX loopback retry timeout seconds */
        unsigned prio = 0;
  +     ts.tv_sec = time(NULL) + lrto_secs;
        int rc = mq_timedreceive(msq->qid, b, nb, &prio, &ts);
        int nr = (rc >= 0 ? rc : 0);
   SPEW((stderr, "<-- %s(0x%x,%p[%lu]) rc %d prio %u\t\"%.*s\"\n", 
"mq_timedreceive", msq->qid, b, (unsigned long)nb, rc, prio, nr, b));
  -     if (rc < 0 && errno == ETIMEDOUT)
  +     if (rc < 0 && errno == ETIMEDOUT) {
  +         msq->ntimeout;
            continue;
  +     }
        msq->nrecv++;
        /* Exit immediately on error or highest priority message. */
        if (rc < 0 || !strcmp(b, "XXX") || prio == RPMMSQ_PRIO_EXIT)
  @@ -437,7 +479,7 @@
   #if defined(__linux__)
       const char * lpath = rpmGetPath("/dev/mqueue/", msq->qname, NULL);
       struct stat st;
  -    rc = stat(lpath, &st);
  +    rc = Stat(lpath, &st);
       if (!rc) {
        fprintf(fp, "\t    dev: 0x%lx\n", (unsigned long)st.st_dev);
        fprintf(fp, "\t    ino: %lu\n", (unsigned long)st.st_ino);
  @@ -453,6 +495,7 @@
        fprintf(fp, "\t  mtime: %lu\n", (unsigned long)st.st_mtime);
        fprintf(fp, "\t  ctime: %lu\n", (unsigned long)st.st_mtime);
       }
  +    /* XXX .fdio avoids select/poll issues on /dev/mqueue with .ufdio. */
       {        FD_t fd = Fopen(lpath, "r.fdio");
        if (fd) {
            unsigned char b[BUFSIZ];
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmio.c
  ============================================================================
  $ cvs diff -u -r1.230.2.48 -r1.230.2.49 rpmio.c
  --- rpm/rpmio/rpmio.c 21 May 2017 05:30:35 -0000      1.230.2.48
  +++ rpm/rpmio/rpmio.c 22 May 2017 21:07:19 -0000      1.230.2.49
  @@ -2640,6 +2640,7 @@
    * - bzopen: 'q' sets verbosity to 0
    * - bzopen: 'v' does verbosity++ (up to 4)
    * - HACK:   '.' terminates, rest is type of I/O
  + * - HACK:   'l' loopback mode (msqio)
    * - HACK:   'n' non-blocking (O_NONBLOCK)
    * - HACK:   't' truncate (O_TRUNC)
    * - HACK:   'D' sync (O_DIRECT)
  @@ -2712,6 +2713,8 @@
            if (--nstdio > 0) *stdio++ = c;
            continue;
            break;
  +     case 'l':               /* XXX loopback mode (rpmmsqNew) */
  +         goto other;
        case 'n':
            flags |= O_NONBLOCK;
            goto other;
  @@ -2921,6 +2924,7 @@
   #if defined(WITH_MQ) || defined(WITH_MSQ)
       if (end && !strcmp(end, "msqio")) {
        fd = msqio->_fopen(path, fmode);
  +     fd->flags = flags;
        goto exit;
       } else
   #endif
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmsq.h
  ============================================================================
  $ cvs diff -u -r1.1.2.5 -r1.1.2.6 rpmmsq.h
  --- rpm/rpmio/rpmmsq.h        22 May 2017 13:30:42 -0000      1.1.2.5
  +++ rpm/rpmio/rpmmsq.h        22 May 2017 21:07:19 -0000      1.1.2.6
  @@ -14,9 +14,10 @@
   typedef enum rpmmsqFlags_e {
       RPMMSQ_FLAGS_NONE                = 0,
   
  -    RPMMSQ_FLAGS_RESET               = _MFB( 0),
  -    RPMMSQ_FLAGS_NOTIFY              = _MFB( 1),
  -    RPMMSQ_FLAGS_INFO                = _MFB( 2),
  +    RPMMSQ_FLAGS_INFO                = _MFB( 0),
  +    RPMMSQ_FLAGS_LOOP                = _MFB( 1),
  +    RPMMSQ_FLAGS_RESET               = _MFB( 2),
  +    RPMMSQ_FLAGS_DEBUG               = _MFB( 3),
   
   } rpmmsqFlags;
   #undef  _MFB
  @@ -39,6 +40,8 @@
       long mtype;
   
       volatile pthread_t tid;  /*!< sigev thread id. */
  +    int ntimeout;
  +    int nwait;
   
       int nsent;
       int nrecv;
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/tmq.c
  ============================================================================
  $ cvs diff -u -r1.1.2.5 -r1.1.2.6 tmq.c
  --- rpm/rpmio/tmq.c   22 May 2017 13:30:42 -0000      1.1.2.5
  +++ rpm/rpmio/tmq.c   22 May 2017 21:07:19 -0000      1.1.2.6
  @@ -41,7 +41,6 @@
   #include <poptIO.h>
   #include <argv.h>
   
  -#define      _RPMMSQ_INTERNAL
   #include <rpmmsq.h>
   
   #include "debug.h"
  @@ -49,9 +48,15 @@
   static int _debug = 0;
   
   static rpmmsqFlags flags =
  -     RPMMSQ_FLAGS_RESET | RPMMSQ_FLAGS_NOTIFY | RPMMSQ_FLAGS_INFO;
  +#ifdef       DYING
  +     RPMMSQ_FLAGS_DEBUG |
  +     RPMMSQ_FLAGS_RESET |
  +     RPMMSQ_FLAGS_LOOP |
  +     RPMMSQ_FLAGS_INFO |
  +#endif
  +     0;
  +
   static int priority;
  -static int notify;
   
   #define SPEW(_list)     if (_rpmmsq_debug || _rpmio_debug) fprintf _list
   
  @@ -358,27 +363,11 @@
       int xx;
   
       const char *qname = "/rpm";;
  -    rpmmsq msq = rpmmsqNew(qname, "w+", -1, flags);
  +    rpmmsq msq = rpmmsqNew(qname, "w+l?", -1, flags);
   
  +    memset(b, 0, nb);
       if (msq) {
   
  -     memset(b, 0, nb);
  -
  -     if (notify) {
  -
  -         pthread_attr_t attr;
  -         xx = pthread_getattr_default_np(&attr);
  -         xx = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  -
  -         struct sigevent sigev = {
  -             .sigev_notify = SIGEV_THREAD,
  -             .sigev_notify_function = rpmmsqReader,
  -             .sigev_notify_attributes = &attr,
  -             .sigev_value.sival_ptr = rpmmsqLink(msq),
  -         };
  -         xx = rpmmsqNotify(msq, &sigev);
  -     }
  -
        strcpy(b, "yadda yadda");
        blen = strlen(b);
        xx = rpmmsqSend(msq, b, blen, priority);
  @@ -387,38 +376,18 @@
        blen = strlen(b);
        xx = rpmmsqSend(msq, b, blen, priority);
   
  -     if (!notify) {
  -         memset(b, 0, nb);
  -         xx = rpmmsqRecv(msq, b, nb, NULL);
  -         memset(b, 0, nb);
  -         xx = rpmmsqRecv(msq, b, nb, NULL);
  -     }
  -    }
  -
  -    int nwaits = 0;
  -    if (notify) {
  -     struct timespec req = { 0, 1000*1000 };
  -     while (msq->tid == 0) {
  -         nanosleep(&req, NULL);
  -         nwaits++;
  -     }
  -     strcpy(b, __FUNCTION__);
  -     blen = strlen(b);
  -     xx = rpmmsqSend(msq, b, blen, RPMMSQ_PRIO_EXIT);
  -     while (msq->tid != 0) {
  -         nanosleep(&req, NULL);
  -         nwaits++;
  -     }
  +     memset(b, 0, nb);
  +     xx = rpmmsqRecv(msq, b, nb, NULL);
  +     memset(b, 0, nb);
  +     xx = rpmmsqRecv(msq, b, nb, NULL);
       }
   
  -fprintf(stderr, "%s: sent %d recv %d nwaits %d\n", __FUNCTION__, msq->nsent, 
msq->nrecv, nwaits);
  -
       xx = rpmmsqClose(msq, 1);
       msq = rpmmsqFree(msq);
   
  -    FD_t fd = Fopen(qname, "w+.msqio");
  +    /* XXX mq_notify memory leak if loopback mode. */
  +    FD_t fd = Fopen(qname, "w+?.msqio");
       if (fd) {
  -     memset(b, 0, blen);
        strcpy(b, "foo bar baz");
        blen = strlen(b);
        xx = Fwrite(b, 1, blen, fd);
  @@ -434,13 +403,21 @@
   }
   
   /*==============================================================*/
  +#if !defined(POPT_BIT_XOR)
  +#define      POPT_BIT_XOR    (POPT_ARG_VAL|POPT_ARGFLAG_XOR)
  +#endif
  +
   static struct poptOption rpmmqOptionsTable[] = {
    { "debug", 'd', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN,       &_debug, 1,
        NULL, NULL },
  - { "priority", 'p', POPT_ARG_INT,                    &priority, 0,
  -     N_("send message <priority>"), N_("<priority>") },
  - { "notify", 'n', POPT_ARG_VAL|POPT_ARGFLAG_XOR,     &notify, 1,
  + { "info", 'i', POPT_BIT_XOR|POPT_ARGFLAG_TOGGLE,   &flags, 
RPMMSQ_FLAGS_INFO,
  +     N_("display queue info on close"), NULL },
  + { "loop", 'l', POPT_BIT_XOR|POPT_ARGFLAG_TOGGLE,   &flags, 
RPMMSQ_FLAGS_LOOP,
        N_("toggle mq_notify running"), NULL },
  + { "priority", 'p', POPT_ARG_INT,                &priority, 0,
  +     N_("send message <priority>"), N_("<priority>") },
  + { "reset", 'r', POPT_BIT_XOR|POPT_ARGFLAG_TOGGLE,  &flags, 
RPMMSQ_FLAGS_RESET,
  +     N_("remove and recreate queue when opening"), NULL },
   
    { NULL, '\0', POPT_ARG_INCLUDE_TABLE, rpmioAllPoptTable, 0,
        N_(" Common options for all rpmio executables:"), NULL },
  @@ -460,8 +437,6 @@
   #endif
       int ec = 0;
   
  -_rpmmsq_debug = -1;
  -
       switch (2) {
       case 1:  ec = doSHM();           break;
       case 2:  ec = doMSQ();           break;
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                [email protected]

Reply via email to