A good reason to offload an ofproto upcall function in polling mode is to allow 
a different CPU to do a time consuming inexact rule matches while the polling 
thread maintains fast packet switching. At low data and packet rates or low 
rate Ethernet interface (1 GbE and lower) this does not matter, however when 
higher packet rates are achieved this is going to be critical since the input 
queue will get easily overrun at 10 GbE rates with moderate delays, especially 
with smaller packet sizes. 

At this time, for polling mode DPDK, all threads have default affinitization to 
only one core. So ofproto upcalls are being run on the same core, so a change 
to call ofproto upcall functions will not show little or no performance 
difference, and may even show as a packet rate gain since no Linux process 
scheduling overhead will be present. Currently, changing to affiliation to 
allow polling thread to execute on different cpu cores than the other 
ovs-vswitchd results in occasional polling halts/hangs which gives very 
unpredictable zero loss performance, resulting in poorer zero loss operation 
then with all ovs-vswitchd threads affinitized to one core. However this is SMP 
synchronization issue(s) that hopefully will eventually get solved.

Calling some ofproto upcall functions directly has potential benefits. It is 
very desirable to setup exact match flow entries while sequentially processing 
packets if this can be done in just several microseconds at most and the NIC RX 
queue has room to absorb and average this time across multiple loop and with 
the loop processing packets faster than average input rate. This would require 
vary optimized inexact rule lookup code. A trip to an open flow controller 
would still need to be offloaded to a non-realtime thread.

Directly calling ofproto upcall functions, before inexact rule lookup code is 
highly optimized for lookup speed when having large number of rules, would make 
it more difficult to get the DPDK packet processing rate up and also to test 
and verify fast packet processing rates.

Mike Polehn  

-----Original Message-----
From: dev [mailto:dev-boun...@openvswitch.org] On Behalf Of Ryan Wilson
Sent: Monday, June 16, 2014 11:45 PM
To: dev@openvswitch.org
Subject: [ovs-dev] [PATCH v2] dpif-netdev: Polling threads directly call 
ofproto upcall functions.

Typically, kernel datapath threads send upcalls to userspace where handler 
threads process the upcalls. For TAP and DPDK devices, the datapath threads 
operate in userspace, so there is no need for separate handler threads.

This patch allows userspace datapath threads to directly call the ofproto 
upcall functions, eliminating the need for handler threads for datapaths of 
type 'netdev'.

Signed-off-by: Ryan Wilson <wr...@nicira.com>
---
v2: Fix race condition found during perf test
---
 lib/dpif-netdev.c             |  327 +++++++----------------------------------
 lib/dpif-netdev.h             |    7 +
 lib/dpif.c                    |   68 ++++++---
 lib/dpif.h                    |    1 +
 ofproto/ofproto-dpif-upcall.c |  227 +++++++++++++++++-----------
 ofproto/ofproto-dpif-upcall.h |    4 +
 6 files changed, 258 insertions(+), 376 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 6c281fe..626b3e6 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -15,8 +15,6 @@
  */
 
 #include <config.h>
-#include "dpif.h"
-
 #include <ctype.h>
 #include <errno.h>
 #include <fcntl.h>
@@ -35,6 +33,7 @@
 #include "cmap.h"
 #include "csum.h"
 #include "dpif.h"
+#include "dpif-netdev.h"
 #include "dpif-provider.h"
 #include "dummy.h"
 #include "dynamic-string.h"
@@ -76,10 +75,7 @@ DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
 /* Configuration parameters. */
 enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
 
-/* Queues. */
-enum { MAX_QUEUE_LEN = 128 };   /* Maximum number of packets per queue. */
-enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 }; 
-BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN));
+static exec_upcall_func *exec_upcall_cb = NULL;
 
 /* Protects against changes to 'dp_netdevs'. */  static struct ovs_mutex 
dp_netdev_mutex = OVS_MUTEX_INITIALIZER; @@ -88,27 +84,6 @@ static struct 
ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;  static struct shash 
dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex)
     = SHASH_INITIALIZER(&dp_netdevs);
 
