Module Name:    src
Committed By:   riastradh
Date:           Fri Sep 11 14:29:00 UTC 2020

Modified Files:
        src/sys/net: pktqueue.c

Log Message:
pktqueue(9): Use percpu_create to allow early initialization.

Otherwise pktqueues can't be created before all CPUs are detected --
they will have a queue only for the primary CPU, not for others.

This will also be necessary if we want to add CPU hotplug (still need
some way to block hotplug during pktq_set_maxlen but it's a start).


To generate a diff of this commit:
cvs rdiff -u -r1.11 -r1.12 src/sys/net/pktqueue.c

Please note that diffs are not public domain; they are subject to the
copyright notices on the relevant files.

Modified files:

Index: src/sys/net/pktqueue.c
diff -u src/sys/net/pktqueue.c:1.11 src/sys/net/pktqueue.c:1.12
--- src/sys/net/pktqueue.c:1.11	Fri Feb  7 12:35:33 2020
+++ src/sys/net/pktqueue.c	Fri Sep 11 14:29:00 2020
@@ -1,4 +1,4 @@
-/*	$NetBSD: pktqueue.c,v 1.11 2020/02/07 12:35:33 thorpej Exp $	*/
+/*	$NetBSD: pktqueue.c,v 1.12 2020/09/11 14:29:00 riastradh Exp $	*/
 
 /*-
  * Copyright (c) 2014 The NetBSD Foundation, Inc.
@@ -36,7 +36,7 @@
  */
 
 #include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: pktqueue.c,v 1.11 2020/02/07 12:35:33 thorpej Exp $");
+__KERNEL_RCSID(0, "$NetBSD: pktqueue.c,v 1.12 2020/09/11 14:29:00 riastradh Exp $");
 
 #include <sys/param.h>
 #include <sys/types.h>
@@ -74,7 +74,7 @@ struct pktqueue {
 	void *		pq_sih;
 
 	/* Finally, per-CPU queues. */
-	pcq_t *		pq_queue[];
+	struct percpu *	pq_pcq;	/* struct pcq * */
 };
 
 /* The counters of the packet queue. */
@@ -90,17 +90,46 @@ typedef struct {
 /* Special marker value used by pktq_barrier() mechanism. */
 #define	PKTQ_MARKER	((void *)(~0ULL))
 
-/*
- * The total size of pktqueue_t which depends on the number of CPUs.
- */
-#define	PKTQUEUE_STRUCT_LEN(ncpu)	\
-    roundup2(offsetof(pktqueue_t, pq_queue[ncpu]), coherency_unit)
+static void
+pktq_init_cpu(void *vqp, void *vpq, struct cpu_info *ci)
+{
+	struct pcq **qp = vqp;
+	struct pktqueue *pq = vpq;
+
+	*qp = pcq_create(pq->pq_maxlen, KM_SLEEP);
+}
+
+static void
+pktq_fini_cpu(void *vqp, void *vpq, struct cpu_info *ci)
+{
+	struct pcq **qp = vqp, *q = *qp;
+
+	KASSERT(pcq_peek(q) == NULL);
+	pcq_destroy(q);
+	*qp = NULL;		/* paranoia */
+}
+
+static struct pcq *
+pktq_pcq(struct pktqueue *pq, struct cpu_info *ci)
+{
+	struct pcq **qp, *q;
+
+	/*
+	 * As long as preemption is disabled, the xcall to swap percpu
+	 * buffers can't complete, so it is safe to read the pointer.
+	 */
+	KASSERT(kpreempt_disabled());
+
+	qp = percpu_getptr_remote(pq->pq_pcq, ci);
+	q = *qp;
+
+	return q;
+}
 
 pktqueue_t *
 pktq_create(size_t maxlen, void (*intrh)(void *), void *sc)
 {
 	const u_int sflags = SOFTINT_NET | SOFTINT_MPSAFE | SOFTINT_RCPU;
-	const size_t len = PKTQUEUE_STRUCT_LEN(ncpu);
 	pktqueue_t *pq;
 	percpu_t *pc;
 	void *sih;
@@ -111,14 +140,13 @@ pktq_create(size_t maxlen, void (*intrh)
 		return NULL;
 	}
 
-	pq = kmem_zalloc(len, KM_SLEEP);
-	for (u_int i = 0; i < ncpu; i++) {
-		pq->pq_queue[i] = pcq_create(maxlen, KM_SLEEP);
-	}
+	pq = kmem_zalloc(sizeof(*pq), KM_SLEEP);
 	mutex_init(&pq->pq_lock, MUTEX_DEFAULT, IPL_NONE);
 	pq->pq_maxlen = maxlen;
 	pq->pq_counters = pc;
 	pq->pq_sih = sih;
+	pq->pq_pcq = percpu_create(sizeof(struct pcq *),
+	    pktq_init_cpu, pktq_fini_cpu, pq);
 
 	return pq;
 }
@@ -126,17 +154,12 @@ pktq_create(size_t maxlen, void (*intrh)
 void
 pktq_destroy(pktqueue_t *pq)
 {
-	const size_t len = PKTQUEUE_STRUCT_LEN(ncpu);
 
-	for (u_int i = 0; i < ncpu; i++) {
-		pcq_t *q = pq->pq_queue[i];
-		KASSERT(pcq_peek(q) == NULL);
-		pcq_destroy(q);
-	}
+	percpu_free(pq->pq_pcq, sizeof(struct pcq *));
 	percpu_free(pq->pq_counters, sizeof(pktq_counters_t));
 	softint_disestablish(pq->pq_sih);
 	mutex_destroy(&pq->pq_lock);
-	kmem_free(pq, len);
+	kmem_free(pq, sizeof(*pq));
 }
 
 /*
@@ -213,18 +236,18 @@ bool
 pktq_enqueue(pktqueue_t *pq, struct mbuf *m, const u_int hash __unused)
 {
 #if defined(_RUMPKERNEL) || defined(_RUMP_NATIVE_ABI)
-	const unsigned cpuid = curcpu()->ci_index;
+	struct cpu_info *ci = curcpu();
 #else
-	const unsigned cpuid = hash % ncpu;
+	struct cpu_info *ci = cpu_lookup(hash % ncpu);
 #endif
 
 	KASSERT(kpreempt_disabled());
 
-	if (__predict_false(!pcq_put(pq->pq_queue[cpuid], m))) {
+	if (__predict_false(!pcq_put(pktq_pcq(pq, ci), m))) {
 		pktq_inc_count(pq, PQCNT_DROP);
 		return false;
 	}
-	softint_schedule_cpu(pq->pq_sih, cpu_lookup(cpuid));
+	softint_schedule_cpu(pq->pq_sih, ci);
 	pktq_inc_count(pq, PQCNT_ENQUEUE);
 	return true;
 }
@@ -238,11 +261,12 @@ pktq_enqueue(pktqueue_t *pq, struct mbuf
 struct mbuf *
 pktq_dequeue(pktqueue_t *pq)
 {
-	const struct cpu_info *ci = curcpu();
-	const unsigned cpuid = cpu_index(ci);
+	struct cpu_info *ci = curcpu();
 	struct mbuf *m;
 
-	m = pcq_get(pq->pq_queue[cpuid]);
+	KASSERT(kpreempt_disabled());
+
+	m = pcq_get(pktq_pcq(pq, ci));
 	if (__predict_false(m == PKTQ_MARKER)) {
 		/* Note the marker entry. */
 		atomic_inc_uint(&pq->pq_barrier);
@@ -262,13 +286,19 @@ pktq_dequeue(pktqueue_t *pq)
 void
 pktq_barrier(pktqueue_t *pq)
 {
+	CPU_INFO_ITERATOR cii;
+	struct cpu_info *ci;
 	u_int pending = 0;
 
 	mutex_enter(&pq->pq_lock);
 	KASSERT(pq->pq_barrier == 0);
 
-	for (u_int i = 0; i < ncpu; i++) {
-		pcq_t *q = pq->pq_queue[i];
+	for (CPU_INFO_FOREACH(cii, ci)) {
+		struct pcq *q;
+
+		kpreempt_disable();
+		q = pktq_pcq(pq, ci);
+		kpreempt_enable();
 
 		/* If the queue is empty - nothing to do. */
 		if (pcq_peek(q) == NULL) {
@@ -279,7 +309,7 @@ pktq_barrier(pktqueue_t *pq)
 			kpause("pktqsync", false, 1, NULL);
 		}
 		kpreempt_disable();
-		softint_schedule_cpu(pq->pq_sih, cpu_lookup(i));
+		softint_schedule_cpu(pq->pq_sih, ci);
 		kpreempt_enable();
 		pending++;
 	}
@@ -300,19 +330,50 @@ pktq_barrier(pktqueue_t *pq)
 void
 pktq_flush(pktqueue_t *pq)
 {
+	CPU_INFO_ITERATOR cii;
+	struct cpu_info *ci;
 	struct mbuf *m;
 
-	for (u_int i = 0; i < ncpu; i++) {
-		while ((m = pcq_get(pq->pq_queue[i])) != NULL) {
+	for (CPU_INFO_FOREACH(cii, ci)) {
+		struct pcq *q;
+
+		kpreempt_disable();
+		q = pktq_pcq(pq, ci);
+		kpreempt_enable();
+
+		/*
+		 * XXX This can't be right -- if the softint is running
+		 * then pcq_get isn't safe here.
+		 */
+		while ((m = pcq_get(q)) != NULL) {
 			pktq_inc_count(pq, PQCNT_DEQUEUE);
 			m_freem(m);
 		}
 	}
 }
 
+static void
+pktq_set_maxlen_cpu(void *vpq, void *vqs)
+{
+	struct pktqueue *pq = vpq;
+	struct pcq **qp, *q, **qs = vqs;
+	unsigned i = cpu_index(curcpu());
+	int s;
+
+	s = splnet();
+	qp = percpu_getref(pq->pq_pcq);
+	q = *qp;
+	*qp = qs[i];
+	qs[i] = q;
+	percpu_putref(pq->pq_pcq);
+	splx(s);
+}
+
 /*
  * pktq_set_maxlen: create per-CPU queues using a new size and replace
  * the existing queues without losing any packets.
+ *
+ * XXX ncpu must remain stable throughout.
  */
 int
 pktq_set_maxlen(pktqueue_t *pq, size_t maxlen)
@@ -325,18 +386,18 @@ pktq_set_maxlen(pktqueue_t *pq, size_t m
 	if (pq->pq_maxlen == maxlen)
 		return 0;
 
-	/* First, allocate the new queues and replace them. */
+	/* First, allocate the new queues. */
 	qs = kmem_zalloc(slotbytes, KM_SLEEP);
 	for (u_int i = 0; i < ncpu; i++) {
 		qs[i] = pcq_create(maxlen, KM_SLEEP);
 	}
+
+	/*
+	 * Issue an xcall to replace the queue pointers on each CPU.
+	 * This implies all the necessary memory barriers.
+	 */
 	mutex_enter(&pq->pq_lock);
-	for (u_int i = 0; i < ncpu; i++) {
-		/* Swap: store of a word is atomic. */
-		pcq_t *q = pq->pq_queue[i];
-		pq->pq_queue[i] = qs[i];
-		qs[i] = q;
-	}
+	xc_wait(xc_broadcast(XC_HIGHPRI, pktq_set_maxlen_cpu, pq, qs));
 	pq->pq_maxlen = maxlen;
 	mutex_exit(&pq->pq_lock);
 
@@ -355,10 +416,15 @@ pktq_set_maxlen(pktqueue_t *pq, size_t m
 	pktq_barrier(pq);
 
 	for (u_int i = 0; i < ncpu; i++) {
+		struct pcq *q;
 		struct mbuf *m;
 
+		kpreempt_disable();
+		q = pktq_pcq(pq, cpu_lookup(i));
+		kpreempt_enable();
+
 		while ((m = pcq_get(qs[i])) != NULL) {
-			while (!pcq_put(pq->pq_queue[i], m)) {
+			while (!pcq_put(q, m)) {
 				kpause("pktqrenq", false, 1, NULL);
 			}
 		}

Reply via email to