RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: [email protected] Module: rpm Date: 22-May-2017 23:07:19 Branch: rpm-5_4 Handle: 2017052221071900 Modified files: (Branch: rpm-5_4) rpm CHANGES rpm/rpmio msqio.c rpmio.c rpmmsq.h tmq.c Log: - msqio: loopback mode refactoring. Summary: Revision Changes Path 1.3501.2.556+1 -0 rpm/CHANGES 1.1.2.6 +94 -51 rpm/rpmio/msqio.c 1.230.2.49 +4 -0 rpm/rpmio/rpmio.c 1.1.2.6 +6 -3 rpm/rpmio/rpmmsq.h 1.1.2.6 +27 -52 rpm/rpmio/tmq.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/CHANGES ============================================================================ $ cvs diff -u -r1.3501.2.555 -r1.3501.2.556 CHANGES --- rpm/CHANGES 22 May 2017 13:30:41 -0000 1.3501.2.555 +++ rpm/CHANGES 22 May 2017 21:07:19 -0000 1.3501.2.556 @@ -1,4 +1,5 @@ 5.4.17 -> 5.4.18: + - jbj: msqio: loopback mode refactoring. - jbj: msqio: make peace with mq_notify (and sigev detached threads). - jbj: msqio: add (and prefer) POSIX message queues (if available). - jbj: msqio: permit building --without-msq. @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/msqio.c ============================================================================ $ cvs diff -u -r1.1.2.5 -r1.1.2.6 msqio.c --- rpm/rpmio/msqio.c 22 May 2017 13:30:42 -0000 1.1.2.5 +++ rpm/rpmio/msqio.c 22 May 2017 21:07:19 -0000 1.1.2.6 @@ -29,9 +29,10 @@ int _rpmmsq_debug; #define F_ISSET(_f, _FLAG) (((_f) & ((RPMMSQ_FLAGS_##_FLAG) & ~0x40000000)) != RPMMSQ_FLAGS_NONE) -#define MF_ISSET(_FLAG) F_ISSET(msq->flags, _FLAG) +#define QF_ISSET(_FLAG) F_ISSET(msq->flags, _FLAG) -#define SPEW(_list) if (_rpmmsq_debug || _rpmio_debug) fprintf _list +#define SPEW(_list) \ + if (QF_ISSET(DEBUG) || _rpmmsq_debug || _rpmio_debug) fprintf _list #define MSQONLY(fd) assert(fdGetIo(fd) == msqio) @@ -49,8 +50,10 @@ msq->oflags = 0; msq->omode = 0; msq->key = 0; - msq->tid = 0; msq->mtype = 0; + msq->tid = 0; + msq->ntimeout = 0; + msq->nwait = 0; msq->nsent = 0; msq->nrecv = 0; } @@ -58,34 +61,11 @@ RPMIOPOOL_MODULE(msq) -#ifdef REFERENCE -key_t -xftok (const char *path, int proj_id) -{ - key_t key = -1; /* assume failure */ - struct stat st; - - if (stat(path, &st) < 0) - goto exit; - - key = 0; - key |= ((st.st_ino & 0xffff); - key |= ((st.st_dev & 0xff) << 16); - key |= ((proj_id & 0xff) << 24)); - -exit: -fprintf(stderr, "<-- %s(%p,0x%02x) key 0x%x\n", __FUNCTION__, (proj_id & 0xff), key); - return key; -} -#endif - rpmmsq rpmmsqNew(const char * path, const char * fmode, int fdno, unsigned flags) { rpmmsq msq = rpmmsqGetPool(_rpmmsqPool); assert(fmode != NULL); /* XXX return NULL instead? */ - msq->flags = flags; - msq->qid = -1; const char * s = fmode; int c; @@ -122,9 +102,14 @@ continue; break; case 'e': /* O_CLOEXEC */ + /* XXX always set with mq_open */ oflags |= O_CLOEXEC; continue; break; + case 'l': /* XXX loopback mode */ + flags |= RPMMSQ_FLAGS_LOOP; + continue; + break; case 'n': /* XXX O_NONBLOCK */ oflags |= O_NONBLOCK; continue; @@ -137,6 +122,10 @@ omode |= IPC_EXCL; continue; break; + case '?': /* XXX loopback mode */ + flags |= RPMMSQ_FLAGS_DEBUG; + continue; + break; default: continue; break; @@ -144,32 +133,49 @@ break; } -#if defined(WITH_MQ) - + msq->flags = flags; msq->qname = rpmGetPath("/", path, NULL); - struct mq_attr _attrs = { - .mq_flags = (oflags & O_NONBLOCK), - .mq_maxmsg = 10, /* /proc/sys/fs/mqueue/msg_default */ - .mq_msgsize = 8192, /* /proc/sys/fs/mqueue/msgsize_default */ - .mq_curmsgs = 0, - }, *attrs = &_attrs; - attrs = NULL; /* XXX */ + msq->qid = -1; msq->oflags = oflags; msq->omode = omode & 0777; /* XXX mask IPC_CREAT | IPC_EXCL */ - msq->mtype = getpid(); /* XXX single queue, multiple RW processes. */ + msq->mtype = getpid(); /* XXX SysV message queues only. */ + +#if defined(WITH_MQ) + + int xx; /* Reset the queue by removing. */ - if (MF_ISSET(RESET)) { - int xx = mq_unlink(msq->qname); + if (QF_ISSET(RESET)) { + xx = mq_unlink(msq->qname); SPEW((stderr, "<-- %s(%s) rc %d\n", "mq_unlink", msq->qname, xx)); } + struct mq_attr _attrs = { + .mq_flags = (oflags & O_NONBLOCK), + .mq_maxmsg = 10, /* /proc/sys/fs/mqueue/msg_default */ + .mq_msgsize = 8192, /* /proc/sys/fs/mqueue/msgsize_default */ + .mq_curmsgs = 0, + }, *attrs = &_attrs; msq->qid = mq_open(msq->qname, msq->oflags, msq->omode, attrs); SPEW((stderr, "<-- %s(%s,0x%x,0%o,%p) qid %d\n", "mq_open", msq->qname, msq->oflags, msq->omode, attrs, msq->qid)); + + /* (loopback mode) Configure the detached reader. */ + if (msq->qid != -1 && QF_ISSET(LOOP)) { + pthread_attr_t attr; + xx = pthread_getattr_default_np(&attr); + xx = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + + struct sigevent sigev = { + .sigev_notify = SIGEV_THREAD, + .sigev_notify_function = rpmmsqReader, + .sigev_notify_attributes = &attr, + .sigev_value.sival_ptr = msq, + }; + xx = rpmmsqNotify(msq, &sigev); + } #elif defined(WITH_MSQ) - msq->qname = rpmGetPath("/", path, NULL); if (!strcmp(msq->qname, "/private") || !strcmp(msq->qname, "/IPC_PRIVATE")) msq->key = IPC_PRIVATE; @@ -191,9 +197,6 @@ rpmlog(lvl, "Using program key 0x%x\n", msq->key); } } - msq->oflags = oflags; - msq->omode = omode & 0777; /* XXX mask IPC_CREAT | IPC_EXCL */ - msq->mtype = getpid(); /* XXX single queue, multiple RW processes. */ msq->qid = msgget(msq->key, omode); SPEW((stderr, "<-- %s(0x%x,0%o) qid %d\n", "msqget", msq->key, omode, msq->qid)); @@ -208,6 +211,8 @@ #if defined(WITH_MQ) + if (QF_ISSET(LOOP)) return 0; /* XXX EOF with loopback service. */ + unsigned int prio = 0; int nb = 0; rc = mq_receive(msq->qid, buf, count, &prio); @@ -256,6 +261,16 @@ msq->nsent++; } + /* (loopback mode) Wait for rpmmsqReader to start up. */ + sched_yield(); /* Give detached threads a chance to run. */ + if (QF_ISSET(LOOP) && prio == 0 && msq->tid == 0) { + struct timespec req = { 0, 1000*1000 }; + while (msq->tid == 0) { + nanosleep(&req, NULL); + msq->nwait++; + } + } + #elif defined(WITH_MSQ) struct msgbuf * msgp = xmalloc(sizeof(*msgp) + count); @@ -290,19 +305,43 @@ int rpmmsqClose(rpmmsq msq, int delete) { int rc = -2; /* assume failure */ + #if defined(WITH_MQ) + + if (QF_ISSET(LOOP)) { + /* (loopback mode) Terminate rpmmsqReader. */ + if (msq->tid) { + rc = rpmmsqSend(msq, __FUNCTION__, sizeof(__FUNCTION__)-1, RPMMSQ_PRIO_EXIT); + struct timespec req = { 0, 1000*1000 }; + do { + nanosleep(&req, NULL); + msq->nwait++; + } while (msq->tid != 0); + } + /* (loopback mode) Turn off the sigev detached thread. */ + rc = rpmmsqNotify(msq, NULL); + } + +SPEW((stderr, "%s: sent %d recv %d wait %d timeout %d\n", __FUNCTION__, msq->nsent, msq->nrecv, msq->nwait, msq->ntimeout)); + if (_rpmmsq_debug) rpmmsqDump(__FUNCTION__, msq, NULL); rc = mq_close(msq->qid); - if (!rc && delete && msq->qname && !strcmp(msq->qname, "/private")) - rc = mq_unlink(msq->qname); +SPEW((stderr, "<-- %s(0x%x) rc %d\n", "mq_close", msq->qid, rc)); + if (!rc && delete && msq->qname && !strcmp(msq->qname, "/private")) { + rc = mq_unlink(msq->qname); /* XXX rpmmsqReset? */ +SPEW((stderr, "<-- %s(%s) rc %d\n", "mq_unlink", msq->qname, rc)); + } rc = 0; /* XXX */ + #elif defined(WITH_MSQ) + if (_rpmmsq_debug) rpmmsqDump(__FUNCTION__, msq, NULL); if (delete || msq->key == IPC_PRIVATE) - (void) rpmmsqCtl(msq, IPC_RMID, NULL); - rc = 0; + rc = rpmmsqCtl(msq, IPC_RMID, NULL); /* XXX rpmmsqReset? */ + rc = 0; /* XXX */ + #endif /* WITH_MQ */ return rc; } @@ -392,23 +431,26 @@ rpmmsq msq = (rpmmsq) sv.sival_ptr; assert(msq); - + msq = rpmmsqLink(msq); msq->tid = pthread_self(); -SPEW((stderr, "==> %s(%p) qid %d\n", __FUNCTION__, msq, (msq ? msq->qid : -1))); +SPEW((stderr, "==> %s(%p) qid %d tid %ld\n", __FUNCTION__, msq, msq->qid, msq->tid)); char b[BUFSIZ]; size_t nb = sizeof(b); struct timespec ts = { 0, 0 }; int rc; while (1) { - ts.tv_sec = time(NULL) + 2; + int lrto_secs = 2; /* XXX loopback retry timeout seconds */ unsigned prio = 0; + ts.tv_sec = time(NULL) + lrto_secs; int rc = mq_timedreceive(msq->qid, b, nb, &prio, &ts); int nr = (rc >= 0 ? rc : 0); SPEW((stderr, "<-- %s(0x%x,%p[%lu]) rc %d prio %u\t\"%.*s\"\n", "mq_timedreceive", msq->qid, b, (unsigned long)nb, rc, prio, nr, b)); - if (rc < 0 && errno == ETIMEDOUT) + if (rc < 0 && errno == ETIMEDOUT) { + msq->ntimeout; continue; + } msq->nrecv++; /* Exit immediately on error or highest priority message. */ if (rc < 0 || !strcmp(b, "XXX") || prio == RPMMSQ_PRIO_EXIT) @@ -437,7 +479,7 @@ #if defined(__linux__) const char * lpath = rpmGetPath("/dev/mqueue/", msq->qname, NULL); struct stat st; - rc = stat(lpath, &st); + rc = Stat(lpath, &st); if (!rc) { fprintf(fp, "\t dev: 0x%lx\n", (unsigned long)st.st_dev); fprintf(fp, "\t ino: %lu\n", (unsigned long)st.st_ino); @@ -453,6 +495,7 @@ fprintf(fp, "\t mtime: %lu\n", (unsigned long)st.st_mtime); fprintf(fp, "\t ctime: %lu\n", (unsigned long)st.st_mtime); } + /* XXX .fdio avoids select/poll issues on /dev/mqueue with .ufdio. */ { FD_t fd = Fopen(lpath, "r.fdio"); if (fd) { unsigned char b[BUFSIZ]; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmio.c ============================================================================ $ cvs diff -u -r1.230.2.48 -r1.230.2.49 rpmio.c --- rpm/rpmio/rpmio.c 21 May 2017 05:30:35 -0000 1.230.2.48 +++ rpm/rpmio/rpmio.c 22 May 2017 21:07:19 -0000 1.230.2.49 @@ -2640,6 +2640,7 @@ * - bzopen: 'q' sets verbosity to 0 * - bzopen: 'v' does verbosity++ (up to 4) * - HACK: '.' terminates, rest is type of I/O + * - HACK: 'l' loopback mode (msqio) * - HACK: 'n' non-blocking (O_NONBLOCK) * - HACK: 't' truncate (O_TRUNC) * - HACK: 'D' sync (O_DIRECT) @@ -2712,6 +2713,8 @@ if (--nstdio > 0) *stdio++ = c; continue; break; + case 'l': /* XXX loopback mode (rpmmsqNew) */ + goto other; case 'n': flags |= O_NONBLOCK; goto other; @@ -2921,6 +2924,7 @@ #if defined(WITH_MQ) || defined(WITH_MSQ) if (end && !strcmp(end, "msqio")) { fd = msqio->_fopen(path, fmode); + fd->flags = flags; goto exit; } else #endif @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmsq.h ============================================================================ $ cvs diff -u -r1.1.2.5 -r1.1.2.6 rpmmsq.h --- rpm/rpmio/rpmmsq.h 22 May 2017 13:30:42 -0000 1.1.2.5 +++ rpm/rpmio/rpmmsq.h 22 May 2017 21:07:19 -0000 1.1.2.6 @@ -14,9 +14,10 @@ typedef enum rpmmsqFlags_e { RPMMSQ_FLAGS_NONE = 0, - RPMMSQ_FLAGS_RESET = _MFB( 0), - RPMMSQ_FLAGS_NOTIFY = _MFB( 1), - RPMMSQ_FLAGS_INFO = _MFB( 2), + RPMMSQ_FLAGS_INFO = _MFB( 0), + RPMMSQ_FLAGS_LOOP = _MFB( 1), + RPMMSQ_FLAGS_RESET = _MFB( 2), + RPMMSQ_FLAGS_DEBUG = _MFB( 3), } rpmmsqFlags; #undef _MFB @@ -39,6 +40,8 @@ long mtype; volatile pthread_t tid; /*!< sigev thread id. */ + int ntimeout; + int nwait; int nsent; int nrecv; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmq.c ============================================================================ $ cvs diff -u -r1.1.2.5 -r1.1.2.6 tmq.c --- rpm/rpmio/tmq.c 22 May 2017 13:30:42 -0000 1.1.2.5 +++ rpm/rpmio/tmq.c 22 May 2017 21:07:19 -0000 1.1.2.6 @@ -41,7 +41,6 @@ #include <poptIO.h> #include <argv.h> -#define _RPMMSQ_INTERNAL #include <rpmmsq.h> #include "debug.h" @@ -49,9 +48,15 @@ static int _debug = 0; static rpmmsqFlags flags = - RPMMSQ_FLAGS_RESET | RPMMSQ_FLAGS_NOTIFY | RPMMSQ_FLAGS_INFO; +#ifdef DYING + RPMMSQ_FLAGS_DEBUG | + RPMMSQ_FLAGS_RESET | + RPMMSQ_FLAGS_LOOP | + RPMMSQ_FLAGS_INFO | +#endif + 0; + static int priority; -static int notify; #define SPEW(_list) if (_rpmmsq_debug || _rpmio_debug) fprintf _list @@ -358,27 +363,11 @@ int xx; const char *qname = "/rpm";; - rpmmsq msq = rpmmsqNew(qname, "w+", -1, flags); + rpmmsq msq = rpmmsqNew(qname, "w+l?", -1, flags); + memset(b, 0, nb); if (msq) { - memset(b, 0, nb); - - if (notify) { - - pthread_attr_t attr; - xx = pthread_getattr_default_np(&attr); - xx = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - - struct sigevent sigev = { - .sigev_notify = SIGEV_THREAD, - .sigev_notify_function = rpmmsqReader, - .sigev_notify_attributes = &attr, - .sigev_value.sival_ptr = rpmmsqLink(msq), - }; - xx = rpmmsqNotify(msq, &sigev); - } - strcpy(b, "yadda yadda"); blen = strlen(b); xx = rpmmsqSend(msq, b, blen, priority); @@ -387,38 +376,18 @@ blen = strlen(b); xx = rpmmsqSend(msq, b, blen, priority); - if (!notify) { - memset(b, 0, nb); - xx = rpmmsqRecv(msq, b, nb, NULL); - memset(b, 0, nb); - xx = rpmmsqRecv(msq, b, nb, NULL); - } - } - - int nwaits = 0; - if (notify) { - struct timespec req = { 0, 1000*1000 }; - while (msq->tid == 0) { - nanosleep(&req, NULL); - nwaits++; - } - strcpy(b, __FUNCTION__); - blen = strlen(b); - xx = rpmmsqSend(msq, b, blen, RPMMSQ_PRIO_EXIT); - while (msq->tid != 0) { - nanosleep(&req, NULL); - nwaits++; - } + memset(b, 0, nb); + xx = rpmmsqRecv(msq, b, nb, NULL); + memset(b, 0, nb); + xx = rpmmsqRecv(msq, b, nb, NULL); } -fprintf(stderr, "%s: sent %d recv %d nwaits %d\n", __FUNCTION__, msq->nsent, msq->nrecv, nwaits); - xx = rpmmsqClose(msq, 1); msq = rpmmsqFree(msq); - FD_t fd = Fopen(qname, "w+.msqio"); + /* XXX mq_notify memory leak if loopback mode. */ + FD_t fd = Fopen(qname, "w+?.msqio"); if (fd) { - memset(b, 0, blen); strcpy(b, "foo bar baz"); blen = strlen(b); xx = Fwrite(b, 1, blen, fd); @@ -434,13 +403,21 @@ } /*==============================================================*/ +#if !defined(POPT_BIT_XOR) +#define POPT_BIT_XOR (POPT_ARG_VAL|POPT_ARGFLAG_XOR) +#endif + static struct poptOption rpmmqOptionsTable[] = { { "debug", 'd', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_debug, 1, NULL, NULL }, - { "priority", 'p', POPT_ARG_INT, &priority, 0, - N_("send message <priority>"), N_("<priority>") }, - { "notify", 'n', POPT_ARG_VAL|POPT_ARGFLAG_XOR, ¬ify, 1, + { "info", 'i', POPT_BIT_XOR|POPT_ARGFLAG_TOGGLE, &flags, RPMMSQ_FLAGS_INFO, + N_("display queue info on close"), NULL }, + { "loop", 'l', POPT_BIT_XOR|POPT_ARGFLAG_TOGGLE, &flags, RPMMSQ_FLAGS_LOOP, N_("toggle mq_notify running"), NULL }, + { "priority", 'p', POPT_ARG_INT, &priority, 0, + N_("send message <priority>"), N_("<priority>") }, + { "reset", 'r', POPT_BIT_XOR|POPT_ARGFLAG_TOGGLE, &flags, RPMMSQ_FLAGS_RESET, + N_("remove and recreate queue when opening"), NULL }, { NULL, '\0', POPT_ARG_INCLUDE_TABLE, rpmioAllPoptTable, 0, N_(" Common options for all rpmio executables:"), NULL }, @@ -460,8 +437,6 @@ #endif int ec = 0; -_rpmmsq_debug = -1; - switch (2) { case 1: ec = doSHM(); break; case 2: ec = doMSQ(); break; @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository [email protected]
