The refcounting before wasn't very clear; there are two refcounts in
struct kioctx, with an unclear relationship between them (or between
them and ctx->dead).

Now, reqs_active holds a refcount on users (when reqs_active is
nonzero), and the initial refcount is taken on reqs_active - when
ctx->dead goes to 1, we drop that initial refcount.

Some other semi related cleanup too.

Signed-off-by: Kent Overstreet <koverstr...@google.com>
---
 fs/aio.c            |  187 +++++++++++++++++----------------------------------
 include/linux/aio.h |    5 +-
 2 files changed, 63 insertions(+), 129 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index 95419c4..3ab12f6 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -86,8 +86,9 @@ static void aio_free_ring(struct kioctx *ctx)
                put_page(info->ring_pages[i]);
 
        if (info->mmap_size) {
-               BUG_ON(ctx->mm != current->mm);
-               vm_munmap(info->mmap_base, info->mmap_size);
+               down_write(&ctx->mm->mmap_sem);
+               do_munmap(ctx->mm, info->mmap_base, info->mmap_size);
+               up_write(&ctx->mm->mmap_sem);
        }
 
        if (info->ring_pages && info->ring_pages != info->internal_pages)
@@ -188,45 +189,37 @@ static int aio_setup_ring(struct kioctx *ctx)
        kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK)); \
 } while(0)
 
-static void ctx_rcu_free(struct rcu_head *head)
-{
-       struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
-       kmem_cache_free(kioctx_cachep, ctx);
-}
-
-/* __put_ioctx
- *     Called when the last user of an aio context has gone away,
- *     and the struct needs to be freed.
- */
-static void __put_ioctx(struct kioctx *ctx)
+static void free_ioctx(struct work_struct *work)
 {
-       unsigned nr_events = ctx->max_reqs;
-       BUG_ON(ctx->reqs_active);
+       struct kioctx *ctx = container_of(work, struct kioctx, free_work);
 
        cancel_delayed_work_sync(&ctx->wq);
        aio_free_ring(ctx);
        mmdrop(ctx->mm);
-       ctx->mm = NULL;
-       if (nr_events) {
-               spin_lock(&aio_nr_lock);
-               BUG_ON(aio_nr - nr_events > aio_nr);
-               aio_nr -= nr_events;
-               spin_unlock(&aio_nr_lock);
-       }
-       pr_debug("__put_ioctx: freeing %p\n", ctx);
-       call_rcu(&ctx->rcu_head, ctx_rcu_free);
-}
 
-static inline int try_get_ioctx(struct kioctx *kioctx)
-{
-       return atomic_inc_not_zero(&kioctx->users);
+       spin_lock(&aio_nr_lock);
+       BUG_ON(aio_nr - ctx->max_reqs > aio_nr);
+       aio_nr -= ctx->max_reqs;
+       spin_unlock(&aio_nr_lock);
+
+       synchronize_rcu();
+
+       pr_debug("__put_ioctx: freeing %p\n", ctx);
+       kmem_cache_free(kioctx_cachep, ctx);
 }
 
 static inline void put_ioctx(struct kioctx *kioctx)
 {
        BUG_ON(atomic_read(&kioctx->users) <= 0);
        if (unlikely(atomic_dec_and_test(&kioctx->users)))
-               __put_ioctx(kioctx);
+               schedule_work(&kioctx->free_work);
+}
+
+static inline void req_put_ioctx(struct kioctx *kioctx)
+{
+       BUG_ON(atomic_read(&kioctx->reqs_active) <= 0);
+       if (unlikely(atomic_dec_and_test(&kioctx->reqs_active)))
+               put_ioctx(kioctx);
 }
 
 static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
@@ -280,12 +273,15 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
        atomic_inc(&mm->mm_count);
 
        atomic_set(&ctx->users, 2);
