Module Name:    src
Committed By:   ozaki-r
Date:           Thu Dec 28 07:00:52 UTC 2017

Modified Files:
        src/share/man/man9: workqueue.9
        src/sys/kern: subr_workqueue.c
        src/sys/sys: workqueue.h

Log Message:
Add workqueue_wait that waits for a specific work to finish

The caller must ensure that no new work is enqueued before calling
workqueue_wait. Note that Note that if the workqueue is WQ_PERCPU, the caller
can enqueue a new work to another queue other than the waiting queue.

Discussed on tech-kern@


To generate a diff of this commit:
cvs rdiff -u -r1.11 -r1.12 src/share/man/man9/workqueue.9
cvs rdiff -u -r1.33 -r1.34 src/sys/kern/subr_workqueue.c
cvs rdiff -u -r1.9 -r1.10 src/sys/sys/workqueue.h

Please note that diffs are not public domain; they are subject to the
copyright notices on the relevant files.

Modified files:

Index: src/share/man/man9/workqueue.9
diff -u src/share/man/man9/workqueue.9:1.11 src/share/man/man9/workqueue.9:1.12
--- src/share/man/man9/workqueue.9:1.11	Tue Oct 13 04:22:24 2015
+++ src/share/man/man9/workqueue.9	Thu Dec 28 07:00:52 2017
@@ -1,4 +1,4 @@
-.\"	$NetBSD: workqueue.9,v 1.11 2015/10/13 04:22:24 riastradh Exp $
+.\"	$NetBSD: workqueue.9,v 1.12 2017/12/28 07:00:52 ozaki-r Exp $
 .\"
 .\" Copyright (c)2005 YAMAMOTO Takashi,
 .\" All rights reserved.
@@ -25,7 +25,7 @@
 .\" SUCH DAMAGE.
 .\"
 .\" ------------------------------------------------------------
-.Dd October 24, 2011
+.Dd December 28, 2017
 .Dt WORKQUEUE 9
 .Os
 .\" ------------------------------------------------------------
@@ -47,6 +47,10 @@
 "struct workqueue *wq" "struct work *wk" "struct cpu_info *ci"
 .\" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
 .Ft void
+.Fn workqueue_wait \
+"struct workqueue *wq" "struct work *wk"
+.\" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+.Ft void
 .Fn workqueue_destroy \
 "struct workqueue *wq"
 .\" ------------------------------------------------------------
@@ -118,6 +122,19 @@ the
 framework.
 .Pp
 .\" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+.Fn workqueue_wait
+waits for a specified work
+.Fa wk
+on the workqueue
+.Fa wq
+to finish.
+The caller must ensure that no new work will be enqueued to the workqueue
+beforehand.
+Note that if the workqueue is
+.Dv WQ_PERCPU ,
+the caller can enqueue a new work to another queue other than the waiting queue.
+.Pp
+.\" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
 .Fn workqueue_destroy
 destroys a workqueue and frees associated resources.
 The caller should ensure that the workqueue has no work enqueued beforehand.

Index: src/sys/kern/subr_workqueue.c
diff -u src/sys/kern/subr_workqueue.c:1.33 src/sys/kern/subr_workqueue.c:1.34
--- src/sys/kern/subr_workqueue.c:1.33	Sun Oct  7 22:16:21 2012
+++ src/sys/kern/subr_workqueue.c	Thu Dec 28 07:00:52 2017
@@ -1,4 +1,4 @@
-/*	$NetBSD: subr_workqueue.c,v 1.33 2012/10/07 22:16:21 matt Exp $	*/
+/*	$NetBSD: subr_workqueue.c,v 1.34 2017/12/28 07:00:52 ozaki-r Exp $	*/
 
 /*-
  * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi,
@@ -27,7 +27,7 @@
  */
 
 #include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.33 2012/10/07 22:16:21 matt Exp $");
+__KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.34 2017/12/28 07:00:52 ozaki-r Exp $");
 
 #include <sys/param.h>
 #include <sys/cpu.h>
@@ -49,8 +49,10 @@ SIMPLEQ_HEAD(workqhead, work_impl);
 struct workqueue_queue {
 	kmutex_t q_mutex;
 	kcondvar_t q_cv;
-	struct workqhead q_queue;
+	struct workqhead q_queue_pending;
+	struct workqhead q_queue_running;
 	lwp_t *q_worker;
+	work_impl_t *q_waiter;
 };
 
 struct workqueue {
@@ -115,24 +117,29 @@ workqueue_worker(void *cookie)
 	q = workqueue_queue_lookup(wq, curlwp->l_cpu);
 
 	for (;;) {
-		struct workqhead tmp;
-
 		/*
 		 * we violate abstraction of SIMPLEQ.
 		 */
 
-#if defined(DIAGNOSTIC)
-		tmp.sqh_last = (void *)POISON;
-#endif /* defined(DIAGNOSTIC) */
-
 		mutex_enter(&q->q_mutex);
-		while (SIMPLEQ_EMPTY(&q->q_queue))
+		while (SIMPLEQ_EMPTY(&q->q_queue_pending))
 			cv_wait(&q->q_cv, &q->q_mutex);
-		tmp.sqh_first = q->q_queue.sqh_first; /* XXX */
-		SIMPLEQ_INIT(&q->q_queue);
+		KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running));
+		q->q_queue_running.sqh_first =
+		    q->q_queue_pending.sqh_first; /* XXX */
+		SIMPLEQ_INIT(&q->q_queue_pending);
 		mutex_exit(&q->q_mutex);
 
