RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   06-Mar-2009 18:40:57
  Branch: HEAD                             Handle: 2009030617405601

  Modified files:
    rpm                     CHANGES
    rpm/rpmio               librpmio.vers poptIO.c rpmzq.c rpmzq.h

  Log:
    - jbj: rpmz: add attribution and debugging to rpmzq.

  Summary:
    Revision    Changes     Path
    1.2801      +1  -0      rpm/CHANGES
    2.85        +19 -0      rpm/rpmio/librpmio.vers
    1.32        +4  -0      rpm/rpmio/poptIO.c
    1.2         +98 -103    rpm/rpmio/rpmzq.c
    1.2         +23 -10     rpm/rpmio/rpmzq.h
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/CHANGES
  ============================================================================
  $ cvs diff -u -r1.2800 -r1.2801 CHANGES
  --- rpm/CHANGES       6 Mar 2009 16:58:00 -0000       1.2800
  +++ rpm/CHANGES       6 Mar 2009 17:40:56 -0000       1.2801
  @@ -1,5 +1,6 @@
   
   5.2a2 -> 5.2a3:
  +    - jbj: rpmz: add attribution and debugging to rpmzq.
       - jbj: rpmz: carve out job queue and buffer pool management from pigz.
       - jbj: rpmz: carve out an opaque trace logging subsystem from pigz.
       - glen: markLinkedFailed if rpm file reopen failed during transaction to 
