LGTM, Tested-by: Antonio Fischetti <antonio.fische...@intel.com> Acked-by: Antonio Fischetti <antonio.fische...@intel.com>
> -----Original Message----- > From: ovs-dev-boun...@openvswitch.org [mailto:ovs-dev- > boun...@openvswitch.org] On Behalf Of Bhanuprakash Bodireddy > Sent: Friday, December 8, 2017 12:04 PM > To: d...@openvswitch.org > Subject: [ovs-dev] [PATCH v6 2/8] dpif-netdev: Register packet > processing cores to KA framework. > > This commit registers the packet processing PMD threads to keepalive > framework. Only PMDs that have rxqs mapped will be registered and > actively monitored by KA framework. > > This commit spawns a keepalive thread that will dispatch heartbeats to > PMD threads. The pmd threads respond to heartbeats by marking themselves > alive. As long as PMD responds to heartbeats it is considered 'healthy'. > > Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodire...@intel.com> > --- > lib/dpif-netdev.c | 79 ++++++++++++++++++++++ > lib/keepalive.c | 194 > ++++++++++++++++++++++++++++++++++++++++++++++++++++-- > lib/keepalive.h | 20 ++++++ > lib/ovs-thread.c | 6 ++ > lib/ovs-thread.h | 1 + > lib/util.c | 22 +++++++ > lib/util.h | 1 + > 7 files changed, 318 insertions(+), 5 deletions(-) > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > index 07f6113..c978a76 100644 > --- a/lib/dpif-netdev.c > +++ b/lib/dpif-netdev.c > @@ -49,6 +49,7 @@ > #include "flow.h" > #include "hmapx.h" > #include "id-pool.h" > +#include "keepalive.h" > #include "latch.h" > #include "netdev.h" > #include "netdev-vport.h" > @@ -592,6 +593,7 @@ struct dp_netdev_pmd_thread { > atomic_bool reload; /* Do we need to reload ports? */ > pthread_t thread; > unsigned core_id; /* CPU core id of this pmd thread. > */ > + pid_t tid; /* PMD thread tid. */ > int numa_id; /* numa node id of this pmd thread. > */ > bool isolated; > > @@ -1018,6 +1020,72 @@ sorted_poll_thread_list(struct dp_netdev *dp, > *n = k; > } > > +static void * > +ovs_keepalive(void *f_ OVS_UNUSED) > +{ > + pthread_detach(pthread_self()); > + > + for (;;) { > + uint64_t interval; > + > + interval = get_ka_interval(); > + xnanosleep(interval * 1000 * 1000); > + } > + > + return NULL; > +} > + > +/* Kickstart 'ovs_keepalive' thread. */ > +static void > +ka_thread_start(struct dp_netdev *dp) > +{ > + static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER; > + > + if (ovsthread_once_start(&once)) { > + ovs_thread_create("ovs_keepalive", ovs_keepalive, dp); > + > + ovsthread_once_done(&once); > + } > +} > + > +/* Register the datapath threads. This gets invoked on every datapath > + * reconfiguration. The pmd thread[s] having rxq[s] mapped will be > + * registered to KA framework. > + */ > +static void > +ka_register_datapath_threads(struct dp_netdev *dp) > +{ > + if (!ka_is_enabled()) { > + return; > + } > + > + ka_thread_start(dp); > + > + ka_reload_datapath_threads_begin(); > + > + struct dp_netdev_pmd_thread *pmd; > + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { > + /* Register only PMD threads. */ > + if (pmd->core_id != NON_PMD_CORE_ID) { > + /* Skip PMD thread with no rxqs mapping. */ > + if (OVS_UNLIKELY(!hmap_count(&pmd->poll_list))) { > + /* Rxq mapping changes due to datapath reconfiguration. > + * If no rxqs mapped to PMD now due to reconfiguration, > + * unregister the pmd thread. */ > + ka_unregister_thread(pmd->tid); > + continue; > + } > + > + ka_register_thread(pmd->tid); > + VLOG_INFO("Registered PMD thread [%d] on Core[%d] to KA > framework", > + pmd->tid, pmd->core_id); > + } > + } > + ka_cache_registered_threads(); > + > + ka_reload_datapath_threads_end(); > +} > + > static void > dpif_netdev_pmd_rebalance(struct unixctl_conn *conn, int argc, > const char *argv[], void *aux OVS_UNUSED) > @@ -3819,6 +3887,9 @@ reconfigure_datapath(struct dp_netdev *dp) > > /* Reload affected pmd threads. */ > reload_affected_pmds(dp); > + > + /* Register datapath threads to KA monitoring. */ > + ka_register_datapath_threads(dp); > } > > /* Returns true if one of the netdevs in 'dp' requires a > reconfiguration */ > @@ -4023,6 +4094,8 @@ pmd_thread_main(void *f_) > > /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */ > ovsthread_setspecific(pmd->dp->per_pmd_key, pmd); > + /* Stores tid in to 'pmd->tid'. */ > + ovsthread_set_tid(&pmd->tid); > ovs_numa_thread_setaffinity_core(pmd->core_id); > dpdk_set_lcore_id(pmd->core_id); > poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list); > @@ -4056,6 +4129,9 @@ reload: > : > PMD_CYCLES_IDLE); > } > > + /* Mark PMD thread alive. */ > + ka_mark_pmd_thread_alive(pmd->tid); > + > if (lc++ > 1024) { > bool reload; > > @@ -4089,6 +4165,9 @@ reload: > } > > emc_cache_uninit(&pmd->flow_cache); > + > + ka_unregister_thread(pmd->tid); > + > free(poll_list); > pmd_free_cached_ports(pmd); > return NULL; > diff --git a/lib/keepalive.c b/lib/keepalive.c > index ca8dccb..b04877f 100644 > --- a/lib/keepalive.c > +++ b/lib/keepalive.c > @@ -19,6 +19,7 @@ > #include "keepalive.h" > #include "lib/vswitch-idl.h" > #include "openvswitch/vlog.h" > +#include "process.h" > #include "seq.h" > #include "timeval.h" > > @@ -28,11 +29,19 @@ static bool keepalive_enable = false; /* > Keepalive disabled by default. */ > static uint32_t keepalive_timer_interval; /* keepalive timer interval. > */ > static struct keepalive_info ka_info; > > -/* Returns true if keepalive is enabled, false otherwise. */ > -bool > -ka_is_enabled(void) > +/* Returns true if state update is allowed, false otherwise. */ > +static bool > +ka_can_update_state(void) > { > - return keepalive_enable; > + bool reload_inprogress; > + bool ka_enable; > + > + atomic_read_relaxed(&ka_info.reload_threads, &reload_inprogress); > + ka_enable = ka_is_enabled(); > + > + /* Return true if KA is enabled and 'cached_process_list' map > reload > + * is completed. */ > + return ka_enable && !reload_inprogress; > } > > /* Finds the thread by 'tid' in 'process_list' map and update > @@ -49,7 +58,7 @@ ka_set_thread_state_ts(pid_t tid, enum keepalive_state > state, > ovs_mutex_lock(&ka_info.proclist_mutex); > HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0), > &ka_info.process_list) { > - if (pinfo->tid == tid) { > + if (OVS_LIKELY(pinfo->tid == tid)) { > pinfo->state = state; > pinfo->last_seen_time = last_alive; > } > @@ -104,6 +113,177 @@ ka_register_relay_cb(ka_relay_cb cb, void *aux) > ka_info.relay_cb_data = aux; > } > > +/* Returns true if keepalive is enabled, false otherwise. */ > +bool > +ka_is_enabled(void) > +{ > + return keepalive_enable; > +} > + > +/* Return the Keepalive timer interval. */ > +uint32_t > +get_ka_interval(void) > +{ > + return keepalive_timer_interval; > +} > + > +/* 'cached_process_list' map reload in progress. > + * > + * Should be called before the 'ka_info.cached_process_list' > + * is populated from 'ka_info.process_list'. This way the pmd > + * doesn't heartbeat while the reload is in progress. */ > +void > +ka_reload_datapath_threads_begin(void) > +{ > + atomic_store_relaxed(&ka_info.reload_threads, true); > +} > + > +/* 'cached_process_list' map reload finished. > + * > + * Should be called after the 'ka_info.cached_process_list' > + * is populated from 'ka_info.process_list'. This way the pmd > + * can restart heartbeat when the reload is finished. */ > +void > +ka_reload_datapath_threads_end(void) > +{ > + atomic_store_relaxed(&ka_info.reload_threads, false); > +} > + > +/* Register thread to KA framework. */ > +void > +ka_register_thread(pid_t tid) > +{ > + if (ka_is_enabled()) { > + struct ka_process_info *ka_pinfo; > + int core_id = -1; > + char proc_name[18] = "UNDEFINED"; > + > + struct process_info pinfo; > + int success = get_process_info(tid, &pinfo); > + if (success) { > + core_id = pinfo.core_id; > + ovs_strlcpy(proc_name, pinfo.name, sizeof proc_name); > + } > + > + uint32_t hash = hash_int(tid, 0); > + ovs_mutex_lock(&ka_info.proclist_mutex); > + HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node, > + hash, &ka_info.process_list) { > + /* Thread is already registered. */ > + if (ka_pinfo->tid == tid) { > + goto out; > + } > + } > + > + ka_pinfo = xmalloc(sizeof *ka_pinfo); > + ka_pinfo->tid = tid; > + ka_pinfo->core_id = core_id; > + ovs_strlcpy(ka_pinfo->name, proc_name, sizeof ka_pinfo->name); > + > + hmap_insert(&ka_info.process_list, &ka_pinfo->node, hash); > + > + ka_pinfo->state = KA_STATE_ALIVE; > + ka_pinfo->last_seen_time = time_wall_msec(); > + ka_info.thread_cnt++; /* Increment count of registered > threads. */ > +out: > + ovs_mutex_unlock(&ka_info.proclist_mutex); > + } > +} > + > +/* Unregister thread from KA framework. */ > +void > +ka_unregister_thread(pid_t tid) > +{ > + if (ka_is_enabled()) { > + struct ka_process_info *ka_pinfo; > + > + ovs_mutex_lock(&ka_info.proclist_mutex); > + HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node, hash_int(tid, 0), > + &ka_info.process_list) { > + /* If thread is registered, remove it from the list */ > + if (ka_pinfo->tid == tid) { > + hmap_remove(&ka_info.process_list, &ka_pinfo->node); > + free(ka_pinfo); > + > + ka_pinfo->state = KA_STATE_UNUSED; > + ka_info.thread_cnt--; /* Decrement thread count. */ > + break; > + } > + } > + ovs_mutex_unlock(&ka_info.proclist_mutex); > + } > +} > + > +/* Free the 'ka_info.cached_process_list' list. */ > +void > +ka_free_cached_threads(void) > +{ > + struct ka_process_info *pinfo_cached; > + /* Free threads in the cached list. */ > + HMAP_FOR_EACH_POP (pinfo_cached, node, > &ka_info.cached_process_list) { > + free(pinfo_cached); > + } > + hmap_shrink(&ka_info.cached_process_list); > +} > + > +/* Cache the list of registered threads from 'ka_info.process_list' > + * map into 'ka_info.cached_process_list. > + * > + * 'cached_process_list' map is an exact copy of 'process_list' that > will > + * be updated by 'pmd' and 'ovs_keepalive' threads as part of heartbeat > + * mechanism. This cached copy is created so that the heartbeats can > be > + * performed with out acquiring locks. > + * > + * On datapath reconfiguration, both the 'process_list' and the cached > copy > + * 'cached_process_list' is updated after setting 'reload_threads' to > 'true' > + * so that pmd doesn't heartbeat while the maps are updated. > + * > + */ > +void > +ka_cache_registered_threads(void) > +{ > + struct ka_process_info *pinfo, *next, *pinfo_cached; > + > + ka_free_cached_threads(); > + > + HMAP_FOR_EACH_SAFE (pinfo, next, node, &ka_info.process_list) { > + pinfo_cached = xmemdup(pinfo, sizeof *pinfo_cached); > + hmap_insert(&ka_info.cached_process_list, &pinfo_cached->node, > + hash_int(pinfo->tid,0)); > + } > +} > + > +/* Mark packet processing thread alive. */ > +void > +ka_mark_pmd_thread_alive(int tid) > +{ > + if (ka_can_update_state()) { > + struct ka_process_info *pinfo; > + HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0), > + &ka_info.cached_process_list) { > + if (OVS_LIKELY(pinfo->tid == tid)) { > + pinfo->state = KA_STATE_ALIVE; > + } > + } > + } > +} > + > +/* Mark packet processing thread as sleeping. */ > +void > +ka_mark_pmd_thread_sleep(int tid) > +{ > + if (ka_can_update_state()) { > + struct ka_process_info *pinfo; > + > + HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0), > + &ka_info.cached_process_list) { > + if (pinfo->tid == tid) { > + pinfo->state = KA_STATE_SLEEP; > + } > + } > + } > +} > + > void > ka_init(const struct smap *ovs_other_config) > { > @@ -120,6 +300,7 @@ ka_init(const struct smap *ovs_other_config) > ka_register_relay_cb(ka_update_thread_state, NULL); > ovs_mutex_init(&ka_info.proclist_mutex); > hmap_init(&ka_info.process_list); > + hmap_init(&ka_info.cached_process_list); > > ka_info.init_time = time_wall_msec(); > > @@ -143,5 +324,8 @@ ka_destroy(void) > ovs_mutex_unlock(&ka_info.proclist_mutex); > > hmap_destroy(&ka_info.process_list); > + > + ka_free_cached_threads(); > + hmap_destroy(&ka_info.cached_process_list); > ovs_mutex_destroy(&ka_info.proclist_mutex); > } > diff --git a/lib/keepalive.h b/lib/keepalive.h > index a738daa..7674ea3 100644 > --- a/lib/keepalive.h > +++ b/lib/keepalive.h > @@ -48,6 +48,9 @@ enum keepalive_state { > }; > > struct ka_process_info { > + /* Process name. */ > + char name[16]; > + > /* Thread id of the process, retrieved using ovs_gettid(). */ > pid_t tid; > > @@ -71,15 +74,32 @@ struct keepalive_info { > /* List of process/threads monitored by KA framework. */ > struct hmap process_list OVS_GUARDED; > > + /* cached copy of 'process_list' list. */ > + struct hmap cached_process_list; > + > + /* count of threads registered to KA framework. */ > + uint32_t thread_cnt; > + > /* Keepalive initialization time. */ > uint64_t init_time; > > /* keepalive relay handler. */ > ka_relay_cb relay_cb; > void *relay_cb_data; > + > + atomic_bool reload_threads; /* Reload threads in to cached list. > */ > }; > > bool ka_is_enabled(void); > +uint32_t get_ka_interval(void); > +void ka_reload_datapath_threads_begin(void); > +void ka_reload_datapath_threads_end(void); > +void ka_register_thread(pid_t); > +void ka_unregister_thread(pid_t); > +void ka_free_cached_threads(void); > +void ka_cache_registered_threads(void); > +void ka_mark_pmd_thread_alive(int); > +void ka_mark_pmd_thread_sleep(int); > void ka_init(const struct smap *); > void ka_destroy(void); > > diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c > index f8bc06d..ae8e450 100644 > --- a/lib/ovs-thread.c > +++ b/lib/ovs-thread.c > @@ -597,6 +597,12 @@ thread_is_pmd(void) > return !strncmp(name, "pmd", 3); > } > > +void > +ovsthread_set_tid(pid_t *tid) > +{ > + *tid = ovs_get_tid(); > +} > + > > /* ovsthread_key. */ > > diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h > index 55e51a4..cfb4b04 100644 > --- a/lib/ovs-thread.h > +++ b/lib/ovs-thread.h > @@ -524,5 +524,6 @@ bool may_fork(void); > > int count_cpu_cores(void); > bool thread_is_pmd(void); > +void ovsthread_set_tid(pid_t *); > > #endif /* ovs-thread.h */ > diff --git a/lib/util.c b/lib/util.c > index 2965656..927929b 100644 > --- a/lib/util.c > +++ b/lib/util.c > @@ -26,6 +26,12 @@ > #include <stdlib.h> > #include <string.h> > #include <sys/stat.h> > +#ifdef __linux__ > +#include <sys/syscall.h> > +#endif > +#ifdef __FreeBSD__ > +#include <sys/thr.h> > +#endif > #include <unistd.h> > #include "bitmap.h" > #include "byte-order.h" > @@ -575,6 +581,22 @@ get_page_size(void) > return cached; > } > > +/* Returns the tid of the calling thread if supported, -EINVAL > otherwise. */ > +pid_t > +ovs_get_tid(void) > +{ > +#ifdef __linux__ > + return syscall(SYS_gettid); > +#elif defined(__FreeBSD__) || defined(__NetBSD__) > + long tid; > + thr_self(&tid); > + return (pid_t)tid; > +#endif > + > + VLOG_ERR("ovs_get_tid(): unsupported."); > + return -EINVAL; > +} > + > /* Returns the time at which the system booted, as the number of > milliseconds > * since the epoch, or 0 if the time of boot cannot be determined. */ > long long int > diff --git a/lib/util.h b/lib/util.h > index b01f421..259346d 100644 > --- a/lib/util.h > +++ b/lib/util.h > @@ -156,6 +156,7 @@ void free_cacheline(void *); > > void ovs_strlcpy(char *dst, const char *src, size_t size); > void ovs_strzcpy(char *dst, const char *src, size_t size); > +pid_t ovs_get_tid(void); > > /* The C standards say that neither the 'dst' nor 'src' argument to > * memcpy() may be null, even if 'n' is zero. This wrapper tolerates > -- > 2.4.11 > > _______________________________________________ > dev mailing list > d...@openvswitch.org > https://mail.openvswitch.org/mailman/listinfo/ovs-dev _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev