RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   13-Mar-2009 17:29:29
  Branch: HEAD                             Handle: 2009031316292801

  Modified files:
    rpm/rpmio               rpmpigz.c rpmzq.c rpmzq.h

  Log:
    - jbj: rpmz: add locking sufficient to permit input job chaining.
    - jbj: rpmz: add seperate I/O queues for z_stream pipelining. well, duh.

  Summary:
    Revision    Changes     Path
    1.49        +162 -134   rpm/rpmio/rpmpigz.c
    1.17        +11 -9      rpm/rpmio/rpmzq.c
    1.25        +9  -6      rpm/rpmio/rpmzq.h
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmpigz.c
  ============================================================================
  $ cvs diff -u -r1.48 -r1.49 rpmpigz.c
  --- rpm/rpmio/rpmpigz.c       12 Mar 2009 21:35:40 -0000      1.48
  +++ rpm/rpmio/rpmpigz.c       13 Mar 2009 16:29:28 -0000      1.49
  @@ -1336,7 +1336,16 @@
        job->more = (job->in->len == nread);
        job->in->len = nread;
   
  -     rpmzqAddRJob(zq, job);
  +     /* queue read job at end of list. */
  +    {        rpmzJob here, *prior;
  +     yarnPossess(zq->qi_first);
  +     prior = &zq->qi;
  +     while ((here = *prior) != NULL)
  +         prior = &here->next;
  +     job->next = here;
  +     *prior = job;
  +     yarnRelease(zq->qi_first);
  +    }
   
        Trace((zlog, "-- decompress read thread read %lu bytes", nread));
        yarnTwist(zq->load_state, TO, 0);
  @@ -1355,25 +1364,20 @@
        /*...@globals fileSystem, internalState @*/
        /*...@modifies zq, fileSystem, internalState @*/
   {
  -    rpmzJob job = zq->_job;
  -
  -assert(job != NULL);
       /* if already detected end of file, do nothing */
  -    if (job->more < 0) {
  -     job->more = 0;
  +    if (zq->qi->more < 0) {
  +     zq->qi->more = 0;
        return 0;
       }
   
   #if 0                /* XXX hmmm, the comment above is incorrect. todo++ */
  -assert(job->in->len == 0);
  +assert(zq->qi->in->len == 0);
   #endif
   
   #ifndef _PIGZNOTHREAD
       /* if first time in or zq->threads == 1, read a buffer to have something 
to
          return, otherwise wait for the previous read job to complete */
       if (zq->threads > 1) {
  -     static long seq = 0;
  -     rpmzJob njob;
        /* if first time, fire up the read thread, ask for a read */
        if (zq->_in_which == -1) {
            zq->_in_which = 1;
  @@ -1389,17 +1393,27 @@
        yarnRelease(zq->load_state);
   
        /* set up input buffer with the data just read */
  -     job->in = rpmzqDropSpace(job->in);
  -     njob = rpmzqDelRJob(zq, seq++);
  +     zq->qi->in = rpmzqDropSpace(zq->qi->in);
  +
  +     yarnPossess(zq->qi_first);
  +     yarnTwist(zq->qi_first, TO, -1);
  +
  +     /* steal the queued read job following zq->qi. */
  +     {   rpmzJob ojob;
  +         rpmzJob njob;
  +
  +         yarnPossess(zq->qi_first);
  +         ojob = zq->qi;
  +         njob = ojob->next;
   assert(njob != NULL);
  -     job->in = njob->in;
  -     job->seq = njob->seq;   /* XXX propagate job->seq for now */
  -     njob->in = NULL;
  -     njob = rpmzqDropJob(njob);
  +         zq->qi = njob;
  +         yarnRelease(zq->qi_first);
  +         rpmzqDropJob(ojob);
  +     }
   
        /* if not at end of file, alert read thread to load next buffer,
         * alternate between in_buf and in_buf2 */
  -     if (job->in->len == zq->_in_buf_allocated) {
  +     if (zq->qi->in->len == zq->_in_buf_allocated) {
            zq->_in_which = 1 - zq->_in_which;
            yarnPossess(zq->load_state);
            yarnTwist(zq->load_state, TO, 1);
  @@ -1417,28 +1431,28 @@
       {        size_t nread;
        /* don't use threads -- free old buffer, malloc and read a new buffer */
        
  -     job->in = rpmzqDropSpace(job->in);
  -     job->in = rpmzqNewSpace(NULL, zq->_in_buf_allocated);
  -     nread = rpmzRead(zq, job->in->buf, job->in->len);
  -     job->in->len = nread;
  +     zq->qi->in = rpmzqDropSpace(zq->qi->in);
  +     zq->qi->in = rpmzqNewSpace(NULL, zq->_in_buf_allocated);
  +     nread = rpmzRead(zq, zq->qi->in->buf, zq->qi->in->len);
  +     zq->qi->in->len = nread;
       }
   
       /* note end of file */
  -    if (job->in->len < zq->_in_buf_allocated) {
  -     job->more = -1;
  +    if (zq->qi->in->len < zq->_in_buf_allocated) {
  +     zq->qi->more = -1;
   
        /* if we got bupkis, now is the time to mark eof */
  -     if (job->in->len == 0)
  -         job->more = 0;
  +     if (zq->qi->in->len == 0)
  +         zq->qi->more = 0;
       } else
  -     job->more = 1;
  +     zq->qi->more = 1;
   
   if (_debug)
  -jobDebug("loaded", zq->_job);
  +jobDebug("loaded", zq->qi);
   
       /* update the total and return the available bytes */
  -    zq->in_tot += job->in->len;
  -    return job->in->len;
  +    zq->in_tot += zq->qi->in->len;
  +    return zq->qi->in->len;
   }
   /*...@=mustmod@*/
   
  @@ -1447,29 +1461,22 @@
   static void in_init(rpmzQueue zq)
        /*...@modifies zq @*/
   {
  -    rpmzJob job;
  -    if (zq->_job == NULL) {
  +    if (zq->qi == NULL) {
        /* inflateBack() window is a function of windowBits */
        size_t _out_len = (1 << infWBits);
        if (zq->threads > 1) {
  -         if (zq->read_first == NULL)
  -             zq->read_first = yarnNewLock(-1);
  +         if (zq->qi_first == NULL)
  +             zq->qi_first = yarnNewLock(-1);
            if (zq->load_ipool == NULL)
                zq->load_ipool = rpmzqNewPool(zq->_in_buf_allocated, 
(zq->threads << 1) + 2);
            if (zq->load_opool == NULL) 
                zq->load_opool = rpmzqNewPool(_out_len,  2);
        }
  -     zq->_job = rpmzqNewJob(0);
  -    }
  -    job = zq->_job;
  -    job->seq = 0;
  -    job->more = 1;
  -    if (job->in != NULL) {
  -     job->in->len = 0;
  -     job->in->buf = NULL;
  +     zq->qi = rpmzqNewJob(0);
       }
  -    job->check = 0;
  -    job->next = NULL;
  +    zq->qi->seq = 0;
  +    zq->qi->more = 1;
  +    zq->qi->next = NULL;
       zq->in_tot = 0;
   #ifndef _PIGZNOTHREAD
       zq->_in_which = -1;
  @@ -1482,46 +1489,60 @@
                /*...@null@*/ unsigned char *buf, size_t len)
        /*...@modifies *buf @*/
   {
  -    rpmzJob job;
       size_t togo = len;
       size_t got = 0;
  +    int rc;
   
       /* initialize input buffer */
  -    if (zq->_job == NULL)
  +    if (zq->qi == NULL)
        in_init(zq);
  -    job = zq->_job;
   
  -assert(job != NULL);
  -    if (!job->more || (!(job->in && job->in->len) && load(zq) == 0))
  -     return got;
  -    while (togo > job->in->len) {
  +    yarnPossess(zq->qi_first);
  +
  +    if (!zq->qi->more)
  +     goto exit;
  +    if (!(zq->qi->in && zq->qi->in->len)) {
  +     yarnRelease(zq->qi_first);
  +     rc = load(zq);
  +     yarnPossess(zq->qi_first);
  +     if (rc == 0)
  +         goto exit;
  +    }
  +
  +    while (togo > zq->qi->in->len) {
        if (buf != NULL) {
  -         memcpy(buf, job->in->buf, job->in->len);
  -         buf += job->in->len;
  +         memcpy(buf, zq->qi->in->buf, zq->qi->in->len);
  +         buf += zq->qi->in->len;
        }
  -     got += job->in->len;
  -     togo -= job->in->len;
  -     {   unsigned char * _buf = job->in->buf;
  -         _buf += job->in->len;
  -         job->in->buf = _buf;        /* XXX don't change job->in->buf?!? */
  -     }
  -     job->in->len -= job->in->len;
  -     if (load(zq) == 0)
  -         return got;
  +     got += zq->qi->in->len;
  +     togo -= zq->qi->in->len;
  +     {   unsigned char * _buf = zq->qi->in->buf;
  +         _buf += zq->qi->in->len;
  +         zq->qi->in->buf = _buf;     /* XXX don't change job->in->buf?!? */
  +     }
  +     zq->qi->in->len -= zq->qi->in->len;
  +
  +     yarnRelease(zq->qi_first);
  +     rc = load(zq);
  +     yarnPossess(zq->qi_first);
  +     if (rc == 0)
  +         goto exit;
       }
       if (togo > 0) {
        if (buf != NULL) {
  -         memcpy(buf, job->in->buf, togo);
  +         memcpy(buf, zq->qi->in->buf, togo);
            buf += togo;
        }
  -     {   unsigned char * _buf = job->in->buf;
  +     {   unsigned char * _buf = zq->qi->in->buf;
            _buf += togo;
  -         job->in->buf = _buf;        /* XXX don't change job->in->buf?!? */
  +         zq->qi->in->buf = _buf;     /* XXX don't change job->in->buf?!? */
        }
  -     job->in->len -= togo;
  +     zq->qi->in->len -= togo;
        got += togo;
        togo -= togo;
       }
  +exit:
  +    yarnRelease(zq->qi_first);
       return got;
   }
   
  @@ -1664,7 +1685,7 @@
       int rc;
   
   if (_debug)
  -jobDebug(" start", zq->_job);
  +jobDebug(" start", zq->qi);
   
       /* clear return information */
       if (save) {
  @@ -1840,7 +1861,7 @@
       char name[NAMEMAX1+1];  /* header or file name, possibly truncated */
   
   if (_debug)
  -jobDebug("  show", zq->_job);
  +jobDebug("  show", zq->qi);
   
       /* create abbreviated name from header file name or actual file name */
       max = zq->verbosity > 1 ? NAMEMAX2 : NAMEMAX1;
  @@ -1914,7 +1935,7 @@
        /*...@globals fileSystem, internalState @*/
        /*...@modifies zq, fileSystem, internalState @*/
   {
  -    rpmzJob job = zq->_job;
  +    rpmzJob job = zq->qi;
       rpmzh zh = zq->_zh;
       int method;             /* rpmzGetHeader() return value */
       size_t n;               /* available trailer bytes */
  @@ -1925,7 +1946,7 @@
       unsigned char * bufend;
   
   if (_debug)
  -jobDebug("  list", zq->_job);
  +jobDebug("  list", zq->qi);
   
       /* read header information and position input after header */
       method = rpmzGetHeader(zq, 1);
  @@ -2054,16 +2075,14 @@
        /*...@modifies _zq, *buf, fileSystem, internalState @*/
   {
       rpmzQueue zq = _zq;
  -    rpmzJob job = zq->_job;
   
  -if (_debug)
  -jobDebug("   inb", zq->_job);
  +    load(zq);
   
  -assert(job != NULL);
  +if (_debug)
  +jobDebug("  post", zq->qi);
   
  -    load(zq);
  -    *buf = job->in->buf;
  -    return job->in->len;
  +    *buf = zq->qi->in->buf;
  +    return zq->qi->in->len;
   }
   
   #ifndef      _PIGZNOTHREAD
  @@ -2074,24 +2093,20 @@
   {
       rpmzQueue zq = _zq;
       rpmzLog zlog = zq->zlog;
  -    rpmzJob job = zq->_job;
       size_t nwrote;
   
  -assert(job != NULL);
       Trace((zlog, "-- launched decompress write thread"));
       do {
        yarnPossess(zq->outb_write_more);
        yarnWaitFor(zq->outb_write_more, TO_BE, 1);
  -#ifdef       NOISY
  +
   if (_debug)
  -jobDebug(" write", zq->_job);
  -#endif
  -assert(job->out != NULL);
  -assert(job->out->buf != NULL);
  -     nwrote = job->out->len;
  +jobDebug(" write", zq->qo);
  +     nwrote = zq->qo->out->len;
        if (nwrote && zq->mode == RPMZ_MODE_DECOMPRESS)
  -         rpmzWrite(zq, job->out->buf, job->out->len);
  -     rpmzqDropSpace(job->out);
  +         rpmzWrite(zq, zq->qo->out->buf, zq->qo->out->len);
  +     rpmzqDropSpace(zq->qo->out);
  +
        Trace((zlog, "-- decompress wrote %lu bytes", nwrote));
        yarnTwist(zq->outb_write_more, TO, 0);
       } while (nwrote);
  @@ -2105,24 +2120,20 @@
   {
       rpmzQueue zq = _zq;
       rpmzLog zlog = zq->zlog;
  -    rpmzJob job = zq->_job;
       size_t nchecked;
   
  -assert(job != NULL);
       Trace((zlog, "-- launched decompress check thread"));
       do {
        yarnPossess(zq->outb_check_more);
        yarnWaitFor(zq->outb_check_more, TO_BE, 1);
  -#ifdef       NOISY
  +
   if (_debug)
  -jobDebug(" check", zq->_job);
  -#endif
  -assert(job->out != NULL);
  -assert(job->out->buf != NULL);
  -     nchecked = job->out->len;
  +jobDebug(" check", zq->qo);
  +     nchecked = zq->qo->out->len;
        if (nchecked > 0)
  -         job->check = CHECK(job->check, job->out->buf, nchecked);
  -     rpmzqDropSpace(job->out);
  +         zq->qo->check = CHECK(zq->qo->check, zq->qo->out->buf, nchecked);
  +     rpmzqDropSpace(zq->qo->out);
  +
        Trace((zlog, "-- decompress checked %lu bytes", nchecked));
        yarnTwist(zq->outb_check_more, TO, 0);
       } while (nchecked);
  @@ -2139,17 +2150,12 @@
        /*...@modifies fileSystem, internalState @*/
   {
       rpmzQueue zq = _zq;
  -    rpmzJob job = zq->_job;
   #ifndef _PIGZNOTHREAD
   /*...@only@*/ /*...@relnull@*/
       static yarnThread wr;
   /*...@only@*/ /*...@relnull@*/
       static yarnThread ch;
   
  -if (_debug)
  -jobDebug("  outb", zq->_job);
  -
  -assert(job != NULL);
       if (zq->threads > 1) {
        /* if first time, initialize state and launch threads */
        if (zq->outb_write_more == NULL) {
  @@ -2167,19 +2173,21 @@
        yarnPossess(zq->outb_write_more);
        yarnWaitFor(zq->outb_write_more, TO_BE, 0);
   
  +if (_debug)
  +jobDebug("  outb", zq->qo);
        /* queue the output and alert the worker bees */
  -     job->out = rpmzqNewSpace(zq->load_opool, zq->load_opool->size);
  -assert(job->out->len >= len);
  -     job->out->len = len;
  +     zq->qo->out = rpmzqNewSpace(zq->load_opool, zq->load_opool->size);
  +assert(zq->qo->out->len >= len);
  +     zq->qo->out->len = len;
        if (len > 0) {
            zq->out_tot += len;
   assert(buf != NULL);
            /* XXX this memcpy cannot be avoided wuth inflateBack. */
   /*...@-mayaliasunique@*/
  -         memcpy(job->out->buf, buf, len);
  +         memcpy(zq->qo->out->buf, buf, len);
   /*...@=mayaliasunique@*/
        }
  -     rpmzqUseSpace(job->out);        /* XXX nrefs++ for check. */
  +     rpmzqUseSpace(zq->qo->out);     /* XXX nrefs++ for check. */
   
        yarnTwist(zq->outb_write_more, TO, 1);
        yarnTwist(zq->outb_check_more, TO, 1);
  @@ -2198,7 +2206,7 @@
            wr = yarnJoin(wr);
            zq->outb_check_more = yarnFreeLock(zq->outb_check_more);
            zq->outb_write_more = yarnFreeLock(zq->outb_write_more);
  -         job->out = NULL;
  +
        }
   
        /* return for more decompression while last buffer is being written
  @@ -2209,7 +2217,7 @@
       if (len) {               /* if no threads, then do it without threads */
        if (zq->mode == RPMZ_MODE_DECOMPRESS)
            rpmzWrite(zq, buf, len);
  -     job->check = CHECK(job->check, buf, len);
  +     zq->qo->check = CHECK(zq->qo->check, buf, len);
        zq->out_tot += len;
       }
   
  @@ -2225,7 +2233,6 @@
        /*...@globals fileSystem, internalState @*/
        /*...@modifies zq, fileSystem, internalState @*/
   {
  -    rpmzJob job = zq->_job;
       rpmzh zh = zq->_zh;
   
       int ret;
  @@ -2240,18 +2247,21 @@
   
   if (_debug) {
   fprintf(stderr, "\n");
  -jobDebug("  init", zq->_job);
  +jobDebug("  init", zq->qi);
   }
   
  -assert(job != NULL);
  -assert(job->out == NULL);
  -
       cont = 0;
       do {
        /* header already read -- set up for decompression */
  -     zq->in_tot = job->in->len;      /* track compressed data length */
  +
  +        zq->qo = rpmzqNewJob(0);
  +
  +     yarnPossess(zq->qi_first);
  +     zq->in_tot = zq->qi->in->len;   /* track compressed data length */
  +     yarnRelease(zq->qi_first);
  +
        zq->out_tot = 0;
  -     job->check = CHECK(0L, Z_NULL, 0);
  +     zq->qo->check = CHECK(0L, Z_NULL, 0);
   
       {        
        z_stream strm;
  @@ -2260,7 +2270,7 @@
        unsigned char * _out_buf = xmalloc(_out_len);
   
   if (_debug)
  -jobDebug("before", zq->_job);
  +jobDebug("before", zq->qi);
   
        strm.zalloc = Z_NULL;
        strm.zfree = Z_NULL;
  @@ -2270,17 +2280,24 @@
            bail("not enough memory", "");
   
        /* decompress, compute lengths and check value */
  -     strm.avail_in = job->in->len;
  +     yarnPossess(zq->qi_first);
  +     strm.avail_in = zq->qi->in->len;
   /*...@-sharedtrans@*/
  -     strm.next_in = job->in->buf;
  +     strm.next_in = zq->qi->in->buf;
   /*...@=sharedtrans@*/
  +     yarnRelease(zq->qi_first);
  +
        ret = inflateBack(&strm, inb, zq, outb, zq);
        if (ret != Z_STREAM_END)
            bail("corrupted input -- invalid deflate data: ", zq->ifn);
  -     job->in->len = strm.avail_in;
  +
  +     yarnPossess(zq->qi_first);
  +     zq->qi->in->len = strm.avail_in;
   /*...@-onlytrans@*/
  -     job->in->buf = strm.next_in;
  +     zq->qi->in->buf = strm.next_in;
   /*...@=onlytrans@*/
  +     yarnRelease(zq->qi_first);
  +
   /*...@-noeffect@*/
        inflateBackEnd(&strm);
   /*...@=noeffect@*/
  @@ -2289,12 +2306,14 @@
        _out_buf = _free(_out_buf);
   
   if (_debug)
  -jobDebug(" after", zq->_job);
  +jobDebug(" after", zq->qi);
   
       }
   
  +     yarnPossess(zq->qi_first);
        /* compute compressed data length */
  -     clen = zq->in_tot - job->in->len;
  +     clen = zq->in_tot - zq->qi->in->len;
  +     yarnRelease(zq->qi_first);
   
        /* read and check trailer */
        switch (zq->format) {
  @@ -2306,8 +2325,8 @@
            zh->zip_ulen = GET4();
   
            /* if crc doesn't match, try info-zip variant with sig */
  -         if (zh->zip_crc != job->check) {
  -             if (zh->zip_crc != 0x08074b50UL || zh->zip_clen != job->check)
  +         if (zh->zip_crc != zq->qo->check) {
  +             if (zh->zip_crc != 0x08074b50UL || zh->zip_clen != 
zq->qo->check)
                    bail("corrupted zip entry -- crc32 mismatch: ", zq->ifn);
                zh->zip_crc = zh->zip_clen;
                zh->zip_clen = zh->zip_ulen;
  @@ -2333,14 +2352,14 @@
            check += GET() << 16;
            check += GET() << 8;
            check += GET();
  -         if (check != job->check)
  +         if (check != zq->qo->check)
                bail("corrupted zlib stream -- adler32 mismatch: ", zq->ifn);
            break;
        case RPMZ_FORMAT_GZIP:  /* gzip trailer */
            BPULL(_b, 8, "corrupted gzip stream -- missing trailer: ");
            check = GET4();
            len = GET4();
  -         if (check != job->check)
  +         if (check != zq->qo->check)
                bail("corrupted gzip stream -- crc32 mismatch: ", zq->ifn);
            if (len != (zq->out_tot & LOW32))
                bail("corrupted gzip stream -- length mismatch: ", zq->ifn);
  @@ -2363,10 +2382,13 @@
       if (ret != -1 && (zq->format == RPMZ_FORMAT_GZIP || zq->format == 
RPMZ_FORMAT_ZLIB))
        fprintf(stderr, "%s OK, has trailing junk which was ignored\n", 
zq->ifn);
   
  -    zq->_job->in = rpmzqDropSpace(zq->_job->in);
  -    zq->_job = rpmzqDropJob(zq->_job);
  +    zq->qi->in = rpmzqDropSpace(zq->qi->in);
  +    zq->qi = rpmzqDropJob(zq->qi);
  +    zq->qo->out = rpmzqDropSpace(zq->qo->out);
  +    zq->qo = rpmzqDropJob(zq->qo);
   if (_debug)
  -jobDebug("finish", zq->_job);
  +jobDebug("finish", zq->qi);
  +
   }
   /*...@=nullstate@*/
   
  @@ -2402,7 +2424,7 @@
        /*...@globals fileSystem, internalState @*/
        /*...@modifies zq, fileSystem, internalState @*/
   {
  -    rpmzJob job = zq->_job;
  +    rpmzJob job = zq->qi;
       /* XXX LZW _out_buf scales with windowBits like inflateBack()? */
       size_t _out_len = (1 << infWBits);
       unsigned char * _out_buf = xmalloc(_out_len);
  @@ -3191,12 +3213,18 @@
   
   exit:
       /* done -- release resources, show log */
  -    if (zq->_job != NULL) {
  -     rpmzJob job = zq->_job;
  +    if (zq->qi != NULL) {
  +     rpmzJob job = zq->qi;
        job->in = rpmzqDropSpace(job->in);
  +     job = NULL;
  +     zq->qi = rpmzqDropJob(zq->qi);
  +    }
  +     
  +    if (zq->qo != NULL) {
  +     rpmzJob job = zq->qo;
        job->out = rpmzqDropSpace(job->out);
        job = NULL;
  -     zq->_job = rpmzqDropJob(zq->_job);
  +     zq->qo = rpmzqDropJob(zq->qo);
       }
        
       if (zq->load_ipool != NULL) {
  @@ -3211,8 +3239,8 @@
        zq->load_opool = rpmzqFreePool(zq->load_opool, &caught);
        Trace((zlog, "-- freed %d output buffers", caught));
       }
  -    if (zq->read_first != NULL)
  -     zq->read_first = yarnFreeLock(zq->read_first);
  +    if (zq->qi_first != NULL)
  +     zq->qi_first = yarnFreeLock(zq->qi_first);
       zq->_zh = _free(zq->_zh);
   
       rpmzNewOpts(zq);
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmzq.c
  ============================================================================
  $ cvs diff -u -r1.16 -r1.17 rpmzq.c
  --- rpm/rpmio/rpmzq.c 12 Mar 2009 17:07:32 -0000      1.16
  +++ rpm/rpmio/rpmzq.c 13 Mar 2009 16:29:29 -0000      1.17
  @@ -502,13 +502,15 @@
       return job;
   }
   
  -void rpmzqUseJob(rpmzJob job)
  +rpmzJob rpmzqUseJob(rpmzJob job)
   {
       int use;
  +    if (job == NULL) return NULL;
       yarnPossess(job->use);
       use = yarnPeekLock(job->use);
   zqFprintf(stderr, "    ++ job %p[%ld] use %d\n", job, job->seq, use+1);
       yarnTwist(job->use, BY, 1);
  +    return job;
   }
   
   /* drop a job, returning it to the pool if the use count is zero */
  @@ -745,14 +747,14 @@
       rpmzJob job;
   
       /* get next read job in order */
  -    yarnPossess(zq->read_first);
  -    yarnWaitFor(zq->read_first, TO_BE, seq);
  -    job = zq->read_head;
  +    yarnPossess(zq->qi_first);
  +    yarnWaitFor(zq->qi_first, TO_BE, seq);
  +    job = zq->qi;
   assert(job != NULL);
   /*...@-assignexpose -dependenttr...@*/
  -    zq->read_head = job->next;
  +    zq->qi = job->next;
   /*...@=assignexpose =dependenttr...@*/
  -    yarnTwist(zq->read_first, TO, zq->read_head == NULL ? -1 : 
zq->read_head->seq);
  +    yarnTwist(zq->qi_first, TO, zq->qi == NULL ? -1 : zq->qi->seq);
       return job;
   }
   
  @@ -761,10 +763,10 @@
       rpmzJob here;            /* pointers for inserting in read list */
       rpmzJob * prior;         /* pointers for inserting in read list */
   
  -    yarnPossess(zq->read_first);
  +    yarnPossess(zq->qi_first);
   
       /* insert read job in list in sorted order, alert read thread */
  -    prior = &zq->read_head;
  +    prior = &zq->qi;
       while ((here = *prior) != NULL) {
        if (here->seq > job->seq)
            break;
  @@ -775,7 +777,7 @@
   /*...@=onlytrans@*/
       *prior = job;
   
  -    yarnTwist(zq->read_first, TO, zq->read_head->seq);
  +    yarnTwist(zq->qi_first, TO, zq->qi->seq);
   }
   
   static rpmzJob rpmzqFillOut(rpmzQueue zq, /*...@returned@*/rpmzJob job, 
rpmbz bz)
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmzq.h
  ============================================================================
  $ cvs diff -u -r1.24 -r1.25 rpmzq.h
  --- rpm/rpmio/rpmzq.h 12 Mar 2009 17:07:32 -0000      1.24
  +++ rpm/rpmio/rpmzq.h 13 Mar 2009 16:29:29 -0000      1.25
  @@ -252,16 +252,18 @@
   #endif
   /*...@only@*/ /*...@relnull@*/
       rpmzh _zh;                       /*!< compressed file header info 
(malloc'd). */
  -/*...@owned@*/ /*...@relnull@*/
  -    rpmzJob _job;            /*!< decompress job (malloc'd). */
   
   /* --- globals for decompression and listing buffered reading */
       int _in_which;           /*!< -1: start */
   
   /*...@only@*/ /*...@null@*/
  -    yarnLock read_first;     /*!< lowest sequence number in list */
  +    yarnLock qi_first;               /*!< lowest sequence number in list */
   /*...@null@*/
  -    rpmzJob read_head;               /*!< list of read jobs */
  +    rpmzJob qi;                      /*!< list of decompress input jobs */
  +/*...@only@*/ /*...@null@*/
  +    yarnLock qo_first;               /*!< lowest sequence number in list */
  +/*...@null@*/
  +    rpmzJob qo;                      /*!< list of decompress output jobs */
   
   #define IN_BUF_ALLOCATED 32768U      /* input buffer size */
       size_t _in_buf_allocated;
  @@ -349,14 +351,15 @@
   
   /**
    */
  -void rpmzqUseJob(rpmzJob job)
  +/*...@newref@*/ /*...@null@*/
  +rpmzJob rpmzqUseJob(/*...@null@*/ rpmzJob job)
        /*...@globals fileSystem, internalState @*/
        /*...@modifies job, fileSystem, internalState @*/;
   
   /**
    */
   /*...@null@*/
  -rpmzJob rpmzqDropJob(/*...@only@*/ /*...@null@*/ rpmzJob job)
  +rpmzJob rpmzqDropJob(/*...@killref@*/ /*...@null@*/ rpmzJob job)
        /*...@globals fileSystem, internalState @*/
        /*...@modifies job, fileSystem, internalState @*/;
   
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to