avoid losing old pkg info from db
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/librpmio.vers
  ============================================================================
  $ cvs diff -u -r2.84 -r2.85 librpmio.vers
  --- rpm/rpmio/librpmio.vers   4 Mar 2009 23:57:24 -0000       2.84
  +++ rpm/rpmio/librpmio.vers   6 Mar 2009 17:40:57 -0000       2.85
  @@ -424,6 +424,25 @@
       rpmzLogFree;
       rpmzLogInit;
       rpmzMsgShow;
  +    _rpmzq_debug;
  +    rpmbzCompressBlock;
  +    rpmzDropSpace;
  +    rpmzFreeJob;
  +    rpmzFreePool;
  +    rpmzNewJob;
  +    rpmzNewPool;
  +    rpmzNewSpace;
  +    rpmzUseSpace;
  +    rpmzqAddCJob;
  +    rpmzqAddWJob;
  +    rpmzqDelCJob;
  +    rpmzqDelWJob;
  +    rpmzqFini;
  +    rpmzqFree;
  +    rpmzqInit;
  +    rpmzqLaunch;
  +    rpmzqNew;
  +    rpmzqVerify;
       Stat;
       _Stat;
       Symlink;
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/poptIO.c
  ============================================================================
  $ cvs diff -u -r1.31 -r1.32 poptIO.c
  --- rpm/rpmio/poptIO.c        23 Feb 2009 22:30:58 -0000      1.31
  +++ rpm/rpmio/poptIO.c        6 Mar 2009 17:40:57 -0000       1.32
  @@ -62,6 +62,8 @@
   /*...@unchecked@*/
   extern int _rpmsq_debug;
   /*...@unchecked@*/
  +extern int _rpmzq_debug;
  +/*...@unchecked@*/
   extern int _tar_debug;
   /*...@unchecked@*/
   extern int _xar_debug;
  @@ -395,6 +397,8 @@
        N_("Debug rpmmg magic"), NULL},
    { "rpmsqdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmsq_debug, 
-1,
        N_("Debug rpmsq Signal Queue"), NULL},
  + { "rpmzqdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmzq_debug, 
-1,
  +     N_("Debug rpmzq Job Queuing"), NULL},
    { "xardebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_xar_debug, -1,
        N_("Debug xar archives"), NULL},
    { "tardebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_tar_debug, -1,
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmzq.c
  ============================================================================
  $ cvs diff -u -r1.1 -r1.2 rpmzq.c
  --- rpm/rpmio/rpmzq.c 6 Mar 2009 16:58:00 -0000       1.1
  +++ rpm/rpmio/rpmzq.c 6 Mar 2009 17:40:57 -0000       1.2
  @@ -1,6 +1,45 @@
  +/** \ingroup rpmio
  + * \file rpmio/rpmzq.c
  + * Job queue and buffer management (originally swiped from PIGZ).
  + */
  +
  +/* pigz.c -- parallel implementation of gzip
  + * Copyright (C) 2007, 2008 Mark Adler
  + * Version 2.1.4  9 Nov 2008  Mark Adler
  + */
  +
  +/*
  +  This software is provided 'as-is', without any express or implied
  +  warranty.  In no event will the author be held liable for any damages
  +  arising from the use of this software.
  +
  +  Permission is granted to anyone to use this software for any purpose,
  +  including commercial applications, and to alter it and redistribute it
  +  freely, subject to the following restrictions:
  +
  +  1. The origin of this software must not be misrepresented; you must not
  +     claim that you wrote the original software. If you use this software
  +     in a product, an acknowledgment in the product documentation would be
  +     appreciated but is not required.
  +  2. Altered source versions must be plainly marked as such, and must not be
  +     misrepresented as being the original software.
  +  3. This notice may not be removed or altered from any source distribution.
  +
  +  Mark Adler
  +  mad...@alumni.caltech.edu
  +
  +  Mark accepts donations for providing this software.  Donations are not
  +  required or expected.  Any amount that you feel is appropriate would be
  +  appreciated.  You can use this link:
  +
  +  https://www.paypal.com/cgi-bin/webscr?cmd=_s-xclick&hosted_button_id=536055
  +
  + */
  +
   #include "system.h"
   
   #include <rpmiotypes.h>
  +#include <rpmlog.h>
   
   #define      _RPMBZ_INTERNAL
   #include "rpmbz.h"
  @@ -32,19 +71,22 @@
   
   #include "debug.h"
   
  +/*...@unchecked@*/
  +int _rpmzq_debug = 0;
  +
  +#define      zqFprint        if (_rpmzq_debug) fprintf
  +
   /*==============================================================*/
   
   /*...@-mustmod@*/
   int rpmbzCompressBlock(void * _bz, rpmzJob job)
  -     /*...@globals fileSystem @*/
  -     /*...@modifies job, fileSystem @*/
   {
       rpmbz bz = _bz;
       int rc;
       rc = BZ2_bzBuffToBuffCompress(job->out->buf, &job->out->len,
                job->in->buf, job->in->len, bz->B, bz->V, bz->W);
       if (rc != BZ_OK)
  -     fprintf(stderr, "==> %s(%p,%p) rc %d\n", __FUNCTION__, bz, job, rc);
  +     zqFprint(stderr, "==> %s(%p,%p) rc %d\n", __FUNCTION__, bz, job, rc);
       return rc;
   }
   /*...@=mustmod@*/
  @@ -58,7 +100,7 @@
       rc = BZ2_bzBuffToBuffDecompress(job->out->buf, &job->out->len,
                job->in->buf, job->in->len, bz->S, bz->V);
       if (rc != BZ_OK)
  -     fprintf(stderr, "==> %s(%p,%p) rc %d\n", __FUNCTION__, bz, job, rc);
  +     zqFprint(stderr, "==> %s(%p,%p) rc %d\n", __FUNCTION__, bz, job, rc);
       return rc;
   }
   /*...@=mustmod@*/
  @@ -76,13 +118,10 @@
      pool.  Each space knows what pool it belongs to, so that it can be 