+       atomic_set(&ctx->reqs_active, 1);
+
        spin_lock_init(&ctx->ctx_lock);
        spin_lock_init(&ctx->ring_info.ring_lock);
        init_waitqueue_head(&ctx->wait);
 
        INIT_LIST_HEAD(&ctx->active_reqs);
        INIT_LIST_HEAD(&ctx->run_list);
+       INIT_WORK(&ctx->free_work, free_ioctx);
        INIT_DELAYED_WORK(&ctx->wq, aio_kick_handler);
 
        if (aio_setup_ring(ctx) < 0)
@@ -327,36 +323,25 @@ out_freectx:
  */
 static void kill_ctx(struct kioctx *ctx)
 {
-       struct task_struct *tsk = current;
-       DECLARE_WAITQUEUE(wait, tsk);
+       struct kiocb *iocb, *t;
        struct io_event res;
+       int put = 0;
 
        spin_lock_irq(&ctx->ctx_lock);
-       ctx->dead = 1;
-       while (!list_empty(&ctx->active_reqs)) {
-               struct list_head *pos = ctx->active_reqs.next;
-               struct kiocb *iocb = list_kiocb(pos);
-               list_del_init(&iocb->ki_list);
 
-               kiocb_cancel(ctx, iocb, &res);
+       if (!ctx->dead) {
+               put = 1;
+               ctx->dead = 1;
+               hlist_del_rcu(&ctx->list);
        }
 
-       if (!ctx->reqs_active)
-               goto out;
-
-       add_wait_queue(&ctx->wait, &wait);
-       set_task_state(tsk, TASK_UNINTERRUPTIBLE);
-       while (ctx->reqs_active) {
-               spin_unlock_irq(&ctx->ctx_lock);
-               io_schedule();
-               set_task_state(tsk, TASK_UNINTERRUPTIBLE);
-               spin_lock_irq(&ctx->ctx_lock);
-       }
-       __set_task_state(tsk, TASK_RUNNING);
-       remove_wait_queue(&ctx->wait, &wait);
+       list_for_each_entry_safe(iocb, t, &ctx->active_reqs, ki_list)
+               kiocb_cancel(ctx, iocb, &res);
 
-out:
        spin_unlock_irq(&ctx->ctx_lock);
+
+       if (put)
+               req_put_ioctx(ctx);
 }
 
 /* wait_on_sync_kiocb:
@@ -385,18 +370,16 @@ EXPORT_SYMBOL(wait_on_sync_kiocb);
 void exit_aio(struct mm_struct *mm)
 {
        struct kioctx *ctx;
+       struct hlist_node *p;
 
-       while (!hlist_empty(&mm->ioctx_list)) {
-               ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list);
-               hlist_del_rcu(&ctx->list);
-
-               kill_ctx(ctx);
+       spin_lock(&mm->ioctx_lock);
 
+       hlist_for_each_entry(ctx, p, &mm->ioctx_list, list) {
                if (1 != atomic_read(&ctx->users))
                        printk(KERN_DEBUG
                                "exit_aio:ioctx still alive: %d %d %d\n",
                                atomic_read(&ctx->users), ctx->dead,
-                               ctx->reqs_active);
+                               atomic_read(&ctx->reqs_active));
                /*
                 * We don't need to bother with munmap() here -
                 * exit_mmap(mm) is coming and it'll unmap everything.
@@ -408,39 +391,34 @@ void exit_aio(struct mm_struct *mm)
                 * all other callers have ctx->mm == current->mm.
                 */
                ctx->ring_info.mmap_size = 0;
-               put_ioctx(ctx);
+               kill_ctx(ctx);
        }
+
+       spin_unlock(&mm->ioctx_lock);
 }
 
 /* aio_get_req
- *     Allocate a slot for an aio request.  Increments the users count
+ *     Allocate a slot for an aio request.  Increments the ki_users count
  * of the kioctx so that the kioctx stays around until all requests are
  * complete.  Returns NULL if no requests are free.
  *
- * Returns with kiocb->users set to 2.  The io submit code path holds
+ * Returns with kiocb->ki_users set to 2.  The io submit code path holds
  * an extra reference while submitting the i/o.
  * This prevents races between the aio code path referencing the
  * req (after submitting it) and aio_complete() freeing the req.
  */
 static struct kiocb *__aio_get_req(struct kioctx *ctx)
 {
-       struct kiocb *req = NULL;
+       struct kiocb *req;
 
        req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
        if (unlikely(!req))
                return NULL;
 
-       req->ki_flags = 0;
+       memset(req, 0, sizeof(*req));
        req->ki_users = 2;
-       req->ki_key = 0;
        req->ki_ctx = ctx;
-       req->ki_cancel = NULL;
-       req->ki_retry = NULL;
-       req->ki_dtor = NULL;
-       req->private = NULL;
-       req->ki_iovec = NULL;
        INIT_LIST_HEAD(&req->ki_run_list);
-       req->ki_eventfd = NULL;
 
        return req;
 }
@@ -473,10 +451,8 @@ static void kiocb_batch_free(struct kioctx *ctx, struct 
kiocb_batch *batch)
                list_del(&req->ki_batch);
                list_del(&req->ki_list);
                kmem_cache_free(kiocb_cachep, req);
-               ctx->reqs_active--;
+               req_put_ioctx(ctx);
        }
-       if (unlikely(!ctx->reqs_active && ctx->dead))
-               wake_up_all(&ctx->wait);
        spin_unlock_irq(&ctx->ctx_lock);
 }
 
@@ -506,7 +482,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct 
kiocb_batch *batch)
        spin_lock_irq(&ctx->ctx_lock);
        ring = kmap_atomic(ctx->ring_info.ring_pages[0]);
 
-       avail = aio_ring_avail(&ctx->ring_info, ring) - ctx->reqs_active;
+       avail = aio_ring_avail(&ctx->ring_info, ring) - 
atomic_read(&ctx->reqs_active);
        BUG_ON(avail < 0);
        if (avail < allocated) {
                /* Trim back the number of requests. */
@@ -521,7 +497,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct 
kiocb_batch *batch)
        batch->count -= allocated;
        list_for_each_entry(req, &batch->head, ki_batch) {
                list_add(&req->ki_list, &ctx->active_reqs);
-               ctx->reqs_active++;
+               atomic_inc(&ctx->reqs_active);
        }
 
        kunmap_atomic(ring);
@@ -546,8 +522,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx,
 
 static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
 {
-       assert_spin_locked(&ctx->ctx_lock);
-
+       fput(req->ki_filp);
        if (req->ki_eventfd != NULL)
                eventfd_ctx_put(req->ki_eventfd);
        if (req->ki_dtor)
@@ -555,10 +530,8 @@ static inline void really_put_req(struct kioctx *ctx, 
struct kiocb *req)
        if (req->ki_iovec != &req->ki_inline_vec)
                kfree(req->ki_iovec);
        kmem_cache_free(kiocb_cachep, req);
-       ctx->reqs_active--;
 
-       if (unlikely(!ctx->reqs_active && ctx->dead))
-               wake_up_all(&ctx->wait);
+       req_put_ioctx(ctx);
 }
 
 /* __aio_put_req
@@ -566,8 +539,8 @@ static inline void really_put_req(struct kioctx *ctx, 
struct kiocb *req)
  */
 static void __aio_put_req(struct kioctx *ctx, struct kiocb *req)
 {
-       dprintk(KERN_DEBUG "aio_put(%p): f_count=%ld\n",
-               req, atomic_long_read(&req->ki_filp->f_count));
+       pr_debug("req=%p f_count=%ld\n",
+                req, atomic_long_read(&req->ki_filp->f_count));
 
        assert_spin_locked(&ctx->ctx_lock);
 
@@ -576,11 +549,7 @@ static void __aio_put_req(struct kioctx *ctx, struct kiocb 
*req)
        if (likely(req->ki_users))
                return;
        list_del(&req->ki_list);                /* remove from active_reqs */
-       req->ki_cancel = NULL;
-       req->ki_retry = NULL;
 
-       fput(req->ki_filp);
-       req->ki_filp = NULL;
        really_put_req(ctx, req);
 }
 
