Author: kib
Date: Tue Apr 26 11:39:56 2011
New Revision: 221059
URL: http://svn.freebsd.org/changeset/base/221059

Log:
  Implement the delayed task execution extension to the taskqueue
  mechanism. The caller may specify a timeout in ticks after which the
  task will be scheduled.
  
  Sponsored by: The FreeBSD Foundation
  Reviewed by:  jeff, jhb
  MFC after:    1 month

Added:
  head/sys/sys/_callout.h
     - copied, changed from r221058, head/sys/sys/callout.h
Modified:
  head/sys/kern/subr_taskqueue.c
  head/sys/sys/callout.h
  head/sys/sys/taskqueue.h

Modified: head/sys/kern/subr_taskqueue.c
==============================================================================
--- head/sys/kern/subr_taskqueue.c      Tue Apr 26 10:02:15 2011        
(r221058)
+++ head/sys/kern/subr_taskqueue.c      Tue Apr 26 11:39:56 2011        
(r221059)
@@ -61,12 +61,15 @@ struct taskqueue {
        int                     tq_tcount;
        int                     tq_spin;
        int                     tq_flags;
+       int                     tq_callouts;
 };
 
 #define        TQ_FLAGS_ACTIVE         (1 << 0)
 #define        TQ_FLAGS_BLOCKED        (1 << 1)
 #define        TQ_FLAGS_PENDING        (1 << 2)
 
+#define        DT_CALLOUT_ARMED        (1 << 0)
+
 #define        TQ_LOCK(tq)                                                     
\
        do {                                                            \
                if ((tq)->tq_spin)                                      \
@@ -83,6 +86,17 @@ struct taskqueue {
                        mtx_unlock(&(tq)->tq_mutex);                    \
        } while (0)
 
+void
+_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
+    int priority, task_fn_t func, void *context)
+{
+
+       TASK_INIT(&timeout_task->t, priority, func, context);
+       callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 0);
+       timeout_task->q = queue;
+       timeout_task->f = 0;
+}
+
 static __inline int
 TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
     int t)
@@ -129,7 +143,7 @@ static void
 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
 {
 
-       while (tq->tq_tcount > 0) {
+       while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
                wakeup(tq);
                TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
        }
@@ -143,26 +157,24 @@ taskqueue_free(struct taskqueue *queue)
        queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
        taskqueue_terminate(queue->tq_threads, queue);
        KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
+       KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
        mtx_destroy(&queue->tq_mutex);
        free(queue->tq_threads, M_TASKQUEUE);
        free(queue, M_TASKQUEUE);
 }
 
