LGTM,

Tested-by: Antonio Fischetti <antonio.fische...@intel.com>
Acked-by: Antonio Fischetti <antonio.fische...@intel.com>


> -----Original Message-----
> From: ovs-dev-boun...@openvswitch.org [mailto:ovs-dev-
> boun...@openvswitch.org] On Behalf Of Bhanuprakash Bodireddy
> Sent: Friday, December 8, 2017 12:04 PM
> To: d...@openvswitch.org
> Subject: [ovs-dev] [PATCH v6 2/8] dpif-netdev: Register packet
> processing cores to KA framework.
> 
> This commit registers the packet processing PMD threads to keepalive
> framework. Only PMDs that have rxqs mapped will be registered and
> actively monitored by KA framework.
> 
> This commit spawns a keepalive thread that will dispatch heartbeats to
> PMD threads. The pmd threads respond to heartbeats by marking themselves
> alive. As long as PMD responds to heartbeats it is considered 'healthy'.
> 
> Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodire...@intel.com>
> ---
>  lib/dpif-netdev.c |  79 ++++++++++++++++++++++
>  lib/keepalive.c   | 194
> ++++++++++++++++++++++++++++++++++++++++++++++++++++--
>  lib/keepalive.h   |  20 ++++++
>  lib/ovs-thread.c  |   6 ++
>  lib/ovs-thread.h  |   1 +
>  lib/util.c        |  22 +++++++
>  lib/util.h        |   1 +
>  7 files changed, 318 insertions(+), 5 deletions(-)
> 
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 07f6113..c978a76 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -49,6 +49,7 @@
>  #include "flow.h"
>  #include "hmapx.h"
>  #include "id-pool.h"
> +#include "keepalive.h"
>  #include "latch.h"
>  #include "netdev.h"
>  #include "netdev-vport.h"
> @@ -592,6 +593,7 @@ struct dp_netdev_pmd_thread {
>      atomic_bool reload;             /* Do we need to reload ports? */
>      pthread_t thread;
>      unsigned core_id;               /* CPU core id of this pmd thread.
> */
> +    pid_t tid;                      /* PMD thread tid. */
>      int numa_id;                    /* numa node id of this pmd thread.
> */
>      bool isolated;
> 
> @@ -1018,6 +1020,72 @@ sorted_poll_thread_list(struct dp_netdev *dp,
>      *n = k;
>  }
> 
> +static void *
> +ovs_keepalive(void *f_ OVS_UNUSED)
> +{
> +    pthread_detach(pthread_self());
> +
> +    for (;;) {
> +        uint64_t interval;
> +
> +        interval = get_ka_interval();
> +        xnanosleep(interval * 1000 * 1000);
> +    }
> +
> +    return NULL;
> +}
> +
> +/* Kickstart 'ovs_keepalive' thread. */
> +static void
> +ka_thread_start(struct dp_netdev *dp)
> +{
> +    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
> +
> +    if (ovsthread_once_start(&once)) {
> +        ovs_thread_create("ovs_keepalive", ovs_keepalive, dp);
> +
> +        ovsthread_once_done(&once);
> +    }
> +}
> +
> +/* Register the datapath threads. This gets invoked on every datapath
> + * reconfiguration. The pmd thread[s] having rxq[s] mapped will be
> + * registered to KA framework.
> + */
> +static void
> +ka_register_datapath_threads(struct dp_netdev *dp)
> +{
> +    if (!ka_is_enabled()) {
> +        return;
> +    }
> +
> +    ka_thread_start(dp);
> +
> +    ka_reload_datapath_threads_begin();
> +
> +    struct dp_netdev_pmd_thread *pmd;
> +    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> +        /*  Register only PMD threads. */
> +        if (pmd->core_id != NON_PMD_CORE_ID) {
> +            /* Skip PMD thread with no rxqs mapping. */
> +            if (OVS_UNLIKELY(!hmap_count(&pmd->poll_list))) {
> +                /* Rxq mapping changes due to datapath reconfiguration.
> +                 * If no rxqs mapped to PMD now due to reconfiguration,
> +                 * unregister the pmd thread. */
> +                ka_unregister_thread(pmd->tid);
> +                continue;
> +            }
> +
> +            ka_register_thread(pmd->tid);
> +            VLOG_INFO("Registered PMD thread [%d] on Core[%d] to KA
> framework",
> +                      pmd->tid, pmd->core_id);
> +        }
> +    }
> +    ka_cache_registered_threads();
> +
> +    ka_reload_datapath_threads_end();
> +}
> +
>  static void
>  dpif_netdev_pmd_rebalance(struct unixctl_conn *conn, int argc,
>                            const char *argv[], void *aux OVS_UNUSED)
> @@ -3819,6 +3887,9 @@ reconfigure_datapath(struct dp_netdev *dp)
> 
>      /* Reload affected pmd threads. */
>      reload_affected_pmds(dp);
> +
> +    /* Register datapath threads to KA monitoring. */
> +    ka_register_datapath_threads(dp);
>  }
> 
>  /* Returns true if one of the netdevs in 'dp' requires a
> reconfiguration */
> @@ -4023,6 +4094,8 @@ pmd_thread_main(void *f_)
> 
>      /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
>      ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
> +    /* Stores tid in to 'pmd->tid'. */
> +    ovsthread_set_tid(&pmd->tid);
>      ovs_numa_thread_setaffinity_core(pmd->core_id);
>      dpdk_set_lcore_id(pmd->core_id);
>      poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
> @@ -4056,6 +4129,9 @@ reload:
>                                                        :
> PMD_CYCLES_IDLE);
>          }
> 
> +        /* Mark PMD thread alive. */
> +        ka_mark_pmd_thread_alive(pmd->tid);
> +
>          if (lc++ > 1024) {
>              bool reload;
> 
> @@ -4089,6 +4165,9 @@ reload:
>      }
> 
>      emc_cache_uninit(&pmd->flow_cache);
> +
> +    ka_unregister_thread(pmd->tid);
> +
>      free(poll_list);
>      pmd_free_cached_ports(pmd);
>      return NULL;
> diff --git a/lib/keepalive.c b/lib/keepalive.c
> index ca8dccb..b04877f 100644
> --- a/lib/keepalive.c
> +++ b/lib/keepalive.c
> @@ -19,6 +19,7 @@
>  #include "keepalive.h"
>  #include "lib/vswitch-idl.h"
>  #include "openvswitch/vlog.h"
> +#include "process.h"
>  #include "seq.h"
>  #include "timeval.h"
> 
> @@ -28,11 +29,19 @@ static bool keepalive_enable = false;      /*
> Keepalive disabled by default. */
>  static uint32_t keepalive_timer_interval;  /* keepalive timer interval.
> */
>  static struct keepalive_info ka_info;
> 
> -/* Returns true if keepalive is enabled, false otherwise. */
> -bool
> -ka_is_enabled(void)
> +/* Returns true if state update is allowed, false otherwise. */
> +static bool
> +ka_can_update_state(void)
>  {
> -    return keepalive_enable;
> +    bool reload_inprogress;
> +    bool ka_enable;
> +
> +    atomic_read_relaxed(&ka_info.reload_threads, &reload_inprogress);
> +    ka_enable = ka_is_enabled();
> +
> +    /* Return true if KA is enabled and 'cached_process_list' map
> reload
> +     * is completed. */
> +    return ka_enable && !reload_inprogress;
>  }
> 
>  /* Finds the thread by 'tid' in 'process_list' map and update
> @@ -49,7 +58,7 @@ ka_set_thread_state_ts(pid_t tid, enum keepalive_state
> state,
>      ovs_mutex_lock(&ka_info.proclist_mutex);
>      HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
>                               &ka_info.process_list) {
> -        if (pinfo->tid == tid) {
> +        if (OVS_LIKELY(pinfo->tid == tid)) {
>              pinfo->state = state;
>              pinfo->last_seen_time = last_alive;
>          }
> @@ -104,6 +113,177 @@ ka_register_relay_cb(ka_relay_cb cb, void *aux)
>      ka_info.relay_cb_data = aux;
>  }
> 
> +/* Returns true if keepalive is enabled, false otherwise. */
> +bool
> +ka_is_enabled(void)
> +{
> +    return keepalive_enable;
> +}
> +
> +/* Return the Keepalive timer interval. */
> +uint32_t
> +get_ka_interval(void)
> +{
> +    return keepalive_timer_interval;
> +}
> +
> +/* 'cached_process_list' map reload in progress.
> + *
> + * Should be called before the 'ka_info.cached_process_list'
> + * is populated from 'ka_info.process_list'. This way the pmd
> + * doesn't heartbeat while the reload is in progress. */
> +void
> +ka_reload_datapath_threads_begin(void)
> +{
> +    atomic_store_relaxed(&ka_info.reload_threads, true);
> +}
> +
> +/* 'cached_process_list' map reload finished.
> + *
> + * Should be called after the 'ka_info.cached_process_list'
> + * is populated from 'ka_info.process_list'. This way the pmd
> + * can restart heartbeat when the reload is finished. */
> +void
> +ka_reload_datapath_threads_end(void)
> +{
> +    atomic_store_relaxed(&ka_info.reload_threads, false);
> +}
> +
> +/* Register thread to KA framework. */
> +void
> +ka_register_thread(pid_t tid)
> +{
> +    if (ka_is_enabled()) {
> +        struct ka_process_info *ka_pinfo;
> +        int core_id = -1;
> +        char proc_name[18] = "UNDEFINED";
> +
> +        struct process_info pinfo;
> +        int success = get_process_info(tid, &pinfo);
> +        if (success) {
> +            core_id = pinfo.core_id;
> +            ovs_strlcpy(proc_name, pinfo.name, sizeof proc_name);
> +        }
> +
> +        uint32_t hash = hash_int(tid, 0);
> +        ovs_mutex_lock(&ka_info.proclist_mutex);
> +        HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node,
> +                                 hash, &ka_info.process_list) {
> +            /* Thread is already registered. */
> +            if (ka_pinfo->tid == tid) {
> +                goto out;
> +            }
> +        }
> +
> +        ka_pinfo = xmalloc(sizeof *ka_pinfo);
> +        ka_pinfo->tid = tid;
> +        ka_pinfo->core_id = core_id;
> +        ovs_strlcpy(ka_pinfo->name, proc_name, sizeof ka_pinfo->name);
> +
> +        hmap_insert(&ka_info.process_list, &ka_pinfo->node, hash);
> +
> +        ka_pinfo->state = KA_STATE_ALIVE;
> +        ka_pinfo->last_seen_time = time_wall_msec();
> +        ka_info.thread_cnt++;  /* Increment count of registered
> threads. */
> +out:
> +        ovs_mutex_unlock(&ka_info.proclist_mutex);
> +    }
> +}
> +
> +/* Unregister thread from KA framework. */
> +void
> +ka_unregister_thread(pid_t tid)
> +{
> +    if (ka_is_enabled()) {
> +        struct ka_process_info *ka_pinfo;
> +
> +        ovs_mutex_lock(&ka_info.proclist_mutex);
> +        HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node, hash_int(tid, 0),
> +                                 &ka_info.process_list) {
> +            /* If thread is registered, remove it from the list */
> +            if (ka_pinfo->tid == tid) {
> +                hmap_remove(&ka_info.process_list, &ka_pinfo->node);
> +                free(ka_pinfo);
> +
> +                ka_pinfo->state = KA_STATE_UNUSED;
> +                ka_info.thread_cnt--;  /* Decrement thread count. */
> +                break;
> +            }
> +        }
> +        ovs_mutex_unlock(&ka_info.proclist_mutex);
> +    }
> +}
> +
> +/* Free the 'ka_info.cached_process_list' list. */
> +void
> +ka_free_cached_threads(void)
> +{
> +    struct ka_process_info *pinfo_cached;
> +    /* Free threads in the cached list. */
> +    HMAP_FOR_EACH_POP (pinfo_cached, node,
> &ka_info.cached_process_list) {
> +        free(pinfo_cached);
> +    }
> +    hmap_shrink(&ka_info.cached_process_list);
> +}
> +
> +/* Cache the list of registered threads from 'ka_info.process_list'
> + * map into 'ka_info.cached_process_list.
> + *
> + * 'cached_process_list' map is an exact copy of 'process_list' that
> will
> + * be updated by 'pmd' and 'ovs_keepalive' threads as part of heartbeat
> + * mechanism.  This cached copy is created so that the heartbeats can
> be
> + * performed with out acquiring locks.
> + *
> + * On datapath reconfiguration, both the 'process_list' and the cached
> copy
> + * 'cached_process_list' is updated after setting 'reload_threads' to
> 'true'
> + * so that pmd doesn't heartbeat while the maps are updated.
> + *
> + */
> +void
> +ka_cache_registered_threads(void)
> +{
> +    struct ka_process_info *pinfo, *next, *pinfo_cached;
> +
> +    ka_free_cached_threads();
> +
> +    HMAP_FOR_EACH_SAFE (pinfo, next, node, &ka_info.process_list) {
> +        pinfo_cached = xmemdup(pinfo, sizeof *pinfo_cached);
> +        hmap_insert(&ka_info.cached_process_list, &pinfo_cached->node,
> +                     hash_int(pinfo->tid,0));
> +    }
> +}
> +
> +/* Mark packet processing thread alive. */
> +void
> +ka_mark_pmd_thread_alive(int tid)
> +{
> +    if (ka_can_update_state()) {
> +        struct ka_process_info *pinfo;
> +        HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
> +                             &ka_info.cached_process_list) {
> +            if (OVS_LIKELY(pinfo->tid == tid)) {
> +                pinfo->state = KA_STATE_ALIVE;
> +            }
> +        }
> +    }
> +}
> +
> +/* Mark packet processing thread as sleeping. */
> +void
> +ka_mark_pmd_thread_sleep(int tid)
> +{
> +    if (ka_can_update_state()) {
> +        struct ka_process_info *pinfo;
> +
> +        HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
> +                             &ka_info.cached_process_list) {
> +            if (pinfo->tid == tid) {
> +                pinfo->state = KA_STATE_SLEEP;
> +            }
> +        }
> +    }
> +}
> +
>  void
>  ka_init(const struct smap *ovs_other_config)
>  {
> @@ -120,6 +300,7 @@ ka_init(const struct smap *ovs_other_config)
>              ka_register_relay_cb(ka_update_thread_state, NULL);
>              ovs_mutex_init(&ka_info.proclist_mutex);
>              hmap_init(&ka_info.process_list);
> +            hmap_init(&ka_info.cached_process_list);
> 
>              ka_info.init_time = time_wall_msec();
> 
> @@ -143,5 +324,8 @@ ka_destroy(void)
>      ovs_mutex_unlock(&ka_info.proclist_mutex);
> 
>      hmap_destroy(&ka_info.process_list);
> +
> +    ka_free_cached_threads();
> +    hmap_destroy(&ka_info.cached_process_list);
>      ovs_mutex_destroy(&ka_info.proclist_mutex);
>  }
> diff --git a/lib/keepalive.h b/lib/keepalive.h
> index a738daa..7674ea3 100644
> --- a/lib/keepalive.h
> +++ b/lib/keepalive.h
> @@ -48,6 +48,9 @@ enum keepalive_state {
>  };
> 
>  struct ka_process_info {
> +    /* Process name. */
> +    char name[16];
> +
>      /* Thread id of the process, retrieved using ovs_gettid(). */
>      pid_t tid;
> 
> @@ -71,15 +74,32 @@ struct keepalive_info {
>      /* List of process/threads monitored by KA framework. */
>      struct hmap process_list OVS_GUARDED;
> 
> +    /* cached copy of 'process_list' list. */
> +    struct hmap cached_process_list;
> +
> +    /* count of threads registered to KA framework. */
> +    uint32_t thread_cnt;
> +
>      /* Keepalive initialization time. */
>      uint64_t init_time;
> 
>      /* keepalive relay handler. */
>      ka_relay_cb relay_cb;
>      void *relay_cb_data;
> +
> +    atomic_bool reload_threads;   /* Reload threads in to cached list.
> */
>  };
> 
>  bool ka_is_enabled(void);
> +uint32_t get_ka_interval(void);
> +void ka_reload_datapath_threads_begin(void);
> +void ka_reload_datapath_threads_end(void);
> +void ka_register_thread(pid_t);
> +void ka_unregister_thread(pid_t);
> +void ka_free_cached_threads(void);
> +void ka_cache_registered_threads(void);
> +void ka_mark_pmd_thread_alive(int);
> +void ka_mark_pmd_thread_sleep(int);
>  void ka_init(const struct smap *);
>  void ka_destroy(void);
> 
> diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c
> index f8bc06d..ae8e450 100644
> --- a/lib/ovs-thread.c
> +++ b/lib/ovs-thread.c
> @@ -597,6 +597,12 @@ thread_is_pmd(void)
>      return !strncmp(name, "pmd", 3);
>  }
> 
> +void
> +ovsthread_set_tid(pid_t *tid)
> +{
> +    *tid = ovs_get_tid();
> +}
> +
>  