@@ -605,18 +574,13 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
 
        rcu_read_lock();
 
-       hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) {
-               /*
-                * RCU protects us against accessing freed memory but
-                * we have to be careful not to get a reference when the
-                * reference count already dropped to 0 (ctx->dead test
-                * is unreliable because of races).
-                */
-               if (ctx->user_id == ctx_id && !ctx->dead && try_get_ioctx(ctx)){
+       hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list)
+               if (ctx->user_id == ctx_id){
+                       BUG_ON(ctx->dead);
+                       atomic_inc(&ctx->users);
                        ret = ctx;
                        break;
                }
-       }
 
        rcu_read_unlock();
        return ret;
@@ -1162,7 +1126,7 @@ retry:
                                break;
                        /* Try to only show up in io wait if there are ops
                         *  in flight */
-                       if (ctx->reqs_active)
+                       if (atomic_read(&ctx->reqs_active) > 1)
                                io_schedule();
                        else
                                schedule();
@@ -1197,35 +1161,6 @@ out:
        return i ? i : ret;
 }
 
-/* Take an ioctx and remove it from the list of ioctx's.  Protects 
- * against races with itself via ->dead.
- */
-static void io_destroy(struct kioctx *ioctx)
-{
-       struct mm_struct *mm = current->mm;
-       int was_dead;
-
-       /* delete the entry from the list is someone else hasn't already */
-       spin_lock(&mm->ioctx_lock);
-       was_dead = ioctx->dead;
-       ioctx->dead = 1;
-       hlist_del_rcu(&ioctx->list);
-       spin_unlock(&mm->ioctx_lock);
-
-       dprintk("aio_release(%p)\n", ioctx);
-       if (likely(!was_dead))
-               put_ioctx(ioctx);       /* twice for the list */
-
-       kill_ctx(ioctx);
-
-       /*
-        * Wake up any waiters.  The setting of ctx->dead must be seen
-        * by other CPUs at this point.  Right now, we rely on the
-        * locking done by the above calls to ensure this consistency.
-        */
-       wake_up_all(&ioctx->wait);
-}
-
 /* sys_io_setup:
  *     Create an aio_context capable of receiving at least nr_events.
  *     ctxp must not point to an aio_context that already exists, and
@@ -1261,7 +1196,7 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, 
aio_context_t __user *, ctxp)
        if (!IS_ERR(ioctx)) {
                ret = put_user(ioctx->user_id, ctxp);
                if (ret)
-                       io_destroy(ioctx);
+                       kill_ctx(ioctx);
                put_ioctx(ioctx);
        }
 
@@ -1279,7 +1214,7 @@ SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
 {
        struct kioctx *ioctx = lookup_ioctx(ctx);
        if (likely(NULL != ioctx)) {
-               io_destroy(ioctx);
+               kill_ctx(ioctx);
                put_ioctx(ioctx);
                return 0;
        }
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 4cde86d..eb6e5e4 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -192,7 +192,7 @@ struct kioctx {
 
        spinlock_t              ctx_lock;
 
-       int                     reqs_active;
+       atomic_t                reqs_active;
        struct list_head        active_reqs;    /* used for cancellation */
        struct list_head        run_list;       /* used for kicked reqs */
 
@@ -202,8 +202,7 @@ struct kioctx {
        struct aio_ring_info    ring_info;
 
        struct delayed_work     wq;
-
-       struct rcu_head         rcu_head;
+       struct work_struct      free_work;
 };
 
 /* prototypes */
-- 
1.7.10.4

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to