-struct dp_netdev_upcall {
-    struct dpif_upcall upcall;  /* Queued upcall information. */
-    struct ofpbuf buf;          /* ofpbuf instance for upcall.packet. */
-};
-
-/* A queue passing packets from a struct dp_netdev to its clients (handlers).
- *
- *
- * Thread-safety
- * =============
- *
- * Any access at all requires the owning 'dp_netdev''s queue_rwlock and
- * its own mutex. */
-struct dp_netdev_queue {
-    struct ovs_mutex mutex;
-    struct seq *seq;      /* Incremented whenever a packet is queued. */
-    struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED;
-    unsigned int head OVS_GUARDED;
-    unsigned int tail OVS_GUARDED;
-};
-
 /* Datapath based on the network device interface from netdev.h.
  *
  *
@@ -124,11 +99,11 @@ struct dp_netdev_queue {
  *    port_mutex
  *    flow_mutex
  *    cls.rwlock
- *    queue_rwlock
  */
 struct dp_netdev {
     const struct dpif_class *const class;
     const char *const name;
+    struct dpif *dpif;
     struct ovs_refcount ref_cnt;
     atomic_flag destroyed;
 
@@ -144,15 +119,6 @@ struct dp_netdev {
     struct classifier cls;      /* Classifier.  Protected by cls.rwlock. */
     struct hmap flow_table OVS_GUARDED; /* Flow table. */
 
-    /* Queues.
-     *
-     * 'queue_rwlock' protects the modification of 'handler_queues' and
-     * 'n_handlers'.  The queue elements are protected by its
-     * 'handler_queues''s mutex. */
-    struct fat_rwlock queue_rwlock;
-    struct dp_netdev_queue *handler_queues;
-    uint32_t n_handlers;
-
     /* Statistics.
      *
      * ovsthread_stats is internally synchronized. */ @@ -165,6 +131,10 @@ 
struct dp_netdev {
     struct cmap ports;
     struct seq *port_seq;       /* Incremented whenever a port changes. */
 
+    /* Protects access to ofproto-dpif-upcall interface during revalidator
+     * thread synchronization. */
+    struct fat_rwlock upcall_rwlock;
+
     /* Forwarding threads. */
     struct latch exit_latch;
     struct pmd_thread *pmd_threads;
@@ -327,12 +297,10 @@ static int do_add_port(struct dp_netdev *dp, const char 
*devname,
     OVS_REQUIRES(dp->port_mutex);
 static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *)
     OVS_REQUIRES(dp->port_mutex);
-static void dp_netdev_destroy_all_queues(struct dp_netdev *dp)
-    OVS_REQ_WRLOCK(dp->queue_rwlock);
 static int dpif_netdev_open(const struct dpif_class *, const char *name,
                             bool create, struct dpif **);  static int 
dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *,
-                                      int queue_no, int type,
+                                      int type,
                                       const struct miniflow *,
                                       const struct nlattr *userdata);  static 
void dp_netdev_execute_actions(struct dp_netdev *dp, @@ -472,8 +440,6 @@ 
create_dp_netdev(const char *name, const struct dpif_class *class,
     classifier_init(&dp->cls, NULL);
     hmap_init(&dp->flow_table);
 
-    fat_rwlock_init(&dp->queue_rwlock);
-
     ovsthread_stats_init(&dp->stats);
 
     ovs_mutex_init(&dp->port_mutex);
@@ -481,6 +447,10 @@ create_dp_netdev(const char *name, const struct dpif_class 
*class,
     dp->port_seq = seq_create();
     latch_init(&dp->exit_latch);
 
+    /* Disable upcalls by default. */
+    fat_rwlock_init(&dp->upcall_rwlock);
+    fat_rwlock_wrlock(&dp->upcall_rwlock);
+
     ovs_mutex_lock(&dp->port_mutex);
     error = do_add_port(dp, name, "internal", ODPP_LOCAL);
     ovs_mutex_unlock(&dp->port_mutex);
@@ -511,31 +481,13 @@ dpif_netdev_open(const struct dpif_class *class, const 
char *name,
     }
     if (!error) {
         *dpifp = create_dpif_netdev(dp);
+        dp->dpif = *dpifp;
     }
     ovs_mutex_unlock(&dp_netdev_mutex);
 
     return error;
 }
 
-static void
-dp_netdev_purge_queues(struct dp_netdev *dp)
-    OVS_REQ_WRLOCK(dp->queue_rwlock)
-{
-    int i;
-
-    for (i = 0; i < dp->n_handlers; i++) {
-        struct dp_netdev_queue *q = &dp->handler_queues[i];
-
-        ovs_mutex_lock(&q->mutex);
-        while (q->tail != q->head) {
-            struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
-            ofpbuf_uninit(&u->upcall.packet);
-            ofpbuf_uninit(&u->buf);
-        }
-        ovs_mutex_unlock(&q->mutex);
-    }
-}
-
 /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
  * through the 'dp_netdevs' shash while freeing 'dp'. */  static void @@ 
-564,17 +516,12 @@ dp_netdev_free(struct dp_netdev *dp)
     }
     ovsthread_stats_destroy(&dp->stats);
 
-    fat_rwlock_wrlock(&dp->queue_rwlock);
-    dp_netdev_destroy_all_queues(dp);
-    fat_rwlock_unlock(&dp->queue_rwlock);
-
-    fat_rwlock_destroy(&dp->queue_rwlock);
-
     classifier_destroy(&dp->cls);
     hmap_destroy(&dp->flow_table);
     ovs_mutex_destroy(&dp->flow_mutex);
     seq_destroy(dp->port_seq);
     cmap_destroy(&dp->ports);
+    fat_rwlock_destroy(&dp->upcall_rwlock);
     latch_destroy(&dp->exit_latch);
     free(CONST_CAST(char *, dp->name));
     free(dp);
@@ -1543,80 +1490,6 @@ dpif_netdev_execute(struct dpif *dpif, struct 
dpif_execute *execute)
     return 0;
 }
 
-static void
-dp_netdev_destroy_all_queues(struct dp_netdev *dp)
-    OVS_REQ_WRLOCK(dp->queue_rwlock)
-{
-    size_t i;
-
-    dp_netdev_purge_queues(dp);
-
-    for (i = 0; i < dp->n_handlers; i++) {
-        struct dp_netdev_queue *q = &dp->handler_queues[i];
-
-        ovs_mutex_destroy(&q->mutex);
-        seq_destroy(q->seq);
-    }
-    free(dp->handler_queues);
-    dp->handler_queues = NULL;
-    dp->n_handlers = 0;
-}
-
-static void
-dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers)
-    OVS_REQ_WRLOCK(dp->queue_rwlock)
-{
-    if (dp->n_handlers != n_handlers) {
-        size_t i;
-
-        dp_netdev_destroy_all_queues(dp);
-
-        dp->n_handlers = n_handlers;
-        dp->handler_queues = xzalloc(n_handlers * sizeof *dp->handler_queues);
-
-        for (i = 0; i < n_handlers; i++) {
-            struct dp_netdev_queue *q = &dp->handler_queues[i];
-
-            ovs_mutex_init(&q->mutex);
-            q->seq = seq_create();
-        }
-    }
-}
-
-static int
-dpif_netdev_recv_set(struct dpif *dpif, bool enable) -{
-    struct dp_netdev *dp = get_dp_netdev(dpif);
-
-    if ((dp->handler_queues != NULL) == enable) {
-        return 0;
-    }
-
-    fat_rwlock_wrlock(&dp->queue_rwlock);
-    if (!enable) {
-        dp_netdev_destroy_all_queues(dp);
-    } else {
-        dp_netdev_refresh_queues(dp, 1);
-    }
-    fat_rwlock_unlock(&dp->queue_rwlock);
-
-    return 0;
-}
-
-static int
-dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers) -{
-    struct dp_netdev *dp = get_dp_netdev(dpif);
-
-    fat_rwlock_wrlock(&dp->queue_rwlock);
-    if (dp->handler_queues) {
-        dp_netdev_refresh_queues(dp, n_handlers);
-    }
-    fat_rwlock_unlock(&dp->queue_rwlock);
-
-    return 0;
-}
-
 static int
 dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
                               uint32_t queue_id, uint32_t *priority) @@ 
