RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: [email protected] Module: rpm Date: 21-May-2017 07:30:35 Branch: rpm-5_4 Handle: 2017052105303500 Modified files: (Branch: rpm-5_4) rpm CHANGES rpm/rpmio librpmio.vers msqio.c rpmio.c rpmmsq.h tmq.c Log: - msqio: add (and prefer) POSIX message queues (if available). Summary: Revision Changes Path 1.3501.2.554+4 -3 rpm/CHANGES 2.199.2.78 +1 -0 rpm/rpmio/librpmio.vers 1.1.2.4 +192 -48 rpm/rpmio/msqio.c 1.230.2.48 +11 -7 rpm/rpmio/rpmio.c 1.1.2.4 +12 -3 rpm/rpmio/rpmmsq.h 1.1.2.4 +6 -0 rpm/rpmio/tmq.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/CHANGES ============================================================================ $ cvs diff -u -r1.3501.2.553 -r1.3501.2.554 CHANGES --- rpm/CHANGES 20 May 2017 19:21:07 -0000 1.3501.2.553 +++ rpm/CHANGES 21 May 2017 05:30:35 -0000 1.3501.2.554 @@ -1,7 +1,8 @@ 5.4.17 -> 5.4.18: - - jbj: rpmmsq: permit building --without-msq. - - jbj: rpmmsq: add per-executable and per-rpm message queue id's. - - jbj: rpmmsq: use pool allocation. permit queue deletion. + - jbj: msqio: add (and prefer) POSIX message queues (if available). + - jbj: msqio: permit building --without-msq. + - jbj: msqio: add per-executable and per-rpm message queue id's. + - jbj: rmsqio: use pool allocation. permit queue deletion. - jbj: rpmio: add ".msqio" to send/receive SysV messages. - jbj: rpmsw: use clock_gettime (if available). - jbj: poptALL: spew info for rpm itself with -vv. @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/librpmio.vers ============================================================================ $ cvs diff -u -r2.199.2.77 -r2.199.2.78 librpmio.vers --- rpm/rpmio/librpmio.vers 20 May 2017 19:21:07 -0000 2.199.2.77 +++ rpm/rpmio/librpmio.vers 21 May 2017 05:30:35 -0000 2.199.2.78 @@ -737,6 +737,7 @@ rpmmsqDump; rpmmsqFree; rpmmsqNew; + rpmmsqNotify; rpmmsqRead; rpmmsqWrite; _nix; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/msqio.c ============================================================================ $ cvs diff -u -r1.1.2.3 -r1.1.2.4 msqio.c --- rpm/rpmio/msqio.c 20 May 2017 19:21:07 -0000 1.1.2.3 +++ rpm/rpmio/msqio.c 21 May 2017 05:30:35 -0000 1.1.2.4 @@ -5,14 +5,17 @@ #include "system.h" -#if defined(WITH_MSQ) +#if defined(WITH_MQ) || defined(WITH_MSQ) #if defined(HAVE_SYS_MSG_H) # include <sys/msg.h> #endif #if defined(HAVE_SYS_IPC_H) # include <sys/ipc.h> #endif -#endif /* WITH_MSQ */ +#if defined(HAVE_MQUEUE_H) +# include <mqueue.h> +#endif +#endif /* WITH_MQ */ #include "rpmio_internal.h" #include <rpmlog.h> @@ -38,9 +41,12 @@ { rpmmsq msq = (rpmmsq) _msq; if (msq) { - msq->key = 0; - msq->perms = 0; + msq->flags = 0; + msq->qname = _free(msq->qname); msq->qid = 0; + msq->oflags = 0; + msq->omode = 0; + msq->key = 0; msq->mtype = 0; } } @@ -75,35 +81,95 @@ assert(fmode != NULL); /* XXX return NULL instead? */ msq->qid = -1; -#if defined(WITH_MSQ) const char * s = fmode; int c; - int perms = 0664; + int oflags = 0; + int omode = 0664; switch ((c = *s++)) { - case 'a': perms |= IPC_CREAT; break; - case 'w': perms |= IPC_CREAT; break; + case 'a': + oflags = O_WRONLY | O_CREAT | O_APPEND; + omode |= IPC_CREAT; + break; + case 'w': + oflags = O_WRONLY | O_CREAT | O_TRUNC; + omode |= IPC_CREAT; + break; + case 'r': + oflags = O_RDONLY; + break; } while ((c = *s++) != 0) { switch (c) { - case '.': break; - case '+': continue; - case 'x': perms |= IPC_EXCL; continue; - default: continue; + case '.': + break; + case '+': + oflags &= ~(O_RDONLY|O_WRONLY); + oflags |= O_RDWR; + continue; + break; + case 'c': /* XXX no cancel */ + continue; + break; + case 'm': /* XXX mmap */ + continue; + break; + case 'e': /* O_CLOEXEC */ + oflags |= O_CLOEXEC; + continue; + break; + case 'n': /* XXX O_NONBLOCK */ + oflags |= O_NONBLOCK; + continue; + break; + case 't': /* XXX O_TRUNC */ + continue; + break; + case 'x': + oflags |= O_EXCL; + omode |= IPC_EXCL; + continue; + break; + default: + continue; + break; } break; } - if (!strcmp(path, "private") || !strcmp(path, "IPC_PRIVATE")) +#if defined(WITH_MQ) + + msq->qname = rpmGetPath("/", path, NULL); + struct mq_attr _attrs = { + .mq_flags = (oflags & O_NONBLOCK), + .mq_maxmsg = 10, /* /proc/sys/fs/mqueue/msg_default */ + .mq_msgsize = 8192, /* /proc/sys/fs/mqueue/msgsize_default */ + .mq_curmsgs = 0, + }, *attrs = &_attrs; + attrs = NULL; /* XXX */ + msq->oflags = oflags; + msq->omode = omode & 0777; /* XXX mask IPC_CREAT | IPC_EXCL */ + msq->mtype = getpid(); /* XXX single queue, multiple RW processes. */ + msq->qid = mq_open(msq->qname, msq->oflags, msq->omode, attrs); +SPEW((stderr, "<-- %s(%s,0x%x,0%o,%p) qid %d\n", "mq_open", msq->qname, msq->oflags, msq->omode, attrs, msq->qid)); + +#elif defined(WITH_MSQ) + + msq->qname = rpmGetPath("/", path, NULL); + if (!strcmp(msq->qname, "/private") + || !strcmp(msq->qname, "/IPC_PRIVATE")) msq->key = IPC_PRIVATE; - else if (!strcmp(path, "program") || !strcmp(path, "__progname_key")) + else + if (!strcmp(msq->qname, "/program") + || !strcmp(msq->qname, "/__progname_key")) msq->key = __progname_key; else { + const char * lpath = msq->qname; int projid = __progname_projid; - if (!strcmp(path, "rpm")) - path = "/usr/lib/rpm"; - msq->key = ftok(path, projid); + if (!strcmp(lpath, "/rpm")) + lpath = "/usr/lib/rpm"; + msq->key = ftok(lpath, projid); if (msq->key == -1) { int lvl = RPMLOG_WARNING; rpmlog(lvl, "%s: ftok(%s,0x%02x) failed: %m\n", @@ -112,12 +178,13 @@ rpmlog(lvl, "Using program key 0x%x\n", msq->key); } } - msq->perms = perms; + msq->oflags = oflags; + msq->omode = omode & 0777; /* XXX mask IPC_CREAT | IPC_EXCL */ msq->mtype = getpid(); /* XXX single queue, multiple RW processes. */ + msq->qid = msgget(msq->key, omode); +SPEW((stderr, "<-- %s(0x%x,0%o) qid %d\n", "msqget", msq->key, omode, msq->qid)); - msq->qid = msgget(msq->key, msq->perms); -SPEW((stderr, "<-- %s(0x%x,0%o) qid %d\n", "msqget", msq->key, msq->perms, msq->qid)); -#endif /* WITH_MSQ */ +#endif /* WITH_MQ */ return (msq->qid != -1 ? rpmmsqLink(msq) : rpmmsqFree(msq)); } @@ -126,7 +193,14 @@ { ssize_t rc = -1; /* assume failure */ -#if defined(WITH_MSQ) +#if defined(WITH_MQ) + unsigned int _msg_prio = 0; + int nb = 0; + rc = mq_receive(msq->qid, buf, count, &_msg_prio); + if (rc > 0) + nb = rc; +SPEW((stderr, "<-- %s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", "mq_receive", msq->qid, buf, (unsigned long)count, &_msg_prio, (long)rc, _msg_prio, nb, buf)); +#elif defined(WITH_MSQ) struct msgbuf * msgp = xmalloc(sizeof(*msgp) + count); size_t msgsz = count; long msgtyp = msq->mtype; @@ -140,7 +214,7 @@ } SPEW((stderr, "<-- %s(0x%x,%p,%lu,%ld,%d) rc %ld\t\"%.*s\"\n", "msqrcv", msq->qid, msgp, (unsigned long)msgsz, (long)msgtyp, msgflg, (long)rc, nb, buf)); msgp = _free(msgp); -#endif /* WITH_MSQ */ +#endif /* WITH_MQ */ return rc; } @@ -149,20 +223,33 @@ { ssize_t rc = -1; /* assume failure */ -#if defined(WITH_MSQ) +#if defined(WITH_MQ) + unsigned int _msg_prio = 0; + int nb = 0; + + rc = mq_send(msq->qid, buf, count, _msg_prio); + if (rc == 0) /* XXX success */ + nb = count; +SPEW((stderr, "<-- %s(0x%x,%p[%lu],%p) rc %ld prio %u\t\"%.*s\"\n", "mq_send", msq->qid, buf, (unsigned long)count, &_msg_prio, (long)rc, _msg_prio, nb, buf)); + if (rc == 0) /* XXX remap to write(2) return */ + rc = count; +#elif defined(WITH_MSQ) struct msgbuf * msgp = xmalloc(sizeof(*msgp) + count); size_t msgsz = count; int msgflg = 0; + int nb = 0; msgp->mtype = msq->mtype; if (count) memcpy(msgp->mtext, buf, count); rc = msgsnd(msq->qid, msgp, msgsz, msgflg); -SPEW((stderr, "<-- %s(0x%x,%p,%lu,%d) rc %ld\t\"%.*s\"\n", "msqsnd", msq->qid, msgp, (unsigned long)msgsz, msgflg, (long)rc, (int)count, buf)); - if (rc == 0) - rc = count; + if (rc == 0) /* XXX success */ + nb = count; +SPEW((stderr, "<-- %s(0x%x,%p,%lu,%d) rc %ld\t\"%.*s\"\n", "msqsnd", msq->qid, msgp, (unsigned long)msgsz, msgflg, (long)rc, nb, buf)); msgp = _free(msgp); -#endif /* WITH_MSQ */ + if (rc == 0) /* XXX remap to write(2) return */ + rc = count; +#endif /* WITH_MQ */ return rc; } @@ -177,13 +264,20 @@ int rpmmsqClose(rpmmsq msq, int delete) { int rc = -2; /* assume failure */ -#if defined(WITH_MSQ) +#if defined(WITH_MQ) + if (_rpmmsq_debug) + rpmmsqDump(__FUNCTION__, msq, NULL); + rc = mq_close(msq->qid); + if (!rc && delete && msq->qname && !strcmp(msq->qname, "/private")) + rc = mq_unlink(msq->qname); + rc = 0; /* XXX */ +#elif defined(WITH_MSQ) if (_rpmmsq_debug) rpmmsqDump(__FUNCTION__, msq, NULL); if (delete || msq->key == IPC_PRIVATE) (void) rpmmsqCtl(msq, IPC_RMID, NULL); rc = 0; -#endif /* WITH_MSQ */ +#endif /* WITH_MQ */ return rc; } @@ -207,10 +301,11 @@ int rpmmsqCtl(void * _msq, int cmd, void *buf) { - rpmmsq msq = (rpmmsq) _msq; int rc = -2; /* assume failure */ -#if defined(WITH_MSQ) +#if defined(WITH_MQ) +#elif defined(WITH_MSQ) + rpmmsq msq = (rpmmsq) _msq; if (msq) switch (cmd) { default: @@ -244,16 +339,31 @@ break; #endif } -#endif /* WITH_MSQ */ - SPEW((stderr, "<-- %s(0x%x,%d,%p) rc %d\n", "msqctl", (msq ? (unsigned)msq->qid : 0xdeadbeef), cmd, buf, rc)); +#endif /* WITH_MQ */ + + return rc; +} + +int rpmmsqNotify(rpmmsq msq, const void *_sevp) +{ + int rc = -2; /* assume failure */ + +#if defined(WITH_MQ) + if (msq) { /* XXX WTF? */ + rc = mq_notify(msq->qid, _sevp); +SPEW((stderr, "<-- %s(0x%x,%p) rc %d\n", "mq_notify", msq->qid, _sevp, rc)); + } +#elif defined(WITH_MSQ) +#endif /* WITH_MQ */ return rc; } int rpmmsqDump(const char * msg, void *_msq, FILE *fp) { - int rc = -1; /* assume failure */ + rpmmsq msq = (rpmmsq) _msq; + int rc = -2; /* assume failure */ if (fp == NULL) fp = stderr; @@ -261,8 +371,39 @@ if (msg) fprintf(fp, "================ %s\n", msg); -#if defined(WITH_MSQ) - rpmmsq msq = (rpmmsq) _msq; +#if defined(WITH_MQ) +#if defined(__linux__) + const char * lpath = rpmGetPath("/dev/mqueue/", msq->qname, NULL); + struct stat st; + rc = stat(lpath, &st); + if (!rc) { + fprintf(fp, "\t dev: 0x%lx\n", (unsigned long)st.st_dev); + fprintf(fp, "\t ino: %lu\n", (unsigned long)st.st_ino); + fprintf(fp, "\t mode: 0%o\n", st.st_mode); + fprintf(fp, "\t nlink: %lu\n", (unsigned long)st.st_nlink); + fprintf(fp, "\t uid: %d\n", st.st_uid); + fprintf(fp, "\t gid: %d\n", st.st_gid); + fprintf(fp, "\t rdev: 0x%lx\n", (unsigned long)st.st_rdev); + fprintf(fp, "\t size: %lu\n", (unsigned long)st.st_size); + fprintf(fp, "\tblksize: %lu\n", (unsigned long)st.st_blksize); + fprintf(fp, "\t blocks: %lu\n", (unsigned long)st.st_blocks); + fprintf(fp, "\t atime: %lu\n", (unsigned long)st.st_atime); + fprintf(fp, "\t mtime: %lu\n", (unsigned long)st.st_mtime); + fprintf(fp, "\t ctime: %lu\n", (unsigned long)st.st_mtime); + } + { FD_t fd = Fopen(lpath, "r.fdio"); + if (fd) { + unsigned char b[BUFSIZ]; + size_t nb = sizeof(b); + size_t nr = Fread(b, 1, nb-1, fd); + b[nr] = '\0'; + fprintf(stderr, "\t%.*s", (int)nr, b); + (void) Fclose(fd); + } + } + lpath = _free(lpath); +#endif /* __linux__ */ +#elif defined(WITH_MSQ) struct msqid_ds ds; rc = rpmmsqCtl(msq, IPC_STAT, &ds); if (!rc) { @@ -295,17 +436,17 @@ struct msginfo mi; rc = rpmmsqCtl(msq, IPC_INFO, (struct msqid_ds *)&mi); if (!rc) { - fprintf(fp, "\tmsgpool: %d Kb\n", mi.msgpool); - fprintf(fp, "\t msgmap: %d\n", mi.msgmap); - fprintf(fp, "\t msgmax: %d\n", mi.msgmax); - fprintf(fp, "\t msgmnb: %d b\n", mi.msgmnb); - fprintf(fp, "\t msgmni: %d\n", mi.msgmni); - fprintf(fp, "\t msgssz: %d\n", mi.msgssz); - fprintf(fp, "\t msgtql: %d\n", mi.msgtql); - fprintf(fp, "\t msgseg: %d\n", mi.msgseg); + fprintf(fp, "\tmsgpool: %7d Kb\n", mi.msgpool); + fprintf(fp, "\t msgmap: %7d\n", mi.msgmap); + fprintf(fp, "\t msgmax: %7d\n", mi.msgmax); + fprintf(fp, "\t msgmnb: %7d b\n", mi.msgmnb); + fprintf(fp, "\t msgmni: %7d\n", mi.msgmni); + fprintf(fp, "\t msgssz: %7d\n", mi.msgssz); + fprintf(fp, "\t msgtql: %7d\n", mi.msgtql); + fprintf(fp, "\t msgseg: %7d\n", mi.msgseg); } #endif -#endif /* WITH_MSQ */ +#endif /* WITH_MQ */ return rc; } @@ -337,9 +478,12 @@ if (msq == NULL) return NULL; fd = fdNew("open (msqOpen)"); +#ifdef NOTYET + fdPop(fd); fdPush(fd, msqio, msq, fileno(msq->fp)); +#else fdPop(fd); fdPush(fd, msqio, msq, -1); - int _omode = O_RDWR; /* XXX */ - fdSetOpen(fd, path, -1, _omode); +#endif + fdSetOpen(fd, path, -1, msq->omode); return fdLink(fd, "msqOpen"); } @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmio.c ============================================================================ $ cvs diff -u -r1.230.2.47 -r1.230.2.48 rpmio.c --- rpm/rpmio/rpmio.c 20 May 2017 16:30:10 -0000 1.230.2.47 +++ rpm/rpmio/rpmio.c 21 May 2017 05:30:35 -0000 1.230.2.48 @@ -221,7 +221,7 @@ } else if (fps->io == xzdio) { sprintf(be, "XZD %p fdno %d", fps->fp, fps->fdno); #endif -#if defined(WITH_MSQ) +#if defined(WITH_MQ) || defined(WITH_MSQ) } else if (fps->io == msqio) { sprintf(be, "MSQ %p fdno %d", fps->fp, fps->fdno); #endif @@ -2629,7 +2629,7 @@ * @todo glibc also supports ",ccs=" * * - glibc: 'c' no cancel - * - glibc: 'e' close on exec (FD_CLOEXEC) + * - glibc: 'e' close on exec (O_CLOEXEC) * - glibc: 'm' use mmap'd input * - glibc: 'x' don't clobber (O_EXCL) * - gzopen: [0-9] is compression level @@ -2640,6 +2640,8 @@ * - bzopen: 'q' sets verbosity to 0 * - bzopen: 'v' does verbosity++ (up to 4) * - HACK: '.' terminates, rest is type of I/O + * - HACK: 'n' non-blocking (O_NONBLOCK) + * - HACK: 't' truncate (O_TRUNC) * - HACK: 'D' sync (O_DIRECT) * - HACK: 'S' sync (O_SYNC) * - HACK: 'I' fallocate(2) @@ -2648,7 +2650,6 @@ * - HACK: 'F' fsync(2) * - HACK: 'P' syncfs(2) * - HACK: 'M' memfd_create(2) - * - HACK: 't' truncate (O_TRUNC) * - HACK: 'T' tempfile (O_TMPFILE) * - HACK: '?' debug I/O + refcnt */ @@ -2711,6 +2712,9 @@ if (--nstdio > 0) *stdio++ = c; continue; break; + case 'n': + flags |= O_NONBLOCK; + goto other; case 'D': flags |= O_DIRECT; RPMFD_SET(flags, FTRUNCATE); @@ -2811,7 +2815,7 @@ iof = xzdio; fd = iof->_fdopen(fd, zstdio); #endif -#if defined(WITH_MSQ) +#if defined(WITH_MQ) || defined(WITH_MSQ) } else if (!strcmp(end, "msqio")) { iof = msqio; fd = iof->_fdopen(fd, zstdio); @@ -2914,7 +2918,7 @@ if (stdio[0] == '\0') goto exit; -#if defined(WITH_MSQ) +#if defined(WITH_MQ) || defined(WITH_MSQ) if (end && !strcmp(end, "msqio")) { fd = msqio->_fopen(path, fmode); goto exit; @@ -2998,7 +3002,7 @@ if (vh && fdGetIo(fd) == xzdio && xzdio->_flush != NULL) return (*xzdio->_flush) ((void *)fd); #endif -#if defined(WITH_MSQ) +#if defined(WITH_MQ) || defined(WITH_MSQ) if (vh && fdGetIo(fd) == msqio && msqio->_flush != NULL) return (*msqio->_flush) ((void *)fd); #endif @@ -3039,7 +3043,7 @@ ec = (fd->syserrno || fd->errcookie != NULL) ? -1 : 0; i--; /* XXX fdio under xzdio always has fdno == -1 */ #endif -#if defined(WITH_MSQ) +#if defined(WITH_MQ) || defined(WITH_MSQ) } else if (fps->io == msqio) { ec = (fd->syserrno || fd->errcookie != NULL) ? -1 : 0; i--; /* XXX fdio under bzdio always has fdno == -1 */ @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmsq.h ============================================================================ $ cvs diff -u -r1.1.2.3 -r1.1.2.4 rpmmsq.h --- rpm/rpmio/rpmmsq.h 20 May 2017 19:21:07 -0000 1.1.2.3 +++ rpm/rpmio/rpmmsq.h 21 May 2017 05:30:35 -0000 1.1.2.4 @@ -14,10 +14,15 @@ struct rpmioItem_s _item; /*!< usage mutex and pool identifier. */ int flags; - key_t key; - int perms; - int qid; + const char * qname; /*!> message queue path */ + int qid; /*!< message queue id */ + + int oflags; + int omode; + + key_t key; /*!< msgget arg */ long mtype; + }; #endif /* _RPMMSQ_INTERNAL */ @@ -70,6 +75,10 @@ /** */ +int rpmmsqNotify(rpmmsq msq, const void *_sevp); + +/** + */ int rpmmsqDump(const char *msg, void * _msq, FILE *fp); #endif /* _H_RPMMSQ_ */ @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmq.c ============================================================================ $ cvs diff -u -r1.1.2.3 -r1.1.2.4 tmq.c --- rpm/rpmio/tmq.c 20 May 2017 19:21:07 -0000 1.1.2.3 +++ rpm/rpmio/tmq.c 21 May 2017 05:30:35 -0000 1.1.2.4 @@ -277,6 +277,12 @@ if (msq) { + { struct sigevent sigev = { + .sigev_notify = SIGEV_NONE, + }; + xx = rpmmsqNotify(msq, &sigev); + } + memset(b, 0, nb); strcpy(b, "yadda yadda"); blen = strlen(b); @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository [email protected]
