> Date: Thu, 29 May 2014 18:00:28 -0300
> Subject: Re: [HACKERS] Extended Prefetching using Asynchronous IO - proposal 
> and patch
> From: klaussfre...@gmail.com
> To: hlinnakan...@vmware.com
> CC: johnlu...@hotmail.com; pgsql-hackers@postgresql.org
> 
> On Thu, May 29, 2014 at 5:39 PM, Heikki Linnakangas
> <hlinnakan...@vmware.com> wrote:
> > On 05/29/2014 11:34 PM, Claudio Freire wrote:
> >>
> >> On Thu, May 29, 2014 at 5:23 PM, Heikki Linnakangas
> >> <hlinnakan...@vmware.com> wrote:
> >>>
> >>> On 05/29/2014 04:12 PM, John Lumby wrote:
> >>>>
> >>>>
> >>>>> On 05/28/2014 11:52 PM, John Lumby wrote:
> >>>>>
> >>>>> The patch seems to assume that you can put the aiocb struct in shared
> >>>>> memory, initiate an asynchronous I/O request from one process, and wait
> >>>>> for its completion from another process. I'm pretty surprised if that
> >>>>> works on any platform.
> >>>>
> >>>>
> >>>> It works on linux.    Actually this ability allows the asyncio
> >>>> implementation to
> >>>> reduce complexity in one respect (yes I know it looks complex enough) :
> >>>> it makes waiting for completion of an in-progress IO simpler than for
> >>>> the existing synchronous IO case,.   since librt takes care of the
> >>>> waiting.
> >>>> specifically,   no need for extra wait-for-io control blocks
> >>>> such as in bufmgr's  WaitIO()
> >>>
> >>>
> >>> [checks]. No, it doesn't work. See attached test program.

Thanks for checking    and thanks for coming up with that test program.
However,  yes,  it really does work  --  always  (on linux).
Your test program is doing things in the wrong order -
it calls aio_suspend *before* aio_error.
However,  the rule is,  call aio_suspend *after* aio_error
and *only* if aio_error returns EINPROGRESS.

See the code changes to fd.c function FileCompleteaio()
to see how we have done it.   And I am attaching corrected version
of your test program which runs just fine.


> >>>
> >>> It kinda seems to work sometimes, because of the way it's implemented in
> >>> glibc. The aiocb struct has a field for the result value and errno, and
> >>> when
> >>> the I/O is finished, the worker thread fills them in. aio_error() and
> >>> aio_return() just return the values of those fields, so calling
> >>> aio_error()
> >>> or aio_return() do in fact happen to work from a different process.
> >>> aio_suspend(), however, is implemented by sleeping on a process-local
> >>> mutex,
> >>> which does not work from a different process.
> >>>
> >>> Even if it worked on Linux today, it would be a bad idea to rely on it
> >>> from
> >>> a portability point of view. No, the only sane way to make this work is
> >>> that
> >>> the process that initiates an I/O request is responsible for completing
> >>> it.
> >>> If another process needs to wait for an async I/O to complete, we must
> >>> use
> >>> some other means to do the waiting. Like the io_in_progress_lock that we
> >>> already have, for the same purpose.
> >>
> >>
> >> But calls to it are timeouted by 10us, effectively turning the thing
> >> into polling mode.
> >
> >
> > We don't want polling... And even if we did, calling aio_suspend() in a way
> > that's known to be broken, in a loop, is a pretty crappy way of polling.

Well,  as mentioned earlier,  it is not broken.     Whether it is efficient I 
am not sure.
I have looked at the mutex in aio_suspend that you mentioned and I am not
quite convinced that,  if caller is not the original aio_read process,
it renders the suspend() into an instant timeout.      I will see if I can 
verify that.
Where are you (Claudio) seeing 10us?

> 
> 
> Didn't fix that, but the attached patch does fix regression tests when
> scanning over index types other than btree (was invoking elog when the
> index am didn't have ampeeknexttuple)
                                          
/*
 * Test program to test if POSIX aio functions work across processes
 */

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <aio.h>
#include <errno.h>

char *shmem;

void
processA(void)
{
        int fd;
        struct aiocb *aiocbp = (struct aiocb *) shmem;
        char *buf = shmem + sizeof(struct aiocb);

        fd = open("aio-shmem-test-file", O_CREAT | O_WRONLY | O_SYNC, S_IRWXU);
        if (fd == -1)
        {
                fprintf(stderr, "open() failed\n");
                exit(1);
        }
        printf("processA starting AIO\n");

        strcpy(buf, "foobar");

        memset(aiocbp, 0, sizeof(struct aiocb));
        aiocbp->aio_fildes = fd;
        aiocbp->aio_offset = 0;
        aiocbp->aio_buf = buf;
        aiocbp->aio_nbytes = strlen(buf);
        aiocbp->aio_reqprio = 0;
        aiocbp->aio_sigevent.sigev_notify = SIGEV_NONE;

        if (aio_write(aiocbp) != 0)
        {
                fprintf(stderr, "aio_write() failed\n");
                exit(1);
        }
}

void
processB(void)
{
        struct aiocb *aiocbp = (struct aiocb *) shmem;
        const struct aiocb * const pl[1] = { aiocbp };
        int rv;
        int     returnCode;
        struct timespec my_timeout = { 0 , 10000 };
        int max_polls;

        printf("waiting for the write to finish in process B\n");

        rv = aio_error(aiocbp);
        if (rv != 0)
        {
                fprintf(stderr, "aio_error returned %d: %s\n", rv, 
strerror(rv));
                exit(1);
        }
        while (rv == EINPROGRESS) {
                max_polls = 256;
                my_timeout.tv_sec = 0; my_timeout.tv_nsec = 10000;
                returnCode = aio_suspend(pl , 1 , &my_timeout);
                printf("aio_suspend() returned %d\n",returnCode);
                while ((returnCode < 0) && (EAGAIN == errno) && (max_polls-- > 
0)) {
                    my_timeout.tv_sec = 0; my_timeout.tv_nsec = 10000;
                    returnCode = aio_suspend(pl , 1 , &my_timeout);
                }
                rv = aio_error(aiocbp);
        }

        rv = aio_return(aiocbp);
        printf("aio_return returned %d\n", rv);
}



int main(int argc, char **argv)
{
        int pidB;

        shmem = mmap(NULL, sizeof(struct aiocb) + 1000,
                                 PROT_READ | PROT_WRITE, MAP_SHARED | 
MAP_ANONYMOUS,
                                 -1, 0);
        if (shmem == MAP_FAILED)
        {
                fprintf(stderr, "mmap() failed\n");
                exit(1);
        }

#ifdef SINGLE_PROCESS
        /* this works */
        processA();
        processB();
#else
        /*
         * Start the I/O request in parent process, then fork and try to wait
         * for it to finish from the child process. (doesn't work, it will hang
         * forever)
         */
        processA();

        pidB = fork();
        if (pidB == -1)
        {
                fprintf(stderr, "fork() failed\n");
                exit(1);
        }
        if (pidB != 0)
        {
                /* parent */
                wait (pidB);
        }
        else
        {
                /* child */
                processB();
        }
#endif
}
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to