-1625,97 +1498,6 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif 
OVS_UNUSED,
     return 0;
 }
 
-static bool
-dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t handler_id)
-    OVS_REQ_RDLOCK(dp->queue_rwlock)
-{
-    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
-
-    if (!dp->handler_queues) {
-        VLOG_WARN_RL(&rl, "receiving upcall disabled");
-        return false;
-    }
-
-    if (handler_id >= dp->n_handlers) {
-        VLOG_WARN_RL(&rl, "handler index out of bound");
-        return false;
-    }
-
-    return true;
-}
-
-static int
-dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id,
-                 struct dpif_upcall *upcall, struct ofpbuf *buf)
-{
-    struct dp_netdev *dp = get_dp_netdev(dpif);
-    struct dp_netdev_queue *q;
-    int error = 0;
-
-    fat_rwlock_rdlock(&dp->queue_rwlock);
-
-    if (!dp_netdev_recv_check(dp, handler_id)) {
-        error = EAGAIN;
-        goto out;
-    }
-
-    q = &dp->handler_queues[handler_id];
-    ovs_mutex_lock(&q->mutex);
-    if (q->head != q->tail) {
-        struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
-
-        *upcall = u->upcall;
-
-        ofpbuf_uninit(buf);
-        *buf = u->buf;
-    } else {
-        error = EAGAIN;
-    }
-    ovs_mutex_unlock(&q->mutex);
-
-out:
-    fat_rwlock_unlock(&dp->queue_rwlock);
-
-    return error;
-}
-
-static void
-dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id) -{
-    struct dp_netdev *dp = get_dp_netdev(dpif);
-    struct dp_netdev_queue *q;
-    uint64_t seq;
-
-    fat_rwlock_rdlock(&dp->queue_rwlock);
-
-    if (!dp_netdev_recv_check(dp, handler_id)) {
-        goto out;
-    }
-
-    q = &dp->handler_queues[handler_id];
-    ovs_mutex_lock(&q->mutex);
-    seq = seq_read(q->seq);
-    if (q->head != q->tail) {
-        poll_immediate_wake();
-    } else {
-        seq_wait(q->seq, seq);
-    }
-
-    ovs_mutex_unlock(&q->mutex);
-
-out:
-    fat_rwlock_unlock(&dp->queue_rwlock);
-}
-
-static void
-dpif_netdev_recv_purge(struct dpif *dpif) -{
-    struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
-
-    fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock);
-    dp_netdev_purge_queues(dpif_netdev->dp);
-    fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock);
-}
 

 /* Creates and returns a new 'struct dp_netdev_actions', with a reference count
  * of 1, whose actions are a copy of from the 'ofpacts_len' bytes of @@ 
-1907,6 +1689,20 @@ reload:
     return NULL;
 }
 
+void
+dp_netdev_disable_upcall(struct dpif *dpif) {
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    fat_rwlock_wrlock(&dp->upcall_rwlock);
+}
+
+void
+dp_netdev_enable_upcall(struct dpif *dpif) {
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    fat_rwlock_unlock(&dp->upcall_rwlock);
+}
+
 static void
 dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)  { @@ -2019,11 +1815,9 
@@ dp_netdev_input(struct dp_netdev *dp, struct ofpbuf *packet,
         dp_netdev_execute_actions(dp, &key.flow, packet, true, md,
                                   actions->actions, actions->size);
         dp_netdev_count_packet(dp, DP_STAT_HIT);
-    } else if (dp->handler_queues) {
+    } else {
         dp_netdev_count_packet(dp, DP_STAT_MISS);
         dp_netdev_output_userspace(dp, packet,
-                                   miniflow_hash_5tuple(&key.flow, 0)
-                                   % dp->n_handlers,
                                    DPIF_UC_MISS, &key.flow, NULL);
     }
 }
@@ -2040,46 +1834,37 @@ dp_netdev_port_input(struct dp_netdev *dp, struct 
ofpbuf *packet,
 
 static int
 dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
-                           int queue_no, int type, const struct miniflow *key,
+                           int type, const struct miniflow *key,
                            const struct nlattr *userdata)  {
-    struct dp_netdev_queue *q;
     int error;
 
-    fat_rwlock_rdlock(&dp->queue_rwlock);
-    q = &dp->handler_queues[queue_no];
-    ovs_mutex_lock(&q->mutex);
-    if (q->head - q->tail < MAX_QUEUE_LEN) {
-        struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK];
-        struct dpif_upcall *upcall = &u->upcall;
-        struct ofpbuf *buf = &u->buf;
-        size_t buf_size;
+    if (!fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
+        struct dpif_upcall upcall;
+        struct ofpbuf buf;
+        char base[ODPUTIL_FLOW_KEY_BYTES];
         struct flow flow;
 
-        upcall->type = type;
+        upcall.type = type;
 
-        /* Allocate buffer big enough for everything. */
-        buf_size = ODPUTIL_FLOW_KEY_BYTES;
-        if (userdata) {
-            buf_size += NLA_ALIGN(userdata->nla_len);
-        }
-        ofpbuf_init(buf, buf_size);
+        ofpbuf_use_stack(&buf, &base, ODPUTIL_FLOW_KEY_BYTES);
 
         /* Put ODP flow. */
         miniflow_expand(key, &flow);