>  /* ovsthread_key. */
> 
> diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h
> index 55e51a4..cfb4b04 100644
> --- a/lib/ovs-thread.h
> +++ b/lib/ovs-thread.h
> @@ -524,5 +524,6 @@ bool may_fork(void);
> 
>  int count_cpu_cores(void);
>  bool thread_is_pmd(void);
> +void ovsthread_set_tid(pid_t *);
> 
>  #endif /* ovs-thread.h */
> diff --git a/lib/util.c b/lib/util.c
> index 2965656..927929b 100644
> --- a/lib/util.c
> +++ b/lib/util.c
> @@ -26,6 +26,12 @@
>  #include <stdlib.h>
>  #include <string.h>
>  #include <sys/stat.h>
> +#ifdef __linux__
> +#include <sys/syscall.h>
> +#endif
> +#ifdef __FreeBSD__
> +#include <sys/thr.h>
> +#endif
>  #include <unistd.h>
>  #include "bitmap.h"
>  #include "byte-order.h"
> @@ -575,6 +581,22 @@ get_page_size(void)
>      return cached;
>  }
> 
> +/* Returns the tid of the calling thread if supported, -EINVAL
> otherwise. */
> +pid_t
> +ovs_get_tid(void)
> +{
> +#ifdef __linux__
> +    return syscall(SYS_gettid);
> +#elif defined(__FreeBSD__) || defined(__NetBSD__)
> +    long tid;
> +    thr_self(&tid);
> +    return (pid_t)tid;
> +#endif
> +
> +    VLOG_ERR("ovs_get_tid(): unsupported.");
> +    return -EINVAL;
> +}
> +
>  /* Returns the time at which the system booted, as the number of
> milliseconds
>   * since the epoch, or 0 if the time of boot cannot be determined. */
>  long long int
> diff --git a/lib/util.h b/lib/util.h
> index b01f421..259346d 100644
> --- a/lib/util.h
> +++ b/lib/util.h
> @@ -156,6 +156,7 @@ void free_cacheline(void *);
> 
>  void ovs_strlcpy(char *dst, const char *src, size_t size);
>  void ovs_strzcpy(char *dst, const char *src, size_t size);
> +pid_t ovs_get_tid(void);
> 
>  /* The C standards say that neither the 'dst' nor 'src' argument to
>   * memcpy() may be null, even if 'n' is zero.  This wrapper tolerates
> --
> 2.4.11
> 
> _______________________________________________
> dev mailing list
> d...@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to