-		workqueue_runlist(wq, &tmp);
+		workqueue_runlist(wq, &q->q_queue_running);
+
+		mutex_enter(&q->q_mutex);
+		KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running));
+		SIMPLEQ_INIT(&q->q_queue_running);
+		if (__predict_false(q->q_waiter != NULL)) {
+			/* Wake up workqueue_wait */
+			cv_signal(&q->q_cv);
+		}
+		mutex_exit(&q->q_mutex);
 	}
 }
 
@@ -159,7 +166,8 @@ workqueue_initqueue(struct workqueue *wq
 
 	mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl);
 	cv_init(&q->q_cv, wq->wq_name);
-	SIMPLEQ_INIT(&q->q_queue);
+	SIMPLEQ_INIT(&q->q_queue_pending);
+	SIMPLEQ_INIT(&q->q_queue_running);
 	ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0);
 	if (wq->wq_prio < PRI_KERNEL)
 		ktf |= KTHREAD_TS;
@@ -194,7 +202,7 @@ workqueue_exit(struct work *wk, void *ar
 	 */
 
 	KASSERT(q->q_worker == curlwp);
-	KASSERT(SIMPLEQ_EMPTY(&q->q_queue));
+	KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
 	mutex_enter(&q->q_mutex);
 	q->q_worker = NULL;
 	cv_signal(&q->q_cv);
@@ -210,10 +218,10 @@ workqueue_finiqueue(struct workqueue *wq
 	KASSERT(wq->wq_func == workqueue_exit);
 
 	wqe.wqe_q = q;
-	KASSERT(SIMPLEQ_EMPTY(&q->q_queue));
+	KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
 	KASSERT(q->q_worker != NULL);
 	mutex_enter(&q->q_mutex);
-	SIMPLEQ_INSERT_TAIL(&q->q_queue, &wqe.wqe_wk, wk_entry);
+	SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry);
 	cv_signal(&q->q_cv);
 	while (q->q_worker != NULL) {
 		cv_wait(&q->q_cv, &q->q_mutex);
@@ -271,6 +279,64 @@ workqueue_create(struct workqueue **wqp,
 	return error;
 }
 
+static bool
+workqueue_q_wait(struct workqueue_queue *q, work_impl_t *wk_target)
+{
+	work_impl_t *wk;
+	bool found = false;
+
+	mutex_enter(&q->q_mutex);
+    again:
+	SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) {
+		if (wk == wk_target)
+			goto found;
+	}
+	SIMPLEQ_FOREACH(wk, &q->q_queue_running, wk_entry) {
+		if (wk == wk_target)
+			goto found;
+	}
+    found:
+	if (wk != NULL) {
+		found = true;
+		KASSERT(q->q_waiter == NULL);
+		q->q_waiter = wk;
+		cv_wait(&q->q_cv, &q->q_mutex);
+		goto again;
+	}
+	if (q->q_waiter != NULL)
+		q->q_waiter = NULL;
+	mutex_exit(&q->q_mutex);
+
+	return found;
+}
+
+/*
+ * Wait for a specified work to finish.  The caller must ensure that no new
+ * work will be enqueued before calling workqueue_wait.  Note that if the
+ * workqueue is WQ_PERCPU, the caller can enqueue a new work to another queue
+ * other than the waiting queue.
+ */
+void
+workqueue_wait(struct workqueue *wq, struct work *wk)
+{
+	struct workqueue_queue *q;
+	bool found;
+
+	if (ISSET(wq->wq_flags, WQ_PERCPU)) {
+		struct cpu_info *ci;
+		CPU_INFO_ITERATOR cii;
+		for (CPU_INFO_FOREACH(cii, ci)) {
+			q = workqueue_queue_lookup(wq, ci);
+			found = workqueue_q_wait(q, (work_impl_t *)wk);
+			if (found)
+				break;
+		}
+	} else {
+		q = workqueue_queue_lookup(wq, NULL);
+		(void) workqueue_q_wait(q, (work_impl_t *)wk);
+	}
+}
+
 void
 workqueue_destroy(struct workqueue *wq)
 {
@@ -298,7 +364,8 @@ workqueue_enqueue(struct workqueue *wq, 
 	q = workqueue_queue_lookup(wq, ci);
 
 	mutex_enter(&q->q_mutex);
-	SIMPLEQ_INSERT_TAIL(&q->q_queue, wk, wk_entry);
+	KASSERT(q->q_waiter == NULL);
+	SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry);
 	cv_signal(&q->q_cv);
 	mutex_exit(&q->q_mutex);
 }

Index: src/sys/sys/workqueue.h
diff -u src/sys/sys/workqueue.h:1.9 src/sys/sys/workqueue.h:1.10
--- src/sys/sys/workqueue.h:1.9	Fri Oct 19 12:16:48 2007
+++ src/sys/sys/workqueue.h	Thu Dec 28 07:00:52 2017
@@ -1,4 +1,4 @@
-/*	$NetBSD: workqueue.h,v 1.9 2007/10/19 12:16:48 ad Exp $	*/
+/*	$NetBSD: workqueue.h,v 1.10 2017/12/28 07:00:52 ozaki-r Exp $	*/
 
 /*-
  * Copyright (c)2002, 2005 YAMAMOTO Takashi,
@@ -51,6 +51,7 @@ struct workqueue;
 int workqueue_create(struct workqueue **, const char *,
     void (*)(struct work *, void *), void *, pri_t, int, int);
 void workqueue_destroy(struct workqueue *);
+void workqueue_wait(struct workqueue *, struct work *);
 
 void workqueue_enqueue(struct workqueue *, struct work *, struct cpu_info *);
 

Reply via email to