On Thu, Jul 27, 2023 at 9:17 PM Mark Michelson <mmich...@redhat.com> wrote:

> Hi Ales, I had a look through this patch finally :)
>
>  From a high-level, I appreciate the design. Periodically requesting
> flow stats in a background thread makes good sense.
>
> I have a couple of suggestions to make to the design.
>

Hi Mark,

thank you for the review.


>
> On 7/10/23 07:05, Ales Musil wrote:
> > To achieve that add thread that will handle
> > statistics requests and delegate the processing
> > to defined functions. This allows the thread to
> > be flexible enough, so it could be extended in future
> > if needed.
> >
> > At the same time connected the thread with the MAC
> > cache I-P node to have the timestamp updates. The
> > updates should happen once per dump_period
> > (3/4 of the aging threshold) per chassis only if
> > the MAC binding is actively used.
> >
> > Signed-off-by: Ales Musil <amu...@redhat.com>
> > ---
> >   controller/automake.mk      |   4 +-
> >   controller/mac_cache.c      |  84 +++++++
> >   controller/mac_cache.h      |   7 +
> >   controller/ovn-controller.c |  11 +
> >   controller/statctrl.c       | 434 ++++++++++++++++++++++++++++++++++++
> >   controller/statctrl.h       |  28 +++
> >   tests/ovn.at                |  23 +-
> >   7 files changed, 585 insertions(+), 6 deletions(-)
> >   create mode 100644 controller/statctrl.c
> >   create mode 100644 controller/statctrl.h
> >
> > diff --git a/controller/automake.mk b/controller/automake.mk
> > index 562290359..0dbbd5d26 100644
> > --- a/controller/automake.mk
> > +++ b/controller/automake.mk
> > @@ -45,7 +45,9 @@ controller_ovn_controller_SOURCES = \
> >       controller/mirror.h \
> >       controller/mirror.c \
> >       controller/mac_cache.h \
> > -     controller/mac_cache.c
> > +     controller/mac_cache.c \
> > +     controller/statctrl.h \
> > +     controller/statctrl.c
> >
> >   controller_ovn_controller_LDADD = lib/libovn.la $(OVS_LIBDIR)/
> libopenvswitch.la
> >   man_MANS += controller/ovn-controller.8
> > diff --git a/controller/mac_cache.c b/controller/mac_cache.c
> > index 4663499a1..6f1d661d4 100644
> > --- a/controller/mac_cache.c
> > +++ b/controller/mac_cache.c
> > @@ -252,3 +252,87 @@ mac_cache_threshold_remove(struct hmap *thresholds,
> >       hmap_remove(thresholds, &threshold->hmap_node);
> >       free(threshold);
> >   }
> > +
> > +struct mac_cache_mb_stats {
> > +    struct ovs_list list_node;
> > +
> > +    int64_t idle_age_ms;
> > +    uint32_t cookie;
> > +    /* Common data to identify MAC binding. */
> > +    struct mac_cache_mb_data data;
> > +};
> > +
> > +void
> > +mac_cache_mb_stats_process_flow_stats(struct ovs_list *stats_list,
> > +                                      struct ofputil_flow_stats
> *ofp_stats)
> > +{
> > +    struct mac_cache_mb_stats *stats = xmalloc(sizeof *stats);
> > +
> > +    stats->idle_age_ms = ofp_stats->idle_age * 1000;
> > +    stats->cookie = ntohll(ofp_stats->cookie);
> > +    stats->data.port_key =
> > +            ofp_stats->match.flow.regs[MFF_LOG_INPORT - MFF_REG0];
> > +    stats->data.dp_key = ntohll(ofp_stats->match.flow.metadata);
> > +
> > +    if (ofp_stats->match.flow.dl_type == htons(ETH_TYPE_IP)) {
> > +        stats->data.ip =
> in6_addr_mapped_ipv4(ofp_stats->match.flow.nw_src);
> > +    } else {
> > +        stats->data.ip = ofp_stats->match.flow.ipv6_src;
> > +    }
> > +
> > +    stats->data.mac = ofp_stats->match.flow.dl_src;
> > +
> > +    ovs_list_push_back(stats_list, &stats->list_node);
> > +}
> > +
> > +void
> > +mac_cache_mb_stats_destroy(struct ovs_list *stats_list)
> > +{
> > +    struct mac_cache_mb_stats *stats;
> > +    LIST_FOR_EACH_POP (stats, list_node, stats_list) {
> > +        free(stats);
> > +    }
> > +}
> > +
> > +void
> > +mac_cache_mb_stats_run(struct ovs_list *stats_list, uint64_t *req_delay,
> > +                       void *data)
> > +{
> > +    struct mac_cache_data *cache_data = data;
> > +    long long timewall_now = time_wall_msec();
> > +
> > +    struct mac_cache_threshold *threshold;
> > +    struct mac_cache_mb_stats *stats;
> > +    struct mac_cache_mac_binding *mc_mb;
> > +    LIST_FOR_EACH_POP (stats, list_node, stats_list) {
> > +        mc_mb = mac_cache_mac_binding_find_by_mb_data(cache_data,
> > +                                                      &stats->data);
> > +
> > +        if (!mc_mb) {
> > +            free(stats);
> > +            continue;
> > +        }
> > +
> > +        struct uuid *dp_uuid = &mc_mb->sbrec_mb->datapath->header_.uuid;
> > +        threshold = mac_cache_threshold_find(&cache_data->mb_thresholds,
> > +                                             dp_uuid);
> > +
> > +        uint64_t dump_period = (3 * threshold->value) / 4;
>
> The dump period correlates directly with the configured threshold.
> Perhaps the dump period could be stored on the mac_cache_threshold and
> updated whenever the threshold->value is changed. This way, you would
> not have to calculate it twice every time this function is run.
>
> I also suggest offloading the dump_period calculation to a function so
> that it is easy to change if desired.
>

Yeah that makes sense. This way the calculation will in single place and we
don't have
to probably do a separate function for that.


>
> > +        /* If "idle_age" is under threshold it means that the mac
> binding is
> > +         * used on this chassis. Also make sure that we don't update the
> > +         * timestamp more than once during the dump period. */
> > +        if (stats->idle_age_ms < threshold->value &&
> > +            (timewall_now - mc_mb->sbrec_mb->timestamp) >= dump_period)
> {
> > +            sbrec_mac_binding_set_timestamp(mc_mb->sbrec_mb,
> timewall_now);
> > +        }
> > +
> > +        free(stats);
> > +    }
> > +
> > +    uint64_t dump_period = UINT64_MAX;
> > +    HMAP_FOR_EACH (threshold, hmap_node, &cache_data->mb_thresholds) {
> > +        dump_period = MIN(dump_period, (3 * threshold->value) / 4);
> > +    }
> > +
> > +    *req_delay = dump_period < UINT64_MAX ? dump_period : 0;
> > +}
> > diff --git a/controller/mac_cache.h b/controller/mac_cache.h
> > index f1f1772c8..a29713908 100644
> > --- a/controller/mac_cache.h
> > +++ b/controller/mac_cache.h
> > @@ -71,4 +71,11 @@ void mac_cache_mac_binding_remove(struct
> mac_cache_data *data,
> >   void mac_cache_mac_bindings_destroy(struct mac_cache_data *data);
> >   bool mac_cache_sb_mac_binding_updated(const struct sbrec_mac_binding
> *mb);
> >
> > +void
> > +mac_cache_mb_stats_process_flow_stats(struct ovs_list *stats_list,
> > +                                      struct ofputil_flow_stats
> *ofp_stats);
> > +void mac_cache_mb_stats_destroy(struct ovs_list *stats_list);
> > +void mac_cache_mb_stats_run(struct ovs_list *stats_list, uint64_t
> *req_delay,
> > +                            void *data);
> > +
> >   #endif /* controller/mac_cache.h */
> > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> > index abb18647a..bdf7368b8 100644
> > --- a/controller/ovn-controller.c
> > +++ b/controller/ovn-controller.c
> > @@ -84,6 +84,7 @@
> >   #include "hmapx.h"
> >   #include "mirror.h"
> >   #include "mac_cache.h"
> > +#include "statctrl.h"
> >
> >   VLOG_DEFINE_THIS_MODULE(main);
> >
> > @@ -4804,6 +4805,7 @@ main(int argc, char *argv[])
> >       lflow_init();
> >       mirror_init();
> >       vif_plug_provider_initialize();
> > +    statctrl_init();
> >
> >       /* Connect to OVS OVSDB instance. */
> >       struct ovsdb_idl_loop ovs_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
> > @@ -5205,6 +5207,8 @@ main(int argc, char *argv[])
> >           engine_get_internal_data(&en_template_vars);
> >       struct ed_type_lb_data *lb_data =
> >           engine_get_internal_data(&en_lb_data);
> > +    struct mac_cache_data *mac_cache_data =
> > +            engine_get_internal_data(&en_mac_cache);
> >
> >       ofctrl_init(&lflow_output_data->group_table,
> >                   &lflow_output_data->meter_table,
> > @@ -5593,6 +5597,11 @@ main(int argc, char *argv[])
> >                           }
> >                       }
> >
> > +                    if (mac_cache_data) {
> > +                        statctrl_update(br_int->name);
> > +                        statctrl_run(ovnsb_idl_txn, mac_cache_data);
> > +                    }
> > +
> >                       ofctrl_seqno_update_create(
> >                           ofctrl_seq_type_nb_cfg,
> >                           get_nb_cfg(sbrec_sb_global_table_get(
> > @@ -5702,6 +5711,7 @@ main(int argc, char *argv[])
> >               if (br_int) {
> >                   ofctrl_wait();
> >                   pinctrl_wait(ovnsb_idl_txn);
> > +                statctrl_wait(ovnsb_idl_txn);
> >               }
> >
> >               binding_wait();
> > @@ -5836,6 +5846,7 @@ loop_done:
> >       patch_destroy();
> >       mirror_destroy();
> >       encaps_destroy();
> > +    statctrl_destroy();
> >       if_status_mgr_destroy(if_mgr);
> >       shash_destroy(&vif_plug_deleted_iface_ids);
> >       shash_destroy(&vif_plug_changed_iface_ids);
> > diff --git a/controller/statctrl.c b/controller/statctrl.c
> > new file mode 100644
> > index 000000000..9bef827fc
> > --- /dev/null
> > +++ b/controller/statctrl.c
> > @@ -0,0 +1,434 @@
> > +/* Copyright (c) 2023, Red Hat, Inc.
> > + *
> > + * Licensed under the Apache License, Version 2.0 (the "License");
> > + * you may not use this file except in compliance with the License.
> > + * You may obtain a copy of the License at:
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +#include <config.h>
> > +
> > +#include "byte-order.h"
> > +#include "dirs.h"
> > +#include "latch.h"
> > +#include "lflow.h"
> > +#include "mac_cache.h"
> > +#include "openvswitch/ofp-errors.h"
> > +#include "openvswitch/ofp-flow.h"
> > +#include "openvswitch/ofp-msgs.h"
> > +#include "openvswitch/ofp-print.h"
> > +#include "openvswitch/ofp-util.h"
> > +#include "openvswitch/poll-loop.h"
> > +#include "openvswitch/rconn.h"
> > +#include "openvswitch/vlog.h"
> > +#include "ovn/logical-fields.h"
> > +#include "ovs-thread.h"
> > +#include "seq.h"
> > +#include "socket-util.h"
> > +#include "statctrl.h"
> > +
> > +VLOG_DEFINE_THIS_MODULE(statctrl);
> > +
> > +enum stat_type {
> > +    STATS_MAC_BINDING = 0,
> > +    STATS_MAX,
> > +};
> > +
> > +struct stats_node {
> > +    /* Table ID for the statistics request. */
> > +    uint8_t table_id;
>
> Instead of using a table_id here, would it make sense to store a struct
> ofputil_flow_stats_request instead? This way, if stats nodes care about
> group IDs or cookies, they can store them here.
>

That is a good idea, thanks.


>
> > +    /* xid of the last statistics request. */
> > +    ovs_be32 xid;
> > +    /* Timestamp when the next request should happen. */
> > +    int64_t next_request_timestamp;
> > +    /* Request delay in ms. */
> > +    uint64_t request_delay;
> > +    /* List of processed statistics. */
> > +    struct ovs_list stats_list;
> > +    /* Function to clean up the node.
> > +     * This function runs in main thread. */
> > +    void (*destroy)(struct ovs_list *stats_list);
> > +    /* Function to process the response and store it in the list.
> > +     * This function runs in statctrl thread locked behind mutex. */
> > +    void (*process_flow_stats)(struct ovs_list *stats_list,
> > +                               struct ofputil_flow_stats *ofp_stats);
> > +    /* Function to process the parsed stats.
> > +     * This function runs in main thread locked behind mutex. */
> > +    void (*run)(struct ovs_list *stats_list, uint64_t *req_delay, void
> *data);
> > +};
> > +
> > +#define STATS_NODE(NAME, TABLE_ID, DESTROY, PROCESS, RUN)
>     \
> > +    statctrl_ctx.nodes[STATS_##NAME] = (struct stats_node) {
>    \
> > +        .table_id = TABLE_ID,
>     \
> > +        .xid = 0,
>     \
> > +        .next_request_timestamp = INT64_MAX,
>    \
> > +        .request_delay = 0,
>     \
> > +        .stats_list =
>     \
> > +            OVS_LIST_INITIALIZER(
>     \
> > +                &statctrl_ctx.nodes[STATS_##NAME].stats_list),
>    \
> > +        .destroy = DESTROY,
>     \
> > +        .process_flow_stats = PROCESS,
>    \
> > +        .run = RUN
>    \
> > +    };
> > +
> > +struct statctrl_ctx {
> > +    char *br_int;
> > +
> > +    pthread_t thread;
> > +    struct latch exit_latch;
> > +
> > +    struct seq *thread_seq;
> > +    struct seq *main_seq;
> > +
> > +    struct stats_node nodes[STATS_MAX];
> > +};
> > +
> > +static struct statctrl_ctx statctrl_ctx;
> > +static struct ovs_mutex mutex;
> > +
> > +static void *statctrl_thread_handler(void *arg);
> > +static void statctrl_rconn_setup(struct rconn *swconn, char
> *conn_target)
> > +    OVS_REQUIRES(mutex);
> > +static void statctrl_handle_rconn_msg(struct rconn *swconn,
> > +                                      struct statctrl_ctx *ctx,
> > +                                      struct ofpbuf *msg);
> > +static enum stat_type statctrl_get_stat_type(struct statctrl_ctx *ctx,
> > +                                             const struct ofp_header
> *oh);
> > +static void statctrl_decode_statistics_reply(struct stats_node *node,
> > +                                             struct ofpbuf *msg)
> > +    OVS_REQUIRES(mutex);
> > +static void statctrl_send_request(struct rconn *swconn,
> > +                                  struct statctrl_ctx *ctx)
> > +    OVS_REQUIRES(mutex);
> > +static void statctrl_notify_main_thread(struct statctrl_ctx *ctx);
> > +static void statctrl_set_conn_target(const char *br_int_name)
> > +    OVS_REQUIRES(mutex);
> > +static void statctrl_wait_next_request(struct statctrl_ctx *ctx)
> > +    OVS_REQUIRES(mutex);
> > +static bool statctrl_update_next_request_timestamp(struct stats_node
> *node,
> > +                                                   long long now,
> > +                                                   uint64_t prev_delay)
> > +    OVS_REQUIRES(mutex);
> > +
> > +void
> > +statctrl_init(void)
> > +{
> > +    statctrl_ctx.br_int = NULL;
> > +    latch_init(&statctrl_ctx.exit_latch);
> > +    ovs_mutex_init(&mutex);
> > +    statctrl_ctx.thread_seq = seq_create();
> > +    statctrl_ctx.main_seq = seq_create();
> > +
> > +    /* Definition of all stat nodes. */
> > +    STATS_NODE(MAC_BINDING, OFTABLE_MAC_CACHE_USE,
> mac_cache_mb_stats_destroy,
> > +               mac_cache_mb_stats_process_flow_stats,
> mac_cache_mb_stats_run);
> > +
> > +
> > +    statctrl_ctx.thread = ovs_thread_create("ovn_statctrl",
> > +                                            statctrl_thread_handler,
> > +                                            &statctrl_ctx);
> > +}
> > +
> > +void
> > +statctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
> > +             struct mac_cache_data *mac_cache_data)
> > +{
> > +    if (!ovnsb_idl_txn) {
> > +        return;
> > +    }
> > +
> > +    void *node_data[STATS_MAX] = {mac_cache_data};
>
> I was thinking about what happens when more than just the MAC cache
> needs to collect flow statistics. Based on this initial design, my
> assumption is that the function signature for statctrl_run() would grow
> as each new stats node type is defined:
>
> void
> statctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
>               struct mac_cache_data *mac_cache_data,
>               struct foo_data *foo_data,
>               struct bar_data *bar_data)
>
> Is that the intention? I suppose this will be OK as long as the number
> of stats nodes stays low. But if we suspect that we will end up with a
> lot more, then we may want to switch to passing in, say, a shash of data
> instead of each individual stats node data type. My biggest concern here
> is merge conflicts when trying to backport patches.
>
> Or did you have a different idea in mind for how new stats data types
> would be handled?
>

Yeah that is the original intention as I don't anticipate that it will grow
too much.
So for example initially there will be only one argument as the
mac_cache_data
is planned to be used for both MAC binding and FDB.


>
> > +
> > +    bool schedule_updated = false;
> > +    long long now = time_msec();
> > +
> > +    ovs_mutex_lock(&mutex);
> > +    for (size_t i = 0; i < STATS_MAX; i++) {
> > +        struct stats_node *node = &statctrl_ctx.nodes[i];
> > +        uint64_t prev_delay = node->request_delay;
> > +
> > +        node->run(&node->stats_list, &node->request_delay,
> node_data[i]);
> > +
> > +        schedule_updated |=
> > +                statctrl_update_next_request_timestamp(node, now,
> prev_delay);
> > +    }
> > +    ovs_mutex_unlock(&mutex);
> > +
> > +    if (schedule_updated) {
> > +        seq_change(statctrl_ctx.thread_seq);
> > +    }
> > +}
> > +
> > +void
> > +statctrl_update(const char *br_int_name)
> > +{
> > +    ovs_mutex_lock(&mutex);
> > +    statctrl_set_conn_target(br_int_name);
> > +    ovs_mutex_unlock(&mutex);
> > +}
> > +
> > +void
> > +statctrl_wait(struct ovsdb_idl_txn *ovnsb_idl_txn)
> > +{
> > +    if (!ovnsb_idl_txn) {
> > +        return;
> > +    }
> > +
> > +    ovs_mutex_lock(&mutex);
> > +    for (size_t i = 0; i < STATS_MAX; i++) {
> > +        struct stats_node *node = &statctrl_ctx.nodes[i];
> > +        if (!ovs_list_is_empty(&node->stats_list)) {
> > +            poll_immediate_wake();
> > +        }
> > +    }
> > +    int64_t new_seq = seq_read(statctrl_ctx.main_seq);
> > +    seq_wait(statctrl_ctx.main_seq, new_seq);
> > +    ovs_mutex_unlock(&mutex);
> > +}
> > +
> > +void
> > +statctrl_destroy(void)
> > +{
> > +    latch_set(&statctrl_ctx.exit_latch);
> > +    pthread_join(statctrl_ctx.thread, NULL);
> > +    latch_destroy(&statctrl_ctx.exit_latch);
> > +    free(statctrl_ctx.br_int);
> > +    seq_destroy(statctrl_ctx.thread_seq);
> > +    seq_destroy(statctrl_ctx.main_seq);
> > +
> > +    for (size_t i = 0; i < STATS_MAX; i++) {
> > +        struct stats_node *node = &statctrl_ctx.nodes[i];
> > +        node->destroy(&node->stats_list);
> > +    }
> > +}
> > +
> > +static void *
> > +statctrl_thread_handler(void *arg)
> > +{
> > +    struct statctrl_ctx *ctx = arg;
> > +
> > +    /* OpenFlow connection to the switch. */
> > +    struct rconn *swconn = rconn_create(5, 0, DSCP_DEFAULT,
> > +                                        1 << OFP15_VERSION);
> > +
> > +    while (!latch_is_set(&ctx->exit_latch)) {
> > +        ovs_mutex_lock(&mutex);
> > +        statctrl_rconn_setup(swconn, ctx->br_int);
> > +        ovs_mutex_unlock(&mutex);
> > +
> > +        rconn_run(swconn);
> > +        uint64_t new_seq = seq_read(ctx->thread_seq);
> > +
> > +        if (rconn_is_connected(swconn)) {
> > +            for (int i = 0; i < 100; i++) {
> > +                struct ofpbuf *msg = rconn_recv(swconn);
> > +
> > +                if (!msg) {
> > +                    break;
> > +                }
> > +
> > +                statctrl_handle_rconn_msg(swconn, ctx, msg);
> > +                ofpbuf_delete(msg);
> > +            }
> > +
> > +            ovs_mutex_lock(&mutex);
> > +            statctrl_send_request(swconn, ctx);
> > +            ovs_mutex_unlock(&mutex);
> > +        }
> > +
> > +        statctrl_notify_main_thread(ctx);
> > +        rconn_run_wait(swconn);
> > +        rconn_recv_wait(swconn);
> > +        ovs_mutex_lock(&mutex);
> > +        statctrl_wait_next_request(ctx);
> > +        ovs_mutex_unlock(&mutex);
> > +        seq_wait(ctx->thread_seq, new_seq);
> > +        latch_wait(&ctx->exit_latch);
> > +
> > +        poll_block();
> > +    }
> > +
> > +    rconn_destroy(swconn);
> > +    return NULL;
> > +}
> > +
> > +static void
> > +statctrl_rconn_setup(struct rconn *swconn, char *br_int)
> > +    OVS_REQUIRES(mutex)
> > +{
> > +    if (!br_int) {
> > +        rconn_disconnect(swconn);
> > +        return;
> > +    }
> > +
> > +    char *conn_target = xasprintf("unix:%s/%s.mgmt", ovs_rundir(),
> br_int);
> > +
> > +    if (strcmp(conn_target, rconn_get_target(swconn))) {
> > +        VLOG_INFO("%s: connecting to switch", conn_target);
> > +        rconn_connect(swconn, conn_target, conn_target);
> > +    }
> > +
> > +    free(conn_target);
> > +}
> > +
> > +static void
> > +statctrl_handle_rconn_msg(struct rconn *swconn, struct statctrl_ctx
> *ctx,
> > +                          struct ofpbuf *msg)
> > +{
> > +    enum ofptype type;
> > +    const struct ofp_header *oh = msg->data;
> > +
> > +    ofptype_decode(&type, oh);
> > +
> > +    if (type == OFPTYPE_ECHO_REQUEST) {
> > +        rconn_send(swconn, ofputil_encode_echo_reply(oh), NULL);
> > +    } else if (type == OFPTYPE_FLOW_STATS_REPLY) {
> > +        enum stat_type stype = statctrl_get_stat_type(ctx, oh);
> > +        if (stype == STATS_MAX) {
> > +            return;
> > +        }
> > +
> > +        ovs_mutex_lock(&mutex);
> > +        statctrl_decode_statistics_reply(&ctx->nodes[stype], msg);
> > +        ovs_mutex_unlock(&mutex);
> > +    } else {
> > +        if (VLOG_IS_DBG_ENABLED()) {
> > +
> > +            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(30,
> 300);
> > +
> > +            char *s = ofp_to_string(oh, ntohs(oh->length), NULL, NULL,
> 2);
> > +
> > +            VLOG_DBG_RL(&rl, "OpenFlow packet ignored: %s", s);
> > +            free(s);
> > +        }
> > +    }
> > +}
> > +
> > +static enum stat_type
> > +statctrl_get_stat_type(struct statctrl_ctx *ctx, const struct
> ofp_header *oh)
> > +{
> > +    for (size_t i = 0; i < STATS_MAX; i++) {
> > +        if (ctx->nodes[i].xid == oh->xid) {
> > +            return i;
> > +        }
> > +    }
> > +    return STATS_MAX;
> > +}
> > +
> > +static void
> > +statctrl_decode_statistics_reply(struct stats_node *node, struct ofpbuf
> *msg)
> > +    OVS_REQUIRES(mutex)
> > +{
> > +    struct ofpbuf ofpacts;
> > +    ofpbuf_init(&ofpacts, 0);
> > +
> > +    while (true) {
> > +        struct ofputil_flow_stats fs;
> > +
> > +        int error = ofputil_decode_flow_stats_reply(&fs, msg, true,
> &ofpacts);
> > +        if (error == EOF) {
> > +            break;
> > +        } else if (error) {
> > +            VLOG_DBG("Couldn't parse stat reply: %s",
> ofperr_to_string(error));
> > +            break;
> > +        }
> > +
> > +        node->process_flow_stats(&node->stats_list, &fs);
> > +    }
> > +
> > +    ofpbuf_uninit(&ofpacts);
> > +}
> > +
> > +static void
> > +statctrl_send_request(struct rconn *swconn, struct statctrl_ctx *ctx)
> > +    OVS_REQUIRES(mutex)
> > +{
> > +    long long now = time_msec();
> > +    enum ofp_version version = rconn_get_version(swconn);
> > +    enum ofputil_protocol proto =
> ofputil_protocol_from_ofp_version(version);
> > +
> > +    for (size_t i = 0; i < STATS_MAX; i++) {
> > +        struct stats_node *node = &ctx->nodes[i];
> > +
> > +        if (now < node->next_request_timestamp) {
> > +            continue;
> > +        }
> > +
> > +        struct ofputil_flow_stats_request fsr = {
> > +                .cookie = htonll(0),
> > +                .cookie_mask = htonll(0),
> > +                .out_port = OFPP_ANY,
> > +                .out_group = OFPG_ANY,
> > +                .table_id = node->table_id,
> > +        }; > +        struct ofpbuf *msg =
> ofputil_encode_flow_stats_request(&fsr,
> proto);
> > +        node->xid = ((struct ofp_header *) msg->data)->xid;
> > +
> > +        statctrl_update_next_request_timestamp(node, now, 0);
> > +
> > +        rconn_send(swconn, msg, NULL);
> > +    }
> > +}
> > +
> > +static void
> > +statctrl_notify_main_thread(struct statctrl_ctx *ctx)
> > +{
> > +    for (size_t i = 0; i < STATS_MAX; i++) {
> > +        if (!ovs_list_is_empty(&ctx->nodes[i].stats_list)) {
> > +            seq_change(ctx->main_seq);
> > +            return;
> > +        }
> > +    }
> > +}
> > +
> > +static void
> > +statctrl_set_conn_target(const char *br_int_name)
> > +    OVS_REQUIRES(mutex)
> > +{
> > +    if (!br_int_name) {
> > +        return;
> > +    }
> > +
> > +
> > +    if (!statctrl_ctx.br_int || strcmp(statctrl_ctx.br_int,
> br_int_name)) {
> > +        free(statctrl_ctx.br_int);
> > +        statctrl_ctx.br_int = xstrdup(br_int_name);
> > +        /* Notify statctrl thread that integration bridge is
> set/changed. */
> > +        seq_change(statctrl_ctx.thread_seq);
> > +    }
> > +}
> > +
> > +static void
> > +statctrl_wait_next_request(struct statctrl_ctx *ctx)
> > +    OVS_REQUIRES(mutex)
> > +{
> > +    for (size_t i = 0; i < STATS_MAX; i++) {
> > +        int64_t timestamp = ctx->nodes[i].next_request_timestamp;
> > +        if (timestamp < INT64_MAX) {
> > +            poll_timer_wait_until(timestamp);
> > +        }
> > +    }
> > +}
> > +
> > +static bool
> > +statctrl_update_next_request_timestamp(struct stats_node *node,
> > +                                       long long now, uint64_t
> prev_delay)
> > +{
> > +    if (!node->request_delay) {
> > +        node->next_request_timestamp = INT64_MAX;
> > +        return false;
> > +    }
> > +
> > +    int64_t timestamp = prev_delay ? node->next_request_timestamp : now;
> > +    node->next_request_timestamp =
> > +            timestamp + node->request_delay - prev_delay;
> > +
> > +    return timestamp != node->next_request_timestamp;
> > +}
> > diff --git a/controller/statctrl.h b/controller/statctrl.h
> > new file mode 100644
> > index 000000000..c5cede353
> > --- /dev/null
> > +++ b/controller/statctrl.h
> > @@ -0,0 +1,28 @@
> > +/* Copyright (c) 2023, Red Hat, Inc.
> > + *
> > + * Licensed under the Apache License, Version 2.0 (the "License");
> > + * you may not use this file except in compliance with the License.
> > + * You may obtain a copy of the License at:
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +#ifndef STATCTRL_H
> > +#define STATCTRL_H
> > +
> > +#include "mac_cache.h"
> > +
> > +void statctrl_init(void);
> > +void statctrl_run(struct ovsdb_idl_txn *ovnsb_idl_txn,
> > +                  struct mac_cache_data *mac_cache_data);
> > +void statctrl_update(const char *br_int_name);
> > +void statctrl_wait(struct ovsdb_idl_txn *ovnsb_idl_txn);
> > +void statctrl_destroy(void);
> > +
> > +#endif /* controller/statctrl.h */
> > diff --git a/tests/ovn.at b/tests/ovn.at
> > index 7cee8a175..f4c28c57b 100644
> > --- a/tests/ovn.at
> > +++ b/tests/ovn.at
> > @@ -34556,8 +34556,10 @@ AT_CHECK([fetch_column nb:logical_router
> options name="gw" | grep -q mac_binding
> >   send_garp hv1 ext1 10
> >   send_garp hv2 ext2 20
> >
> > -OVS_WAIT_UNTIL([ovn-sbctl list mac_binding | grep -q "192.168.10.10"])
> > -OVS_WAIT_UNTIL([ovn-sbctl list mac_binding | grep -q "192.168.10.20"])
> > +wait_row_count mac_binding 1 ip="192.168.10.10"
> > +wait_row_count mac_binding 1 ip="192.168.10.20"
> > +
> > +timestamp=$(fetch_column mac_binding timestamp ip="192.168.10.20")
> >
> >   send_udp hv1 ext1 10
> >   send_udp hv2 ext2 20
> > @@ -34566,16 +34568,27 @@ OVS_WAIT_UNTIL([as hv1 ovs-ofctl dump-flows
> br-int table=79 | grep "192.168.10.1
> >   OVS_WAIT_UNTIL([as hv2 ovs-ofctl dump-flows br-int table=79 | grep
> "192.168.10.20" | grep -q "n_packets=1"])
> >
> >   # Set the MAC binding aging threshold
> > -AT_CHECK([ovn-nbctl set logical_router gw
> options:mac_binding_age_threshold=1])
> > -AT_CHECK([fetch_column nb:logical_router options | grep -q
> mac_binding_age_threshold=1])
> > +AT_CHECK([ovn-nbctl set logical_router gw
> options:mac_binding_age_threshold=5])
> > +AT_CHECK([fetch_column nb:logical_router options | grep -q
> mac_binding_age_threshold=5])
> >   AT_CHECK([ovn-nbctl --wait=sb sync])
> >
> > +# Wait send few packets for "192.168.10.20" to indicate that it is
> still in use
> > +send_udp hv2 ext2 20
> > +sleep 1
> > +send_udp hv2 ext2 20
> > +
> >   # Set the timeout for OVS_WAIT* functions to 5 seconds
> >   OVS_CTL_TIMEOUT=5
> > +OVS_WAIT_UNTIL([
> > +    test "$timestamp" != "$(fetch_column mac_binding timestamp
> ip='192.168.10.20')"
> > +])
> > +check $(test "$(fetch_column mac_binding timestamp ip='192.168.10.20')"
> != "")
> > +
> >   # Check if the records are removed after some inactivity
> >   OVS_WAIT_UNTIL([
> >       test "0" = "$(ovn-sbctl list mac_binding | grep -c
> '192.168.10.10')"
> >   ])
> > +# The second one takes longer because it got refreshed
> >   OVS_WAIT_UNTIL([
> >       test "0" = "$(ovn-sbctl list mac_binding | grep -c
> '192.168.10.20')"
> >   ])
> > @@ -35257,7 +35270,7 @@ check ovs-vsctl add-br br-phys
> >   ovn_attach n1 br-phys 192.168.0.1
> >
> >   dnl Ensure that there are at least 3 openflow connections.
>
> Nit: The comment is already incorrect since it checks for an exact
> number of connections instead of "at least" some number. But it also
> needs to be updated to say 4 instead of 3.
>
>
Ah right, it will be fixed in v2.


>
> > -OVS_WAIT_UNTIL([test "$(grep -c 'negotiated OpenFlow version'
> hv1/ovs-vswitchd.log)" -eq "3"])
> > +OVS_WAIT_UNTIL([test "$(grep -c 'negotiated OpenFlow version'
> hv1/ovs-vswitchd.log)" -eq "4"])
> >
> >   dnl "Wait" 3 times 60 seconds and ensure ovn-controller writes to the
> >   dnl openflow connections in the meantime.  This should allow
> ovs-vswitchd
>
>
Thanks,
Ales

-- 

Ales Musil

Senior Software Engineer - OVN Core

Red Hat EMEA <https://www.redhat.com>

amu...@redhat.com    IM: amusil
<https://red.ht/sig>
_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to