Han Zhou <hz...@ovn.org> writes: > On Thu, Jan 25, 2024 at 12:55 PM Aaron Conole <acon...@redhat.com> wrote: >> >> From: Kevin Sprague <ksprague0...@gmail.com> >> >> During normal operations, it is useful to understand when a particular flow >> gets removed from the system. This can be useful when debugging performance >> issues tied to ofproto flow changes, trying to determine deployed traffic >> patterns, or while debugging dynamic systems where ports come and go. >> >> Prior to this change, there was a lack of visibility around flow expiration. >> The existing debugging infrastructure could tell us when a flow was added to >> the datapath, but not when it was removed or why. >> >> This change introduces a USDT probe at the point where the revalidator >> determines that the flow should be removed. Additionally, we track the >> reason for the flow eviction and provide that information as well. With >> this change, we can track the complete flow lifecycle for the netlink >> datapath by hooking the upcall tracepoint in kernel, the flow put USDT, and >> the revaldiator USDT, letting us watch as flows are added and removed from >> the kernel datapath. >> >> This change only enables this information via USDT probe, so it won't be >> possible to access this information any other way (see: >> Documentation/topics/usdt-probes.rst). >> >> Also included is a script (utilities/usdt-scripts/flow_reval_monitor.py) >> which serves as a demonstration of how the new USDT probe might be used >> going forward. >> >> Signed-off-by: Kevin Sprague <ksprague0...@gmail.com> >> Co-authored-by: Aaron Conole <acon...@redhat.com> >> Signed-off-by: Aaron Conole <acon...@redhat.com> > > Thanks Aaron for taking care of this patch. I saw you resolved most of my > comments for the v6 of the original patch: > https://mail.openvswitch.org/pipermail/ovs-dev/2023-January/401220.html > > Butit seems my last comment was missed: > === > > I do notice a counter in my patch doesn't have a > counterpart in this patch. In revalidator_sweep__(), I have: > if (purge) { > result = UKEY_DELETE; > + COVERAGE_INC(upcall_flow_del_purge); > > Would it be good to add one (e.g. FDR_PURGE) here, too? > > === > Could you check if this can be added? > If this is merged I can rebase my patch on top of this.
Sorry I didn't reply to this. I'm not sure it makes sense to add the probe for purge, specifically as the purge is only done in two cases: 1. The threads are being stopped (which should never occur after initialization unless the vswitchd is being stopped / killed) 2. An admin runs a command to purge the revalidators (which isn't a recommended procedure as it can cause lots of really weird side effects and we only use it as a debug tool). Did I understand the case enough? I didn't reread the patch you're proposing, so I might be misunderstanding something. > Thanks, > Han > >> >> --- >> Documentation/topics/usdt-probes.rst | 1 + >> ofproto/ofproto-dpif-upcall.c | 42 +- >> utilities/automake.mk | 3 + >> utilities/usdt-scripts/flow_reval_monitor.py | 653 +++++++++++++++++++ >> 4 files changed, 693 insertions(+), 6 deletions(-) >> create mode 100755 utilities/usdt-scripts/flow_reval_monitor.py >> >> diff --git a/Documentation/topics/usdt-probes.rst >> b/Documentation/topics/usdt-probes.rst >> index e527f43bab..a8da9bb1f7 100644 >> --- a/Documentation/topics/usdt-probes.rst >> +++ b/Documentation/topics/usdt-probes.rst >> @@ -214,6 +214,7 @@ Available probes in ``ovs_vswitchd``: >> - dpif_recv:recv_upcall >> - main:poll_block >> - main:run_start >> +- revalidate:flow_result >> - revalidate_ukey\_\_:entry >> - revalidate_ukey\_\_:exit >> - udpif_revalidator:start_dump >> diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c >> index b5cbeed878..97d75833f7 100644 >> --- a/ofproto/ofproto-dpif-upcall.c >> +++ b/ofproto/ofproto-dpif-upcall.c >> @@ -269,6 +269,18 @@ enum ukey_state { >> }; >> #define N_UKEY_STATES (UKEY_DELETED + 1) >> >> +enum flow_del_reason { >> + FDR_REVALIDATE = 0, /* The flow was revalidated. */ >> + FDR_FLOW_IDLE, /* The flow went unused and was deleted. */ >> + FDR_TOO_EXPENSIVE, /* The flow was too expensive to revalidate. */ >> + FDR_FLOW_WILDCARDED, /* The flow needed a narrower wildcard mask. */ >> + FDR_BAD_ODP_FIT, /* The flow had a bad ODP flow fit. */ >> + FDR_NO_OFPROTO, /* The flow didn't have an associated ofproto. >> */ >> + FDR_XLATION_ERROR, /* There was an error translating the flow. */ >> + FDR_AVOID_CACHING, /* Flow deleted to avoid caching. */ >> + FDR_FLOW_LIMIT, /* All flows being killed. */ >> +}; >> + >> /* 'udpif_key's are responsible for tracking the little bit of state udpif >> * needs to do flow expiration which can't be pulled directly from the >> * datapath. They may be created by any handler or revalidator thread at >> any >> @@ -2272,7 +2284,8 @@ populate_xcache(struct udpif *udpif, struct udpif_key >> *ukey, >> static enum reval_result >> revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, >> uint16_t tcp_flags, struct ofpbuf *odp_actions, >> - struct recirc_refs *recircs, struct xlate_cache *xcache) >> + struct recirc_refs *recircs, struct xlate_cache *xcache, >> + enum flow_del_reason *reason) >> { >> struct xlate_out *xoutp; >> struct netflow *netflow; >> @@ -2293,11 +2306,13 @@ revalidate_ukey__(struct udpif *udpif, const struct >> udpif_key *ukey, >> netflow = NULL; >> >> if (xlate_ukey(udpif, ukey, tcp_flags, &ctx)) { >> + *reason = FDR_XLATION_ERROR; >> goto exit; >> } >> xoutp = &ctx.xout; >> >> if (xoutp->avoid_caching) { >> + *reason = FDR_AVOID_CACHING; >> goto exit; >> } >> >> @@ -2311,6 +2326,7 @@ revalidate_ukey__(struct udpif *udpif, const struct >> udpif_key *ukey, >> ofpbuf_clear(odp_actions); >> >> if (!ofproto) { >> + *reason = FDR_NO_OFPROTO; >> goto exit; >> } >> >> @@ -2322,6 +2338,7 @@ revalidate_ukey__(struct udpif *udpif, const struct >> udpif_key *ukey, >> if (odp_flow_key_to_mask(ukey->mask, ukey->mask_len, &dp_mask, >> &ctx.flow, >> NULL) >> == ODP_FIT_ERROR) { >> + *reason = FDR_BAD_ODP_FIT; >> goto exit; >> } >> >> @@ -2331,6 +2348,7 @@ revalidate_ukey__(struct udpif *udpif, const struct >> udpif_key *ukey, >> * down. Note that we do not know if the datapath has ignored any of >> the >> * wildcarded bits, so we may be overly conservative here. */ >> if (flow_wildcards_has_extra(&dp_mask, ctx.wc)) { >> + *reason = FDR_FLOW_WILDCARDED; >> goto exit; >> } >> >> @@ -2400,7 +2418,7 @@ static enum reval_result >> revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey, >> const struct dpif_flow_stats *stats, >> struct ofpbuf *odp_actions, uint64_t reval_seq, >> - struct recirc_refs *recircs) >> + struct recirc_refs *recircs, enum flow_del_reason *reason) >> OVS_REQUIRES(ukey->mutex) >> { >> bool need_revalidate = ukey->reval_seq != reval_seq; >> @@ -2430,8 +2448,12 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key >> *ukey, >> xlate_cache_clear(ukey->xcache); >> } >> result = revalidate_ukey__(udpif, ukey, push.tcp_flags, >> - odp_actions, recircs, ukey->xcache); >> - } /* else delete; too expensive to revalidate */ >> + odp_actions, recircs, ukey->xcache, >> + reason); >> + } else { >> + /* delete; too expensive to revalidate */ >> + *reason = FDR_TOO_EXPENSIVE; >> + } >> } else if (!push.n_packets || ukey->xcache >> || !populate_xcache(udpif, ukey, push.tcp_flags)) { >> result = UKEY_KEEP; >> @@ -2831,6 +2853,7 @@ revalidate(struct revalidator *revalidator) >> for (f = flows; f < &flows[n_dumped]; f++) { >> long long int used = f->stats.used; >> struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER; >> + enum flow_del_reason reason = FDR_REVALIDATE; >> struct dpif_flow_stats stats = f->stats; >> enum reval_result result; >> struct udpif_key *ukey; >> @@ -2905,9 +2928,14 @@ revalidate(struct revalidator *revalidator) >> } >> if (kill_them_all || (used && used < now - max_idle)) { >> result = UKEY_DELETE; >> + if (kill_them_all) { >> + reason = FDR_FLOW_LIMIT; >> + } else { >> + reason = FDR_FLOW_IDLE; >> + } >> } else { >> result = revalidate_ukey(udpif, ukey, &stats, &odp_actions, >> - reval_seq, &recircs); >> + reval_seq, &recircs, &reason); >> } >> ukey->dump_seq = dump_seq; >> >> @@ -2916,6 +2944,7 @@ revalidate(struct revalidator *revalidator) >> udpif_update_flow_pps(udpif, ukey, f); >> } >> >> + OVS_USDT_PROBE(revalidate, flow_result, reason, udpif, ukey); >> if (result != UKEY_KEEP) { >> /* Takes ownership of 'recircs'. */ >> reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs, >> @@ -2962,6 +2991,7 @@ revalidator_sweep__(struct revalidator *revalidator, >> bool purge) >> uint64_t odp_actions_stub[1024 / 8]; >> struct ofpbuf odp_actions = >> OFPBUF_STUB_INITIALIZER(odp_actions_stub); >> >> + enum flow_del_reason reason = FDR_REVALIDATE; >> struct ukey_op ops[REVALIDATE_MAX_BATCH]; >> struct udpif_key *ukey; >> struct umap *umap = &udpif->ukeys[i]; >> @@ -2993,7 +3023,7 @@ revalidator_sweep__(struct revalidator *revalidator, >> bool purge) >> COVERAGE_INC(revalidate_missed_dp_flow); >> memcpy(&stats, &ukey->stats, sizeof stats); >> result = revalidate_ukey(udpif, ukey, &stats, >> &odp_actions, >> - reval_seq, &recircs); >> + reval_seq, &recircs, &reason); >> } >> if (result != UKEY_KEEP) { >> /* Clears 'recircs' if filled by revalidate_ukey(). */ >> diff --git a/utilities/automake.mk b/utilities/automake.mk >> index 9a2114df40..146b8c37fb 100644 >> --- a/utilities/automake.mk >> +++ b/utilities/automake.mk >> @@ -23,6 +23,7 @@ scripts_DATA += utilities/ovs-lib >> usdt_SCRIPTS += \ >> utilities/usdt-scripts/bridge_loop.bt \ >> utilities/usdt-scripts/dpif_nl_exec_monitor.py \ >> + utilities/usdt-scripts/flow_reval_monitor.py \ >> utilities/usdt-scripts/kernel_delay.py \ >> utilities/usdt-scripts/kernel_delay.rst \ >> utilities/usdt-scripts/reval_monitor.py \ >> @@ -72,6 +73,7 @@ EXTRA_DIST += \ >> utilities/docker/debian/build-kernel-modules.sh \ >> utilities/usdt-scripts/bridge_loop.bt \ >> utilities/usdt-scripts/dpif_nl_exec_monitor.py \ >> + utilities/usdt-scripts/flow_reval_monitor.py \ >> utilities/usdt-scripts/kernel_delay.py \ >> utilities/usdt-scripts/kernel_delay.rst \ >> utilities/usdt-scripts/reval_monitor.py \ >> @@ -146,6 +148,7 @@ FLAKE8_PYFILES += utilities/ovs-pcap.in \ >> utilities/ovs-tcpdump.in \ >> utilities/ovs-pipegen.py \ >> utilities/usdt-scripts/dpif_nl_exec_monitor.py \ >> + utilities/usdt-scripts/flow_reval_monitor.py \ >> utilities/usdt-scripts/upcall_monitor.py \ >> utilities/usdt-scripts/upcall_cost.py >> >> diff --git a/utilities/usdt-scripts/flow_reval_monitor.py >> b/utilities/usdt-scripts/flow_reval_monitor.py >> new file mode 100755 >> index 0000000000..e808020bb5 >> --- /dev/null >> +++ b/utilities/usdt-scripts/flow_reval_monitor.py >> @@ -0,0 +1,653 @@ >> +#!/usr/bin/env python3 >> +# >> +# Copyright (c) 2022 Redhat, Inc. >> +# >> +# Licensed under the Apache License, Version 2.0 (the "License"); >> +# you may not use this file except in compliance with the License. >> +# You may obtain a copy of the License at: >> +# >> +# http://www.apache.org/licenses/LICENSE-2.0 >> +# >> +# Unless required by applicable law or agreed to in writing, software >> +# distributed under the License is distributed on an "AS IS" BASIS, >> +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> +# See the License for the specific language governing permissions and >> +# limitations under the License. >> +# >> +# Script information: >> +# ------------------- >> +# flow_reval_monitor.py uses the dpif_netlink_operate:flow_put and >> +# revalidator:flow_result USDT probes to monitor flow lifetimes and >> +# expiration events. By default, this will show all flow_put and flow >> +# expiration events, along with their reasons. This will look like so: >> +# >> +# TIME UFID >> EVENT/REASON >> +# 101536.226986736 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow_put >> +# 101536.227196214 ufid:d08472b6-110e-46cb-a9e4-00008f46838e flow_put >> +# 101541.516610178 ufid:fc5cc4a2-39e7-4a2d-bbce-000019665b32 flow_put >> +# 101541.516967303 ufid:fddd6510-26dc-4c87-8f7a-0000fc0c2c3a flow_put >> +# 101551.688050747 ufid:fddd6510-26dc-4c87-8f7a-0000fc0c2c3a flow timed >> out >> +# 101551.688077175 ufid:fc5cc4a2-39e7-4a2d-bbce-000019665b32 flow timed >> out >> +# 101557.695391371 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow timed >> out >> +# 101557.695408909 ufid:d08472b6-110e-46cb-a9e4-00008f46838e flow timed >> out >> +# >> +# flow key data can be printed using the --flow-keys option. This will >> +# print the equivalent datapath flow string. >> +# >> +# When filtering flows, the syntax is the same as used by >> +# `ovs-appctl dpctl/add-flow`. >> +# >> +# The following options are available: >> +# >> +# usage: flow_reval_monitor.py [-h] [--buffer-page-count NUMBER] >> +# [-k [FLOW_KEYS]] [-p VSWITCHD_PID] >> +# [-D [DEBUG]] [-f [FLOW STRING ...]] >> +# >> +# optional arguments: >> +# -h, --help show this help message and exit >> +# --buffer-page-count NUMBER >> +# Number of BPF ring buffer pages, default 1024 >> +# -f <64..2048>, --flow-key-size=<64..2048> >> +# Set the size of the flow key, default 64 >> +# -k, --flow-keys Print flow keys as flow strings >> +# -l [FLOW_STRING ...], --filter-flows [FLOW_STRING ...] >> +# Filter flows that match the specified ODP-like >> flow >> +# -p VSWITCHD_PID, --pid VSWITCHD_PID >> +# ovs-vswitchd's PID >> +# -D [DEBUG], --debug [DEBUG] >> +# Enable eBPF debugging >> +# >> +# Examples: >> +# >> +# To use the script on a running ovs-vswitchd to see flow keys and >> expiration >> +# events for flows with an ipv4 source of 192.168.10.10: >> +# $ ./flow_reval_monitor.py --flow-keys --filter-flows \ >> +# "ipv4(src=192.168.10.10)" >> +# TIME UFID >> EVENT/REASON >> +# 105082.457322742 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow_put >> +# ufid:f76fc899-376d-466b-bc74-0000b933eb97 has the following flow >> information: >> +# in_port(2), >> +# eth(src=0e:04:47:fc:74:51, dst=da:dc:c5:69:05:d7), \ >> +# eth_type(0x800), \ >> +# ipv4(src=192.168.10.10, dst=192.168.10.30, proto=1, tos=0, >> ttl=64,[...]), >> +# icmp(type=8, code=0) >> +# 105092.635450202 ufid:f76fc899-376d-466b-bc74-0000b933eb97 Flow timed >> out >> +# >> +# Notes: >> +# 1) No options are needed to attach when there is a single running >> instance >> +# of ovs-vswitchd. >> +# 2) If you're using the flow filtering option, it will only track flows >> that >> +# have been upcalled since the script began running. >> +# 3) When using the flow filtering option, the key size will likely need >> to >> +# be expanded to match on all the fields in the message. The default >> is >> +# kept small to keep the buffer copy sizes down when displaying >> +# flows (-k), but is hardcoded to 2048 when an actual filter (-l) is >> +# applied >> + >> +try: >> + from bcc import BPF >> + from bcc import USDT >> + from bcc import USDTException >> +except ModuleNotFoundError: >> + print("ERROR: Can't find the BPF Compiler Collection Tools.") >> + print("Please install them before running this script.") >> + exit(1) >> + >> +import argparse >> +from ipaddress import IPv4Address, IPv6Address >> +import psutil >> +import struct >> +import sys >> +import time >> + >> +# >> +# eBPF source code >> +# >> +bpf_src = """ >> +#include <linux/sched.h> >> +#include <uapi/linux/ptrace.h> >> + >> +#define MAX_KEY <MAX_KEY_VAL> >> +#define FLOW_FILTER <FILTER_BOOL> >> + >> +enum probe { OP_FLOW_PUT, FLOW_RESULT }; >> + >> +typedef union ovs_u128 { >> + unsigned int ufid32[4]; >> + unsigned long long ufid64[2]; >> +} ovs_u128; >> + >> +struct dpif_flow_put { >> + int flags; >> + void *key_ptr; >> + size_t key_len; >> + void *mask_ptr; >> + size_t mask_len; >> + u64 action_ptr; >> + size_t action_len; >> + void *ufid_ptr; >> +}; >> + >> +struct udpif_key { >> + void *cmap_node; >> + void *key_ptr; >> + size_t key_len; >> + void *mask_ptr; >> + size_t mask_len; >> + ovs_u128 ufid; >> +}; >> + >> +struct event_t { >> + u64 ts; >> + u32 reason; >> + u32 ufid[4]; /* Can't seem to make the ovs_u128 pass to python side. */ >> + u64 key_size; >> + u8 key[MAX_KEY]; >> + enum probe probe; >> +}; >> + >> +BPF_HASH(watchlist, ovs_u128); >> +BPF_RINGBUF_OUTPUT(events, <BUFFER_PAGE_COUNT>); >> + >> +int usdt__flow_result(struct pt_regs *ctx) { >> + u64 *ufid_present = NULL; >> + struct udpif_key ukey; >> + >> + bpf_usdt_readarg_p(3, ctx, &ukey, sizeof ukey); >> + ovs_u128 ufid = ukey.ufid; >> + ufid_present = watchlist.lookup(&ufid); >> + if(FLOW_FILTER && !ufid_present) { >> + return 0; >> + } >> + >> + struct event_t *event = events.ringbuf_reserve(sizeof(struct event_t)); >> + if(!event) { >> + /* If we can't reserve the space in the ring buffer, return 1. */ >> + return 1; >> + } >> + >> + event->probe = FLOW_RESULT; >> + event->ts = bpf_ktime_get_ns(); >> + bpf_probe_read(&event->ufid, sizeof ufid, &ufid); >> + bpf_usdt_readarg(1, ctx, &event->reason); >> + events.ringbuf_submit(event, 0); >> + >> + return 0; >> +}; >> + >> + >> +int usdt__op_flow_put(struct pt_regs *ctx) { >> + struct dpif_flow_put put; >> + ovs_u128 ufid; >> + >> + struct event_t *event = events.ringbuf_reserve(sizeof(struct event_t)); >> + if(!event) { >> + /* If we can't reserve the space in the ring buffer, return 1. */ >> + return 1; >> + } >> + >> + event->probe = OP_FLOW_PUT; >> + event->ts = bpf_ktime_get_ns(); >> + bpf_usdt_readarg_p(2, ctx, &put, sizeof put); >> + bpf_probe_read(&event->ufid, sizeof event->ufid, put.ufid_ptr); >> + bpf_probe_read(&ufid, sizeof ufid, &event->ufid); >> + if (put.key_len > MAX_KEY) { >> + put.key_len = MAX_KEY; >> + } >> + event->key_size = put.key_len; >> + bpf_probe_read(&event->key, put.key_len, put.key_ptr); >> + event->reason = 0; >> + events.ringbuf_submit(event, 0); >> + >> + watchlist.increment(ufid); >> + return 0; >> +}; >> +""" >> + >> + >> +# >> +# buffer_size_type() >> +# >> +def buffer_size_type(astr, min=64, max=2048): >> + value = int(astr) >> + if min <= value <= max: >> + return value >> + else: >> + raise argparse.ArgumentTypeError( >> + 'value not in range {}-{}'.format(min, max)) >> + >> + >> +# >> +# format_ufid() >> +# >> +def format_ufid(ufid): >> + if ufid is None: >> + return "ufid:none" >> + >> + return "ufid:{:08x}-{:04x}-{:04x}-{:04x}-{:04x}{:08x}".format( >> + ufid[0], ufid[1] >> 16, ufid[1] & 0xffff, >> + ufid[2] >> 16, ufid[2] & 0, ufid[3]) >> + >> + >> +# >> +# find_and_delete_from_watchlist() >> +# >> +def find_and_delete_from_watchlist(event): >> + for k, _ in b["watchlist"].items(): >> + key_ufid = struct.unpack("=IIII", k) >> + if key_ufid == tuple(event.ufid): >> + key = (b["watchlist"].Key * 1)(k) >> + b["watchlist"].items_delete_batch(key) >> + break >> + >> + >> +# >> +# handle_flow_put() >> +# >> +def handle_flow_put(event): >> + if args.flow_keys or args.filter_flows is not None: >> + key = decode_key(bytes(event.key)[:event.key_size]) >> + flow_dict, flow_str = parse_flow_dict(key) >> + # For each attribute that we're watching. >> + if args.filter_flows is not None: >> + if not compare_flow_to_target(args.filter_flows, flow_dict): >> + find_and_delete_from_watchlist(event) >> + return >> + >> + print("{:<18.9f} {:<45} {:<13}".format(event.ts / 1000000000, >> + format_ufid(event.ufid), "Insert (put) flow to kernel.")) >> + >> + if args.flow_keys: >> + if len(flow_str) > 80: >> + flow_str = " " + "),\n ".join(flow_str.split("), ")) >> + else: >> + flow_str = " " + flow_str >> + print(" - It holds the following flow information:") >> + print(flow_str) >> + >> + >> +# >> +# compare_flow_to_target() >> +# >> +def compare_flow_to_target(target, flow): >> + for key in target: >> + if key not in flow: >> + return False >> + elif target[key] is True: >> + continue >> + elif target[key] == flow[key]: >> + continue >> + elif isinstance(target[key], dict) and isinstance(flow[key], dict): >> + return compare_flow_to_target(target[key], flow[key]) >> + else: >> + return False >> + return True >> + >> + >> +# >> +# parse_flow_str() >> +# >> +def parse_flow_str(flow_str): >> + f_list = [i.strip(", ") for i in flow_str.split(")")] >> + if f_list[-1] == "": >> + f_list = f_list[:-1] >> + flow_dict = {} >> + for e in f_list: >> + split_list = e.split("(") >> + k = split_list[0] >> + if len(split_list) == 1: >> + flow_dict[k] = True >> + elif split_list[1].count("=") == 0: >> + flow_dict[k] = split_list[1] >> + else: >> + sub_dict = {} >> + sublist = [i.strip() for i in split_list[1].split(",")] >> + for subkey in sublist: >> + brk = subkey.find("=") >> + sub_dict[subkey[:brk]] = subkey[brk + 1:] >> + flow_dict[k] = sub_dict >> + return flow_dict >> + >> + >> +# >> +# print_expiration() >> +# >> +def print_expiration(event): >> + reasons = ["Unknown flow expiration reason!", "Flow timed out", >> + "Flow revalidation too expensive", >> + "Flow needs narrower wildcard mask", >> + "Bad ODP flow fit", "Flow with associated ofproto", >> + "Flow translation error", "Flow cache avoidance", >> + "Kill them all signal"] >> + >> + ufid_str = format_ufid(event.ufid) >> + reason = event.reason >> + >> + if reason not in range(0, len(reasons) - 1): >> + reason = 0 >> + print("{:<18.9f} {:<45} {:<17}". >> + format(event.ts / 1000000000, ufid_str, reasons[reason])) >> + >> + >> +# >> +# decode_key() >> +# >> +def decode_key(msg): >> + bytes_left = len(msg) >> + result = {} >> + while bytes_left: >> + if bytes_left < 4: >> + break >> + nla_len, nla_type = struct.unpack("=HH", msg[:4]) >> + if nla_len < 4: >> + break >> + nla_data = msg[4:nla_len] >> + trunc = False >> + if nla_len > bytes_left: >> + trunc = True >> + nla_data = nla_data[:(bytes_left - 4)] >> + else: >> + result[get_ovs_key_attr_str(nla_type)] = nla_data >> + if trunc: >> + break >> + next_offset = (nla_len + 3) & (~3) >> + msg = msg[next_offset:] >> + bytes_left -= next_offset >> + return result >> + >> + >> +# >> +# get_ovs_key_attr_str() >> +# >> +def get_ovs_key_attr_str(attr): >> + ovs_key_attr = ["OVS_KEY_ATTR_UNSPEC", >> + "encap", >> + "skb_priority", >> + "in_port", >> + "eth", >> + "vlan", >> + "eth_type", >> + "ipv4", >> + "ipv6", >> + "tcp", >> + "udp", >> + "icmp", >> + "icmpv6", >> + "arp", >> + "nd", >> + "skb_mark", >> + "tunnel", >> + "sctp", >> + "tcp_flags", >> + "dp_hash", >> + "recirc_id", >> + "mpls", >> + "ct_state", >> + "ct_zone", >> + "ct_mark", >> + "ct_label", >> + "ct_tuple4", >> + "ct_tuple6", >> + "nsh"] >> + >> + if attr < 0 or attr > len(ovs_key_attr): >> + return "<UNKNOWN>: {}".format(attr) >> + return ovs_key_attr[attr] >> + >> + >> +# >> +# is_nonzero() >> +# >> +def is_nonzero(val): >> + if isinstance(val, int): >> + return (val != 0) >> + >> + if isinstance(val, str): >> + val = bytes(val, "utf-8") >> + >> + # If it's not a string or an int, it's bytes. >> + return (val.count(0) < len(val)) >> + >> + >> +# >> +# parse_flow_dict() >> +# >> +def parse_flow_dict(key_dict, decode=True): >> + ret_str = "" >> + parseable = {} >> + skip = ["nsh", "tunnel", "mpls", "vlan"] >> + need_byte_swap = ["ct_label"] >> + ipv4addrs = ["ct_tuple4", "tunnel", "ipv4", "arp"] >> + ipv6addrs = ["ipv6", "nd", "ct_tuple6"] >> + macs = {"eth": [0, 1], "arp": [3, 4], "nd": [1, 2]} >> + fields = [("OVS_KEY_ATTR_UNSPEC"), >> + ("encap", ), >> + ("skb_priority", "<I"), >> + ("in_port", "<I"), >> + ("eth", "!6s6s", "src", "dst"), >> + ("vlan", ), >> + ("eth_type", "!H"), >> + ("ipv4", "!4s4s4B", "src", "dst", "proto", "tos", "ttl", >> "frag"), >> + ("ipv6", "!16s16s4s4B", "src", "dst", >> + "label", "proto", "tclass", "hlimit", "frag"), >> + ("tcp", "!2H", "src", "dst"), >> + ("udp", "!2H", "src", "dst"), >> + ("icmp", "!2B", "type", "code"), >> + ("icmpv6", "!2B", "type", "code"), >> + ("arp", "!4s4sH6s6s", "sip", "tip", "op", "sha", "tha"), >> + ("nd", "!16s6s6s", "target", "sll", "tll"), >> + ("skb_mark", "<I"), >> + ("tunnel", ), >> + ("sctp", "!2H", "src", "dst"), >> + ("tcp_flags", "!H"), >> + ("dp_hash", "<I"), >> + ("recirc_id", "<I"), >> + ("mpls", ), >> + ("ct_state", "<I"), >> + ("ct_zone", "<H"), >> + ("ct_mark", "<I"), >> + ("ct_label", "!16s"), >> + ("ct_tuple4", >> + "!4s4s2HB", "src", "dst", "tp_src", "tp_dst", "proto"), >> + ("ct_tuple6", >> + "!16s16sB2H", "src", "dst", "proto", "tp_src", "tp_dst"), >> + ("nsh", )] >> + for k, v in key_dict.items(): >> + s = "" >> + if k in skip: >> + continue >> + if decode and int.from_bytes(v, "big") == 0: >> + parseable[k] = "0" >> + continue >> + if decode and k in need_byte_swap: >> + v = int.from_bytes(v, "little").to_bytes(len(v), "big") >> + attr = -1 >> + found = False >> + for f in fields: >> + if k == f[0]: >> + attr = fields.index(f) >> + found = True >> + break >> + if not found: >> + raise KeyError("Invalid flow field '%s'" % k) >> + if decode and len(fields[attr]) > 1: >> + data = list(struct.unpack(fields[attr][1], >> + v[:struct.calcsize(fields[attr][1])])) >> + if k in ipv4addrs: >> + if data[0].count(0) < 4: >> + data[0] = str(IPv4Address(data[0])) >> + else: >> + data[0] = b"\x00" >> + if data[1].count(0) < 4: >> + data[1] = str(IPv4Address(data[1])) >> + else: >> + data[1] = b"\x00" >> + if k in ipv6addrs: >> + if data[0].count(0) < 16: >> + data[0] = str(IPv6Address(data[0])) >> + else: >> + data[0] = b"\x00" >> + if data[1].count(0) < len(data[1]): >> + data[1] = str(IPv6Address(data[1])) >> + else: >> + data[1] = b"\x00" >> + if k in macs.keys(): >> + for e in macs[k]: >> + if data[e].count(0) == 6: >> + mac_str = b"\x00" >> + else: >> + mac_str = ":".join(["%02x" % i for i in data[e]]) >> + data[e] = mac_str >> + if decode and len(fields[attr]) > 2: >> + field_dict = {field: d for field, d in zip(fields[attr][2:], >> data)} >> + s = ", ".join(k + "=" + str(v) for k, v in field_dict.items()) >> + elif decode and k != "eth_type": >> + s = str(data[0]) >> + field_dict = s >> + else: >> + if decode: >> + s = hex(data[0]) >> + field_dict = s >> + ret_str += k + "(" + s + "), " >> + parseable[k] = field_dict >> + ret_str = ret_str[:-2] >> + return (parseable, ret_str) >> + >> + >> +# >> +# handle_event() >> +# >> +def handle_event(ctx, data, size): >> + # Once we grab the event, we have three cases. >> + # 1. It's a revalidator probe and the reason is nonzero: A flow is >> expiring >> + # 2. It's a revalidator probe and the reason is zero: flow revalidated >> + # 3. It's a flow_put probe. >> + # >> + # We will ignore case 2, and report all others. >> + # >> + event = b["events"].event(data) >> + if event.probe == 0: # OP_FLOW_PUT >> + handle_flow_put(event) >> + elif event.probe == 1 and event.reason > 0: # FLOW_RESULT >> + print_expiration(event) >> + >> + >> +def main(): >> + # >> + # Don't like these globals, but ctx passing does not work with the >> existing >> + # open_ring_buffer() API :( >> + # >> + global b >> + global args >> + >> + # >> + # Argument parsing >> + # >> + parser = argparse.ArgumentParser() >> + parser.add_argument("--buffer-page-count", >> + help="Number of BPF ring buffer pages, default >> 1024", >> + type=int, default=1024, metavar="NUMBER") >> + parser.add_argument("-f", "--flow-key-size", >> + help="Set maximum flow key size to capture, " >> + "default 64 - see notes", type=buffer_size_type, >> + default=64, metavar="[64-2048]") >> + parser.add_argument("-k", "--flow-keys", >> + help="Print flow keys as flow strings", >> + action="store_true") >> + parser.add_argument("-l", "--filter-flows", metavar="FLOW_STRING", >> + help="Filter flows that match the specified " >> + "ODP-like flow", >> + type=str, default=None, nargs="*") >> + parser.add_argument("-p", "--pid", metavar="VSWITCHD_PID", >> + help="ovs-vswitchd's PID", type=int, default=None) >> + parser.add_argument("-D", "--debug", help="Enable eBPF debugging", >> + type=int, const=0x3f, default=0, nargs="?") >> + args = parser.parse_args() >> + >> + # >> + # Find the PID of the ovs-vswitchd daemon if not specified. >> + # >> + if args.pid is None: >> + for proc in psutil.process_iter(): >> + if "ovs-vswitchd" in proc.name(): >> + if args.pid is not None: >> + print("Error: Multiple ovs-vswitchd daemons running, " >> + "use the -p option!") >> + sys.exit(-1) >> + >> + args.pid = proc.pid >> + # >> + # Error checking on input parameters >> + # >> + if args.pid is None: >> + print("ERROR: Failed to find ovs-vswitchd's PID!") >> + sys.exit(-1) >> + >> + # >> + # Attach the USDT probes >> + # >> + u = USDT(pid=int(args.pid)) >> + try: >> + u.enable_probe(probe="op_flow_put", fn_name="usdt__op_flow_put") >> + except USDTException as e: >> + print("Error attaching the dpif_netlink_operate__:op_flow_put >> probe.") >> + print(str(e)) >> + sys.exit(-1) >> + >> + try: >> + u.enable_probe(probe="flow_result", fn_name="usdt__flow_result") >> + except USDTException as e: >> + print("Error attaching the revalidate:flow_result probe.") >> + print(str(e)) >> + sys.exit(-1) >> + >> + # >> + # Attach the probes to the running process >> + # >> + source = bpf_src.replace("<BUFFER_PAGE_COUNT>", >> + str(args.buffer_page_count)) >> + >> + if args.filter_flows is None: >> + filter_bool = 0 >> + >> + # Set the key size based on what the user wanted >> + source = source.replace("<MAX_KEY_VAL>", str(args.flow_key_size)) >> + else: >> + filter_bool = 1 >> + args.filter_flows = parse_flow_str(args.filter_flows[0]) >> + >> + # Run through the parser to make sure we only filter on fields we >> + # understand >> + parse_flow_dict(args.filter_flows, False) >> + >> + # This is hardcoded here because it doesn't make sense to shrink the >> + # size, since the flow key might be missing fields that are matched >> in >> + # the flow filter. >> + source = source.replace("<MAX_KEY_VAL>", "2048") >> + >> + source = source.replace("<FILTER_BOOL>", str(filter_bool)) >> + >> + b = BPF(text=source, usdt_contexts=[u], debug=args.debug) >> + >> + # >> + # Print header >> + # >> + print("{:<18} {:<45} {:<17}".format("TIME", "UFID", "EVENT/REASON")) >> + >> + # >> + # Dump out all events. >> + # >> + b["events"].open_ring_buffer(handle_event) >> + while 1: >> + try: >> + b.ring_buffer_poll() >> + time.sleep(0.5) >> + except KeyboardInterrupt: >> + break >> + >> + >> +# >> +# Start main() as the default entry point >> +# >> +if __name__ == "__main__": >> + main() >> -- >> 2.41.0 >> _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev