RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: [email protected] Module: rpm Date: 22-May-2017 15:30:43 Branch: rpm-5_4 Handle: 2017052213304101 Modified files: (Branch: rpm-5_4) rpm CHANGES rpm/rpmio librpmio.vers msqio.c rpmmsq.h tmq.c Log: - msqio: make peace with mq_notify (and sigev detached threads). Summary: Revision Changes Path 1.3501.2.555+1 -0 rpm/CHANGES 2.199.2.79 +3 -2 rpm/rpmio/librpmio.vers 1.1.2.5 +83 -21 rpm/rpmio/msqio.c 1.1.2.5 +38 -6 rpm/rpmio/rpmmsq.h 1.1.2.5 +142 -75 rpm/rpmio/tmq.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/CHANGES ============================================================================ $ cvs diff -u -r1.3501.2.554 -r1.3501.2.555 CHANGES --- rpm/CHANGES 21 May 2017 05:30:35 -0000 1.3501.2.554 +++ rpm/CHANGES 22 May 2017 13:30:41 -0000 1.3501.2.555 @@ -1,4 +1,5 @@ 5.4.17 -> 5.4.18: + - jbj: msqio: make peace with mq_notify (and sigev detached threads). - jbj: msqio: add (and prefer) POSIX message queues (if available). - jbj: msqio: permit building --without-msq. - jbj: msqio: add per-executable and per-rpm message queue id's. @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/librpmio.vers ============================================================================ $ cvs diff -u -r2.199.2.78 -r2.199.2.79 librpmio.vers --- rpm/rpmio/librpmio.vers 21 May 2017 05:30:35 -0000 2.199.2.78 +++ rpm/rpmio/librpmio.vers 22 May 2017 13:30:42 -0000 2.199.2.79 @@ -738,8 +738,9 @@ rpmmsqFree; rpmmsqNew; rpmmsqNotify; - rpmmsqRead; - rpmmsqWrite; + rpmmsqReader; + rpmmsqRecv; + rpmmsqSend; _nix; _rpmnix_debug; _rpmnixI; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/msqio.c ============================================================================ $ cvs diff -u -r1.1.2.4 -r1.1.2.5 msqio.c --- rpm/rpmio/msqio.c 21 May 2017 05:30:35 -0000 1.1.2.4 +++ rpm/rpmio/msqio.c 22 May 2017 13:30:42 -0000 1.1.2.5 @@ -28,6 +28,8 @@ #include "debug.h" int _rpmmsq_debug; +#define F_ISSET(_f, _FLAG) (((_f) & ((RPMMSQ_FLAGS_##_FLAG) & ~0x40000000)) != RPMMSQ_FLAGS_NONE) +#define MF_ISSET(_FLAG) F_ISSET(msq->flags, _FLAG) #define SPEW(_list) if (_rpmmsq_debug || _rpmio_debug) fprintf _list @@ -47,7 +49,10 @@ msq->oflags = 0; msq->omode = 0; msq->key = 0; + msq->tid = 0; msq->mtype = 0; + msq->nsent = 0; + msq->nrecv = 0; } } @@ -74,11 +79,12 @@ } #endif -rpmmsq rpmmsqNew(const char * path, const char * fmode, int fdno, int flags) +rpmmsq rpmmsqNew(const char * path, const char * fmode, int fdno, unsigned flags) { rpmmsq msq = rpmmsqGetPool(_rpmmsqPool); assert(fmode != NULL); /* XXX return NULL instead? */ + msq->flags = flags; msq->qid = -1; const char * s = fmode; @@ -151,6 +157,13 @@ msq->oflags = oflags; msq->omode = omode & 0777; /* XXX mask IPC_CREAT | IPC_EXCL */ msq->mtype = getpid(); /* XXX single queue, multiple RW processes. */ + + /* Reset the queue by removing. */ + if (MF_ISSET(RESET)) { + int xx = mq_unlink(msq->qname); +SPEW((stderr, "<-- %s(%s) rc %d\n", "mq_unlink", msq->qname, xx)); + } + msq->qid = mq_open(msq->qname, msq->oflags, msq->omode, attrs); SPEW((stderr, "<-- %s(%s,0x%x,0%o,%p) qid %d\n", "mq_open", msq->qname, msq->oflags, msq->omode, attrs, msq->qid)); @@ -189,18 +202,25 @@ return (msq->qid != -1 ? rpmmsqLink(msq) : rpmmsqFree(msq)); } -ssize_t rpmmsqRead(rpmmsq msq, char * buf, size_t count) +ssize_t rpmmsqRecv(rpmmsq msq, char * buf, size_t count, unsigned short *priop) { ssize_t rc = -1; /* assume failure */ #if defined(WITH_MQ) - unsigned int _msg_prio = 0; + + unsigned int prio = 0; int nb = 0; - rc = mq_receive(msq->qid, buf, count, &_msg_prio); - if (rc > 0) + rc = mq_receive(msq->qid, buf, count, &prio); + if (rc >= 0) { nb = rc; -SPEW((stderr, "<-- %s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", "mq_receive", msq->qid, buf, (unsigned long)count, &_msg_prio, (long)rc, _msg_prio, nb, buf)); + msq->nrecv++; + } +SPEW((stderr, "<-- %s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", "mq_receive", msq->qid, buf, (unsigned long)count, priop, (long)rc, prio, nb, buf)); + if (priop) + *priop = prio; + #elif defined(WITH_MSQ) + struct msgbuf * msgp = xmalloc(sizeof(*msgp) + count); size_t msgsz = count; long msgtyp = msq->mtype; @@ -208,32 +228,36 @@ int nb = 0; rc = msgrcv(msq->qid, msgp, msgsz, msgtyp, msgflg); - if (rc > 0) { - memcpy(buf, msgp->mtext, rc); + if (rc >= 0) { + if (rc > 0) + memcpy(buf, msgp->mtext, rc); nb = rc; + msq->nrecv++; } SPEW((stderr, "<-- %s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", "msqrcv", msq->qid, msgp, (unsigned long)msgsz, (long)msgtyp, msgflg, (long)rc, nb, buf)); msgp = _free(msgp); + #endif /* WITH_MQ */ return rc; } -ssize_t rpmmsqWrite(rpmmsq msq, const char * buf, size_t count) +ssize_t rpmmsqSend(rpmmsq msq, const char * buf, size_t count, unsigned short prio) { ssize_t rc = -1; /* assume failure */ #if defined(WITH_MQ) - unsigned int _msg_prio = 0; - int nb = 0; - rc = mq_send(msq->qid, buf, count, _msg_prio); - if (rc == 0) /* XXX success */ - nb = count; -SPEW((stderr, "<-- %s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", "mq_send", msq->qid, buf, (unsigned long)count, &_msg_prio, (long)rc, _msg_prio, nb, buf)); - if (rc == 0) /* XXX remap to write(2) return */ + rc = mq_send(msq->qid, buf, count, prio); + int nb = (rc == 0 ? count : 0); +SPEW((stderr, "<-- %s(0x%x,%p[%lu],%u) rc %ld\t\"%.*s\"\n", "mq_send", msq->qid, buf, (unsigned long)count, prio, (long)rc, nb, buf)); + if (rc == 0) { /* XXX remap to write(2) return */ rc = count; + msq->nsent++; + } + #elif defined(WITH_MSQ) + struct msgbuf * msgp = xmalloc(sizeof(*msgp) + count); size_t msgsz = count; int msgflg = 0; @@ -243,12 +267,14 @@ if (count) memcpy(msgp->mtext, buf, count); rc = msgsnd(msq->qid, msgp, msgsz, msgflg); - if (rc == 0) /* XXX success */ - nb = count; + int nb = (rc == 0 ? count : 0); SPEW((stderr, "<-- %s(0x%x,%p,%lu,%d) rc %ld\t\"%.*s\"\n", "msqsnd", msq->qid, msgp, (unsigned long)msgsz, msgflg, (long)rc, nb, buf)); msgp = _free(msgp); - if (rc == 0) /* XXX remap to write(2) return */ + if (rc == 0) { /* XXX remap to write(2) return */ rc = count; + msq->nsent++; + } + #endif /* WITH_MQ */ return rc; @@ -360,6 +386,42 @@ return rc; } +void rpmmsqReader(union sigval sv) +{ +#if defined(WITH_MQ) + rpmmsq msq = (rpmmsq) sv.sival_ptr; + +assert(msq); + + msq->tid = pthread_self(); + +SPEW((stderr, "==> %s(%p) qid %d\n", __FUNCTION__, msq, (msq ? msq->qid : -1))); + char b[BUFSIZ]; + size_t nb = sizeof(b); + struct timespec ts = { 0, 0 }; + int rc; + + while (1) { + ts.tv_sec = time(NULL) + 2; + unsigned prio = 0; + int rc = mq_timedreceive(msq->qid, b, nb, &prio, &ts); + int nr = (rc >= 0 ? rc : 0); +SPEW((stderr, "<-- %s(0x%x,%p[%lu]) rc %d prio %u\t\"%.*s\"\n", "mq_timedreceive", msq->qid, b, (unsigned long)nb, rc, prio, nr, b)); + if (rc < 0 && errno == ETIMEDOUT) + continue; + msq->nrecv++; + /* Exit immediately on error or highest priority message. */ + if (rc < 0 || !strcmp(b, "XXX") || prio == RPMMSQ_PRIO_EXIT) + break; + } +SPEW((stderr, "<== %s(%p) rc %d\n", __FUNCTION__, msq, rc)); + + msq->tid = 0; + + msq = rpmmsqFree(msq); +#endif /* WITH_MQ */ +} + int rpmmsqDump(const char * msg, void *_msq, FILE *fp) { rpmmsq msq = (rpmmsq) _msq; @@ -517,7 +579,7 @@ assert(msq != NULL); if (fd->bytesRemain == 0) return 0; /* XXX simulate EOF */ fdstat_enter(fd, FDSTAT_READ); - rc = rpmmsqRead(msq, buf, count); + rc = rpmmsqRecv(msq, buf, count, NULL); if (rc >= 0) { fdstat_exit(fd, FDSTAT_READ, rc); if (fd->ndigests && rc > 0) fdUpdateDigests(fd, (const unsigned char *)buf, rc); @@ -537,7 +599,7 @@ if (fd->ndigests && count > 0) fdUpdateDigests(fd, (const unsigned char *)buf, count); fdstat_enter(fd, FDSTAT_WRITE); - rc = rpmmsqWrite(msq, buf, count); + rc = rpmmsqSend(msq, buf, count, 0); if (rc >= 0) fdstat_exit(fd, FDSTAT_WRITE, rc); return rc; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmsq.h ============================================================================ $ cvs diff -u -r1.1.2.4 -r1.1.2.5 rpmmsq.h --- rpm/rpmio/rpmmsq.h 21 May 2017 05:30:35 -0000 1.1.2.4 +++ rpm/rpmio/rpmmsq.h 22 May 2017 13:30:42 -0000 1.1.2.5 @@ -9,13 +9,28 @@ /** */ +#define _KFB(n) (1U << (n)) +#define _MFB(n) (_KFB(n) | 0x40000000) +typedef enum rpmmsqFlags_e { + RPMMSQ_FLAGS_NONE = 0, + + RPMMSQ_FLAGS_RESET = _MFB( 0), + RPMMSQ_FLAGS_NOTIFY = _MFB( 1), + RPMMSQ_FLAGS_INFO = _MFB( 2), + +} rpmmsqFlags; +#undef _MFB +#undef _KFB + +/** + */ #if defined(_RPMMSQ_INTERNAL) struct rpmmsq_s { struct rpmioItem_s _item; /*!< usage mutex and pool identifier. */ - int flags; + rpmmsqFlags flags; - const char * qname; /*!> message queue path */ - int qid; /*!< message queue id */ + const char * qname; /*!> message queue path. */ + int qid; /*!< message queue id. */ int oflags; int omode; @@ -23,9 +38,17 @@ key_t key; /*!< msgget arg */ long mtype; + volatile pthread_t tid; /*!< sigev thread id. */ + + int nsent; + int nrecv; }; #endif /* _RPMMSQ_INTERNAL */ +#ifdef __cplusplus +extern "C" { +#endif + /** * Unreference a msq wrapper instance. * @param msq msq wrapper @@ -55,15 +78,15 @@ /** */ -rpmmsq rpmmsqNew(const char * path, const char * fmode, int fdno, int flags); +rpmmsq rpmmsqNew(const char * path, const char * fmode, int fdno, unsigned flags); /** */ -ssize_t rpmmsqRead(rpmmsq msq, char * buf, size_t count); +ssize_t rpmmsqRecv(rpmmsq msq, char * buf, size_t count, unsigned short *priop); /** */ -ssize_t rpmmsqWrite(rpmmsq msq, const char * buf, size_t count); +ssize_t rpmmsqSend(rpmmsq msq, const char * buf, size_t count, unsigned short prio); /** */ @@ -79,6 +102,15 @@ /** */ +void rpmmsqReader(union sigval sv); +#define RPMMSQ_PRIO_EXIT 0x7fff + +/** + */ int rpmmsqDump(const char *msg, void * _msq, FILE *fp); +#ifdef __cplusplus +} +#endif + #endif /* _H_RPMMSQ_ */ @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmq.c ============================================================================ $ cvs diff -u -r1.1.2.4 -r1.1.2.5 tmq.c --- rpm/rpmio/tmq.c 21 May 2017 05:30:35 -0000 1.1.2.4 +++ rpm/rpmio/tmq.c 22 May 2017 13:30:42 -0000 1.1.2.5 @@ -38,13 +38,23 @@ #include <rpmio_internal.h> #include <rpmlog.h> #include <rpmmacro.h> -#include <rpmmsq.h> #include <poptIO.h> #include <argv.h> + +#define _RPMMSQ_INTERNAL +#include <rpmmsq.h> + #include "debug.h" static int _debug = 0; +static rpmmsqFlags flags = + RPMMSQ_FLAGS_RESET | RPMMSQ_FLAGS_NOTIFY | RPMMSQ_FLAGS_INFO; +static int priority; +static int notify; + +#define SPEW(_list) if (_rpmmsq_debug || _rpmio_debug) fprintf _list + /*==============================================================*/ static int ShmOpen(const char *path, size_t size, int shmflg) { @@ -264,6 +274,81 @@ } /*==============================================================*/ + +#define SECS 1 +#define TICKS 5 +typedef struct AT_s * AT_t; +struct AT_s { + int secs; + int tick; + int nticks; + clockid_t clockid; + timer_t timerid; + int flags; + struct sigevent sigev; + struct itimerspec nit; + struct itimerspec oit; + pthread_cond_t cond; + pthread_mutex_t mutex; +}; + +#define Z(_rc) assert((_rc) == 0) +#define LOCK(_mutex) Z(pthread_mutex_lock(&_mutex)) +#define UNLOCK(_mutex) Z(pthread_mutex_unlock(&_mutex)) +#define SIGNAL(_cond) Z(pthread_cond_signal(&_cond)) +#define WAIT(_cond, _mutex) Z(pthread_cond_wait(&_cond, &_mutex)) + +static void +timer_thread (union sigval sv) +{ + AT_t at = (AT_t) sv.sival_ptr; + LOCK(at->mutex); + if (++at->tick >= at->nticks) + SIGNAL(at->cond); + UNLOCK(at->mutex); + + fprintf(stderr,"\ttick %d\n", at->tick); +} + +static int +doTIMER(void) +{ + struct AT_s _at = { + .secs = SECS, + .tick = 0, + .nticks = TICKS, + .clockid = CLOCK_REALTIME, + .flags = 0, + .sigev = { + .sigev_notify = SIGEV_THREAD, + .sigev_notify_function = timer_thread, + .sigev_notify_attributes = NULL, + .sigev_value.sival_ptr = &_at, + }, + .nit = { + .it_value.tv_sec = _at.secs, + .it_value.tv_nsec = 0, + .it_interval.tv_sec = _at.secs, + .it_interval.tv_nsec = 0, + }, + .cond = PTHREAD_COND_INITIALIZER, + .mutex = PTHREAD_MUTEX_INITIALIZER, + }; + AT_t at = &_at; + + Z(timer_create(at->clockid, &at->sigev, &at->timerid)); + + Z(timer_settime(at->timerid, at->flags, &at->nit, &at->oit)); + + LOCK(at->mutex); + while (at->tick < at->nticks) + WAIT(at->cond, at->mutex); + UNLOCK(at->mutex); + + return 0; +} + +/*==============================================================*/ static int doMSQ(void) { int rc = -2; /* assume failure */ @@ -272,35 +357,66 @@ size_t blen; int xx; - const char *msqname = "rpm";; - rpmmsq msq = rpmmsqNew(msqname, "w+", -1, 0); + const char *qname = "/rpm";; + rpmmsq msq = rpmmsqNew(qname, "w+", -1, flags); if (msq) { - { struct sigevent sigev = { - .sigev_notify = SIGEV_NONE, + memset(b, 0, nb); + + if (notify) { + + pthread_attr_t attr; + xx = pthread_getattr_default_np(&attr); + xx = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + + struct sigevent sigev = { + .sigev_notify = SIGEV_THREAD, + .sigev_notify_function = rpmmsqReader, + .sigev_notify_attributes = &attr, + .sigev_value.sival_ptr = rpmmsqLink(msq), }; xx = rpmmsqNotify(msq, &sigev); } - memset(b, 0, nb); strcpy(b, "yadda yadda"); blen = strlen(b); - xx = rpmmsqWrite(msq, b, blen); - memset(b, 0, nb); + xx = rpmmsqSend(msq, b, blen, priority); + strcpy(b, "bing bang boom"); blen = strlen(b); - xx = rpmmsqWrite(msq, b, blen); - memset(b, 0, nb); - xx = rpmmsqRead(msq, b, nb); - memset(b, 0, nb); - xx = rpmmsqRead(msq, b, nb); + xx = rpmmsqSend(msq, b, blen, priority); + + if (!notify) { + memset(b, 0, nb); + xx = rpmmsqRecv(msq, b, nb, NULL); + memset(b, 0, nb); + xx = rpmmsqRecv(msq, b, nb, NULL); + } + } + + int nwaits = 0; + if (notify) { + struct timespec req = { 0, 1000*1000 }; + while (msq->tid == 0) { + nanosleep(&req, NULL); + nwaits++; + } + strcpy(b, __FUNCTION__); + blen = strlen(b); + xx = rpmmsqSend(msq, b, blen, RPMMSQ_PRIO_EXIT); + while (msq->tid != 0) { + nanosleep(&req, NULL); + nwaits++; + } } +fprintf(stderr, "%s: sent %d recv %d nwaits %d\n", __FUNCTION__, msq->nsent, msq->nrecv, nwaits); + xx = rpmmsqClose(msq, 1); msq = rpmmsqFree(msq); - FD_t fd = Fopen(msqname, "w+.msqio"); + FD_t fd = Fopen(qname, "w+.msqio"); if (fd) { memset(b, 0, blen); strcpy(b, "foo bar baz"); @@ -318,65 +434,13 @@ } /*==============================================================*/ -static int doMQ(void) -{ - int rc = -2; /* assume failure */ - mqd_t mq; - char b[BUFSIZ]; - size_t nb = sizeof(b); - - /* create the message queue */ - const char *mqname = "/rpmmq"; - struct mq_attr attr = { - .mq_flags = 0, - .mq_maxmsg = 10, - .mq_msgsize = nb, - .mq_curmsgs = 0, - }; - int oflag = O_CREAT|O_RDONLY; - mode_t mode = 0660; - mq = mq_open(mqname, oflag, mode, &attr); - if (mq == (mqd_t)-1) { - perror("mq_open"); - goto exit; - } - - while (1) { - static unsigned mq_prio = 0; - ssize_t nr; - - /* receive the message */ - nr = mq_receive(mq, b, nb, &mq_prio); - if (nr <= 0) { - perror("mq_receive"); - goto exit; - } - b[nr] = '\0'; - - fprintf(stderr, "Received: %s\n", b); - if (!strncmp(b, "exit", sizeof("exit")-1)) - break; - } - rc = 0; - -exit: - /* cleanup */ - if (mq != (mqd_t)-1) { - rc = mq_close(mq); - if (rc < 0) - perror("mq_close"); - rc = mq_unlink(mqname); - if (rc < 0) - perror("mq_unlink"); - } - - return rc; -} - -/*==============================================================*/ static struct poptOption rpmmqOptionsTable[] = { - { "debug", 'd', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_debug, 1, + { "debug", 'd', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_debug, 1, NULL, NULL }, + { "priority", 'p', POPT_ARG_INT, &priority, 0, + N_("send message <priority>"), N_("<priority>") }, + { "notify", 'n', POPT_ARG_VAL|POPT_ARGFLAG_XOR, ¬ify, 1, + N_("toggle mq_notify running"), NULL }, { NULL, '\0', POPT_ARG_INCLUDE_TABLE, rpmioAllPoptTable, 0, N_(" Common options for all rpmio executables:"), NULL }, @@ -396,10 +460,13 @@ #endif int ec = 0; - if (0) - ec = doSHM(); - else - ec = doMSQ(); +_rpmmsq_debug = -1; + + switch (2) { + case 1: ec = doSHM(); break; + case 2: ec = doMSQ(); break; + case 3: ec = doTIMER(); break; + } con = rpmioFini(con); @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository [email protected]
