On 10/04/2017 02:03 PM, Jesper Dangaard Brouer wrote:
[...]
static int cpu_map_kthread_run(void *data)
{
struct bpf_cpu_map_entry *rcpu = data;
set_current_state(TASK_INTERRUPTIBLE);
while (!kthread_should_stop()) {
+ unsigned int processed = 0, drops = 0;
struct xdp_pkt *xdp_pkt;
- schedule();
- /* Do work */
- while ((xdp_pkt = ptr_ring_consume(rcpu->queue))) {
- /* For now just "refcnt-free" */
- page_frag_free(xdp_pkt);
+ /* Release CPU reschedule checks */
+ if (__ptr_ring_empty(rcpu->queue)) {
+ schedule();
+ } else {
+ cond_resched();
+ }
+
+ /* Process packets in rcpu->queue */
+ local_bh_disable();
+ /*
+ * The bpf_cpu_map_entry is single consumer, with this
+ * kthread CPU pinned. Lockless access to ptr_ring
+ * consume side valid as no-resize allowed of queue.
+ */
+ while ((xdp_pkt = __ptr_ring_consume(rcpu->queue))) {
+ struct sk_buff *skb;
+ int ret;
+
+ skb = cpu_map_build_skb(rcpu, xdp_pkt);
+ if (!skb) {
+ page_frag_free(xdp_pkt);
+ continue;
+ }
+
+ /* Inject into network stack */
+ ret = netif_receive_skb_core(skb);
Don't we need to hold RCU read lock for above netif_receive_skb_core()?
+ if (ret == NET_RX_DROP)
+ drops++;
+
+ /* Limit BH-disable period */
+ if (++processed == 8)
+ break;
}
+ local_bh_enable(); /* resched point, may call do_softirq() */
+
__set_current_state(TASK_INTERRUPTIBLE);
}
put_cpu_map_entry(rcpu);
@@ -463,13 +582,6 @@ static int bq_flush_to_queue(struct bpf_cpu_map_entry
*rcpu,