Since you are implementing new APIs here, have you considered doing an
aio_sendfilev to be able to send a header with the data ?

Regards
Suparna

On Wed, Jan 17, 2007 at 09:30:35AM +0300, Evgeniy Polyakov wrote:
> 
> Kevent based AIO (aio_sendfile()/aio_sendfile_path()).
> 
> aio_sendfile()/aio_sendfile_path() contains of two major parts: AIO
> state machine and page processing code.
> The former is just a small subsystem, which allows to queue callback
> for theirs invocation in process' context on behalf of pool of kernel
> threads. It allows to queue caches of callbacks to the local thread
> or to any other specified. Each cache of callbacks is processed until
> there are callbacks in it, callbacks can requeue themselfs into the
> same cache.
> 
> Real work is being done in page processing code - code which populates
> pages into VFS cache and then sends pages to the destination socket
> via ->sendpage(). Unlike previous aio_sendfile() implementation, new
> one does not require low-level filesystem specific callbacks (->get_block())
> at all, instead I extended struct address_space_operations to contain new
> member called ->aio_readpages(), which is exactly the same as ->readpage()
> (read: mpage_readpages()) except different BIO allocation and sumbission
> routines. I changed mpage_readpages() to provide mpage_alloc() and
> mpage_bio_submit() to the new function called __mpage_readpages(), which is
> exactly old mpage_readpages() with provided callback invocation instead of
> usage for old functions. mpage_readpages_aio() provides kevent specific
> callbacks, which calls old functions, but with different destructor callbacks,
> which are essentially the same, except that they reschedule AIO processing.
> 
> aio_sendfile_path() is essentially aio_sendfile(), except that it takes
> source filename as parameter and returns opened file descriptor.
> 
> Benchmark of the 100 1MB files transfer (files are in VFS already) using sync
> sendfile() against aio_sendfile_path() shows about 10MB/sec performance win
> (78 MB/s vs 66-72 MB/s over 1 Gb network, sendfile sending server is one-way
> AMD Athlong 64 3500+) for aio_sendfile_path().
> 
> AIO state machine is a base for network AIO (which becomes
> quite trivial), but I will not start implementation until
> roadback of kevent as a whole and AIO implementation become more clear.
> 
> Signed-off-by: Evgeniy Polyakov <[EMAIL PROTECTED]>
> 
> diff --git a/fs/bio.c b/fs/bio.c
> index 7618bcb..291e7e8 100644
> --- a/fs/bio.c
> +++ b/fs/bio.c
> @@ -120,7 +120,7 @@ void bio_free(struct bio *bio, struct bio_set *bio_set)
>  /*
>   * default destructor for a bio allocated with bio_alloc_bioset()
>   */
> -static void bio_fs_destructor(struct bio *bio)
> +void bio_fs_destructor(struct bio *bio)
>  {
>       bio_free(bio, fs_bio_set);
>  }
> diff --git a/fs/ext3/inode.c b/fs/ext3/inode.c
> index beaf25f..f08c957 100644
> --- a/fs/ext3/inode.c
> +++ b/fs/ext3/inode.c
> @@ -1650,6 +1650,13 @@ ext3_readpages(struct file *file, struct address_space 
> *mapping,
>       return mpage_readpages(mapping, pages, nr_pages, ext3_get_block);
>  }
> 
> +static int
> +ext3_readpages_aio(struct file *file, struct address_space *mapping,
> +             struct list_head *pages, unsigned nr_pages, void *priv)
> +{
> +     return mpage_readpages_aio(mapping, pages, nr_pages, ext3_get_block, 
> priv);
> +}
> +
>  static void ext3_invalidatepage(struct page *page, unsigned long offset)
>  {
>       journal_t *journal = EXT3_JOURNAL(page->mapping->host);
> @@ -1768,6 +1775,7 @@ static int ext3_journalled_set_page_dirty(struct page 
> *page)
>  }
> 
>  static const struct address_space_operations ext3_ordered_aops = {
> +     .aio_readpages  = ext3_readpages_aio,
>       .readpage       = ext3_readpage,
>       .readpages      = ext3_readpages,
>       .writepage      = ext3_ordered_writepage,
> diff --git a/fs/mpage.c b/fs/mpage.c
> index 692a3e5..e5ba44b 100644
> --- a/fs/mpage.c
> +++ b/fs/mpage.c
> @@ -102,7 +102,7 @@ static struct bio *mpage_bio_submit(int rw, struct bio 
> *bio)
>  static struct bio *
>  mpage_alloc(struct block_device *bdev,
>               sector_t first_sector, int nr_vecs,
> -             gfp_t gfp_flags)
> +             gfp_t gfp_flags, void *priv)
>  {
>       struct bio *bio;
> 
> @@ -116,6 +116,7 @@ mpage_alloc(struct block_device *bdev,
>       if (bio) {
>               bio->bi_bdev = bdev;
>               bio->bi_sector = first_sector;
> +             bio->bi_private = priv;
>       }
>       return bio;
>  }
> @@ -175,7 +176,10 @@ map_buffer_to_page(struct page *page, struct buffer_head 
> *bh, int page_block)
>  static struct bio *
>  do_mpage_readpage(struct bio *bio, struct page *page, unsigned nr_pages,
>               sector_t *last_block_in_bio, struct buffer_head *map_bh,
> -             unsigned long *first_logical_block, get_block_t get_block)
> +             unsigned long *first_logical_block, get_block_t get_block,
> +             struct bio *(*alloc)(struct block_device *bdev, sector_t 
> first_sector,
> +                     int nr_vecs, gfp_t gfp_flags, void *priv),
> +             struct bio *(*submit)(int rw, struct bio *bio), void *priv)
>  {
>       struct inode *inode = page->mapping->host;
>       const unsigned blkbits = inode->i_blkbits;
> @@ -302,25 +306,25 @@ do_mpage_readpage(struct bio *bio, struct page *page, 
> unsigned nr_pages,
>        * This page will go to BIO.  Do we need to send this BIO off first?
>        */
>       if (bio && (*last_block_in_bio != blocks[0] - 1))
> -             bio = mpage_bio_submit(READ, bio);
> +             bio = submit(READ, bio);
> 
>  alloc_new:
>       if (bio == NULL) {
> -             bio = mpage_alloc(bdev, blocks[0] << (blkbits - 9),
> +             bio = alloc(bdev, blocks[0] << (blkbits - 9),
>                               min_t(int, nr_pages, bio_get_nr_vecs(bdev)),
> -                             GFP_KERNEL);
> +                             GFP_KERNEL, priv);
>               if (bio == NULL)
>                       goto confused;
>       }
> 
>       length = first_hole << blkbits;
>       if (bio_add_page(bio, page, length, 0) < length) {
> -             bio = mpage_bio_submit(READ, bio);
> +             bio = submit(READ, bio);
>               goto alloc_new;
>       }
> 
>       if (buffer_boundary(map_bh) || (first_hole != blocks_per_page))
> -             bio = mpage_bio_submit(READ, bio);
> +             bio = submit(READ, bio);
>       else
>               *last_block_in_bio = blocks[blocks_per_page - 1];
>  out:
> @@ -328,7 +332,7 @@ out:
> 
>  confused:
>       if (bio)
> -             bio = mpage_bio_submit(READ, bio);
> +             bio = submit(READ, bio);
>       if (!PageUptodate(page))
>               block_read_full_page(page, get_block);
>       else
> @@ -336,6 +340,48 @@ confused:
>       goto out;
>  }
> 
> +int
> +__mpage_readpages(struct address_space *mapping, struct list_head *pages,
> +                             unsigned nr_pages, get_block_t get_block,
> +             struct bio *(*alloc)(struct block_device *bdev, sector_t 
> first_sector,
> +                     int nr_vecs, gfp_t gfp_flags, void *priv),
> +             struct bio *(*submit)(int rw, struct bio *bio),
> +             void *priv)
> +{
> +     struct bio *bio = NULL;
> +     unsigned page_idx;
> +     sector_t last_block_in_bio = 0;
> +     struct pagevec lru_pvec;
> +     struct buffer_head map_bh;
> +     unsigned long first_logical_block = 0;
> +
> +     clear_buffer_mapped(&map_bh);
> +     pagevec_init(&lru_pvec, 0);
> +     for (page_idx = 0; page_idx < nr_pages; page_idx++) {
> +             struct page *page = list_entry(pages->prev, struct page, lru);
> +
> +             prefetchw(&page->flags);
> +             list_del(&page->lru);
> +             if (!add_to_page_cache(page, mapping,
> +                                     page->index, GFP_KERNEL)) {
> +                     bio = do_mpage_readpage(bio, page,
> +                                     nr_pages - page_idx,
> +                                     &last_block_in_bio, &map_bh,
> +                                     &first_logical_block,
> +                                     get_block, alloc, submit, priv);
> +                     if (!pagevec_add(&lru_pvec, page))
> +                             __pagevec_lru_add(&lru_pvec);
> +             } else {
> +                     page_cache_release(page);
> +             }
> +     }
> +     pagevec_lru_add(&lru_pvec);
> +     BUG_ON(!list_empty(pages));
> +     if (bio)
> +             submit(READ, bio);
> +     return 0;
> +}
> +
>  /**
>   * mpage_readpages - populate an address space with some pages, and
>   *                       start reads against them.
> @@ -386,40 +432,28 @@ int
>  mpage_readpages(struct address_space *mapping, struct list_head *pages,
>                               unsigned nr_pages, get_block_t get_block)
>  {
> -     struct bio *bio = NULL;
> -     unsigned page_idx;
> -     sector_t last_block_in_bio = 0;
> -     struct pagevec lru_pvec;
> -     struct buffer_head map_bh;
> -     unsigned long first_logical_block = 0;
> +     return __mpage_readpages(mapping, pages, nr_pages, get_block,
> +                     mpage_alloc, mpage_bio_submit, NULL);
> +}
> +EXPORT_SYMBOL(mpage_readpages);
> 
> -     clear_buffer_mapped(&map_bh);
> -     pagevec_init(&lru_pvec, 0);
> -     for (page_idx = 0; page_idx < nr_pages; page_idx++) {
> -             struct page *page = list_entry(pages->prev, struct page, lru);
> +#ifdef CONFIG_KEVENT_AIO
> +extern struct bio *kaio_mpage_alloc(struct block_device *bdev, sector_t 
> first_sector,
> +             int nr_vecs, gfp_t gfp_flags, void *priv);
> +extern struct bio *kaio_mpage_bio_submit(int rw, struct bio *bio);
> +#else
> +#define kaio_mpage_alloc     mpage_alloc
> +#define kaio_mpage_bio_submit        mpage_bio_submit
> +#endif
> 
> -             prefetchw(&page->flags);
> -             list_del(&page->lru);
> -             if (!add_to_page_cache(page, mapping,
> -                                     page->index, GFP_KERNEL)) {
> -                     bio = do_mpage_readpage(bio, page,
> -                                     nr_pages - page_idx,
> -                                     &last_block_in_bio, &map_bh,
> -                                     &first_logical_block,
> -                                     get_block);
> -                     if (!pagevec_add(&lru_pvec, page))
> -                             __pagevec_lru_add(&lru_pvec);
> -             } else {
> -                     page_cache_release(page);
> -             }
> -     }
> -     pagevec_lru_add(&lru_pvec);
> -     BUG_ON(!list_empty(pages));
> -     if (bio)
> -             mpage_bio_submit(READ, bio);
> -     return 0;
> +int
> +mpage_readpages_aio(struct address_space *mapping, struct list_head *pages,
> +                             unsigned nr_pages, get_block_t get_block, void 
> *priv)
> +{
> +     return __mpage_readpages(mapping, pages, nr_pages, get_block,
> +                     kaio_mpage_alloc, kaio_mpage_bio_submit, priv);
>  }
> -EXPORT_SYMBOL(mpage_readpages);
> +EXPORT_SYMBOL(mpage_readpages_aio);
> 
>  /*
>   * This isn't called much at all
> @@ -433,7 +467,8 @@ int mpage_readpage(struct page *page, get_block_t 
> get_block)
> 
>       clear_buffer_mapped(&map_bh);
>       bio = do_mpage_readpage(bio, page, 1, &last_block_in_bio,
> -                     &map_bh, &first_logical_block, get_block);
> +                     &map_bh, &first_logical_block, get_block,
> +                     mpage_alloc, mpage_bio_submit, NULL);
>       if (bio)
>               mpage_bio_submit(READ, bio);
>       return 0;
> @@ -595,7 +630,7 @@ page_is_mapped:
>  alloc_new:
>       if (bio == NULL) {
>               bio = mpage_alloc(bdev, blocks[0] << (blkbits - 9),
> -                             bio_get_nr_vecs(bdev), GFP_NOFS|__GFP_HIGH);
> +                             bio_get_nr_vecs(bdev), GFP_NOFS|__GFP_HIGH, 
> NULL);
>               if (bio == NULL)
>                       goto confused;
>       }
> diff --git a/include/linux/mpage.h b/include/linux/mpage.h
> index cc5fb75..accdbdd 100644
> --- a/include/linux/mpage.h
> +++ b/include/linux/mpage.h
> @@ -16,6 +16,8 @@ typedef int (writepage_t)(struct page *page, struct 
> writeback_control *wbc);
> 
>  int mpage_readpages(struct address_space *mapping, struct list_head *pages,
>                               unsigned nr_pages, get_block_t get_block);
> +int mpage_readpages_aio(struct address_space *mapping, struct list_head 
> *pages,
> +                             unsigned nr_pages, get_block_t get_block, void 
> *priv);
>  int mpage_readpage(struct page *page, get_block_t get_block);
>  int mpage_writepages(struct address_space *mapping,
>               struct writeback_control *wbc, get_block_t get_block);
> diff --git a/kernel/kevent/kevent_aio.c b/kernel/kevent/kevent_aio.c
> new file mode 100644
> index 0000000..d4c1c5f
> --- /dev/null
> +++ b/kernel/kevent/kevent_aio.c
> @@ -0,0 +1,881 @@
> +/*
> + * 2006 Copyright (c) Evgeniy Polyakov <[EMAIL PROTECTED]>
> + * All rights reserved.
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
> + */
> +
> +#include <linux/kernel.h>
> +#include <linux/module.h>
> +#include <linux/types.h>
> +#include <linux/spinlock.h>
> +#include <linux/list.h>
> +#include <linux/kthread.h>
> +#include <linux/slab.h>
> +#include <linux/bio.h>
> +#include <linux/pagemap.h>
> +#include <linux/file.h>
> +#include <linux/swap.h>
> +#include <linux/kevent.h>
> +
> +#define KAIO_CALL_NUM                8
> +#define KAIO_THREAD_NUM              8
> +
> +//#define KEVENT_AIO_DEBUG
> +
> +#ifdef KEVENT_AIO_DEBUG
> +#define dprintk(f, a...) printk(f, ##a)
> +#else
> +#define dprintk(f, a...) do {} while (0)
> +#endif
> +
> +struct kaio_thread
> +{
> +     struct list_head        req_list;
> +     spinlock_t              req_lock;
> +     struct task_struct      *thread;
> +     int                     refcnt;
> +     wait_queue_head_t       wait;
> +};
> +
> +extern struct file_operations kevent_user_fops;
> +static DEFINE_PER_CPU(u32, kaio_req_counter);
> +static DEFINE_PER_CPU(int, kaio_req_cpu);
> +
> +/*
> + * Array of working threads.
> + * It can only be accessed under RCU protection,
> + * so threads reference counters are not atomic.
> + */
> +static struct kaio_thread *kaio_threads[KAIO_THREAD_NUM] __read_mostly;
> +static struct kmem_cache *kaio_req_cache __read_mostly;
> +static struct kmem_cache *kaio_priv_cache __read_mostly;
> +
> +struct kaio_req;
> +typedef int (* kaio_callback)(struct kaio_req *req, int direct);
> +
> +#define KAIO_REQ_PENDING     0
> +
> +/*
> + * Cache of kaio request callbacks.
> + * It is not allowed to change the same cache entry
> + * simultaneously (for example it is forbidden to add entries
> + * in parallel).
> + *
> + * When cache entry is scheduled for execution in one of the threads,
> + * it is forbidden to access it, since it will be freed when
> + * all callbacks have been invoked.
> + *
> + * It is possible to add callbacks into this cache from callbacks itself.
> + */
> +struct kaio_req
> +{
> +     struct list_head        req_entry;
> +     kaio_callback           call[KAIO_CALL_NUM];
> +     int                     read_idx, add_idx;
> +     int                     cpu;
> +     long                    flags;
> +     atomic_t                refcnt;
> +     void                    (*destructor)(struct kaio_req *);
> +     void                    *priv;
> +};
> +
> +/*
> + * Returns pointer to thread entry for given index.
> + * Must be called under RCU protection.
> + */
> +static struct kaio_thread *kaio_get_thread(int cpu)
> +{
> +     struct kaio_thread *th;
> +
> +     if (cpu == -1) {
> +#if 1
> +             int *cnt = &__get_cpu_var(kaio_req_cpu);
> +             cpu = *cnt;
> +
> +             *cnt = *cnt + 1;
> +             if (*cnt >= KAIO_THREAD_NUM)
> +                     *cnt = 0;
> +#else
> +             cpu = 0;
> +#endif
> +     }
> +
> +     if (unlikely(cpu >= KAIO_THREAD_NUM || !kaio_threads[cpu]))
> +             return NULL;
> +
> +     th = kaio_threads[cpu];
> +     th->refcnt++;
> +
> +     return th;
> +}
> +
> +/*
> + * Drops reference counter for given thread.
> + * Must be called under RCU protection.
> + */
> +static inline void kaio_put_thread(struct kaio_thread *th)
> +{
> +     th->refcnt--;
> +}
> +
> +void kaio_schedule_req(struct kaio_req *req)
> +{
> +     struct kaio_thread *th;
> +     unsigned long flags;
> +
> +     rcu_read_lock();
> +     th = kaio_get_thread(req->cpu);
> +     if (!th) {
> +             req->cpu = -1;
> +             th = kaio_get_thread(-1);
> +             BUG_ON(!th);
> +     }
> +
> +     if (!test_and_set_bit(KAIO_REQ_PENDING, &req->flags)) {
> +             spin_lock_irqsave(&th->req_lock, flags);
> +             list_add_tail(&req->req_entry, &th->req_list);
> +             spin_unlock_irqrestore(&th->req_lock, flags);
> +     }
> +
> +     wake_up(&th->wait);
> +
> +     kaio_put_thread(th);
> +     rcu_read_unlock();
> +}
> +
> +EXPORT_SYMBOL_GPL(kaio_schedule_req);
> +
> +static inline void kaio_req_get(struct kaio_req *req)
> +{
> +     atomic_inc(&req->refcnt);
> +}
> +
> +static inline int kaio_req_put(struct kaio_req *req)
> +{
> +     if (atomic_dec_and_test(&req->refcnt)) {
> +             dprintk("%s: freeing req: %p, priv: %p.\n", __func__, req, 
> req->priv);
> +             if (req->destructor)
> +                     req->destructor(req);
> +             kmem_cache_free(kaio_req_cache, req);
> +             return 1;
> +     }
> +
> +     return 0;
> +}
> +
> +/*
> + * Append a call request into cache.
> + * Returns -EOVERFLOW in case cache is full, and 0 otherwise.
> + */
> +int kaio_append_call(struct kaio_req *req, kaio_callback call)
> +{
> +     if ((req->add_idx + 1 == req->read_idx) ||
> +                     ((req->add_idx + 1 == KAIO_CALL_NUM) && req->read_idx 
> == 0))
> +             return -EOVERFLOW;
> +
> +     req->call[req->add_idx] = call;
> +
> +     dprintk("%s: req: %p, read_idx: %d, add_idx: %d, call: %p [%p].\n",
> +                     __func__, req, req->read_idx, req->add_idx,
> +                     req->call[req->read_idx], req->call[req->add_idx]);
> +     if (++req->add_idx == KAIO_CALL_NUM)
> +             req->add_idx = 0;
> +
> +     kaio_req_get(req);
> +     
> +     return 0;
> +}
> +
> +EXPORT_SYMBOL_GPL(kaio_append_call);
> +
> +/*
> + * Adds one call request into given cache.
> + * If cache is NULL or full, allocate new one.
> + */
> +struct kaio_req *kaio_add_call(struct kaio_req *req, kaio_callback call, int 
> cpu, gfp_t gflags)
> +{
> +     if (req && !kaio_append_call(req, call)) {
> +             kaio_schedule_req(req);
> +             return req;
> +     }
> +
> +     req = kmem_cache_alloc(kaio_req_cache, gflags);
> +     if (!req)
> +             return NULL;
> +
> +     memset(req->call, 0, sizeof(req->call));
> +
> +     req->destructor = NULL;
> +     req->cpu = cpu;
> +     req->call[0] = call;
> +     req->add_idx = 1;
> +     req->read_idx = 0;
> +     req->flags = 0;
> +     atomic_set(&req->refcnt, 1);
> +
> +     dprintk("%s: req: %p, call: %p [%p].\n", __func__, req, call, 
> req->call[0]);
> +
> +     return req;
> +}
> +
> +EXPORT_SYMBOL_GPL(kaio_add_call);
> +
> +/*
> + * Call appropriate callbacks in cache.
> + * This can only be called by working threads, which means that cache
> + * is filled (probably partially) and are not even accessible from
> + * the originator of requests, which means that cache will be freed
> + * when all callbacks are invoked.
> + *
> + * Callback itself can reschedule new callback into the same cache.
> + *
> + * If callback returns negative value, the whole cache will be freed.
> + * If positive value is returned, then further processing is stopped,
> + * so cache can be queued into the end of the processing FIFO by callback.
> + * If zero is returned, next callback will be invoked if any.
> + */
> +static int kaio_call(struct kaio_req *req)
> +{
> +     int err = -EINVAL;
> +
> +     if (likely(req->add_idx != req->read_idx)) {
> +             dprintk("%s: req: %p, read_idx: %d, add_idx: %d, call: %p 
> [%p].\n",
> +                             __func__, req, req->read_idx, req->add_idx,
> +                             req->call[req->read_idx], req->call[0]);
> +             err = (*req->call[req->read_idx])(req, 0);
> +             if (++req->read_idx == KAIO_CALL_NUM)
> +                     req->read_idx = 0;
> +
> +             if (kaio_req_put(req))
> +                     err = 0;
> +     }
> +     return err;
> +}
> +
> +static int kaio_thread_process(void *data)
> +{
> +     struct kaio_thread *th = data;
> +     unsigned long flags;
> +     struct kaio_req *req, *first;
> +     DECLARE_WAITQUEUE(wait, current);
> +     int err;
> +
> +     add_wait_queue_exclusive(&th->wait, &wait);
> +
> +     while (!kthread_should_stop()) {
> +             first = req = NULL;
> +             do {
> +                     req = NULL;
> +                     spin_lock_irqsave(&th->req_lock, flags);
> +                     if (!list_empty(&th->req_list)) {
> +                             req = list_entry(th->req_list.prev, struct 
> kaio_req, req_entry);
> +                             if (first != req)
> +                                     list_del(&req->req_entry);
> +                     }
> +                     spin_unlock_irqrestore(&th->req_lock, flags);
> +                             
> +                     if (!first)
> +                             first = req;
> +                     else if (first == req)
> +                             break;
> +
> +                     if (req) {
> +                             err = 0;
> +                             while ((req->read_idx != req->add_idx) && 
> !kthread_should_stop()) {
> +                                     dprintk("%s: req: %p, read_idx: %d, 
> add_idx: %d, err: %d.\n",
> +                                                     __func__, req, 
> req->read_idx, req->add_idx, err);
> +                                     err = kaio_call(req);
> +                                     if (err != 0)
> +                                             break;
> +                             }
> +
> +                             if (err > 0) {
> +                                     spin_lock_irqsave(&th->req_lock, flags);
> +                                     list_add_tail(&req->req_entry, 
> &th->req_list);
> +                                     spin_unlock_irqrestore(&th->req_lock, 
> flags);
> +                             }
> +                     }
> +             } while (req);
> +             __set_current_state(TASK_INTERRUPTIBLE);
> +             schedule_timeout(HZ);
> +             __set_current_state(TASK_RUNNING);
> +     }
> +     
> +     remove_wait_queue(&th->wait, &wait);
> +
> +     return 0;
> +}
> +
> +struct kaio_private
> +{
> +     union {
> +             void            *sptr;
> +             __u64           sdata;
> +     };
> +     union {
> +             void            *dptr;
> +             __u64           ddata;
> +     };
> +     __u64                   offset, processed;
> +     __u64                   count, limit;
> +     struct kevent_user      *kevent_user;
> +};
> +
> +extern void bio_fs_destructor(struct bio *bio);
> +
> +static void kaio_bio_destructor(struct bio *bio)
> +{
> +     dprintk("%s: bio=%p, num=%u.\n", __func__, bio, bio->bi_vcnt);
> +     bio_fs_destructor(bio);
> +}
> +
> +static int kaio_read_send_pages(struct kaio_req *req, int direct);
> +
> +static int kaio_mpage_end_io_read(struct bio *bio, unsigned int bytes_done, 
> int err)
> +{
> +     const int uptodate = test_bit(BIO_UPTODATE, &bio->bi_flags);
> +     struct bio_vec *bvec = bio->bi_io_vec + bio->bi_vcnt - 1;
> +     struct kaio_req *req = bio->bi_private;
> +
> +     if (bio->bi_size)
> +             return 1;
> +
> +     do {
> +             struct page *page = bvec->bv_page;
> +
> +             if (--bvec >= bio->bi_io_vec)
> +                     prefetchw(&bvec->bv_page->flags);
> +
> +             if (uptodate) {
> +                     SetPageUptodate(page);
> +             } else {
> +                     ClearPageUptodate(page);
> +                     SetPageError(page);
> +             }
> +             unlock_page(page);
> +     } while (bvec >= bio->bi_io_vec);
> +
> +     dprintk("%s: bio: %p, req: %p, pending: %d.\n",
> +                     __func__, bio, req, test_bit(KAIO_REQ_PENDING, 
> &req->flags));
> +
> +     kaio_append_call(req, kaio_read_send_pages);
> +     kaio_req_put(req);
> +     kaio_schedule_req(req);
> +
> +     bio_put(bio);
> +     return 0;
> +}
> +
> +struct bio *kaio_mpage_bio_submit(int rw, struct bio *bio)
> +{
> +     if (bio) {
> +             bio->bi_end_io = kaio_mpage_end_io_read;
> +             dprintk("%s: bio=%p, num=%u.\n", __func__, bio, bio->bi_vcnt);
> +             submit_bio(READ, bio);
> +     }
> +     return NULL;
> +}
> +
> +struct bio *kaio_mpage_alloc(struct block_device *bdev,
> +             sector_t first_sector, int nr_vecs, gfp_t gfp_flags, void *priv)
> +{
> +     struct bio *bio;
> +
> +     bio = bio_alloc(gfp_flags, nr_vecs);
> +
> +     if (bio == NULL && (current->flags & PF_MEMALLOC)) {
> +             while (!bio && (nr_vecs /= 2))
> +                     bio = bio_alloc(gfp_flags, nr_vecs);
> +     }
> +
> +     if (bio) {
> +             struct kaio_req *req = priv;
> +
> +             bio->bi_bdev = bdev;
> +             bio->bi_sector = first_sector;
> +             bio->bi_private = priv;
> +             bio->bi_destructor = kaio_bio_destructor;
> +             kaio_req_get(req);
> +             dprintk("%s: bio: %p, req: %p, num: %d.\n", __func__, bio, 
> priv, nr_vecs);
> +     }
> +     return bio;
> +}
> +
> +static ssize_t kaio_vfs_read_actor(struct kaio_private *priv, struct page 
> *page, size_t len)
> +{
> +     struct socket *sock = priv->dptr;
> +     struct file *file = sock->file;
> +     
> +     return file->f_op->sendpage(file, page, 0, len, &file->f_pos, 1);
> +}
> +
> +static int kaio_vfs_read(struct kaio_private *priv,
> +             ssize_t (*actor)(struct kaio_private *, struct page *, size_t))
> +{
> +     struct address_space *mapping;
> +     struct file *file = priv->sptr;
> +     ssize_t actor_size;
> +     loff_t isize;
> +     int i = 0, pg_num;
> +
> +     mapping = file->f_mapping;
> +     isize = i_size_read(file->f_dentry->d_inode);
> +
> +     if (priv->processed >= isize) {
> +             priv->count = 0;
> +             return 0;
> +     }
> +     priv->count = isize - priv->processed;
> +     pg_num = ALIGN(min_t(u64, isize, priv->count), PAGE_SIZE) >> PAGE_SHIFT;
> +     
> +     dprintk("%s: start: priv: %p, ret: %d, num: %d, count: %Lu, offset: 
> %Lu, processed: %Lu.\n",
> +                     __func__, priv, i, pg_num, priv->count, priv->offset, 
> priv->processed);
> +
> +     for (i=0; i<pg_num && priv->count; ++i) {
> +             struct page *page;
> +             size_t nr = PAGE_CACHE_SIZE;
> +
> +             page = find_get_page(mapping, priv->processed >> 
> PAGE_CACHE_SHIFT);
> +             if (unlikely(page == NULL))
> +                     break;
> +             if (!PageUptodate(page)) {
> +                     dprintk("%s: %2d: page=%p, processed=%Lu, count=%Lu not 
> uptodate.\n",
> +                                     __func__, i, page, priv->processed, 
> priv->count);
> +                     page_cache_release(page);
> +                     break;
> +             }
> +
> +             if (mapping_writably_mapped(mapping))
> +                     flush_dcache_page(page);
> +
> +             mark_page_accessed(page);
> +
> +             if (nr + priv->processed > isize)
> +                     nr = isize - priv->processed;
> +             if (nr > priv->count)
> +                     nr = priv->count;
> +
> +             actor_size = actor(priv, page, nr);
> +             if (actor_size < 0) {
> +                     page_cache_release(page);
> +                     i = (int)actor_size;
> +                     break;
> +             }
> +
> +             page_cache_release(page);
> +
> +             priv->processed += actor_size;
> +             priv->count -= actor_size;
> +     }
> +
> +     if (!priv->count)
> +             i = pg_num;
> +
> +     dprintk("%s: end: priv: %p, ret: %d, num: %d, count: %Lu, offset: %Lu, 
> processed: %Lu.\n",
> +                     __func__, priv, i, pg_num, priv->count, priv->offset, 
> priv->processed);
> +
> +     return i;
> +}
> +
> +static int kaio_read_send_pages(struct kaio_req *req, int direct)
> +{
> +     struct kaio_private *priv = req->priv;
> +     struct file *file = priv->sptr;
> +     struct address_space *mapping = file->f_mapping;
> +     struct page *page;
> +     int err, i, num;
> +     u64 offset;
> +     LIST_HEAD(page_pool);
> +
> +     err = kaio_vfs_read(priv, &kaio_vfs_read_actor);
> +     if (err < 0)
> +             return err;
> +
> +     if (err == 0) {
> +             priv->limit >>= 1;
> +     } else {
> +             if (priv->limit)
> +                     priv->limit <<= 1;
> +             else
> +                     priv->limit = 8;
> +     }
> +
> +     if (priv->offset < priv->processed)
> +             priv->offset = priv->processed;
> +
> +     if (!priv->count) {
> +             kevent_storage_ready(&priv->kevent_user->st, NULL, 
> KEVENT_MASK_ALL);
> +             return 0;
> +     }
> +
> +     if (priv->offset >= priv->processed + priv->count) {
> +             kaio_append_call(req, kaio_read_send_pages);
> +             return 0;
> +     }
> +
> +     num = min_t(int, max_sane_readahead(priv->limit),
> +                     ALIGN(priv->count, PAGE_SIZE) >> PAGE_SHIFT);
> +
> +     offset = priv->offset;
> +     for (i=0; i<num; ++i) {
> +             page = page_cache_alloc_cold(mapping);
> +             if (!page)
> +                     break;
> +
> +             page->index = priv->offset >> PAGE_CACHE_SHIFT;
> +             list_add(&page->lru, &page_pool);
> +
> +             priv->offset += PAGE_CACHE_SIZE;
> +     }
> +
> +     dprintk("%s: submit: req: %p, priv: %p, offset: %Lu, num: %d, limit: 
> %Lu.\n",
> +                     __func__, req, priv, offset, i, priv->limit);
> +
> +     err = mapping->a_ops->aio_readpages(file, mapping, &page_pool, i, req);
> +     if (err) {
> +             dprintk("%s: kevent_mpage_readpages failed: err=%d, 
> count=%Lu.\n",
> +                             __func__, err, priv->count);
> +             kaio_schedule_req(req);
> +             return err;
> +     }
> +
> +     return 1;
> +}
> +
> +static int kaio_add_kevent(int fd, struct kaio_req *req)
> +{
> +     struct ukevent uk;
> +     struct file *file;
> +     struct kevent_user *u;
> +     int err, need_fput = 0;
> +     u32 *cnt;
> +
> +     file = fget_light(fd, &need_fput);
> +     if (!file) {
> +             err = -EBADF;
> +             goto err_out;
> +     }
> +
> +     if (file->f_op != &kevent_user_fops) {
> +             err = -EINVAL;
> +             goto err_out_fput;
> +     }
> +
> +     u = file->private_data;
> +
> +     memset(&uk, 0, sizeof(struct ukevent));
> +
> +     uk.event = KEVENT_MASK_ALL;
> +     uk.type = KEVENT_AIO;
> +
> +     preempt_disable();
> +     uk.id.raw[0] = smp_processor_id();
> +     cnt = &__get_cpu_var(kaio_req_counter);
> +     uk.id.raw[1] = *cnt;
> +     *cnt = *cnt + 1;
> +     preempt_enable();
> +
> +     uk.req_flags = KEVENT_REQ_ONESHOT | KEVENT_REQ_ALWAYS_QUEUE;
> +     uk.ptr = req;
> +
> +     err = kevent_user_add_ukevent(&uk, u);
> +     if (err)
> +             goto err_out_fput;
> +
> +     kevent_user_get(u);
> +
> +     fput_light(file, need_fput);
> +
> +     return 0;
> +
> +err_out_fput:
> +     fput_light(file, need_fput);
> +err_out:
> +     return err;
> +}
> +
> +static void kaio_destructor(struct kaio_req *req)
> +{
> +     struct kaio_private *priv = req->priv;
> +     struct socket *sock = priv->dptr;
> +     struct file *file = priv->sptr;
> +
> +     fput(file);
> +     sockfd_put(sock);
> +
> +     kevent_storage_ready(&priv->kevent_user->st, NULL, KEVENT_MASK_ALL);
> +     kevent_user_put(priv->kevent_user);
> +
> +     kmem_cache_free(kaio_priv_cache, req->priv);
> +}
> +
> +static struct kaio_req *kaio_sendfile(int kevent_fd, int sock_fd, struct 
> file *file, off_t offset, size_t count)
> +{
> +     struct kaio_req *req;
> +     struct socket *sock;
> +     struct kaio_private *priv;
> +     int err;
> +
> +     sock = sockfd_lookup(sock_fd, &err);
> +     if (!sock)
> +             goto err_out_exit;
> +
> +     priv = kmem_cache_alloc(kaio_priv_cache, GFP_KERNEL);
> +     if (!priv)
> +             goto err_out_sput;
> +
> +     priv->sptr = file;
> +     priv->dptr = sock;
> +     priv->offset = offset;
> +     priv->count = min_t(u64, i_size_read(file->f_dentry->d_inode), count);
> +     priv->processed = offset;
> +     priv->limit = 128;
> +
> +     req = kaio_add_call(NULL, &kaio_read_send_pages, -1, GFP_KERNEL);
> +     if (!req)
> +             goto err_out_free;
> +
> +     req->destructor = kaio_destructor;
> +     req->priv = priv;
> +
> +     err = kaio_add_kevent(kevent_fd, req);
> +
> +     dprintk("%s: req: %p, priv: %p, call: %p [%p], offset: %Lu, processed: 
> %Lu, count: %Lu, err: %d.\n",
> +                     __func__, req, priv, &kaio_read_send_pages,
> +                     kaio_read_send_pages, priv->offset, priv->processed, 
> priv->count, err);
> +
> +     if (err)
> +             goto err_out_remove;
> +
> +     kaio_schedule_req(req);
> +
> +     return req;
> +
> +err_out_remove:
> +     /* It is safe to just free the object since it is guaranteed that it 
> was not
> +      * queued for processing.
> +      */
> +     kmem_cache_free(kaio_req_cache, req);
> +err_out_free:
> +     kmem_cache_free(kaio_priv_cache, priv);
> +err_out_sput:
> +     sockfd_put(sock);
> +err_out_exit:
> +     return NULL;
> +
> +}
> +
> +asmlinkage long sys_aio_sendfile(int kevent_fd, int sock_fd, int in_fd, 
> off_t offset, size_t count)
> +{
> +     struct kaio_req *req;
> +     struct file *file;
> +     int err;
> +
> +     file = fget(in_fd);
> +     if (!file) {
> +             err = -EBADF;
> +             goto err_out_exit;
> +     }
> +
> +     req = kaio_sendfile(kevent_fd, sock_fd, file, offset, count);
> +     if (!req) {
> +             err = -EINVAL;
> +             goto err_out_fput;
> +     }
> +
> +     return (long)req;
> +
> +err_out_fput:
> +     fput(file);
> +err_out_exit:
> +     return err;
> +}
> +
> +asmlinkage long sys_aio_sendfile_path(int kevent_fd, int sock_fd, char 
> __user *filename, off_t offset, size_t count)
> +{
> +     char *tmp = getname(filename);
> +     int fd = PTR_ERR(tmp);
> +     int flags = O_RDONLY, err;
> +     struct nameidata nd;
> +     struct file *file;
> +     struct kaio_req *req;
> +
> +     if (force_o_largefile())
> +             flags = O_LARGEFILE;
> +
> +     if (IS_ERR(tmp)) {
> +             err = fd;
> +             goto err_out_exit;
> +     }
> +
> +     fd = get_unused_fd();
> +     if (fd < 0) {
> +             err = fd;
> +             goto err_out_put_name;
> +     }
> +
> +     if ((flags+1) & O_ACCMODE)
> +             flags++;
> +
> +     err = open_namei(AT_FDCWD, tmp, flags, 0400, &nd);
> +     if (err)
> +             goto err_out_fdput;
> +
> +     file = nameidata_to_filp(&nd, flags);
> +     if (!file)
> +             goto err_out_fdput;
> +
> +     /* One reference will be released in sys_close(),
> +      * second one through req->destructor()
> +      */
> +     atomic_inc(&file->f_count);
> +
> +     req = kaio_sendfile(kevent_fd, sock_fd, file, offset, count);
> +     if (!req) {
> +             err = -EINVAL;
> +             goto err_out_fput;
> +     }
> +
> +     fd_install(fd, file);
> +
> +     return fd;
> +
> +err_out_fput:
> +     fput(file);
> +     fput(file);
> +err_out_fdput:
> +     put_unused_fd(fd);
> +err_out_put_name:
> +     putname(tmp);
> +err_out_exit:
> +     return err;
> +}
> +
> +static int kevent_aio_callback(struct kevent *k)
> +{
> +     struct kaio_req *req = k->event.ptr;
> +     struct kaio_private *priv = req->priv;
> +
> +     if (!priv->count) {
> +             __u32 *processed = (__u32 *)&priv->processed;
> +             k->event.ret_data[0] = processed[0];
> +             k->event.ret_data[1] = processed[1];
> +             return 1;
> +     }
> +
> +     return 0;
> +}
> +
> +int kevent_aio_enqueue(struct kevent *k)
> +{
> +     int err;
> +     struct kaio_req *req = k->event.ptr;
> +     struct kaio_private *priv = req->priv;
> +
> +     err = kevent_storage_enqueue(&k->user->st, k);
> +     if (err)
> +             goto err_out_exit;
> +
> +     priv->kevent_user = k->user;
> +     if (k->event.req_flags & KEVENT_REQ_ALWAYS_QUEUE)
> +             kevent_requeue(k);
> +
> +     return 0;
> +
> +err_out_exit:
> +     return err;
> +}
> +
> +int kevent_aio_dequeue(struct kevent *k)
> +{
> +     kevent_storage_dequeue(k->st, k);
> +
> +     return 0;
> +}
> +
> +static void kaio_thread_stop(struct kaio_thread *th)
> +{
> +     kthread_stop(th->thread);
> +     kfree(th);
> +}
> +
> +static int kaio_thread_start(struct kaio_thread **thp, int num)
> +{
> +     struct kaio_thread *th;
> +
> +     th = kzalloc(sizeof(struct kaio_thread), GFP_KERNEL);
> +     if (!th)
> +             return -ENOMEM;
> +
> +     th->refcnt = 1;
> +     spin_lock_init(&th->req_lock);
> +     INIT_LIST_HEAD(&th->req_list);
> +     init_waitqueue_head(&th->wait);
> +
> +     th->thread = kthread_run(kaio_thread_process, th, "kaio/%d", num);
> +     if (IS_ERR(th->thread)) {
> +             int err = PTR_ERR(th->thread);
> +             kfree(th);
> +             return err;
> +     }
> +
> +     *thp = th;
> +     wmb();
> +
> +     return 0;
> +}
> +
> +static int __init kevent_init_aio(void)
> +{
> +     struct kevent_callbacks sc = {
> +             .callback = &kevent_aio_callback,
> +             .enqueue = &kevent_aio_enqueue,
> +             .dequeue = &kevent_aio_dequeue,
> +             .flags = 0,
> +     };
> +     int err, i;
> +
> +     kaio_req_cache = kmem_cache_create("kaio_req", sizeof(struct kaio_req),
> +                     0, SLAB_PANIC, NULL, NULL);
> +     kaio_priv_cache = kmem_cache_create("kaio_priv", sizeof(struct 
> kaio_private),
> +                     0, SLAB_PANIC, NULL, NULL);
> +
> +     memset(kaio_threads, 0, sizeof(kaio_threads));
> +
> +     for (i=0; i<KAIO_THREAD_NUM; ++i) {
> +             err = kaio_thread_start(&kaio_threads[i], i);
> +             if (err)
> +                     goto err_out_stop;
> +     }
> +
> +     err = kevent_add_callbacks(&sc, KEVENT_AIO);
> +     if (err)
> +             goto err_out_stop;
> +
> +     return 0;
> +
> +err_out_stop:
> +     while (--i >= 0) {
> +             struct kaio_thread *th = kaio_threads[i];
> +
> +             kaio_threads[i] = NULL;
> +             wmb();
> +
> +             kaio_thread_stop(th);
> +     }
> +     return err;
> +}
> +module_init(kevent_init_aio);
> 
> -
> To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
> the body of a message to [EMAIL PROTECTED]
> More majordomo info at  http://vger.kernel.org/majordomo-info.html

-- 
Suparna Bhattacharya ([EMAIL PROTECTED])
Linux Technology Center
IBM Software Lab, India

-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [EMAIL PROTECTED]
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to