returned.
    */
   
  -#if !defined(_RPMZQ_WHITEOUT)
   /* initialize a pool (pool structure itself provided, not allocated) -- the
      limit is the maximum number of spaces in the pool, or -1 to indicate no
      limit, i.e., to never wait for a buffer to return to the pool */
   rpmzPool rpmzNewPool(size_t size, int limit)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies fileSystem, internalState @*/
   {
       rpmzPool pool = xcalloc(1, sizeof(*pool));
       pool->have = yarnNewLock(0);
  @@ -92,15 +131,13 @@
       pool->size = size;
       pool->limit = limit;
       pool->made = 0;
  -fprintf(stderr, "    ++ pool %p[%u,%d]\n", pool, (unsigned)size, limit);
  +zqFprint(stderr, "    ++ pool %p[%u,%d]\n", pool, (unsigned)size, limit);
       return pool;
   }
   
   /* get a space from a pool -- the use count is initially set to one, so there
      is no need to call rpmzUseSpace() for the first use */
   rpmzSpace rpmzNewSpace(rpmzPool pool)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies pool, fileSystem, internalState @*/
   {
       rpmzSpace space;
   
  @@ -135,7 +172,7 @@
       space->pool = pool;                 /* remember the pool this belongs to 
*/
   /*...@=assignexpose@*/
   /*...@=mustfreeonly@*/
  -fprintf(stderr, "    ++ space %p use %d buf %p[%u]\n", space, 1, space->buf, 
space->len);
  +zqFprint(stderr, "    ++ space %p use %d buf %p[%u]\n", space, 1, 
space->buf, space->len);
   /*...@-nullret@*/
       return space;
   /*...@=nullret@*/
  @@ -144,27 +181,22 @@
   /* increment the use count to require one more drop before returning this 
space
      to the pool */
   void rpmzUseSpace(rpmzSpace space)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies space, fileSystem, internalState @*/
   {
       int use;
       yarnPossess(space->use);
       use = yarnPeekLock(space->use);
  -fprintf(stderr, "    ++ space %p[%d] buf %p[%u]\n", space, use+1, 
space->buf, space->len);
  +zqFprint(stderr, "    ++ space %p[%d] buf %p[%u]\n", space, use+1, 
space->buf, space->len);
       yarnTwist(space->use, BY, 1);
   }
   
   /* drop a space, returning it to the pool if the use count is zero */
  -/*...@null@*/
   rpmzSpace rpmzDropSpace(/*...@only@*/ rpmzSpace space)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies space, fileSystem, internalState @*/
   {
       int use;
   
       yarnPossess(space->use);
       use = yarnPeekLock(space->use);
  -fprintf(stderr, "    -- space %p[%u] buf %p[%u]\n", space, use, space->buf, 
space->len);
  +zqFprint(stderr, "    -- space %p[%u] buf %p[%u]\n", space, use, space->buf, 
space->len);
   assert(use != 0);
       if (use == 1) {
        rpmzPool pool = space->pool;
  @@ -182,10 +214,7 @@
   
   /* free the memory and lock resources of a pool -- return number of spaces 
for
      debugging and resource usage measurement */
  -/*...@null@*/
  -rpmzPool rpmzFreePool(/*...@only@*/ rpmzPool pool, /*...@null@*/ int *countp)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies pool, *countp, fileSystem, internalState @*/
  +rpmzPool rpmzFreePool(rpmzPool pool, int *countp)
   {
       rpmzSpace space;
       int count;
  @@ -207,9 +236,9 @@
   assert(count == pool->made);
   #else
   if (count != pool->made)
  -fprintf(stderr, "==> FIXME: %s: count %d pool->made %d\n", __FUNCTION__, 
count, pool->made);
  +zqFprint(stderr, "==> FIXME: %s: count %d pool->made %d\n", __FUNCTION__, 
count, pool->made);
   #endif
  -fprintf(stderr, "    -- pool %p count %d\n", pool, count);
  +zqFprint(stderr, "    -- pool %p count %d\n", pool, count);
   /*...@-compdestroy@*/
       pool = _free(pool);
   /*...@=compdestroy@*/
  @@ -217,29 +246,22 @@
        *countp = count;
       return NULL;
   }
  -#endif       /* _RPMZQ_WHITEOUT */
   
  -/*...@null@*/
  -rpmzJob rpmzFreeJob(/*...@only@*/ rpmzJob job)
  -        /*...@globals fileSystem, internalState @*/
  -        /*...@modifies job, fileSystem, internalState @*/
  +rpmzJob rpmzFreeJob(rpmzJob job)
   {
  -fprintf(stderr, "    -- job %p[%ld] %p => %p\n", job, job->seq, job->in, 
job->out);
  +zqFprint(stderr, "    -- job %p[%ld] %p => %p\n", job, job->seq, job->in, 
job->out);
       if (job->calc != NULL)
        job->calc = yarnFreeLock(job->calc);
       job = _free(job);
       return NULL;
   }
   
  -/*...@only@*/
   rpmzJob rpmzNewJob(long seq)
  -        /*...@globals fileSystem, internalState @*/
  -        /*...@modifies fileSystem, internalState @*/
   {
       rpmzJob job = xcalloc(1, sizeof(*job));
       job->seq = seq;
       job->calc = yarnNewLock(0);
  -fprintf(stderr, "    ++ job %p[%ld]\n", job, seq);
  +zqFprint(stderr, "    ++ job %p[%ld]\n", job, seq);
       return job;
   }
   
  @@ -250,15 +272,13 @@
   /* command the compress threads to all return, then join them all (call from
      main thread), free all the thread-related resources */
   void rpmzqFini(rpmzQueue zq)
  -        /*...@globals fileSystem, internalState @*/
  -        /*...@modifies zq, fileSystem, internalState @*/
   {
       rpmzLog zlog = zq->zlog;
   
       struct rpmzJob_s job;
       int caught;
   
  -fprintf(stderr, "--> %s(%p)\n", __FUNCTION__, zq);
  +zqFprint(stderr, "--> %s(%p)\n", __FUNCTION__, zq);
       /* only do this once */
       if (zq->compress_have == NULL)
        return;
  @@ -280,7 +300,7 @@
   assert(caught == zq->cthreads);
   #else
   if (caught != zq->cthreads)
  -fprintf(stderr, "==> FIXME: %s: caught %d z->cthreads %d\n", __FUNCTION__, 
caught, zq->cthreads);
  +zqFprint(stderr, "==> FIXME: %s: caught %d z->cthreads %d\n", __FUNCTION__, 
caught, zq->cthreads);
   #endif
       zq->cthreads = 0;
   
  @@ -295,10 +315,8 @@
   
   /* setup job lists (call from main thread) */
   void rpmzqInit(rpmzQueue zq)
  -        /*...@globals fileSystem, internalState @*/
  -        /*...@modifies zq, fileSystem, internalState @*/
   {
  -fprintf(stderr, "--> %s(%p)\n", __FUNCTION__, zq);
  +zqFprint(stderr, "--> %s(%p)\n", __FUNCTION__, zq);
       /* set up only if not already set up*/
       if (zq->compress_have != NULL)
        return;
  @@ -312,24 +330,20 @@
       zq->write_head = NULL;
   
       zq->in_pool = rpmzNewPool(zq->iblocksize, zq->ilimit);
  -fprintf(stderr, "-->  in_pool: %p[%u] blocksize %u\n", zq->in_pool, 
(unsigned)zq->ilimit, (unsigned)zq->iblocksize);
  +zqFprint(stderr, "-->  in_pool: %p[%u] blocksize %u\n", zq->in_pool, 
(unsigned)zq->ilimit, (unsigned)zq->iblocksize);
       zq->out_pool = rpmzNewPool(zq->oblocksize, zq->olimit);
  -fprintf(stderr, "--> out_pool: %p[%u] blocksize %u\n", zq->out_pool, 
(unsigned)zq->olimit, (unsigned)zq->oblocksize);
  +zqFprint(stderr, "--> out_pool: %p[%u] blocksize %u\n", zq->out_pool, 
(unsigned)zq->olimit, (unsigned)zq->oblocksize);
   
   }
   
  -/*...@null@*/
  -rpmzQueue rpmzqFree(/*...@only@*/ rpmzQueue zq)
  -        /*...@modifies zq @*/
  +rpmzQueue rpmzqFree(rpmzQueue zq)
   {
       zq = _free(zq);
       return NULL;
   }
   
  -/*...@only@*/
   rpmzQueue rpmzqNew(rpmzLog zlog, int flags,
                int verbosity, unsigned int level, size_t blocksize, int limit)
  -     /*...@*/
   {
       rpmzQueue zq = xcalloc(1, sizeof(*zq));
       zq->flags = flags;
  @@ -349,48 +363,7 @@
       return zq;
   }
   
  -/* forward reference */
  -static void rpmzqCompressThread (void *_zq)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies _zq, fileSystem, internalState @*/;
  -
  -/* forward reference */
  -static void rpmzqDecompressThread (void *_zq)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies _zq, fileSystem, internalState @*/;
  -
  -/* start another compress/decompress thread if needed */
  -void rpmzqLaunch(rpmzQueue zq, long seq, unsigned int threads)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies zq, fileSystem, internalState @*/
  -{
  -    if (zq->cthreads < seq && zq->cthreads < (int)threads) {
  -     switch (zq->omode) {
  -     case O_WRONLY: (void)yarnLaunch(rpmzqCompressThread, zq);       break;
  -     case O_RDONLY: (void)yarnLaunch(rpmzqDecompressThread, zq);     break;
  -     default:        assert(0);      break;
  -     }
  -     zq->cthreads++;
  -    }
  -}
  -
  -/* verify no more jobs, prepare for next use */
  -void rpmzqVerify(rpmzQueue zq)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies zq, fileSystem, internalState @*/
  -{
  -    yarnPossess(zq->compress_have);
  -assert(zq->compress_head == NULL && yarnPeekLock(zq->compress_have) == 0);
  -    yarnRelease(zq->compress_have);
  -    yarnPossess(zq->write_first);
  -assert(zq->write_head == NULL);
  -    yarnTwist(zq->write_first, TO, -1);
  -}
  -
  -/*...@null@*/
   rpmzJob rpmzqDelCJob(rpmzQueue zq)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies zq, fileSystem, internalState @*/
   {
       rpmzJob job;
   
  @@ -415,8 +388,6 @@
   }
   
   void rpmzqAddCJob(rpmzQueue zq, rpmzJob job)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies zq, job, fileSystem, internalState @*/
   {
       /* put job at end of compress list, let all the compressors know */
       yarnPossess(zq->compress_have);
  @@ -429,8 +400,6 @@
   }
   
   rpmzJob rpmzqDelWJob(rpmzQueue zq, long seq)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies zq, fileSystem, internalState @*/
   {
       rpmzJob job;
   
  @@ -447,8 +416,6 @@
   }
   
   void rpmzqAddWJob(rpmzQueue zq, rpmzJob job)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies zq, job, fileSystem, internalState @*/
   {
       rpmzLog zlog = zq->zlog;
   
  @@ -462,7 +429,7 @@
       default: assert(0);      break;
       case O_WRONLY:
        pct = (100.0 * job->out->len) / job->in->len;
  -     fprintf(stderr, "       job %p[%ld]:\t%p[%u] => %p[%u]\t(%3.1f%%)\n",
  +     zqFprint(stderr, "       job %p[%ld]:\t%p[%u] => %p[%u]\t(%3.1f%%)\n",
                        job, job->seq, job->in->buf, job->in->len,
                        job->out->buf, job->out->len, pct);
        Trace((zlog, "-- compressed #%ld %3.1f%%%s", job->seq, pct,
  @@ -470,7 +437,7 @@
        break;
       case O_RDONLY:
        pct = (100.0 * job->in->len) / job->out->len;
  -     fprintf(stderr, "       job %p[%ld]:\t%p[%u] <= %p[%u]\t(%3.1f%%)\n",
  +     zqFprint(stderr, "       job %p[%ld]:\t%p[%u] <= %p[%u]\t(%3.1f%%)\n",
                        job, job->seq, job->in->buf, job->in->len,
                        job->out->buf, job->out->len, pct);
        Trace((zlog, "-- decompressed #%ld %3.1f%%%s", job->seq, pct,
  @@ -511,7 +478,7 @@
        job->out = rpmzNewSpace(zq->out_pool);
   /*...@=mustfreeonly@*/
        if (job->out->len < outlen) {
  -fprintf(stderr, "==> FIXME: %s: job->out %p %p[%u] malloc(%u)\n", 
__FUNCTION__, job->out, job->out->buf, (unsigned)job->out->len, 
(unsigned)outlen);
  +zqFprint(stderr, "==> FIXME: %s: job->out %p %p[%u] malloc(%u)\n", 
__FUNCTION__, job->out, job->out->buf, (unsigned)job->out->len, 
(unsigned)outlen);
            job->out = rpmzDropSpace(job->out);
            job->out = xcalloc(1, sizeof(*job->out));
            job->out->len = outlen;
  @@ -525,7 +492,7 @@
        outlen = 6 * job->in->len;
        job->out = rpmzNewSpace(zq->out_pool);
        if (job->out->len < outlen) {
  -fprintf(stderr, "==> FIXME: %s: job->out %p %p[%u] malloc(%u)\n", 
__FUNCTION__, job->out, job->out->buf, (unsigned)job->out->len, 
(unsigned)outlen);
  +zqFprint(stderr, "==> FIXME: %s: job->out %p %p[%u] malloc(%u)\n", 
__FUNCTION__, job->out, job->out->buf, (unsigned)job->out->len, 
(unsigned)outlen);
            job->out = rpmzDropSpace(job->out);
            job->out = xcalloc(1, sizeof(*job->out));
            job->out->len = outlen;
  @@ -539,11 +506,11 @@
            ret = rpmbzDecompressBlock(bz, job);
            if (ret != BZ_OUTBUFF_FULL)
                /*...@loopbreak@*/ break;
  -fprintf(stderr, "==> FIXME: %s: job->out %p %p[%u] realloc(%u)\n", 
__FUNCTION__, job->out, job->out->buf, (unsigned)job->out->len, 
(unsigned)outlen);
  +zqFprint(stderr, "==> FIXME: %s: job->out %p %p[%u] realloc(%u)\n", 
__FUNCTION__, job->out, job->out->buf, (unsigned)job->out->len, 
(unsigned)outlen);
            if (job->out->use != NULL)
                job->out = rpmzDropSpace(job->out);
            else {
  -fprintf(stderr, "==> FIXME: %s: job->out %p %p[%u] free\n", __FUNCTION__, 
job->out, job->out->buf, (unsigned)job->out->len);
  +zqFprint(stderr, "==> FIXME: %s: job->out %p %p[%u] free\n", __FUNCTION__, 
job->out, job->out->buf, (unsigned)job->out->len);
                job->out->buf = _free(job->out->buf);
                job->out = _free(job->out);
            }
  @@ -563,12 +530,14 @@
      sequence number of -1 (leave that job in the list for other incarnations 
to
      find) */
   static void rpmzqCompressThread (void *_zq)
  +     /*...@globals fileSystem, internalState @*/
  +     /*...@modifies _zq, fileSystem, internalState @*/
   {
       rpmzQueue zq = _zq;
       rpmbz bz = rpmbzInit(zq->level, zq->omode);
       rpmzJob job;
   
  -fprintf(stderr, "--> %s(%p) bz %p\n", __FUNCTION__, zq, bz);
  +zqFprint(stderr, "--> %s(%p) bz %p\n", __FUNCTION__, zq, bz);
   
       /* get job, insert write job in list in sorted order, alert write thread 
*/
   /*...@-evalorder@*/
  @@ -581,12 +550,14 @@
   }
   
   static void rpmzqDecompressThread(void *_zq)
  +     /*...@globals fileSystem, internalState @*/
  +     /*...@modifies _zq, fileSystem, internalState @*/
   {
       rpmzQueue zq = _zq;
       rpmbz bz = rpmbzInit(zq->level, zq->omode);
       rpmzJob job;
   
  -fprintf(stderr, "--> %s(%p) bz %p\n", __FUNCTION__, zq, bz);
  +zqFprint(stderr, "--> %s(%p) bz %p\n", __FUNCTION__, zq, bz);
   
       /* get job, insert write job in list in sorted order, alert write thread 
*/
   /*...@-evalorder@*/
  @@ -597,3 +568,27 @@
   
       bz = rpmbzFini(bz);
   }
  +
  +/* start another compress/decompress thread if needed */
  +void rpmzqLaunch(rpmzQueue zq, long seq, unsigned int threads)
  +{
  +    if (zq->cthreads < seq && zq->cthreads < (int)threads) {
  +     switch (zq->omode) {
  +     default:        assert(0);      break;
  +     case O_WRONLY: (void)yarnLaunch(rpmzqCompressThread, zq);       break;
  +     case O_RDONLY: (void)yarnLaunch(rpmzqDecompressThread, zq);     break;
  +     }
  +     zq->cthreads++;
  +    }
  +}
  +
  +/* verify no more jobs, prepare for next use */
  +void rpmzqVerify(rpmzQueue zq)
  +{
  +    yarnPossess(zq->compress_have);
  +assert(zq->compress_head == NULL && yarnPeekLock(zq->compress_have) == 0);
  +    yarnRelease(zq->compress_have);
  +    yarnPossess(zq->write_first);
  +assert(zq->write_head == NULL);
  +    yarnTwist(zq->write_first, TO, -1);
  +}
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmzq.h
  ============================================================================
  $ cvs diff -u -r1.1 -r1.2 rpmzq.h
  --- rpm/rpmio/rpmzq.h 6 Mar 2009 16:58:00 -0000       1.1
  +++ rpm/rpmio/rpmzq.h 6 Mar 2009 17:40:57 -0000       1.2
  @@ -6,9 +6,22 @@
    * Job queue and buffer pool management.
    */
   
  +/**
  + */
  +/*...@unchecked@*/
  +extern int _rpmzq_debug;
  +
  +/**
  + */
   typedef /*...@abstract@*/ struct rpmzSpace_s * rpmzSpace;
  +/**
  + */
   typedef /*...@abstract@*/ struct rpmzPool_s * rpmzPool;
  +/**
  + */
   typedef /*...@abstract@*/ struct rpmzQueue_s * rpmzQueue;
  +/**
  + */
   typedef /*...@abstract@*/ struct rpmzJob_s * rpmzJob;
   
   #ifdef       _RPMZQ_INTERNAL
  @@ -187,16 +200,6 @@
                int verbosity, unsigned int level, size_t blocksize, int limit)
        /*...@*/;
   
  -/** start another compress/decompress thread if needed */
  -void rpmzqLaunch(rpmzQueue zq, long seq, unsigned int threads)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies zq, fileSystem, internalState @*/;
  -
  -/** verify no more jobs, prepare for next use */
  -void rpmzqVerify(rpmzQueue zq)
  -     /*...@globals fileSystem, internalState @*/
  -     /*...@modifies zq, fileSystem, internalState @*/;
  -
   /**
    */
   /*...@null@*/
  @@ -222,6 +225,16 @@
        /*...@globals fileSystem, internalState @*/
        /*...@modifies zq, job, fileSystem, internalState @*/;
   
  +/** start another compress/decompress thread if needed */
  +void rpmzqLaunch(rpmzQueue zq, long seq, unsigned int threads)
  +     /*...@globals fileSystem, internalState @*/
  +     /*...@modifies zq, fileSystem, internalState @*/;
  +
  +/** verify no more jobs, prepare for next use */
  +void rpmzqVerify(rpmzQueue zq)
  +     /*...@globals fileSystem, internalState @*/
  +     /*...@modifies zq, fileSystem, internalState @*/;
  +
   #ifdef __cplusplus
   }
   #endif
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to