RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: [email protected] Module: rpm Date: 29-May-2017 22:44:36 Branch: rpm-5_4 Handle: 2017052920443600 Added files: (Branch: rpm-5_4) rpm/rpmio aiocat.c Log: - create. Summary: Revision Changes Path 1.1.2.1 +770 -0 rpm/rpmio/aiocat.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/aiocat.c ============================================================================ $ cvs diff -u -r0 -r1.1.2.1 aiocat.c --- /dev/null 2017-05-29 22:44:00.000000000 +0200 +++ aiocat.c 2017-05-29 22:44:36.448539451 +0200 @@ -0,0 +1,770 @@ +/* ============================================================================ +|| aiocat.c : This highly artificial example demonstrates asynchronous I/O. +|| +|| The command syntax is: +|| aiocat [ -o outfile ] [-a {0|1|2|3} ] infilename... +|| +|| The output file is given by -o, with $TMPDIR/aiocat.out by default. +|| The aio method of waiting for completion is given by -a as follows: +|| -a 0 poll for completion with aio_error() (default) +|| -a 1 wait for completion with aio_suspend() +|| -a 2 wait on a semaphore posted from a signal handler +|| -a 3 wait on a semaphore posted from a callback routine +|| +|| Up to MAX_INFILES input files may be specified. Each input file is +|| read in BLOCKSIZE units. The output file contains the data from +|| the input files in the order they were specified. Thus the +|| output should be the same as "cat infilename... >outfile". +|| +|| When DO_SPROCS is compiled true, all I/O is done asynchronously +|| and concurrently using one sproc'd process per file. Thus in a +|| multiprocessor concurrent input can be done. +============================================================================ */ +#include "../config.h" +#define _SGI_MP_SOURCE /* see the "Caveats" section of sproc(2) */ +#include <sys/time.h> /* for clock() */ +#include <errno.h> /* for perror() */ +#include <stdio.h> /* for printf() */ +#include <stdlib.h> /* for getenv(), malloc(3c) */ +#include <ulocks.h> /* usinit() & friends */ +#include <bstring.h> /* for bzero() */ +#include <sys/resource.h> /* for prctl, get/setrlimit() */ +#include <sys/prctl.h> /* for prctl() */ +#include <sys/types.h> /* required by lseek(), prctl */ +#include <unistd.h> /* ditto */ +#include <sys/types.h> /* wanted by sproc() */ +#include <sys/prctl.h> /* ditto */ +#include <signal.h> /* for signals - gets sys/signal and sys/siginfo */ +#include <aio.h> /* async I/O */ +#define BLOCKSIZE 2048 /* input units -- play with this number */ +#define MAX_INFILES 10 /* max sprocs: anything from 4 to 20 or so */ +#define DO_SPROCS 1 /* set 0 to do all I/O in a single process */ +#define QUITIFNULL(PTR,MSG) if (NULL==PTR) {perror(MSG);return(errno);} +#define QUITIFMONE(INT,MSG) if (-1==INT) {perror(MSG);return(errno);} +/***************************************************************************** +|| The following structure contains the info needed by one child proc. +|| The main program builds an array of MAX_INFILES of these. +|| The reason for storing the actual filename here (not a pointer) is +|| to force the struct to >128 bytes. Then, when the procs run in +|| different CPUs on a CHALLENGE, the info structs will be in different +|| cache lines, and a store by one proc will not invalidate a cache line +|| for its neighbor proc. +*/ +typedef struct child +{ + /* read-only to child */ + char fname[100]; /* input filename from argv[n] */ + int fd; /* FD for this file */ + void* buffer; /* buffer for this file */ + int procid; /* process ID of child process */ + off_t fsize; /* size of this input file */ + /* read-write to child */ + usema_t* sema; /* semaphore used by methods 2 & 3 */ + off_t outbase; /* starting offset in output file */ + off_t inbase; /* current offset in input file */ + clock_t etime; /* sum of utime/stime to read file */ + aiocb_t acb; /* aiocb used for reading and writing */ +} child_t; +/****************************************************************************** +|| Globals, accessible to all processes +*/ +char* ofName = NULL; /* output file name string */ +int outFD; /* output file descriptor */ +usptr_t* arena; /* arena where everything is built */ +barrier_t* convene; /* barrier used to sync up */ +int nprocs = 1; /* 1 + number of child procs */ +child_t* array; /* array of child_t structs in arena */ +int errors = 0; /* always incremented on an error */ +/****************************************************************************** +|| forward declaration of the child process functions +*/ +void inProc0(void *arg, size_t stk); /* polls with aio_error() */ +void inProc1(void *arg, size_t stk); /* uses aio_suspend() */ +void inProc2(void *arg, size_t stk); /* uses a signal and semaphore */ +void inProc3(void *arg, size_t stk); /* uses a callback and semaphore */ +/****************************************************************************** +// The main() +*/ +int main(int argc, char **argv) +{ + char* tmpdir; /* ->name string of temp dir */ + int nfiles; /* how many input files on cmd line */ + int argno; /* loop counter */ + child_t* pc; /* ->child_t of current file */ + void (*method)(void *,size_t) = inProc0; /* ->chosen input method */ + char arenaPath[128]; /* build area for arena pathname */ + char outPath[128]; /* build area for output pathname */ + /* + || Ensure the name of a temporary directory. + */ + tmpdir = getenv("TMPDIR"); + if (!tmpdir) tmpdir = "/var/tmp"; + /* + || Build a name for the arena file. + */ + strcpy(arenaPath,tmpdir); + strcat(arenaPath,"/aiocat.wrk"); + /* + || Create the arena. First, call usconfig() to establish the + || minimum size (twice the buffer size per file, to allow for misc usage) + || and the (maximum) number of processes that may later use + || this arena. For this program that is MAX_INFILES+10, allowing + || for our sprocs plus those done by aio_sgi_init(). + || These values apply to any arenas made subsequently, until changed. + */ + { + ptrdiff_t ret; + ret = usconfig(CONF_INITSIZE,2*BLOCKSIZE*MAX_INFILES); + QUITIFMONE(ret,"usconfig size") + ret = usconfig(CONF_INITUSERS,MAX_INFILES+10); + QUITIFMONE(ret,"usconfig users") + arena = usinit(arenaPath); + QUITIFNULL(arena,"usinit") + } + /* + || Allocate the barrier. + */ + convene = new_barrier(arena); + QUITIFNULL(convene,"new_barrier") + /* + || Allocate the array of child info structs and zero it. + */ + array = (child_t*)usmalloc(MAX_INFILES*sizeof(child_t),arena); + QUITIFNULL(array,"usmalloc") + bzero((void *)array,MAX_INFILES*sizeof(child_t)); + /* + || Loop over the arguments, setting up child structs and + || counting input files. Quit if a file won't open or seek, + || or if we can't get a buffer or semaphore. + */ + for (nfiles=0, argno=1; argno < argc; ++argno ) + { + if (0 == strcmp(argv[argno],"-o")) + { /* is the -o argument */ + ++argno; + if (argno < argc) + ofName = argv[argno]; + else + { + fprintf(stderr,"-o must have a filename after\n"); + return -1; + } + } + else if (0 == strcmp(argv[argno],"-a")) + { /* is the -a argument */ + char c = argv[++argno][0]; + switch(c) + { + case '0' : method = inProc0; break; + case '1' : method = inProc1; break; + case '2' : method = inProc2; break; + case '3' : method = inProc3; break; + default: + { + fprintf(stderr,"unknown method -a %c\n",c); + return -1; + } + } + } + else if ('-' == argv[argno][0]) + { /* is unknown -option */ + fprintf(stderr,"aiocat [-o outfile] [-a 0|1|2|3] infiles...\n"); + return -1; + } + else + { /* neither -o nor -a, assume input file */ + if (nfiles < MAX_INFILES) + { + /* + || save the filename + */ + pc = &array[nfiles]; + strcpy(pc->fname,argv[argno]); + /* + || allocate a buffer and a semaphore. Not all + || child procs use the semaphore but so what? + */ + pc->buffer = usmalloc(BLOCKSIZE,arena); + QUITIFNULL(pc->buffer,"usmalloc(buffer)") + pc->sema = usnewsema(arena,0); + QUITIFNULL(pc->sema,"usnewsema") + /* + || open the file + */ + pc->fd = open(pc->fname,O_RDONLY); + QUITIFMONE(pc->fd,"open") + /* + || get the size of the file. This leaves the file + || positioned at-end, but there is no need to reposition + || because all aio_read calls have an implied lseek. + || NOTE: there is no check for zero-length file; that + || is a valid (and interesting) test case. + */ + pc->fsize = lseek(pc->fd,0,SEEK_END); + QUITIFMONE(pc->fsize,"lseek") + /* + || set the starting base address of this input file + || in the output file. The first file starts at 0. + || Each one after starts at prior base + prior size. + */ + if (nfiles) /* not first */ + pc->outbase = + array[nfiles-1].fsize + array[nfiles-1].outbase; + ++nfiles; + } + else + { + printf("Too many files, %s ignored\n",argv[argno]); + } + } + } /* end for(argc) */ + /* + || If there was no -o argument, construct an output file name. + */ + if (!ofName) + { + strcpy(outPath,tmpdir); + strcat(outPath,"/aiocat.out"); + ofName = outPath; + } + /* + || Open, creating or truncating, the output file. + || Do not use O_APPEND, which would constrain aio to doing + || operations in sequence. + */ + outFD = open(ofName, O_WRONLY+O_CREAT+O_TRUNC,0666); + QUITIFMONE(outFD,"open(output)") + /* + || If there were no input files, just quit, leaving empty output + */ + if (!nfiles) + { + return 0; + } + /* + || Note the number of processes-to-be, for use in initializing + || aio and for use by each child in a barrier() call. + */ + nprocs = 1+nfiles; + /* + || Initialize async I/O using aio_sgi_init(), in order to specify + || a number of locks at least equal to the number of child procs + || and in order to specify extra sproc users. + */ + { + aioinit_t ainit = {0}; /* all fields initially zero */ + /* + || Go with the default 5 for the number of aio-created procs, + || as we have no way of knowing the number of unique devices. + */ +#define AIO_PROCS 5 + ainit.aio_threads = AIO_PROCS; + /* + || Set the number of locks aio needs to the number of procs + || we will start, minimum 3. + */ + ainit.aio_locks = (nprocs > 2)?nprocs:3; + /* + || Warn aio of the number of user procs that will be + || using its arena. + */ + ainit.aio_numusers = nprocs; + aio_sgi_init(&ainit); + } + /* + || Process each input file, either in a child process or in + || a subroutine call, as specified by the DO_SPROCS variable. + */ + for (argno = 0; argno < nfiles; ++argno) + { + pc = &array[argno]; +#if DO_SPROCS +#define CHILD_STACK 64*1024 + /* + || For each input file, start a child process as an instance + || of the selected method (-a argument). + || If an error occurs, quit. That will send a SIGHUP to any + || already-started child, which will kill it, too. + */ + pc->procid = sprocsp(method /* function to start */ + ,PR_SALL /* share all, keep FDs sync'd */ + ,(void *)pc /* argument to child func */ + ,NULL /* absolute stack seg */ + ,CHILD_STACK); /* max stack seg growth */ + QUITIFMONE(pc->procid,"sproc") +#else + /* + || For each input file, call the selected (-a) method as a + || subroutine to copy its file. + */ + fprintf(stderr,"file %s...",pc->fname); + method((void*)pc,0); + if (errors) break; + fprintf(stderr,"done\n"); +#endif + } +#if DO_SPROCS + /* + || Wait for all the kiddies to get themselves initialized. + || When all have started and reached barrier(), all continue. + || If any errors occurred in initialization, quit. + */ + barrier(convene,nprocs); + /* + || Child processes are executing now. Reunite the family round the + || old hearth one last time, when their processing is complete. + || Each child ensures that all its output is complete before it + || invokes barrier(). + */ + barrier(convene,nprocs); +#endif + /* + || Close the output file and print some statistics. + */ + close(outFD); + { + clock_t timesum; + long bytesum; + double bperus; + printf(" procid time fsize filename\n"); + for(argno = 0, timesum = bytesum = 0 ; argno < nfiles ; ++argno) + { + pc = &array[argno]; + timesum += pc->etime; + bytesum += pc->fsize; + printf("%2d: %-8d %-8d %-8d %s\n" + ,argno,pc->procid,pc->etime,pc->fsize,pc->fname); + } + bperus = ((double)bytesum)/((double)timesum); + printf("total time %d usec, total bytes %d, %g bytes/usec\n" + ,timesum , bytesum , bperus); + } + /* + || Unlink the arena file, so it won't exist when this progam runs + || again. If it did exist, it would be used as the initial state of + || the arena, which might or might not have any effect. + */ + unlink(arenaPath); + return 0; +} +/****************************************************************************** +|| inProc0() alternates polling with aio_error() with sginap(). Under +|| the Frame Scheduler, it would use frs_yield() instead of sginap(). +|| The general pattern of this function is repeated in the other three; +|| only the wait method varies from function to function. +*/ +int inWait0(child_t *pch) +{ + int ret; + aiocb_t* pab = &pch->acb; + while (EINPROGRESS == (ret = aio_error(pab))) + { + sginap(0); + } + return ret; +} +void inProc0(void *arg, size_t stk) +{ + child_t *pch = arg; /* starting arg is ->child_t for my file */ + aiocb_t *pab = &pch->acb; /* base address of the aiocb_t in child_t */ + int ret; /* as long as this is 0, all is ok */ + int bytes; /* #bytes read on each input */ + /* + || Initialize -- no signals or callbacks needed. + */ + pab->aio_sigevent.sigev_notify = SIGEV_NONE; + pab->aio_buf = pch->buffer; /* always the same */ +#if DO_SPROCS + /* + || Wait for the starting gun... + */ + barrier(convene,nprocs); +#endif + pch->etime = clock(); + do /* read and write, read and write... */ + { + /* + || Set up the aiocb for a read, queue it, and wait for it. + */ + pab->aio_fildes = pch->fd; + pab->aio_offset = pch->inbase; + pab->aio_nbytes = BLOCKSIZE; + if (ret = aio_read(pab)) + break; /* unable to schedule a read */ + ret = inWait0(pch); + if (ret) + break; /* nonzero read completion status */ + /* + || get the result of the read() call, the count of bytes read. + || Since aio_error returned 0, the count is nonnegative. + || It could be 0, or less than BLOCKSIZE, indicating EOF. + */ + bytes = aio_return(pab); /* actual read result */ + if (!bytes) + break; /* no need to write a last block of 0 */ + pch->inbase += bytes; /* where to read next time */ + /* + || Set up the aiocb for a write, queue it, and wait for it. + */ + pab->aio_fildes = outFD; + pab->aio_nbytes = bytes; + pab->aio_offset = pch->outbase; + if (ret = aio_write(pab)) + break; + ret = inWait0(pch); + if (ret) + break; + pch->outbase += bytes; /* where to write next time */ + } while ((!ret) && (bytes == BLOCKSIZE)); + /* + || The loop is complete. If no errors so far, use aio_fsync() + || to ensure that output is complete. This requires waiting + || yet again. + */ + if (!ret) + { + if (!(ret = aio_fsync(O_SYNC,pab))) + ret = inWait0(pch); + } + /* + || Flag any errors for the parent proc. If none, count elapsed time. + */ + if (ret) ++errors; + else pch->etime = (clock() - pch->etime); +#if DO_SPROCS + /* + || Rendezvous with the rest of the family, then quit. + */ + barrier(convene,nprocs); +#endif + return; +} /* end inProc1 */ +/****************************************************************************** +|| inProc1 uses aio_suspend() to await the completion of each operation. +|| Otherwise it is the same as inProc0, above. +*/ + +int inWait1(child_t *pch) +{ + int ret; + aiocb_t* susplist[1]; /* list of 1 aiocb for aio_suspend() */ + susplist[0] = &pch->acb; + /* + || Note: aio.h declares the 1st argument of aio_suspend() as "const." + || The C compiler requires the actual-parameter to match in type, + || so the list we pass must either be declared "const aiocb_t*" or + || must be cast to that -- else cc gives a warning. The cast + || in the following statement is only to avoid this warning. + */ + ret = aio_suspend( (const aiocb_t **) susplist,1,NULL); + return ret; +} +void inProc1(void *arg, size_t stk) +{ + child_t *pch = arg; /* starting arg is ->child_t for my file */ + aiocb_t *pab = &pch->acb; /* base address of the aiocb_t in child_t */ + int ret; /* as long as this is 0, all is ok */ + int bytes; /* #bytes read on each input */ + /* + || Initialize -- no signals or callbacks needed. + */ + pab->aio_sigevent.sigev_notify = SIGEV_NONE; + pab->aio_buf = pch->buffer; /* always the same */ +#if DO_SPROCS + /* + || Wait for the starting gun... + */ + barrier(convene,nprocs); +#endif + pch->etime = clock(); + do /* read and write, read and write... */ + { + /* + || Set up the aiocb for a read, queue it, and wait for it. + */ + pab->aio_fildes = pch->fd; + pab->aio_offset = pch->inbase; + pab->aio_nbytes = BLOCKSIZE; + if (ret = aio_read(pab)) + break; + ret = inWait1(pch); + /* + || If the aio_suspend() return is nonzero, it means that the wait + || did not end for i/o completion but because of a signal. Since we + || expect no signals here, we take that as an error. + */ + if (!ret) /* op is complete */ + ret = aio_error(pab); /* read() status, should be 0 */ + if (ret) + break; /* signal, or nonzero read completion */ + /* + || get the result of the read() call, the count of bytes read. + || Since aio_error returned 0, the count is nonnegative. + || It could be 0, or less than BLOCKSIZE, indicating EOF. + */ + bytes = aio_return(pab); /* actual read result */ + if (!bytes) + break; /* no need to write a last block of 0 */ + pch->inbase += bytes; /* where to read next time */ + /* + || Set up the aiocb for a write, queue it, and wait for it. + */ + pab->aio_fildes = outFD; + pab->aio_nbytes = bytes; + pab->aio_offset = pch->outbase; + if (ret = aio_write(pab)) + break; + ret = inWait1(pch); + if (!ret) /* op is complete */ + ret = aio_error(pab); /* should be 0 */ + if (ret) + break; + pch->outbase += bytes; /* where to write next time */ + } while ((!ret) && (bytes == BLOCKSIZE)); + /* + || The loop is complete. If no errors so far, use aio_fsync() + || to ensure that output is complete. This requires waiting + || yet again. + */ + if (!ret) + { + if (!(ret = aio_fsync(O_SYNC,pab))) + ret = inWait1(pch); + } + /* + || Flag any errors for the parent proc. If none, count elapsed time. + */ + if (ret) ++errors; + else pch->etime = (clock() - pch->etime); +#if DO_SPROCS + /* + || Rendezvous with the rest of the family, then quit. + */ + barrier(convene,nprocs); +#endif +} /* end inProc0 */ +/****************************************************************************** +|| inProc2 requests a signal upon completion of an I/O. After starting +|| an operation, it P's a semaphore which is V'd from the signal handler. +*/ +#define AIO_SIGNUM SIGRTMIN+1 /* arbitrary choice of signal number */ +void sigHandler2(const int signo, const struct siginfo *sif ) +{ + /* + || In this minimal signal handler we pick up the address of the + || child_t info structure -- which was put in aio_sigevent.sigev_value + || field during initialization -- and use it to find the semaphore. + */ + child_t *pch = sif->si_value.sival_ptr ; + usvsema(pch->sema); + return; /* stop here with dbx to print the above address */ +} +int inWait2(child_t *pch) +{ + /* + || Wait for any signal handler to post the semaphore. The signal + || handler could have been entered before this function is called, + || or it could be entered afterward. + */ + uspsema(pch->sema); + /* + || Since this process executes only one aio operation at a time, + || we can return the status of that operation. In a more complicated + || design, if a signal could arrive from more than one pending + || operation, this function could not return status. + */ + return aio_error(&pch->acb); +} +void inProc2(void *arg, size_t stk) +{ + child_t *pch = arg; /* starting arg is ->child_t for my file */ + aiocb_t *pab = &pch->acb; /* base address of the aiocb_t in child_t */ + int ret; /* as long as this is 0, all is ok */ + int bytes; /* #bytes read on each input */ + /* + || Initialize -- request a signal in aio_sigevent. The address of + || the child_t struct is passed as the siginfo value, for use + || in the signal handler. + */ + pab->aio_sigevent.sigev_notify = SIGEV_SIGNAL; + pab->aio_sigevent.sigev_signo = AIO_SIGNUM; + pab->aio_sigevent.sigev_value.sival_ptr = (void *)pch; + pab->aio_buf = pch->buffer; /* always the same */ + /* + || Initialize -- set up a signal handler for AIO_SIGNUM. + */ + { + struct sigaction sa = {SA_SIGINFO,sigHandler2}; + ret = sigaction(AIO_SIGNUM,&sa,NULL); + if (ret) ++errors; /* parent will shut down ASAP */ + } +#if DO_SPROCS + /* + || Wait for the starting gun... + */ + barrier(convene,nprocs); +#else + if (ret) return; +#endif + pch->etime = clock(); + do /* read and write, read and write... */ + { + /* + || Set up the aiocb for a read, queue it, and wait for it. + */ + pab->aio_fildes = pch->fd; + pab->aio_offset = pch->inbase; + pab->aio_nbytes = BLOCKSIZE; + if (!(ret = aio_read(pab))) + ret = inWait2(pch); + if (ret) + break; /* could not start read, or it ended badly */ + /* + || get the result of the read() call, the count of bytes read. + || Since aio_error returned 0, the count is nonnegative. + || It could be 0, or less than BLOCKSIZE, indicating EOF. + */ + bytes = aio_return(pab); /* actual read result */ + if (!bytes) + break; /* no need to write a last block of 0 */ + pch->inbase += bytes; /* where to read next time */ + /* + || Set up the aiocb for a write, queue it, and wait for it. + */ + pab->aio_fildes = outFD; + pab->aio_nbytes = bytes; + pab->aio_offset = pch->outbase; + if (!(ret = aio_write(pab))) + ret = inWait2(pch); + if (ret) + break; + pch->outbase += bytes; /* where to write next time */ + } while ((!ret) && (bytes == BLOCKSIZE)); + /* + || The loop is complete. If no errors so far, use aio_fsync() + || to ensure that output is complete. This requires waiting + || yet again. + */ + if (!ret) + { + if (!(ret = aio_fsync(O_SYNC,pab))) + ret = inWait2(pch); + } + /* + || Flag any errors for the parent proc. If none, count elapsed time. + */ + if (ret) ++errors; + else pch->etime = (clock() - pch->etime); +#if DO_SPROCS + /* + || Rendezvous with the rest of the family, then quit. + */ + barrier(convene,nprocs); +#endif +} /* end inProc2 */ + +/****************************************************************************** +|| inProc3 uses a callback and a semaphore. It waits with a P operation. +|| The callback function executes a V operation. This may come before or +|| after the P operation. +*/ +void callBack3(union sigval usv) +{ + /* + || The callback function receives the pointer to the child_t struct, + || as prepared in aio_sigevent.sigev_value.sival_ptr. Use this to + || post the semaphore in the child_t struct. + */ + child_t *pch = usv.sival_ptr; + usvsema(pch->sema); + return; +} +int inWait3(child_t *pch) +{ + /* + || Suspend, if necessary, by polling the semaphore. The callback + || function might be entered before we reach this point, or after. + */ + uspsema(pch->sema); + /* + || Return the status of the aio operation associated with the sema. + */ + return aio_error(&pch->acb); +} +void inProc3(void *arg, size_t stk) +{ + child_t *pch = arg; /* starting arg is ->child_t for my file */ + aiocb_t *pab = &pch->acb; /* base address of the aiocb_t in child_t */ + int ret; /* as long as this is 0, all is ok */ + int bytes; /* #bytes read on each input */ + /* + || Initialize -- request a callback in aio_sigevent. The address of + || the child_t struct is passed as the siginfo value to be passed + || into the callback. + */ + pab->aio_sigevent.sigev_notify = SIGEV_CALLBACK; + pab->aio_sigevent.sigev_func = callBack3; + pab->aio_sigevent.sigev_value.sival_ptr = (void *)pch; + pab->aio_buf = pch->buffer; /* always the same */ +#if DO_SPROCS + /* + || Wait for the starting gun... + */ + barrier(convene,nprocs); +#endif + pch->etime = clock(); + do /* read and write, read and write... */ + { + /* + || Set up the aiocb for a read, queue it, and wait for it. + */ + pab->aio_fildes = pch->fd; + pab->aio_offset = pch->inbase; + pab->aio_nbytes = BLOCKSIZE; + if (!(ret = aio_read(pab))) + ret = inWait3(pch); + if (ret) + break; /* read error */ + /* + || get the result of the read() call, the count of bytes read. + || Since aio_error returned 0, the count is nonnegative. + || It could be 0, or less than BLOCKSIZE, indicating EOF. + */ + bytes = aio_return(pab); /* actual read result */ + if (!bytes) + break; /* no need to write a last block of 0 */ + pch->inbase += bytes; /* where to read next time */ + /* + || Set up the aiocb for a write, queue it, and wait for it. + */ + pab->aio_fildes = outFD; + pab->aio_nbytes = bytes; + pab->aio_offset = pch->outbase; + if (!(ret = aio_write(pab))) + ret = inWait3(pch); + if (ret) + break; + pch->outbase += bytes; /* where to write next time */ + } while ((!ret) && (bytes == BLOCKSIZE)); + /* + || The loop is complete. If no errors so far, use aio_fsync() + || to ensure that output is complete. This requires waiting + || yet again. + */ + if (!ret) + { + if (!(ret = aio_fsync(O_SYNC,pab))) + ret = inWait3(pch); + } + /* + || Flag any errors for the parent proc. If none, count elapsed time. + */ + if (ret) ++errors; + else pch->etime = (clock() - pch->etime); +#if DO_SPROCS + /* + || Rendezvous with the rest of the family, then quit. + */ + barrier(convene,nprocs); +#endif +} /* end inProc3 */ @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository [email protected]