-int
-taskqueue_enqueue(struct taskqueue *queue, struct task *task)
+static int
+taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
 {
        struct task *ins;
        struct task *prev;
 
-       TQ_LOCK(queue);
-
        /*
         * Count multiple enqueues.
         */
        if (task->ta_pending) {
                task->ta_pending++;
-               TQ_UNLOCK(queue);
-               return 0;
+               return (0);
        }
 
        /*
@@ -190,9 +202,60 @@ taskqueue_enqueue(struct taskqueue *queu
        else
                queue->tq_flags |= TQ_FLAGS_PENDING;
 
+       return (0);
+}
+int
+taskqueue_enqueue(struct taskqueue *queue, struct task *task)
+{
+       int res;
+
+       TQ_LOCK(queue);
+       res = taskqueue_enqueue_locked(queue, task);
        TQ_UNLOCK(queue);
 
-       return 0;
+       return (res);
+}
+
+static void
+taskqueue_timeout_func(void *arg)
+{
+       struct taskqueue *queue;
+       struct timeout_task *timeout_task;
+
+       timeout_task = arg;
+       queue = timeout_task->q;
+       KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
+       timeout_task->f &= ~DT_CALLOUT_ARMED;
+       queue->tq_callouts--;
+       taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
+}
+
+int
+taskqueue_enqueue_timeout(struct taskqueue *queue,
+    struct timeout_task *timeout_task, int ticks)
+{
+       int res;
+
+       TQ_LOCK(queue);
+       KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
+           ("Migrated queue"));
+       KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
+       timeout_task->q = queue;
+       res = timeout_task->t.ta_pending;
+       if (ticks == 0) {
+               taskqueue_enqueue_locked(queue, &timeout_task->t);
+       } else {
+               if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
+                       res++;
+               } else {
+                       queue->tq_callouts++;
+                       timeout_task->f |= DT_CALLOUT_ARMED;
+               }
+               callout_reset(&timeout_task->c, ticks, taskqueue_timeout_func,
+                   timeout_task);
+       }
+       TQ_UNLOCK(queue);
+       return (res);
 }
 
 void
@@ -271,6 +334,19 @@ task_is_running(struct taskqueue *queue,
        return (0);
 }
 
+static int
+taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
+    u_int *pendp)
+{
+
+       if (task->ta_pending > 0)
+               STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
+       if (pendp != NULL)
+               *pendp = task->ta_pending;
+       task->ta_pending = 0;
+       return (task_is_running(queue, task) ? EBUSY : 0);
+}
+
 int
 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
 {
@@ -278,14 +354,31 @@ taskqueue_cancel(struct taskqueue *queue
        int error;
 
        TQ_LOCK(queue);
-       if ((pending = task->ta_pending) > 0)
-               STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
-       task->ta_pending = 0;
-       error = task_is_running(queue, task) ? EBUSY : 0;
+       pending = task->ta_pending;
+       error = taskqueue_cancel_locked(queue, task, pendp);
+       TQ_UNLOCK(queue);
+
+       return (error);
+}
+
+int
+taskqueue_cancel_timeout(struct taskqueue *queue,
+    struct timeout_task *timeout_task, u_int *pendp)
+{
+       u_int pending, pending1;
+       int error;
+
+       TQ_LOCK(queue);
+       pending = !!callout_stop(&timeout_task->c);
+       error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
+       if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
+               timeout_task->f &= ~DT_CALLOUT_ARMED;
+               queue->tq_callouts--;
+       }
        TQ_UNLOCK(queue);
 
        if (pendp != NULL)
-               *pendp = pending;
+               *pendp = pending + pending1;
        return (error);
 }
 
@@ -302,6 +395,15 @@ taskqueue_drain(struct taskqueue *queue,
        TQ_UNLOCK(queue);
 }
 
+void
+taskqueue_drain_timeout(struct taskqueue *queue,
+    struct timeout_task *timeout_task)
+{
+
+       callout_drain(&timeout_task->c);
+       taskqueue_drain(queue, &timeout_task->t);
+}
+
 static void
 taskqueue_swi_enqueue(void *context)
 {

Copied and modified: head/sys/sys/_callout.h (from r221058, 
head/sys/sys/callout.h)
==============================================================================
--- head/sys/sys/callout.h      Tue Apr 26 10:02:15 2011        (r221058, copy 
source)
+++ head/sys/sys/_callout.h     Tue Apr 26 11:39:56 2011        (r221059)
@@ -35,8 +35,8 @@
  * $FreeBSD$
  */
 
-#ifndef _SYS_CALLOUT_H_
-#define _SYS_CALLOUT_H_
+#ifndef _SYS__CALLOUT_H
+#define        _SYS__CALLOUT_H
 
 #include <sys/queue.h>
 
@@ -58,47 +58,4 @@ struct callout {
        volatile int c_cpu;                     /* CPU we're scheduled on */
 };
 
-#define        CALLOUT_LOCAL_ALLOC     0x0001 /* was allocated from callfree */
-#define        CALLOUT_ACTIVE          0x0002 /* callout is currently active */
-#define        CALLOUT_PENDING         0x0004 /* callout is waiting for 
timeout */
-#define        CALLOUT_MPSAFE          0x0008 /* callout handler is mp safe */
-#define        CALLOUT_RETURNUNLOCKED  0x0010 /* handler returns with mtx 
unlocked */
-#define        CALLOUT_SHAREDLOCK      0x0020 /* callout lock held in shared 
mode */
-
-struct callout_handle {
-       struct callout *callout;
-};
-
-#ifdef _KERNEL
-extern int ncallout;
-
-#define        callout_active(c)       ((c)->c_flags & CALLOUT_ACTIVE)
-#define        callout_deactivate(c)   ((c)->c_flags &= ~CALLOUT_ACTIVE)
-#define        callout_drain(c)        _callout_stop_safe(c, 1)
-void   callout_init(struct callout *, int);
-void   _callout_init_lock(struct callout *, struct lock_object *, int);
-#define        callout_init_mtx(c, mtx, flags)                                 
\
-       _callout_init_lock((c), ((mtx) != NULL) ? &(mtx)->lock_object : \
-           NULL, (flags))
-#define        callout_init_rw(c, rw, flags)                                   
\
-       _callout_init_lock((c), ((rw) != NULL) ? &(rw)->lock_object :   \
-          NULL, (flags))
-#define        callout_pending(c)      ((c)->c_flags & CALLOUT_PENDING)
-int    callout_reset_on(struct callout *, int, void (*)(void *), void *, int);
-#define        callout_reset(c, on_tick, fn, arg)                              
\
-    callout_reset_on((c), (on_tick), (fn), (arg), (c)->c_cpu)
-#define        callout_reset_curcpu(c, on_tick, fn, arg)                       
\
-    callout_reset_on((c), (on_tick), (fn), (arg), PCPU_GET(cpuid))
-int    callout_schedule(struct callout *, int);
-int    callout_schedule_on(struct callout *, int, int);
-#define        callout_schedule_curcpu(c, on_tick)                             
\
-    callout_schedule_on((c), (on_tick), PCPU_GET(cpuid))
-#define        callout_stop(c)         _callout_stop_safe(c, 0)
-int    _callout_stop_safe(struct callout *, int);
-void   callout_tick(void);
-int    callout_tickstofirst(int limit);
-extern void (*callout_new_inserted)(int cpu, int ticks);
-
 #endif
-
-#endif /* _SYS_CALLOUT_H_ */

Modified: head/sys/sys/callout.h
==============================================================================
--- head/sys/sys/callout.h      Tue Apr 26 10:02:15 2011        (r221058)
+++ head/sys/sys/callout.h      Tue Apr 26 11:39:56 2011        (r221059)
@@ -38,25 +38,7 @@
 #ifndef _SYS_CALLOUT_H_
 #define _SYS_CALLOUT_H_
 
-#include <sys/queue.h>
-
-struct lock_object;
-
-SLIST_HEAD(callout_list, callout);
-TAILQ_HEAD(callout_tailq, callout);
-
-struct callout {
-       union {
-               SLIST_ENTRY(callout) sle;
-               TAILQ_ENTRY(callout) tqe;
-       } c_links;
-       int     c_time;                         /* ticks to the event */
-       void    *c_arg;                         /* function argument */
-       void    (*c_func)(void *);              /* function to call */
-       struct lock_object *c_lock;             /* lock to handle */
-       int     c_flags;                        /* state of this entry */
-       volatile int c_cpu;                     /* CPU we're scheduled on */
-};
+#include <sys/_callout.h>
 
 #define        CALLOUT_LOCAL_ALLOC     0x0001 /* was allocated from callfree */
 #define        CALLOUT_ACTIVE          0x0002 /* callout is currently active */

Modified: head/sys/sys/taskqueue.h
==============================================================================
--- head/sys/sys/taskqueue.h    Tue Apr 26 10:02:15 2011        (r221058)
+++ head/sys/sys/taskqueue.h    Tue Apr 26 11:39:56 2011        (r221059)
@@ -35,10 +35,18 @@
 
 #include <sys/queue.h>
 #include <sys/_task.h>
+#include <sys/_callout.h>
 
 struct taskqueue;
 struct thread;
 
+struct timeout_task {
+       struct taskqueue *q;
+       struct task t;
+       struct callout c;
+       int    f;
+};
+
 /*
  * A notification callback function which is called from
  * taskqueue_enqueue().  The context argument is given in the call to
@@ -54,9 +62,15 @@ struct taskqueue *taskqueue_create(const
 int    taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
                                const char *name, ...) __printflike(4, 5);
 int    taskqueue_enqueue(struct taskqueue *queue, struct task *task);
+int    taskqueue_enqueue_timeout(struct taskqueue *queue,
+           struct timeout_task *timeout_task, int ticks);
 int    taskqueue_cancel(struct taskqueue *queue, struct task *task,
            u_int *pendp);
+int    taskqueue_cancel_timeout(struct taskqueue *queue,
+           struct timeout_task *timeout_task, u_int *pendp);
 void   taskqueue_drain(struct taskqueue *queue, struct task *task);
+void   taskqueue_drain_timeout(struct taskqueue *queue,
+           struct timeout_task *timeout_task);
 void   taskqueue_free(struct taskqueue *queue);
 void   taskqueue_run(struct taskqueue *queue);
 void   taskqueue_block(struct taskqueue *queue);
@@ -79,6 +93,12 @@ void taskqueue_thread_enqueue(void *cont
        (task)->ta_context = (context);                 \
 } while (0)
 
+void _timeout_task_init(struct taskqueue *queue,
+           struct timeout_task *timeout_task, int priority, task_fn_t func,
+           void *context);
+#define        TIMEOUT_TASK_INIT(queue, timeout_task, priority, func, context) 
\
+       _timeout_task_init(queue, timeout_task, priority, func, context);
+
 /*
  * Declare a reference to a taskqueue.
  */
_______________________________________________
svn-src-head@freebsd.org mailing list
http://lists.freebsd.org/mailman/listinfo/svn-src-head
To unsubscribe, send any mail to "svn-src-head-unsubscr...@freebsd.org"

Reply via email to