Hi Mark,

This looks good to me.

Since the new scheme doesn't allow users to change the number
of handlers, we must update ovs-vswitchd.conf.db(5) as well.

Some comments below.

On Fri, Apr 30, 2021 at 11:31:29AM -0400, Mark Gray wrote:
> The Open vSwitch kernel module uses the upcall mechanism to send
> packets from kernel space to user space when it misses in the kernel
> space flow table. The upcall sends packets via a Netlink socket.
> Currently, a Netlink socket is created for every vport. In this way,
> there is a 1:1 mapping between a vport and a Netlink socket.
> When a packet is received by a vport, if it needs to be sent to
> user space, it is sent via the corresponding Netlink socket.
> 
> This mechanism, with various iterations of the corresponding user
> space code, has seen some limitations and issues:
> 
> * On systems with a large number of vports, there is correspondingly
> a large number of Netlink sockets which can limit scaling.
> (https://bugzilla.redhat.com/show_bug.cgi?id=1526306)
> * Packet reordering on upcalls.
> (https://bugzilla.redhat.com/show_bug.cgi?id=1844576)
> * A thundering herd issue.
> (https://bugzilla.redhat.com/show_bug.cgi?id=1834444)
> 
> This patch introduces an alternative, feature-negotiated, upcall
> mode using a per-cpu dispatch rather than a per-vport dispatch.
> 
> In this mode, the Netlink socket to be used for the upcall is
> selected based on the CPU of the thread that is executing the upcall.
> In this way, it resolves the issues above as:
> 
> a) The number of Netlink sockets scales with the number of CPUs
> rather than the number of vports.
> b) Ordering per-flow is maintained as packets are distributed to
> CPUs based on mechanisms such as RSS and flows are distributed
> to a single user space thread.
> c) Packets from a flow can only wake up one user space thread.
> 
> Reported-at: https://bugzilla.redhat.com/1844576
> Signed-off-by: Mark Gray <mark.d.g...@redhat.com>
> ---
>  .../linux/compat/include/linux/openvswitch.h  |   7 +
>  lib/dpif-netdev.c                             |   1 +
>  lib/dpif-netlink.c                            | 405 +++++++++++++++---
>  lib/dpif-provider.h                           |  10 +
>  lib/dpif.c                                    |  17 +
>  lib/dpif.h                                    |   1 +
>  ofproto/ofproto-dpif-upcall.c                 |  51 ++-
>  ofproto/ofproto.c                             |  12 -
>  8 files changed, 430 insertions(+), 74 deletions(-)
> 
> diff --git a/datapath/linux/compat/include/linux/openvswitch.h 
> b/datapath/linux/compat/include/linux/openvswitch.h
> index 875de20250ce..f29265df055e 100644
> --- a/datapath/linux/compat/include/linux/openvswitch.h
> +++ b/datapath/linux/compat/include/linux/openvswitch.h
> @@ -89,6 +89,8 @@ enum ovs_datapath_cmd {
>   * set on the datapath port (for OVS_ACTION_ATTR_MISS).  Only valid on
>   * %OVS_DP_CMD_NEW requests. A value of zero indicates that upcalls should
>   * not be sent.
> + * OVS_DP_ATTR_PER_CPU_PIDS: Per-cpu array of PIDs for upcalls when
> + * OVS_DP_F_DISPATCH_UPCALL_PER_CPU feature is set.
>   * @OVS_DP_ATTR_STATS: Statistics about packets that have passed through the
>   * datapath.  Always present in notifications.
>   * @OVS_DP_ATTR_MEGAFLOW_STATS: Statistics about mega flow masks usage for 
> the
> @@ -105,6 +107,8 @@ enum ovs_datapath_attr {
>       OVS_DP_ATTR_MEGAFLOW_STATS,     /* struct ovs_dp_megaflow_stats */
>       OVS_DP_ATTR_USER_FEATURES,      /* OVS_DP_F_*  */
>       OVS_DP_ATTR_PAD,
> +     OVS_DP_ATTR_PAD2,
> +     OVS_DP_ATTR_PER_CPU_PIDS,       /* Netlink PIDS to receive upcalls */
>       __OVS_DP_ATTR_MAX
>  };
>  
> @@ -146,6 +150,9 @@ struct ovs_vport_stats {
>  /* Allow tc offload recirc sharing */
>  #define OVS_DP_F_TC_RECIRC_SHARING  (1 << 2)
>  
> +/* Allow per-cpu dispatch of upcalls */
> +#define OVS_DP_F_DISPATCH_UPCALL_PER_CPU (1 << 3)
> +
>  /* Fixed logical ports. */
>  #define OVSP_LOCAL      ((__u32)0)
>  
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 251788b04965..24e6911dd4ff 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -8488,6 +8488,7 @@ const struct dpif_class dpif_netdev_class = {
>      dpif_netdev_operate,
>      NULL,                       /* recv_set */
>      NULL,                       /* handlers_set */
> +    NULL,                       /* handlers_get */

That is number_handlers_required.

>      dpif_netdev_set_config,
>      dpif_netdev_queue_to_priority,
>      NULL,                       /* recv */
> diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
> index 2ded5fdd01b3..349897e70632 100644
> --- a/lib/dpif-netlink.c
> +++ b/lib/dpif-netlink.c
> @@ -80,6 +80,9 @@ enum { MAX_PORTS = USHRT_MAX };
>  #define FLOW_DUMP_MAX_BATCH 50
>  #define OPERATE_MAX_OPS 50
>  
> +#define DISPATCH_MODE_PER_CPU(dpif) ((dpif)->user_features & \
> +                                     OVS_DP_F_DISPATCH_UPCALL_PER_CPU)
> +

Perhaps a function like this:
static inline bool
dpif_netlink_upcall_per_cpu(struct ... dpif) {
    return !!((dpif)->user_features & OVS_DP_F_DISPATCH_UPCALL_PER_CPU);
}

>  #ifndef EPOLLEXCLUSIVE
>  #define EPOLLEXCLUSIVE (1u << 28)
>  #endif
> @@ -98,6 +101,8 @@ struct dpif_netlink_dp {
>      const struct ovs_dp_stats *stats;  /* OVS_DP_ATTR_STATS. */
>      const struct ovs_dp_megaflow_stats *megaflow_stats;
>                                         /* OVS_DP_ATTR_MEGAFLOW_STATS.*/
> +    const uint32_t *upcall_pids;       /* OVS_DP_ATTR_PER_CPU_PIDS */
> +    uint32_t n_upcall_pids;
>  };
>  
>  static void dpif_netlink_dp_init(struct dpif_netlink_dp *);
> @@ -178,11 +183,16 @@ struct dpif_windows_vport_sock {
>  #endif
>  
>  struct dpif_handler {
> +    /* per-vport dispatch mode */
>      struct epoll_event *epoll_events;
>      int epoll_fd;                 /* epoll fd that includes channel socks. */
>      int n_events;                 /* Num events returned by epoll_wait(). */
>      int event_offset;             /* Offset into 'epoll_events'. */
>  
> +    /* per-cpu dispatch mode */
> +    struct nl_sock *sock;         /* Each handler thread holds one netlink
> +                                     socket. */
> +
>  #ifdef _WIN32
>      /* Pool of sockets. */
>      struct dpif_windows_vport_sock *vport_sock_pool;
> @@ -201,6 +211,8 @@ struct dpif_netlink {
>      struct fat_rwlock upcall_lock;
>      struct dpif_handler *handlers;
>      uint32_t n_handlers;           /* Num of upcall handlers. */
> +
> +    /* Per-vport dispatch mode */
>      struct dpif_channel *channels; /* Array of channels for each port. */
>      int uc_array_size;             /* Size of 'handler->channels' and */
>                                     /* 'handler->epoll_events'. */
> @@ -241,8 +253,12 @@ static int open_dpif(const struct dpif_netlink_dp *, 
> struct dpif **);
>  static uint32_t dpif_netlink_port_get_pid(const struct dpif *,
>                                            odp_port_t port_no);
>  static void dpif_netlink_handler_uninit(struct dpif_handler *handler);
> -static int dpif_netlink_refresh_channels(struct dpif_netlink *,
> -                                         uint32_t n_handlers);
> +static int dpif_netlink_refresh_handlers_vport_dispatch(struct dpif_netlink 
> *,
> +                                                        uint32_t n_handlers);
> +static void destroy_all_channels(struct dpif_netlink *);
> +static int dpif_netlink_refresh_handlers_cpu_dispatch(struct dpif_netlink *);
> +static void destroy_all_handlers(struct dpif_netlink *);
> +
>  static void dpif_netlink_vport_to_ofpbuf(const struct dpif_netlink_vport *,
>                                           struct ofpbuf *);
>  static int dpif_netlink_vport_from_ofpbuf(struct dpif_netlink_vport *,
> @@ -357,11 +373,34 @@ dpif_netlink_open(const struct dpif_class *class 
> OVS_UNUSED, const char *name,
>          dp_request.cmd = OVS_DP_CMD_SET;
>      }
>  
> +    /* The Open vSwitch kernel module has two modes for dispatching upcalls:
> +     * per-vport and per-cpu.
> +     *
> +     * When dispatching upcalls per-vport, the kernel will
> +     * send the upcall via a Netlink socket that has been selected based on 
> the
> +     * vport that received the packet that is causing the upcall.
> +     *
> +     * When dispatching upcall per-cpu, the kernel will send the upcall via
> +     * a Netlink socket that has been selected based on the cpu that received
> +     * the packet that is causing the upcall.
> +     *
> +     * First we test to see if the kernel module supports per-cpu dispatching
> +     * (the preferred method). If it does not support per-cpu dispatching, we
> +     * fall back to the per-vport dispatch mode.
> +     */
>      dp_request.user_features |= OVS_DP_F_UNALIGNED;
> -    dp_request.user_features |= OVS_DP_F_VPORT_PIDS;
> +    dp_request.user_features |= OVS_DP_F_DISPATCH_UPCALL_PER_CPU;
>      error = dpif_netlink_dp_transact(&dp_request, &dp, &buf);
>      if (error) {
> -        return error;
> +        dp_request.user_features &= ~OVS_DP_F_DISPATCH_UPCALL_PER_CPU;
> +        dp_request.user_features |= OVS_DP_F_VPORT_PIDS;
> +        error = dpif_netlink_dp_transact(&dp_request, &dp, &buf);
> +        if (error) {
> +            return error;
> +        }
> +        VLOG_INFO("Dispatch mode(per-vport)");
> +    } else {
> +        VLOG_INFO("Dispatch mode:(per-cpu)");


It would be nice if we could expose the mode using a command.


>      }
>  
>      error = open_dpif(&dp, dpifp);
> @@ -609,6 +648,24 @@ destroy_all_channels(struct dpif_netlink *dpif)
>      dpif->uc_array_size = 0;
>  }
>  
> +static void
> +destroy_all_handlers(struct dpif_netlink *dpif)
> +    OVS_REQ_WRLOCK(dpif->upcall_lock)
> +{
> +    int i = 0;
> +
> +    if (!dpif->handlers) {
> +        return;
> +    }
> +    for (i = 0; i < dpif->n_handlers; i++) {
> +        struct dpif_handler *handler = &dpif->handlers[i];
> +        close_nl_sock(handler->sock);
> +    }
> +    free(dpif->handlers);
> +    dpif->handlers = NULL;
> +    dpif->n_handlers = 0;
> +}
> +
>  static void
>  dpif_netlink_close(struct dpif *dpif_)
>  {
> @@ -617,7 +674,11 @@ dpif_netlink_close(struct dpif *dpif_)
>      nl_sock_destroy(dpif->port_notifier);
>  
>      fat_rwlock_wrlock(&dpif->upcall_lock);
> -    destroy_all_channels(dpif);
> +    if (DISPATCH_MODE_PER_CPU(dpif)) {
> +        destroy_all_handlers(dpif);
> +    } else {
> +        destroy_all_channels(dpif);
> +    }
>      fat_rwlock_unlock(&dpif->upcall_lock);
>  
>      fat_rwlock_destroy(&dpif->upcall_lock);
> @@ -641,11 +702,14 @@ dpif_netlink_run(struct dpif *dpif_)
>  {
>      struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
>  
> -    if (dpif->refresh_channels) {
> -        dpif->refresh_channels = false;
> -        fat_rwlock_wrlock(&dpif->upcall_lock);
> -        dpif_netlink_refresh_channels(dpif, dpif->n_handlers);
> -        fat_rwlock_unlock(&dpif->upcall_lock);
> +    if (!DISPATCH_MODE_PER_CPU(dpif)) {
> +        if (dpif->refresh_channels) {
> +            dpif->refresh_channels = false;
> +            fat_rwlock_wrlock(&dpif->upcall_lock);
> +            dpif_netlink_refresh_handlers_vport_dispatch(dpif,
> +                                                         dpif->n_handlers);
> +            fat_rwlock_unlock(&dpif->upcall_lock);
> +        }
>      }
>      return false;
>  }
> @@ -681,6 +745,41 @@ dpif_netlink_get_stats(const struct dpif *dpif_, struct 
> dpif_dp_stats *stats)
>      return error;
>  }
>  
> +static int
> +dpif_netlink_set_handler_pids(struct dpif *dpif_, const uint32_t 
> *upcall_pids,
> +                              uint32_t n_upcall_pids)
> +{
> +    struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
> +    struct dpif_netlink_dp request, reply;
> +    struct ofpbuf *bufp;
> +    int error;
> +    int n_cores;
> +
> +    n_cores = count_cpu_cores();
> +    ovs_assert(n_cores == n_upcall_pids);
> +    VLOG_DBG("Dispatch mode(per-cpu): Number of CPUs is %d", n_cores);
> +
> +    dpif_netlink_dp_init(&request);
> +    request.cmd = OVS_DP_CMD_SET;
> +    request.name = dpif_->base_name;
> +    request.dp_ifindex = dpif->dp_ifindex;
> +    request.user_features = dpif->user_features |
> +                            OVS_DP_F_DISPATCH_UPCALL_PER_CPU;
> +
> +    request.upcall_pids = upcall_pids;
> +    request.n_upcall_pids = n_cores;
> +
> +    error = dpif_netlink_dp_transact(&request, &reply, &bufp);
> +    if (!error) {
> +        dpif->user_features = reply.user_features;
> +        ofpbuf_delete(bufp);
> +         if (!(dpif->user_features & OVS_DP_F_DISPATCH_UPCALL_PER_CPU)) {
> +            return -EOPNOTSUPP;
> +        }
> +    }
> +    return error;
> +}
> +
>  static int
>  dpif_netlink_set_features(struct dpif *dpif_, uint32_t new_features)
>  {
> @@ -741,7 +840,7 @@ get_vport_type(const struct dpif_netlink_vport *vport)
>          return "erspan";
>  
>      case OVS_VPORT_TYPE_IP6ERSPAN:
> -        return "ip6erspan"; 
> +        return "ip6erspan";
>  
>      case OVS_VPORT_TYPE_IP6GRE:
>          return "ip6gre";
> @@ -807,10 +906,16 @@ dpif_netlink_port_add__(struct dpif_netlink *dpif, 
> const char *name,
>      uint32_t upcall_pids = 0;
>      int error = 0;
>  
> -    if (dpif->handlers) {
> -        error = create_nl_sock(dpif, &sock);
> -        if (error) {
> -            return error;
> +    /* per-cpu dispatch mode does not require a socket per vport */
> +    if (!DISPATCH_MODE_PER_CPU(dpif)) {
> +        if (dpif->handlers) {
> +            error = create_nl_sock(dpif, &sock);
> +            if (error) {
> +                return error;
> +            }
> +        }
> +        if (sock) {
> +            upcall_pids = nl_sock_pid(sock);
>          }
>      }
>  
> @@ -821,9 +926,6 @@ dpif_netlink_port_add__(struct dpif_netlink *dpif, const 
> char *name,
>      request.name = name;
>  
>      request.port_no = *port_nop;
> -    if (sock) {
> -        upcall_pids = nl_sock_pid(sock);
> -    }
>      request.n_upcall_pids = 1;
>      request.upcall_pids = &upcall_pids;
>  
> @@ -845,19 +947,21 @@ dpif_netlink_port_add__(struct dpif_netlink *dpif, 
> const char *name,
>          goto exit;
>      }
>  
> -    error = vport_add_channel(dpif, *port_nop, sock);
> -    if (error) {
> -        VLOG_INFO("%s: could not add channel for port %s",
> -                    dpif_name(&dpif->dpif), name);
> -
> -        /* Delete the port. */
> -        dpif_netlink_vport_init(&request);
> -        request.cmd = OVS_VPORT_CMD_DEL;
> -        request.dp_ifindex = dpif->dp_ifindex;
> -        request.port_no = *port_nop;
> -        dpif_netlink_vport_transact(&request, NULL, NULL);
> -        close_nl_sock(sock);
> -        goto exit;
> +    if (!DISPATCH_MODE_PER_CPU(dpif)) {
> +        error = vport_add_channel(dpif, *port_nop, sock);
> +        if (error) {
> +            VLOG_INFO("%s: could not add channel for port %s",
> +                        dpif_name(&dpif->dpif), name);
> +
> +            /* Delete the port. */
> +            dpif_netlink_vport_init(&request);
> +            request.cmd = OVS_VPORT_CMD_DEL;
> +            request.dp_ifindex = dpif->dp_ifindex;
> +            request.port_no = *port_nop;
> +            dpif_netlink_vport_transact(&request, NULL, NULL);
> +            close_nl_sock(sock);
> +            goto exit;
> +        }
>      }
>  
>  exit:
> @@ -1115,6 +1219,11 @@ dpif_netlink_port_get_pid(const struct dpif *dpif_, 
> odp_port_t port_no)
>      const struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
>      uint32_t ret;
>  
> +    /* In per-cpu dispatch mode, vports do not have an associated PID */
> +    if (DISPATCH_MODE_PER_CPU(dpif)) {
> +        return 0;
> +    }

I mentioned this in the kernel patch's review that this is used with
userspace() action and 0 means it is 'unset'.


> +
>      fat_rwlock_rdlock(&dpif->upcall_lock);
>      ret = dpif_netlink_port_get_pid__(dpif, port_no);
>      fat_rwlock_unlock(&dpif->upcall_lock);
> @@ -2326,12 +2435,51 @@ dpif_netlink_handler_uninit(struct dpif_handler 
> *handler)
>  }
>  #endif
>  
> +static int
> +dpif_netlink_refresh_handlers_cpu_dispatch(struct dpif_netlink *dpif)
> +    OVS_REQ_WRLOCK(dpif->upcall_lock)
> +{
> +    int handler_id;
> +    int error = 0;
> +    uint32_t n_handlers;
> +    uint32_t *upcall_pids;
> +
> +    n_handlers = count_cpu_cores();
> +    if (dpif->n_handlers != n_handlers) {
> +        VLOG_DBG("Dispatch mode(per-cpu): initializing %d handlers",
> +                   n_handlers);
> +        destroy_all_handlers(dpif);
> +        upcall_pids = xzalloc(n_handlers * sizeof *upcall_pids);
> +        dpif->handlers = xzalloc(n_handlers * sizeof *dpif->handlers);
> +        for (handler_id = 0; handler_id < n_handlers; handler_id++) {
> +            struct dpif_handler *handler = &dpif->handlers[handler_id];
> +            error = create_nl_sock(dpif, &handler->sock);
> +            if (error) {
> +                VLOG_ERR("Dispatch mode(per-cpu): Cannot create socket for"
> +                         "handler %d", handler_id);
> +                continue;
> +            }
> +            upcall_pids[handler_id] = nl_sock_pid(handler->sock);
> +            VLOG_DBG("Dispatch mode(per-cpu): "
> +                      "handler %d has Netlink PID of %u",
> +                      handler_id, upcall_pids[handler_id]);
> +        }
> +
> +        dpif->n_handlers = n_handlers;
> +        error = dpif_netlink_set_handler_pids(&dpif->dpif, upcall_pids,
> +                                              n_handlers);
> +        free(upcall_pids);
> +    }
> +    return error;
> +}
> +
>  /* Synchronizes 'channels' in 'dpif->handlers'  with the set of vports
>   * currently in 'dpif' in the kernel, by adding a new set of channels for
>   * any kernel vport that lacks one and deleting any channels that have no
>   * backing kernel vports. */
>  static int
> -dpif_netlink_refresh_channels(struct dpif_netlink *dpif, uint32_t n_handlers)
> +dpif_netlink_refresh_handlers_vport_dispatch(struct dpif_netlink *dpif,
> +                                             uint32_t n_handlers)
>      OVS_REQ_WRLOCK(dpif->upcall_lock)
>  {
>      unsigned long int *keep_channels;
> @@ -2458,7 +2606,7 @@ dpif_netlink_refresh_channels(struct dpif_netlink 
> *dpif, uint32_t n_handlers)
>  }
>  
>  static int
> -dpif_netlink_recv_set__(struct dpif_netlink *dpif, bool enable)
> +dpif_netlink_recv_set_vport_dispatch(struct dpif_netlink *dpif, bool enable)
>      OVS_REQ_WRLOCK(dpif->upcall_lock)
>  {
>      if ((dpif->handlers != NULL) == enable) {
> @@ -2467,7 +2615,21 @@ dpif_netlink_recv_set__(struct dpif_netlink *dpif, 
> bool enable)
>          destroy_all_channels(dpif);
>          return 0;
>      } else {
> -        return dpif_netlink_refresh_channels(dpif, 1);
> +        return dpif_netlink_refresh_handlers_vport_dispatch(dpif, 1);
> +    }
> +}
> +
> +static int
> +dpif_netlink_recv_set_cpu_dispatch(struct dpif_netlink *dpif, bool enable)
> +    OVS_REQ_WRLOCK(dpif->upcall_lock)
> +{
> +    if ((dpif->handlers != NULL) == enable) {
> +        return 0;
> +    } else if (!enable) {
> +        destroy_all_handlers(dpif);
> +        return 0;
> +    } else {
> +        return dpif_netlink_refresh_handlers_cpu_dispatch(dpif);
>      }
>  }
>  
> @@ -2478,7 +2640,11 @@ dpif_netlink_recv_set(struct dpif *dpif_, bool enable)
>      int error;
>  
>      fat_rwlock_wrlock(&dpif->upcall_lock);
> -    error = dpif_netlink_recv_set__(dpif, enable);
> +    if (DISPATCH_MODE_PER_CPU(dpif)) {
> +        error = dpif_netlink_recv_set_cpu_dispatch(dpif, enable);
> +    } else {
> +        error = dpif_netlink_recv_set_vport_dispatch(dpif, enable);
> +    }
>      fat_rwlock_unlock(&dpif->upcall_lock);
>  
>      return error;
> @@ -2500,13 +2666,31 @@ dpif_netlink_handlers_set(struct dpif *dpif_, 
> uint32_t n_handlers)
>  
>      fat_rwlock_wrlock(&dpif->upcall_lock);
>      if (dpif->handlers) {
> -        error = dpif_netlink_refresh_channels(dpif, n_handlers);
> +        if (DISPATCH_MODE_PER_CPU(dpif)) {
> +            error = dpif_netlink_refresh_handlers_cpu_dispatch(dpif);
> +        } else {
> +            error = dpif_netlink_refresh_handlers_vport_dispatch(dpif,
> +                                                                 n_handlers);
> +        }
>      }
>      fat_rwlock_unlock(&dpif->upcall_lock);
>  
>      return error;
>  }
>  
> +static bool
> +dpif_netlink_number_handlers_required(struct dpif *dpif_, uint32_t 
> *n_handlers)
> +{
> +    struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
> +
> +    if (DISPATCH_MODE_PER_CPU(dpif)) {
> +        *n_handlers = count_cpu_cores();
> +        return true;
> +    }
> +
> +    return false;
> +}
> +
>  static int
>  dpif_netlink_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
>                               uint32_t queue_id, uint32_t *priority)
> @@ -2669,8 +2853,59 @@ dpif_netlink_recv_windows(struct dpif_netlink *dpif, 
> uint32_t handler_id,
>  }
>  #else
>  static int
> -dpif_netlink_recv__(struct dpif_netlink *dpif, uint32_t handler_id,
> -                    struct dpif_upcall *upcall, struct ofpbuf *buf)
> +dpif_netlink_recv_cpu_dispatch(struct dpif_netlink *dpif, uint32_t 
> handler_id,
> +                               struct dpif_upcall *upcall, struct ofpbuf 
> *buf)
> +    OVS_REQ_RDLOCK(dpif->upcall_lock)
> +{
> +    struct dpif_handler *handler;
> +    int read_tries = 0;
> +
> +    if (!dpif->handlers || handler_id >= dpif->n_handlers) {
> +        return EAGAIN;
> +    }
> +
> +    handler = &dpif->handlers[handler_id];
> +
> +    for (;;) {
> +        int dp_ifindex;
> +        int error;
> +
> +        if (++read_tries > 50) {
> +            return EAGAIN;
> +        }
> +        error = nl_sock_recv(handler->sock, buf, NULL, false);
> +        if (error == ENOBUFS) {
> +            /* ENOBUFS typically means that we've received so many
> +                * packets that the buffer overflowed.  Try again
> +                * immediately because there's almost certainly a packet
> +                * waiting for us. */
> +            report_loss(dpif, NULL, 0, handler_id);
> +            continue;
> +        }
> +
> +        if (error) {
> +            if (error == EAGAIN) {
> +                break;
> +            }
> +            return error;
> +        }
> +
> +        error = parse_odp_packet(buf, upcall, &dp_ifindex);
> +        if (!error && dp_ifindex == dpif->dp_ifindex) {
> +            return 0;
> +        } else if (error) {
> +            return error;
> +        }
> +    }
> +
> +    return EAGAIN;
> +}
> +
> +static int
> +dpif_netlink_recv_vport_dispatch(struct dpif_netlink *dpif,
> +                                 uint32_t handler_id,
> +                                 struct dpif_upcall *upcall,
> +                                 struct ofpbuf *buf)
>      OVS_REQ_RDLOCK(dpif->upcall_lock)
>  {
>      struct dpif_handler *handler;
> @@ -2755,18 +2990,24 @@ dpif_netlink_recv(struct dpif *dpif_, uint32_t 
> handler_id,
>  #ifdef _WIN32
>      error = dpif_netlink_recv_windows(dpif, handler_id, upcall, buf);
>  #else
> -    error = dpif_netlink_recv__(dpif, handler_id, upcall, buf);
> +    if (DISPATCH_MODE_PER_CPU(dpif)) {
> +        error = dpif_netlink_recv_cpu_dispatch(dpif, handler_id, upcall, 
> buf);
> +    } else {
> +        error = dpif_netlink_recv_vport_dispatch(dpif,
> +                                                 handler_id, upcall, buf);
> +    }
>  #endif
>      fat_rwlock_unlock(&dpif->upcall_lock);
>  
>      return error;
>  }
>  
> +#ifdef _WIN32
>  static void
> -dpif_netlink_recv_wait__(struct dpif_netlink *dpif, uint32_t handler_id)
> +dpif_netlink_recv_wait_windows(struct dpif_netlink *dpif, uint32_t 
> handler_id)
>      OVS_REQ_RDLOCK(dpif->upcall_lock)
>  {
> -#ifdef _WIN32
> +
>      uint32_t i;
>      struct dpif_windows_vport_sock *sock_pool =
>          dpif->handlers[handler_id].vport_sock_pool;
> @@ -2779,13 +3020,31 @@ dpif_netlink_recv_wait__(struct dpif_netlink *dpif, 
> uint32_t handler_id)
>      for (i = 0; i < VPORT_SOCK_POOL_SIZE; i++) {
>          nl_sock_wait(sock_pool[i].nl_sock, POLLIN);
>      }
> -#else
> +}
> +#endif
> +
> +static void
> +dpif_netlink_recv_wait_vport_dispatch(struct dpif_netlink *dpif,
> +                                      uint32_t handler_id)
> +    OVS_REQ_RDLOCK(dpif->upcall_lock)
> +{
>      if (dpif->handlers && handler_id < dpif->n_handlers) {
>          struct dpif_handler *handler = &dpif->handlers[handler_id];
>  
>          poll_fd_wait(handler->epoll_fd, POLLIN);
>      }
> -#endif
> +}
> +
> +static void
> +dpif_netlink_recv_wait_cpu_dispatch(struct dpif_netlink *dpif,
> +                                    uint32_t handler_id)
> +    OVS_REQ_RDLOCK(dpif->upcall_lock)
> +{
> +    if (dpif->handlers && handler_id < dpif->n_handlers) {
> +        struct dpif_handler *handler = &dpif->handlers[handler_id];
> +
> +        poll_fd_wait(nl_sock_fd(handler->sock), POLLIN);
> +    }
>  }
>  
>  static void
> @@ -2794,12 +3053,20 @@ dpif_netlink_recv_wait(struct dpif *dpif_, uint32_t 
> handler_id)
>      struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
>  
>      fat_rwlock_rdlock(&dpif->upcall_lock);
> -    dpif_netlink_recv_wait__(dpif, handler_id);
> +#ifdef _WIN32
> +    dpif_netlink_recv_wait_windows(dpif, handler_id);
> +#else
> +    if (DISPATCH_MODE_PER_CPU(dpif)) {
> +        dpif_netlink_recv_wait_cpu_dispatch(dpif, handler_id);
> +    } else {
> +        dpif_netlink_recv_wait_vport_dispatch(dpif, handler_id);
> +    }
> +#endif
>      fat_rwlock_unlock(&dpif->upcall_lock);
>  }
>  
>  static void
> -dpif_netlink_recv_purge__(struct dpif_netlink *dpif)
> +dpif_netlink_recv_purge_vport_dispatch(struct dpif_netlink *dpif)
>      OVS_REQ_WRLOCK(dpif->upcall_lock)
>  {
>      if (dpif->handlers) {
> @@ -2815,13 +3082,31 @@ dpif_netlink_recv_purge__(struct dpif_netlink *dpif)
>      }
>  }
>  
> +static void
> +dpif_netlink_recv_purge_cpu_dispatch(struct dpif_netlink *dpif)
> +    OVS_REQ_WRLOCK(dpif->upcall_lock)
> +{
> +    int handler_id;
> +
> +    if (dpif->handlers) {
> +        for (handler_id = 0; handler_id < dpif->n_handlers; handler_id++) {
> +            struct dpif_handler *handler = &dpif->handlers[handler_id];
> +            nl_sock_drain(handler->sock);
> +        }
> +    }
> +}
> +
>  static void
>  dpif_netlink_recv_purge(struct dpif *dpif_)
>  {
>      struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
>  
>      fat_rwlock_wrlock(&dpif->upcall_lock);
> -    dpif_netlink_recv_purge__(dpif);
> +    if (DISPATCH_MODE_PER_CPU(dpif)) {
> +        dpif_netlink_recv_purge_cpu_dispatch(dpif);
> +    } else {
> +        dpif_netlink_recv_purge_vport_dispatch(dpif);
> +    }
>      fat_rwlock_unlock(&dpif->upcall_lock);
>  }
>  
> @@ -3974,6 +4259,7 @@ const struct dpif_class dpif_netlink_class = {
>      dpif_netlink_operate,
>      dpif_netlink_recv_set,
>      dpif_netlink_handlers_set,
> +    dpif_netlink_number_handlers_required,
>      NULL,                       /* set_config */
>      dpif_netlink_queue_to_priority,
>      dpif_netlink_recv,
> @@ -4337,6 +4623,11 @@ dpif_netlink_dp_to_ofpbuf(const struct dpif_netlink_dp 
> *dp, struct ofpbuf *buf)
>          nl_msg_put_u32(buf, OVS_DP_ATTR_USER_FEATURES, dp->user_features);
>      }
>  
> +    if (dp->upcall_pids) {
> +        nl_msg_put_unspec(buf, OVS_DP_ATTR_PER_CPU_PIDS, dp->upcall_pids,
> +                          sizeof *dp->upcall_pids * dp->n_upcall_pids);
> +    }
> +
>      /* Skip OVS_DP_ATTR_STATS since we never have a reason to serialize it. 
> */
>  }
>  
> @@ -4642,7 +4933,6 @@ dpif_netlink_flow_get_stats(const struct 
> dpif_netlink_flow *flow,
>      stats->used = flow->used ? get_32aligned_u64(flow->used) : 0;
>      stats->tcp_flags = flow->tcp_flags ? *flow->tcp_flags : 0;
>  }
> -
>  /* Logs information about a packet that was recently lost in 'ch' (in
>   * 'dpif_'). */
>  static void
> @@ -4656,13 +4946,18 @@ report_loss(struct dpif_netlink *dpif, struct 
> dpif_channel *ch, uint32_t ch_idx,
>          return;
>      }
>  
> -    ds_init(&s);
> -    if (ch->last_poll != LLONG_MIN) {
> -        ds_put_format(&s, " (last polled %lld ms ago)",
> -                      time_msec() - ch->last_poll);
> -    }
> +    if (DISPATCH_MODE_PER_CPU(dpif)) {
> +        VLOG_WARN("%s: lost packet on handler %u",
> +                  dpif_name(&dpif->dpif), handler_id);
> +    } else {
> +        ds_init(&s);
> +        if (ch->last_poll != LLONG_MIN) {
> +            ds_put_format(&s, " (last polled %lld ms ago)",
> +                        time_msec() - ch->last_poll);
> +        }
>  
> -    VLOG_WARN("%s: lost packet on port channel %u of handler %u%s",
> -              dpif_name(&dpif->dpif), ch_idx, handler_id, ds_cstr(&s));
> -    ds_destroy(&s);
> +        VLOG_WARN("%s: lost packet on port channel %u of handler %u%s",
> +                dpif_name(&dpif->dpif), ch_idx, handler_id, ds_cstr(&s));
> +        ds_destroy(&s);
> +    }
>  }
> diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h
> index b817fceac698..32711f19d1f6 100644
> --- a/lib/dpif-provider.h
> +++ b/lib/dpif-provider.h
> @@ -357,6 +357,16 @@ struct dpif_class {
>       * */
>      int (*handlers_set)(struct dpif *dpif, uint32_t n_handlers);
>  
> +    /* Queries 'dpif' to see if a certain number of handlers are required by
> +     * the implementation.
> +     *
> +     * If a certain number of handlers are required, returns 'true' and sets
> +     * 'n_handlers' to that number of handler threads.
> +     *
> +     * If not, returns 'false'.
> +     */
> +    bool (*number_handlers_required)(struct dpif *dpif, uint32_t 
> *n_handlers);
> +

It also needs to update struct dpif_class handlers_set() documentation.


>      /* Pass custom configuration options to the datapath.  The implementation
>       * might postpone applying the changes until run() is called. */
>      int (*set_config)(struct dpif *dpif, const struct smap *other_config);
> diff --git a/lib/dpif.c b/lib/dpif.c
> index 26e8bfb7db98..511383514d5b 100644
> --- a/lib/dpif.c
> +++ b/lib/dpif.c
> @@ -1489,6 +1489,23 @@ dpif_handlers_set(struct dpif *dpif, uint32_t 
> n_handlers)
>      return error;
>  }
>  
> +/* Checks if a certain number of handlers are required.
> + *
> + * If a certain number of handlers are required, returns 'true' and sets
> + * 'n_handlers' to that number of handler threads.
> + *
> + * If not, returns 'false'
> + */
> +bool
> +dpif_number_handlers_required(struct dpif *dpif, uint32_t *n_handlers) {
> +    bool ret = false;
> +
> +    if (dpif->dpif_class->number_handlers_required) {
> +        ret = dpif->dpif_class->number_handlers_required(dpif, n_handlers);
> +    }
> +    return ret;
> +}
> +
>  void
>  dpif_register_dp_purge_cb(struct dpif *dpif, dp_purge_callback *cb, void 
> *aux)
>  {
> diff --git a/lib/dpif.h b/lib/dpif.h
> index f9728e67393b..7c322d20e6c7 100644
> --- a/lib/dpif.h
> +++ b/lib/dpif.h
> @@ -873,6 +873,7 @@ void dpif_register_upcall_cb(struct dpif *, 
> upcall_callback *, void *aux);
>  
>  int dpif_recv_set(struct dpif *, bool enable);
>  int dpif_handlers_set(struct dpif *, uint32_t n_handlers);
> +bool dpif_number_handlers_required(struct dpif *, uint32_t *n_handlers);
>  int dpif_set_config(struct dpif *, const struct smap *cfg);
>  int dpif_port_set_config(struct dpif *, odp_port_t, const struct smap *cfg);
>  int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *,
> diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
> index 88406fea1391..45c22d20c183 100644
> --- a/ofproto/ofproto-dpif-upcall.c
> +++ b/ofproto/ofproto-dpif-upcall.c
> @@ -636,24 +636,61 @@ udpif_set_threads(struct udpif *udpif, uint32_t 
> n_handlers_,
>                    uint32_t n_revalidators_)
>  {
>      ovs_assert(udpif);
> -    ovs_assert(n_handlers_ && n_revalidators_);
> +    uint32_t n_handlers_requested;
> +    uint32_t n_revalidators_requested;
> +    bool forced = false;
> +
> +    if (dpif_number_handlers_required(udpif->dpif, &n_handlers_requested)) {
> +        forced = true;
> +        if (!n_revalidators_) {
> +            n_revalidators_requested = n_handlers_requested / 4 + 1;
> +        } else {
> +            n_revalidators_requested = n_revalidators_;
> +        }
> +    } else {
> +        int threads = MAX(count_cpu_cores(), 2);
> +
> +        n_revalidators_requested = MAX(n_revalidators_, 0);
> +        n_handlers_requested = MAX(n_handlers_, 0);
>  
> -    if (udpif->n_handlers != n_handlers_
> -        || udpif->n_revalidators != n_revalidators_) {
> +        if (!n_revalidators_requested) {
> +            n_revalidators_requested = n_handlers_requested
> +            ? MAX(threads - (int) n_handlers_requested, 1)
> +            : threads / 4 + 1;
> +        }
> +
> +        if (!n_handlers_requested) {
> +            n_handlers_requested = MAX(threads -
> +                                       (int) n_revalidators_requested, 1);
> +        }
> +    }
> +
> +    if (udpif->n_handlers != n_handlers_requested
> +        || udpif->n_revalidators != n_revalidators_requested) {
> +        if (forced) {
> +            VLOG_INFO("Overriding n-handler-threads to %u, setting "
> +                      "n-revalidator-threads to %u", n_handlers_requested,
> +                      n_revalidators_requested);
> +            } else {
> +            VLOG_INFO("Setting n-handler-threads to %u, setting "
> +                      "n-revalidator-threads to %u", n_handlers_requested,
> +                      n_revalidators_requested);
> +        }
>          udpif_stop_threads(udpif, true);
>      }
>  
>      if (!udpif->handlers && !udpif->revalidators) {
> +        VLOG_INFO("Starting %u threads", n_handlers_requested +
> +                                         n_revalidators_requested);
>          int error;
> -
> -        error = dpif_handlers_set(udpif->dpif, n_handlers_);
> +        error = dpif_handlers_set(udpif->dpif, n_handlers_requested);
>          if (error) {
>              VLOG_ERR("failed to configure handlers in dpif %s: %s",
>                       dpif_name(udpif->dpif), ovs_strerror(error));
>              return;
>          }
> -
> -        udpif_start_threads(udpif, n_handlers_, n_revalidators_);
> +        udpif_start_threads(udpif, n_handlers_requested,
> +                            n_revalidators_requested);
>      }
>  }
>  
> diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c
> index 69e5834e766c..1501eca9a989 100644
> --- a/ofproto/ofproto.c
> +++ b/ofproto/ofproto.c
> @@ -792,20 +792,8 @@ ofproto_type_set_config(const char *datapath_type, const 
> struct smap *cfg)
>  void
>  ofproto_set_threads(int n_handlers_, int n_revalidators_)
>  {
> -    int threads = MAX(count_cpu_cores(), 2);
> -
>      n_revalidators = MAX(n_revalidators_, 0);
>      n_handlers = MAX(n_handlers_, 0);
> -
> -    if (!n_revalidators) {
> -        n_revalidators = n_handlers
> -            ? MAX(threads - (int) n_handlers, 1)
> -            : threads / 4 + 1;
> -    }
> -
> -    if (!n_handlers) {
> -        n_handlers = MAX(threads - (int) n_revalidators, 1);
> -    }
>  }
>  
>  void
> -- 
> 2.27.0
> 
> _______________________________________________
> dev mailing list
> d...@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev

-- 
fbl
_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to