-        odp_flow_key_from_flow(buf, &flow, NULL, flow.in_port.odp_port, true);
-        upcall->key = ofpbuf_data(buf);
-        upcall->key_len = ofpbuf_size(buf);
+        odp_flow_key_from_flow(&buf, &flow, NULL, flow.in_port.odp_port, true);
+        upcall.key = ofpbuf_data(&buf);
+        upcall.key_len = ofpbuf_size(&buf);
 
         /* Put userdata. */
-        if (userdata) {
-            upcall->userdata = ofpbuf_put(buf, userdata,
-                                          NLA_ALIGN(userdata->nla_len));
-        }
+        upcall.userdata = CONST_CAST(struct nlattr *, userdata);
+
+        upcall.packet = *packet;
 
-        upcall->packet = *packet;
+        ovs_assert(exec_upcall_cb);
 
-        seq_change(q->seq);
+        dpif_print_packet(dp->dpif, &upcall);
+        exec_upcall_cb(dp->dpif, &upcall, &buf);
+        fat_rwlock_unlock(&dp->upcall_rwlock);
 
         error = 0;
     } else {
@@ -2087,8 +1872,6 @@ dp_netdev_output_userspace(struct dp_netdev *dp, struct 
ofpbuf *packet,
         ofpbuf_delete(packet);
         error = ENOBUFS;
     }
-    ovs_mutex_unlock(&q->mutex);
-    fat_rwlock_unlock(&dp->queue_rwlock);
 
     return error;
 }
@@ -2098,6 +1881,12 @@ struct dp_netdev_execute_aux {
     const struct miniflow *key;
 };
 
+void
+dp_netdev_register_cb(exec_upcall_func *cb) {
+    exec_upcall_cb = cb;
+}
+
 static void
 dp_execute_cb(void *aux_, struct ofpbuf *packet,
               struct pkt_metadata *md,
@@ -2125,8 +1914,6 @@ dp_execute_cb(void *aux_, struct ofpbuf *packet,
         userspace_packet = may_steal ? packet : ofpbuf_clone(packet);
 
         dp_netdev_output_userspace(aux->dp, userspace_packet,
-                                   miniflow_hash_5tuple(aux->key, 0)
-                                       % aux->dp->n_handlers,
                                    DPIF_UC_ACTION, aux->key,
                                    userdata);
         break;
@@ -2227,12 +2014,12 @@ const struct dpif_class dpif_netdev_class = {
     dpif_netdev_flow_dump_next,
     dpif_netdev_execute,
     NULL,                       /* operate */
-    dpif_netdev_recv_set,
-    dpif_netdev_handlers_set,
+    NULL,                       /* recv_set */
+    NULL,                       /* handlers_set */
     dpif_netdev_queue_to_priority,
-    dpif_netdev_recv,
-    dpif_netdev_recv_wait,
-    dpif_netdev_recv_purge,
+    NULL,                       /* recv */
+    NULL,                       /* recv_wait */
+    NULL,                       /* recv_purge */
 };
 
 static void
diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h index af4a969..b2c75d4 100644
--- a/lib/dpif-netdev.h
+++ b/lib/dpif-netdev.h
@@ -20,6 +20,7 @@
 #include <stdbool.h>
 #include <stddef.h>
 #include <stdint.h>
+#include "dpif.h"
 #include "openvswitch/types.h"
 #include "ofpbuf.h"
 #include "packets.h"
@@ -32,6 +33,12 @@ extern "C" {
  * headers to be aligned on a 4-byte boundary.  */  enum { DP_NETDEV_HEADROOM 
= 2 + VLAN_HEADER_LEN };
 
+typedef void exec_upcall_func(struct dpif *, struct dpif_upcall *,
+                              struct ofpbuf *); void 
+dp_netdev_register_cb(exec_upcall_func *); void 
+dp_netdev_disable_upcall(struct dpif *); void 
+dp_netdev_enable_upcall(struct dpif *);
+
 static inline void dp_packet_pad(struct ofpbuf *b)  {
     if (ofpbuf_size(b) < ETH_TOTAL_MIN) { diff --git a/lib/dpif.c b/lib/dpif.c 
index cace47b..e4f90d8 100644
--- a/lib/dpif.c
+++ b/lib/dpif.c
@@ -1253,8 +1253,12 @@ dpif_upcall_type_to_string(enum dpif_upcall_type type)  
int  dpif_recv_set(struct dpif *dpif, bool enable)  {
-    int error = dpif->dpif_class->recv_set(dpif, enable);
-    log_operation(dpif, "recv_set", error);
+    int error = 0;
+
+    if (dpif->dpif_class->recv_set) {
+        error = dpif->dpif_class->recv_set(dpif, enable);
+        log_operation(dpif, "recv_set", error);
+    }
     return error;
 }
 
@@ -1281,11 +1285,37 @@ dpif_recv_set(struct dpif *dpif, bool enable)  int  
dpif_handlers_set(struct dpif *dpif, uint32_t n_handlers)  {
-    int error = dpif->dpif_class->handlers_set(dpif, n_handlers);
-    log_operation(dpif, "handlers_set", error);
+    int error = 0;
+
+    if (dpif->dpif_class->handlers_set) {
+        error = dpif->dpif_class->handlers_set(dpif, n_handlers);
+        log_operation(dpif, "handlers_set", error);
+    }
     return error;
 }
 
+void
+dpif_print_packet(struct dpif *dpif, struct dpif_upcall *upcall) {
+    if (!VLOG_DROP_DBG(&dpmsg_rl)) {
+        struct ds flow;
+        char *packet;
+
+        packet = ofp_packet_to_string(ofpbuf_data(&upcall->packet),
+                                      ofpbuf_size(&upcall->packet));
+
+        ds_init(&flow);
+        odp_flow_key_format(upcall->key, upcall->key_len, &flow);
+
+        VLOG_DBG("%s: %s upcall:\n%s\n%s",
+                 dpif_name(dpif), dpif_upcall_type_to_string(upcall->type),
+                 ds_cstr(&flow), packet);
+
+        ds_destroy(&flow);
+        free(packet);
+    }
+}
+
 /* Polls for an upcall from 'dpif' for an upcall handler.  Since there
  * there can be multiple poll loops, 'handler_id' is needed as index to
  * identify the corresponding poll loop.  If successful, stores the upcall @@ 
-1308,25 +1338,15 @@ int  dpif_recv(struct dpif *dpif, uint32_t handler_id, 
struct dpif_upcall *upcall,
           struct ofpbuf *buf)
 {
-    int error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf);
-    if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) {
-        struct ds flow;
-        char *packet;
-
-        packet = ofp_packet_to_string(ofpbuf_data(&upcall->packet),
-                                      ofpbuf_size(&upcall->packet));
-
-        ds_init(&flow);
-        odp_flow_key_format(upcall->key, upcall->key_len, &flow);
-
-        VLOG_DBG("%s: %s upcall:\n%s\n%s",
-                 dpif_name(dpif), dpif_upcall_type_to_string(upcall->type),
-                 ds_cstr(&flow), packet);
+    int error = EAGAIN;
 
-        ds_destroy(&flow);
-        free(packet);
-    } else if (error && error != EAGAIN) {
-        log_operation(dpif, "recv", error);
+    if (dpif->dpif_class->recv) {
+        error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf);
+        if (!error) {
+            dpif_print_packet(dpif, upcall);
+        } else if (error && error != EAGAIN) {
+            log_operation(dpif, "recv", error);
+        }
     }
     return error;
 }
@@ -1349,7 +1369,9 @@ dpif_recv_purge(struct dpif *dpif)  void  
dpif_recv_wait(struct dpif *dpif, uint32_t handler_id)  {
-    dpif->dpif_class->recv_wait(dpif, handler_id);
+    if (dpif->dpif_class->recv_wait) {
+        dpif->dpif_class->recv_wait(dpif, handler_id);
+    }
 }
 
 /* Obtains the NetFlow engine type and engine ID for 'dpif' into '*engine_type'
diff --git a/lib/dpif.h b/lib/dpif.h
index f080cde..b4c45d1 100644
--- a/lib/dpif.h
+++ b/lib/dpif.h
@@ -676,6 +676,7 @@ int dpif_recv(struct dpif *, uint32_t handler_id, struct 
dpif_upcall *,
               struct ofpbuf *);
 void dpif_recv_purge(struct dpif *);
 void dpif_recv_wait(struct dpif *, uint32_t handler_id);
+void dpif_print_packet(struct dpif *, struct dpif_upcall *);
 

 /* Miscellaneous. */
 
diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c 
index b38f226..4f63b0c 100644
--- a/ofproto/ofproto-dpif-upcall.c
+++ b/ofproto/ofproto-dpif-upcall.c
@@ -22,6 +22,7 @@
 #include "connmgr.h"
 #include "coverage.h"
 #include "dpif.h"
+#include "dpif-netdev.h"
 #include "dynamic-string.h"
 #include "fail-open.h"
 #include "guarded-list.h"
@@ -194,7 +195,10 @@ static struct list all_udpifs = 
LIST_INITIALIZER(&all_udpifs);
 
 static size_t read_upcalls(struct handler *,
                            struct upcall upcalls[UPCALL_MAX_BATCH]); -static 
void handle_upcalls(struct handler *, struct upcall *, size_t n_upcalls);
+static void free_upcall(struct upcall *); static int 
+convert_upcall(struct udpif *, struct upcall *); static void 
+handle_upcall(struct udpif *, struct upcall *); static void 
+handle_upcalls(struct udpif *, struct upcall *, size_t n_upcalls);
 static void udpif_stop_threads(struct udpif *);  static void 
udpif_start_threads(struct udpif *, size_t n_handlers,
                                 size_t n_revalidators); @@ -255,6 +259,8 @@ 
udpif_create(struct dpif_backer *backer, struct dpif *dpif)
     atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
     ovs_mutex_init(&udpif->n_flows_mutex);
 
+    dp_netdev_register_cb(exec_upcall);
+
     return udpif;
 }
 
@@ -291,6 +297,8 @@ udpif_stop_threads(struct udpif *udpif)
             xpthread_join(udpif->revalidators[i].thread, NULL);
         }
 
+        dp_netdev_disable_upcall(udpif->dpif);
+
         for (i = 0; i < udpif->n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
 
@@ -341,6 +349,8 @@ udpif_start_threads(struct udpif *udpif, size_t n_handlers,
                 "handler", udpif_upcall_handler, handler);
         }
 
+        dp_netdev_enable_upcall(udpif->dpif);
+
         ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators);
         udpif->reval_exit = false;
         udpif->revalidators = xzalloc(udpif->n_revalidators @@ -513,12 +523,10 
@@ udpif_upcall_handler(void *arg)
             latch_wait(&udpif->exit_latch);
             poll_block();
         } else {
-            handle_upcalls(handler, upcalls, n_upcalls);
+            handle_upcalls(handler->udpif, upcalls, n_upcalls);
 
             for (i = 0; i < n_upcalls; i++) {
-                xlate_out_uninit(&upcalls[i].xout);
-                ofpbuf_uninit(&upcalls[i].dpif_upcall.packet);
-                ofpbuf_uninit(&upcalls[i].upcall_buf);
+                free_upcall(&upcalls[i]);
             }
         }
         coverage_clear();
@@ -725,6 +733,45 @@ upcall_init(struct upcall *upcall, struct flow *flow, 
struct ofpbuf *packet,
     xlate_actions(&xin, &upcall->xout);  }
 
+void
+free_upcall(struct upcall *upcall)
+{
+    xlate_out_uninit(&upcall->xout);
+    ofpbuf_uninit(&upcall->dpif_upcall.packet);
+    ofpbuf_uninit(&upcall->upcall_buf);
+}
+
+static struct udpif *
+find_udpif(struct dpif *dpif)
+{
+    struct udpif *udpif;
+
+    LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+        if (udpif->dpif == dpif) {
+            return udpif;
+        }
+    }
+    return NULL;
+}
+
+void
+exec_upcall(struct dpif *dpif, struct dpif_upcall *dupcall, struct 
+ofpbuf *buf) {
+    struct udpif *udpif;
+    struct upcall upcall;
+
+    upcall.dpif_upcall = *dupcall;
+    upcall.upcall_buf = *buf;
+
+    udpif = find_udpif(dpif);
+    ovs_assert(udpif);
+
+    if (!convert_upcall(udpif, &upcall)) {
+        handle_upcall(udpif, &upcall);
+        free_upcall(&upcall);
+    }
+}
+
 /* Reads and classifies upcalls.  Returns the number of upcalls successfully
  * read. */
 static size_t
@@ -738,14 +785,6 @@ read_upcalls(struct handler *handler,
     /* Try reading UPCALL_MAX_BATCH upcalls from dpif. */
     for (i = 0; i < UPCALL_MAX_BATCH; i++) {
         struct upcall *upcall = &upcalls[n_upcalls];
-        struct dpif_upcall *dupcall;
-        struct ofpbuf *packet;
-        struct ofproto_dpif *ofproto;
-        struct dpif_sflow *sflow;
-        struct dpif_ipfix *ipfix;
-        struct flow flow;
-        enum upcall_type type;
-        odp_port_t odp_in_port;
         int error;
 
         ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub, @@ -757,91 
+796,113 @@ read_upcalls(struct handler *handler,
             break;
         }
 
-        dupcall = &upcall->dpif_upcall;
-        packet = &dupcall->packet;
-        error = xlate_receive(udpif->backer, packet, dupcall->key,
-                              dupcall->key_len, &flow,
-                              &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
-        if (error) {
-            if (error == ENODEV) {
-                /* Received packet on datapath port for which we couldn't
-                 * associate an ofproto.  This can happen if a port is removed
-                 * while traffic is being received.  Print a rate-limited
-                 * message in case it happens frequently.  Install a drop flow
-                 * so that future packets of the flow are inexpensively dropped
-                 * in the kernel. */
-                VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
-                             "port %"PRIu32, odp_in_port);
-                dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
-                              dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
-                              NULL);
-            }
-            goto destroy_upcall;
+        if (!convert_upcall(udpif, upcall)) {
+            n_upcalls += 1;
         }
+    }
+    return n_upcalls;
+}
 
-        type = classify_upcall(upcall);
-        if (type == MISS_UPCALL) {
-            upcall_init(upcall, &flow, packet, ofproto, dupcall, odp_in_port);
-            n_upcalls++;
-            continue;
+int
+convert_upcall(struct udpif *udpif, struct upcall *upcall) {
+    struct dpif_upcall *dupcall = &upcall->dpif_upcall;
+    struct ofpbuf *packet = &dupcall->packet;
+    struct ofproto_dpif *ofproto;
+    struct dpif_sflow *sflow;
+    struct dpif_ipfix *ipfix;
+    struct flow flow;
+    enum upcall_type type;
+    odp_port_t odp_in_port;
+    int error;
+
+    error = xlate_receive(udpif->backer, packet, dupcall->key,
+                          dupcall->key_len, &flow,
+                          &ofproto, &ipfix, &sflow, NULL, 
+ &odp_in_port);
+
+    if (error) {
+        if (error == ENODEV) {
+            /* Received packet on datapath port for which we couldn't
+             * associate an ofproto.  This can happen if a port is removed
+             * while traffic is being received.  Print a rate-limited
+             * message in case it happens frequently.  Install a drop flow
+             * so that future packets of the flow are inexpensively dropped
+             * in the kernel. */
+            VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
+                         "port %"PRIu32, odp_in_port);
+            dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
+                          dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
+                          NULL);
         }
+        goto destroy_upcall;
+    }
 
-        switch (type) {
-        case SFLOW_UPCALL:
-            if (sflow) {
-                union user_action_cookie cookie;
+    type = classify_upcall(upcall);
+    if (type == MISS_UPCALL) {
+        upcall_init(upcall, &flow, packet, ofproto, dupcall, odp_in_port);
+        return error;
+    }
 
-                memset(&cookie, 0, sizeof cookie);
-                memcpy(&cookie, nl_attr_get(dupcall->userdata),
-                       sizeof cookie.sflow);
-                dpif_sflow_received(sflow, packet, &flow, odp_in_port,
-                                    &cookie);
-            }
-            break;
-        case IPFIX_UPCALL:
-            if (ipfix) {
-                dpif_ipfix_bridge_sample(ipfix, packet, &flow);
-            }
-            break;
-        case FLOW_SAMPLE_UPCALL:
-            if (ipfix) {
-                union user_action_cookie cookie;
-
-                memset(&cookie, 0, sizeof cookie);
-                memcpy(&cookie, nl_attr_get(dupcall->userdata),
-                       sizeof cookie.flow_sample);
-
-                /* The flow reflects exactly the contents of the packet.
-                 * Sample the packet using it. */
-                dpif_ipfix_flow_sample(ipfix, packet, &flow,
-                                       cookie.flow_sample.collector_set_id,
-                                       cookie.flow_sample.probability,
-                                       cookie.flow_sample.obs_domain_id,
-                                       cookie.flow_sample.obs_point_id);
-            }
-            break;
-        case BAD_UPCALL:
-            break;
-        case MISS_UPCALL:
-            OVS_NOT_REACHED();
+    switch (type) {
+    case SFLOW_UPCALL:
+        if (sflow) {
+            union user_action_cookie cookie;
+
+            memset(&cookie, 0, sizeof cookie);
+            memcpy(&cookie, nl_attr_get(dupcall->userdata),
+                   sizeof cookie.sflow);
+            dpif_sflow_received(sflow, packet, &flow, odp_in_port,
+                                &cookie);
+        }
+        break;
+    case IPFIX_UPCALL:
+        if (ipfix) {
+            dpif_ipfix_bridge_sample(ipfix, packet, &flow);
         }
+        break;
+    case FLOW_SAMPLE_UPCALL:
+        if (ipfix) {
+            union user_action_cookie cookie;
+
+            memset(&cookie, 0, sizeof cookie);
+            memcpy(&cookie, nl_attr_get(dupcall->userdata),
+                   sizeof cookie.flow_sample);
+
+            /* The flow reflects exactly the contents of the packet.
+             * Sample the packet using it. */
+            dpif_ipfix_flow_sample(ipfix, packet, &flow,
+                                   cookie.flow_sample.collector_set_id,
+                                   cookie.flow_sample.probability,
+                                   cookie.flow_sample.obs_domain_id,
+                                   cookie.flow_sample.obs_point_id);
+        }
+        break;
+    case BAD_UPCALL:
+        break;
+    case MISS_UPCALL:
+        OVS_NOT_REACHED();
+    }
 
-        dpif_ipfix_unref(ipfix);
-        dpif_sflow_unref(sflow);
+    dpif_ipfix_unref(ipfix);
+    dpif_sflow_unref(sflow);
+    error = EAGAIN;
 
 destroy_upcall:
-        ofpbuf_uninit(&upcall->dpif_upcall.packet);
-        ofpbuf_uninit(&upcall->upcall_buf);
-    }
+    ofpbuf_uninit(&upcall->dpif_upcall.packet);
+    ofpbuf_uninit(&upcall->upcall_buf);
+    return error;
+}
 
-    return n_upcalls;
+static void
+handle_upcall(struct udpif *udpif, struct upcall *upcall) {
+    handle_upcalls(udpif, upcall, 1);
 }
 
 static void
-handle_upcalls(struct handler *handler, struct upcall *upcalls,
+handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
                size_t n_upcalls)
 {
-    struct udpif *udpif = handler->udpif;
     struct dpif_op *opsp[UPCALL_MAX_BATCH * 2];
     struct dpif_op ops[UPCALL_MAX_BATCH * 2];
     size_t n_ops, i;
diff --git a/ofproto/ofproto-dpif-upcall.h b/ofproto/ofproto-dpif-upcall.h 
index 6846f87..7817b39 100644
--- a/ofproto/ofproto-dpif-upcall.h
+++ b/ofproto/ofproto-dpif-upcall.h
@@ -19,6 +19,8 @@
 
 struct dpif;
 struct dpif_backer;
+struct dpif_upcall;
+struct ofpbuf;
 struct seq;
 struct simap;
 
@@ -26,6 +28,8 @@ struct simap;
  * them.  Additionally, it's responsible for maintaining the datapath flow
  * table. */
 
+void exec_upcall(struct dpif *, struct dpif_upcall *, struct ofpbuf *);
+
 struct udpif *udpif_create(struct dpif_backer *, struct dpif *);  void 
udpif_set_threads(struct udpif *, size_t n_handlers,
                        size_t n_revalidators);
--
1.7.9.5

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to