Author: vmaffione
Date: Fri Nov  9 08:43:40 2018
New Revision: 340279
URL: https://svnweb.freebsd.org/changeset/base/340279

Log:
  netmap: add load balancer program
  
  Add the lb program, which is able to load-balance input traffic
  received from a netmap port over M groups, with N netmap pipes in
  each group. Each received packet is forwarded to one of the pipes
  chosen from each group (using an L3/L4 connection-consistent hash function).
  This also adds a man page for lb and some cross-references in related
  man pages.
  
  Reviewed by:  bcr, 0mp
  Approved by:  gnn (mentor)
  Differential Revision:        https://reviews.freebsd.org/D17735

Added:
  head/tools/tools/netmap/lb.8   (contents, props changed)
  head/tools/tools/netmap/lb.c   (contents, props changed)
  head/tools/tools/netmap/pkt_hash.c   (contents, props changed)
  head/tools/tools/netmap/pkt_hash.h   (contents, props changed)
Modified:
  head/share/man/man4/netmap.4
  head/tools/tools/netmap/Makefile
  head/tools/tools/netmap/README
  head/tools/tools/netmap/bridge.8
  head/tools/tools/netmap/ctrs.h

Modified: head/share/man/man4/netmap.4
==============================================================================
--- head/share/man/man4/netmap.4        Fri Nov  9 08:15:58 2018        
(r340278)
+++ head/share/man/man4/netmap.4        Fri Nov  9 08:43:40 2018        
(r340279)
@@ -27,7 +27,7 @@
 .\"
 .\" $FreeBSD$
 .\"
-.Dd October 23, 2018
+.Dd October 28, 2018
 .Dt NETMAP 4
 .Os
 .Sh NAME
@@ -1073,8 +1073,11 @@ Other
 clients attached to the same switch can now communicate
 with the network card or the host.
 .Sh SEE ALSO
-.Xr pkt-gen 8 ,
-.Xr bridge 8
+.Xr vale 4 ,
+.Xr vale-ctl 4 ,
+.Xr bridge 8 ,
+.Xr lb 8 ,
+.Xr pkt-gen 8
 .Pp
 .Pa http://info.iet.unipi.it/~luigi/netmap/
 .Pp

Modified: head/tools/tools/netmap/Makefile
==============================================================================
--- head/tools/tools/netmap/Makefile    Fri Nov  9 08:15:58 2018        
(r340278)
+++ head/tools/tools/netmap/Makefile    Fri Nov  9 08:43:40 2018        
(r340279)
@@ -3,7 +3,7 @@
 #
 # For multiple programs using a single source file each,
 # we can just define 'progs' and create custom targets.
-PROGS  =       pkt-gen nmreplay bridge vale-ctl
+PROGS  =       pkt-gen nmreplay bridge vale-ctl lb
 
 CLEANFILES = $(PROGS) *.o
 MAN=
@@ -34,3 +34,6 @@ nmreplay: nmreplay.o
 
 vale-ctl: vale-ctl.o
        $(CC) $(CFLAGS) -o vale-ctl vale-ctl.o
+
+lb: lb.o pkt_hash.o
+       $(CC) $(CFLAGS) -o lb lb.o pkt_hash.o $(LDFLAGS)

Modified: head/tools/tools/netmap/README
==============================================================================
--- head/tools/tools/netmap/README      Fri Nov  9 08:15:58 2018        
(r340278)
+++ head/tools/tools/netmap/README      Fri Nov  9 08:43:40 2018        
(r340279)
@@ -1,9 +1,13 @@
 $FreeBSD$
 
-This directory contains examples that use netmap
+This directory contains applications that use the netmap API
 
-       pkt-gen         a packet sink/source using the netmap API
+       pkt-gen         a multi-function packet generator and traffic sink
 
-       bridge          a two-port jumper wire, also using the native API
+       bridge          a two-port jumper wire, also using the netmap API
 
-       vale-ctl        the program to control VALE bridges
+       vale-ctl        the program to control and inspect VALE switches
+
+       lb              an L3/L4 load balancer
+
+       nmreplay        a tool to playback a pcap file to a netmap port

Modified: head/tools/tools/netmap/bridge.8
==============================================================================
--- head/tools/tools/netmap/bridge.8    Fri Nov  9 08:15:58 2018        
(r340278)
+++ head/tools/tools/netmap/bridge.8    Fri Nov  9 08:43:40 2018        
(r340279)
@@ -23,7 +23,7 @@
 .\"
 .\" $FreeBSD$
 .\"
-.Dd October 23, 2018
+.Dd October 28, 2018
 .Dt BRIDGE 8
 .Os
 .Sh NAME
@@ -71,7 +71,8 @@ Disable zero-copy mode.
 .El
 .Sh SEE ALSO
 .Xr netmap 4 ,
-.Xr pkt-gen 8
+.Xr pkt-gen 8 ,
+.Xr lb 8
 .Sh AUTHORS
 .An -nosplit
 .Nm

Modified: head/tools/tools/netmap/ctrs.h
==============================================================================
--- head/tools/tools/netmap/ctrs.h      Fri Nov  9 08:15:58 2018        
(r340278)
+++ head/tools/tools/netmap/ctrs.h      Fri Nov  9 08:43:40 2018        
(r340279)
@@ -7,30 +7,37 @@
 
 /* counters to accumulate statistics */
 struct my_ctrs {
-       uint64_t pkts, bytes, events, drop;
+       uint64_t pkts, bytes, events;
+       uint64_t drop, drop_bytes;
        uint64_t min_space;
        struct timeval t;
+       uint32_t oq_n; /* number of elements in overflow queue (used in lb) */
 };
 
 /* very crude code to print a number in normalized form.
  * Caller has to make sure that the buffer is large enough.
  */
 static const char *
-norm2(char *buf, double val, char *fmt)
+norm2(char *buf, double val, char *fmt, int normalize)
 {
        char *units[] = { "", "K", "M", "G", "T" };
        u_int i;
-
-       for (i = 0; val >=1000 && i < sizeof(units)/sizeof(char *) - 1; i++)
-               val /= 1000;
+       if (normalize)
+               for (i = 0; val >=1000 && i < sizeof(units)/sizeof(char *) - 1; 
i++)
+                       val /= 1000;
+       else
+               i=0;
        sprintf(buf, fmt, val, units[i]);
        return buf;
 }
 
 static __inline const char *
-norm(char *buf, double val)
+norm(char *buf, double val, int normalize)
 {
-       return norm2(buf, val, "%.3f %s");
+       if (normalize)
+               return norm2(buf, val, "%.3f %s", normalize);
+       else
+               return norm2(buf, val, "%.0f %s", normalize);
 }
 
 static __inline int
@@ -89,7 +96,7 @@ timespec_sub(struct timespec a, struct timespec b)
        return ret;
 }
 
-static uint64_t
+static __inline uint64_t
 wait_for_next_report(struct timeval *prev, struct timeval *cur,
                int report_interval)
 {
@@ -106,3 +113,4 @@ wait_for_next_report(struct timeval *prev, struct time
        return delta.tv_sec* 1000000 + delta.tv_usec;
 }
 #endif /* CTRS_H_ */
+

Added: head/tools/tools/netmap/lb.8
==============================================================================
--- /dev/null   00:00:00 1970   (empty, because file is newly added)
+++ head/tools/tools/netmap/lb.8        Fri Nov  9 08:43:40 2018        
(r340279)
@@ -0,0 +1,130 @@
+.\" Copyright (c) 2017 Corelight, Inc. and Universita` di Pisa
+.\" All rights reserved.
+.\"
+.\" Redistribution and use in source and binary forms, with or without
+.\" modification, are permitted provided that the following conditions
+.\" are met:
+.\" 1. Redistributions of source code must retain the above copyright
+.\"    notice, this list of conditions and the following disclaimer.
+.\" 2. Redistributions in binary form must reproduce the above copyright
+.\"    notice, this list of conditions and the following disclaimer in the
+.\"    documentation and/or other materials provided with the distribution.
+.\"
+.\" THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+.\" ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+.\" IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+.\" ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+.\" FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+.\" DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+.\" OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+.\" HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+.\" LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+.\" OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+.\" SUCH DAMAGE.
+.\"
+.\" $FreeBSD$
+.\"
+.Dd October 28, 2018
+.Dt LB 8
+.Os
+.Sh NAME
+.Nm lb
+.Nd netmap-based load balancer
+.Sh SYNOPSIS
+.Bk -words
+.Bl -tag -width "lb"
+.It Nm
+.Op Fl i Ar port
+.Op Fl p Ar pipe-group
+.Op Fl B Ar extra-buffers
+.Op Fl b Ar batch-size
+.Op Fl w Ar wait-link
+.El
+.Ek
+.Sh DESCRIPTION
+.Nm
+reads packets from an input netmap port and sends them to a number of netmap 
pipes,
+trying to balance the packets received by each pipe.
+Packets belonging to the
+same connection will always be sent to the same pipe.
+.Pp
+Command line options are listed below.
+.Bl -tag -width Ds
+.It Fl i Ar port
+Name of a netmap port.
+It must be supplied exactly once to identify
+the input port.
+Any netmap port type (e.g., physical interface, VALE switch, pipe,
+monitor port) can be used.
+.It Fl p Ar name Ns Cm \&: Ns Ar number | number
+Add a new pipe group of the given number of pipes.
+The pipe group will receive all the packets read from the input port, balanced
+among the available pipes.
+The receiving ends of the pipes
+will be called
+.Dq Ar name Ns Em }0
+to
+.Dq Ar name No Ns Em } Ns Aq Ar number No - 1 .
+The name is optional and defaults to
+the name of the input port (stripped down of any netmap operator).
+If the name is omitted, also the colon can be omitted.
+.Pp
+This option can be supplied multiple times to define a sequence of pipe groups,
+each group receiving all the packets in turn.
+.Pp
+If no
+.Fl p
+option is given, a single group of two pipes with default name is assumed.
+.Pp
+It is allowed to use the same name for several groups.
+The pipe numbering in each
+group will start from were the previous identically-named group had left.
+.It Fl B Ar extra-buffers
+Try to reserve the given number of extra buffers.
+Extra buffers are shared among
+all pipes in all groups and work as an extension of the pipe rings.
+If a pipe ring is full for whatever reason,
+.Nm
+tries to use extra buffers before dropping any packets directed to that pipe.
+.Pp
+If all extra buffers are busy, some are stolen from the pipe with the longest
+backlog.
+This gives preference to newer packets over old ones, and prevents a
+stalled pipe to deplete the pool of extra buffers.
+.It Fl b Ar batch-size
+Maximum number of packets processed between two read operations from the input 
port.
+Higher values of batch-size improve performance by amortizing read operations,
+but increase the risk of filling up the port internal queues.
+.It Fl w Ar wait-link
+indicates the number of seconds to wait before transmitting.
+It defaults to 2, and may be useful when talking to physical
+ports to let link negotiation complete before starting transmission.
+.El
+.Sh LIMITATIONS
+The group chaining assumes that the applications on the receiving end of the
+pipes are read-only: they must not modify the buffers or the pipe ring slots
+in any way.
+.Pp
+The group naming is currently implemented by creating a persistent VALE port
+with the given name.
+If
+.Nm
+does not exit cleanly the ports will not be removed.
+Please use
+.Xr vale-ctl 4
+to remove any stale persistent VALE port.
+.Sh SEE ALSO
+.Xr netmap 4 ,
+.Xr bridge 8 ,
+.Xr pkt-gen 8
+.Pp
+.Pa http://info.iet.unipi.it/~luigi/netmap/
+.Sh AUTHORS
+.An -nosplit
+.Nm
+has been written by
+.An Seth Hall
+at Corelight, USA.
+The facilities related to extra buffers and pipe groups have been added by
+.An Giuseppe Lettieri
+at University of Pisa, Italy, under contract by Corelight, USA.

Added: head/tools/tools/netmap/lb.c
==============================================================================
--- /dev/null   00:00:00 1970   (empty, because file is newly added)
+++ head/tools/tools/netmap/lb.c        Fri Nov  9 08:43:40 2018        
(r340279)
@@ -0,0 +1,1027 @@
+/*
+ * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights 
reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *   1. Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *   2. Redistributions in binary form must reproduce the above copyright
+ *      notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+/* $FreeBSD$ */
+#include <stdio.h>
+#include <string.h>
+#include <ctype.h>
+#include <stdbool.h>
+#include <inttypes.h>
+#include <syslog.h>
+
+#define NETMAP_WITH_LIBS
+#include <net/netmap_user.h>
+#include <sys/poll.h>
+
+#include <netinet/in.h>                /* htonl */
+
+#include <pthread.h>
+
+#include "pkt_hash.h"
+#include "ctrs.h"
+
+
+/*
+ * use our version of header structs, rather than bringing in a ton
+ * of platform specific ones
+ */
+#ifndef ETH_ALEN
+#define ETH_ALEN 6
+#endif
+
+struct compact_eth_hdr {
+       unsigned char h_dest[ETH_ALEN];
+       unsigned char h_source[ETH_ALEN];
+       u_int16_t h_proto;
+};
+
+struct compact_ip_hdr {
+       u_int8_t ihl:4, version:4;
+       u_int8_t tos;
+       u_int16_t tot_len;
+       u_int16_t id;
+       u_int16_t frag_off;
+       u_int8_t ttl;
+       u_int8_t protocol;
+       u_int16_t check;
+       u_int32_t saddr;
+       u_int32_t daddr;
+};
+
+struct compact_ipv6_hdr {
+       u_int8_t priority:4, version:4;
+       u_int8_t flow_lbl[3];
+       u_int16_t payload_len;
+       u_int8_t nexthdr;
+       u_int8_t hop_limit;
+       struct in6_addr saddr;
+       struct in6_addr daddr;
+};
+
+#define MAX_IFNAMELEN  64
+#define MAX_PORTNAMELEN        (MAX_IFNAMELEN + 40)
+#define DEF_OUT_PIPES  2
+#define DEF_EXTRA_BUFS         0
+#define DEF_BATCH      2048
+#define DEF_WAIT_LINK  2
+#define DEF_STATS_INT  600
+#define BUF_REVOKE     100
+#define STAT_MSG_MAXSIZE 1024
+
+struct {
+       char ifname[MAX_IFNAMELEN];
+       char base_name[MAX_IFNAMELEN];
+       int netmap_fd;
+       uint16_t output_rings;
+       uint16_t num_groups;
+       uint32_t extra_bufs;
+       uint16_t batch;
+       int stdout_interval;
+       int syslog_interval;
+       int wait_link;
+       bool busy_wait;
+} glob_arg;
+
+/*
+ * the overflow queue is a circular queue of buffers
+ */
+struct overflow_queue {
+       char name[MAX_IFNAMELEN + 16];
+       struct netmap_slot *slots;
+       uint32_t head;
+       uint32_t tail;
+       uint32_t n;
+       uint32_t size;
+};
+
+struct overflow_queue *freeq;
+
+static inline int
+oq_full(struct overflow_queue *q)
+{
+       return q->n >= q->size;
+}
+
+static inline int
+oq_empty(struct overflow_queue *q)
+{
+       return q->n <= 0;
+}
+
+static inline void
+oq_enq(struct overflow_queue *q, const struct netmap_slot *s)
+{
+       if (unlikely(oq_full(q))) {
+               D("%s: queue full!", q->name);
+               abort();
+       }
+       q->slots[q->tail] = *s;
+       q->n++;
+       q->tail++;
+       if (q->tail >= q->size)
+               q->tail = 0;
+}
+
+static inline struct netmap_slot
+oq_deq(struct overflow_queue *q)
+{
+       struct netmap_slot s = q->slots[q->head];
+       if (unlikely(oq_empty(q))) {
+               D("%s: queue empty!", q->name);
+               abort();
+       }
+       q->n--;
+       q->head++;
+       if (q->head >= q->size)
+               q->head = 0;
+       return s;
+}
+
+static volatile int do_abort = 0;
+
+uint64_t dropped = 0;
+uint64_t forwarded = 0;
+uint64_t received_bytes = 0;
+uint64_t received_pkts = 0;
+uint64_t non_ip = 0;
+uint32_t freeq_n = 0;
+
+struct port_des {
+       char interface[MAX_PORTNAMELEN];
+       struct my_ctrs ctr;
+       unsigned int last_sync;
+       uint32_t last_tail;
+       struct overflow_queue *oq;
+       struct nm_desc *nmd;
+       struct netmap_ring *ring;
+       struct group_des *group;
+};
+
+struct port_des *ports;
+
+/* each group of pipes receives all the packets */
+struct group_des {
+       char pipename[MAX_IFNAMELEN];
+       struct port_des *ports;
+       int first_id;
+       int nports;
+       int last;
+       int custom_port;
+};
+
+struct group_des *groups;
+
+/* statistcs */
+struct counters {
+       struct timeval ts;
+       struct my_ctrs *ctrs;
+       uint64_t received_pkts;
+       uint64_t received_bytes;
+       uint64_t non_ip;
+       uint32_t freeq_n;
+       int status __attribute__((aligned(64)));
+#define COUNTERS_EMPTY 0
+#define COUNTERS_FULL  1
+};
+
+struct counters counters_buf;
+
+static void *
+print_stats(void *arg)
+{
+       int npipes = glob_arg.output_rings;
+       int sys_int = 0;
+       (void)arg;
+       struct my_ctrs cur, prev;
+       struct my_ctrs *pipe_prev;
+
+       pipe_prev = calloc(npipes, sizeof(struct my_ctrs));
+       if (pipe_prev == NULL) {
+               D("out of memory");
+               exit(1);
+       }
+
+       char stat_msg[STAT_MSG_MAXSIZE] = "";
+
+       memset(&prev, 0, sizeof(prev));
+       while (!do_abort) {
+               int j, dosyslog = 0, dostdout = 0, newdata;
+               uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0;
+               struct my_ctrs x;
+
+               counters_buf.status = COUNTERS_EMPTY;
+               newdata = 0;
+               memset(&cur, 0, sizeof(cur));
+               sleep(1);
+               if (counters_buf.status == COUNTERS_FULL) {
+                       __sync_synchronize();
+                       newdata = 1;
+                       cur.t = counters_buf.ts;
+                       if (prev.t.tv_sec || prev.t.tv_usec) {
+                               usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 
+
+                                       cur.t.tv_usec - prev.t.tv_usec;
+                       }
+               }
+
+               ++sys_int;
+               if (glob_arg.stdout_interval && sys_int % 
glob_arg.stdout_interval == 0)
+                               dostdout = 1;
+               if (glob_arg.syslog_interval && sys_int % 
glob_arg.syslog_interval == 0)
+                               dosyslog = 1;
+
+               for (j = 0; j < npipes; ++j) {
+                       struct my_ctrs *c = &counters_buf.ctrs[j];
+                       cur.pkts += c->pkts;
+                       cur.drop += c->drop;
+                       cur.drop_bytes += c->drop_bytes;
+                       cur.bytes += c->bytes;
+
+                       if (usec) {
+                               x.pkts = c->pkts - pipe_prev[j].pkts;
+                               x.drop = c->drop - pipe_prev[j].drop;
+                               x.bytes = c->bytes - pipe_prev[j].bytes;
+                               x.drop_bytes = c->drop_bytes - 
pipe_prev[j].drop_bytes;
+                               pps = (x.pkts*1000000 + usec/2) / usec;
+                               dps = (x.drop*1000000 + usec/2) / usec;
+                               bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
+                               dbps = ((x.drop_bytes*1000000 + usec/2) / usec) 
* 8;
+                       }
+                       pipe_prev[j] = *c;
+
+                       if ( (dosyslog || dostdout) && newdata )
+                               snprintf(stat_msg, STAT_MSG_MAXSIZE,
+                                      "{"
+                                      "\"ts\":%.6f,"
+                                      "\"interface\":\"%s\","
+                                      "\"output_ring\":%" PRIu16 ","
+                                      "\"packets_forwarded\":%" PRIu64 ","
+                                      "\"packets_dropped\":%" PRIu64 ","
+                                      "\"data_forward_rate_Mbps\":%.4f,"
+                                      "\"data_drop_rate_Mbps\":%.4f,"
+                                      "\"packet_forward_rate_kpps\":%.4f,"
+                                      "\"packet_drop_rate_kpps\":%.4f,"
+                                      "\"overflow_queue_size\":%" PRIu32
+                                      "}", cur.t.tv_sec + (cur.t.tv_usec / 
1000000.0),
+                                           ports[j].interface,
+                                           j,
+                                           c->pkts,
+                                           c->drop,
+                                           (double)bps / 1024 / 1024,
+                                           (double)dbps / 1024 / 1024,
+                                           (double)pps / 1000,
+                                           (double)dps / 1000,
+                                           c->oq_n);
+
+                       if (dosyslog && stat_msg[0])
+                               syslog(LOG_INFO, "%s", stat_msg);
+                       if (dostdout && stat_msg[0])
+                               printf("%s\n", stat_msg);
+               }
+               if (usec) {
+                       x.pkts = cur.pkts - prev.pkts;
+                       x.drop = cur.drop - prev.drop;
+                       x.bytes = cur.bytes - prev.bytes;
+                       x.drop_bytes = cur.drop_bytes - prev.drop_bytes;
+                       pps = (x.pkts*1000000 + usec/2) / usec;
+                       dps = (x.drop*1000000 + usec/2) / usec;
+                       bps = ((x.bytes*1000000 + usec/2) / usec) * 8;
+                       dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8;
+               }
+
+               if ( (dosyslog || dostdout) && newdata )
+                       snprintf(stat_msg, STAT_MSG_MAXSIZE,
+                                "{"
+                                "\"ts\":%.6f,"
+                                "\"interface\":\"%s\","
+                                "\"output_ring\":null,"
+                                "\"packets_received\":%" PRIu64 ","
+                                "\"packets_forwarded\":%" PRIu64 ","
+                                "\"packets_dropped\":%" PRIu64 ","
+                                "\"non_ip_packets\":%" PRIu64 ","
+                                "\"data_forward_rate_Mbps\":%.4f,"
+                                "\"data_drop_rate_Mbps\":%.4f,"
+                                "\"packet_forward_rate_kpps\":%.4f,"
+                                "\"packet_drop_rate_kpps\":%.4f,"
+                                "\"free_buffer_slots\":%" PRIu32
+                                "}", cur.t.tv_sec + (cur.t.tv_usec / 
1000000.0),
+                                     glob_arg.ifname,
+                                     received_pkts,
+                                     cur.pkts,
+                                     cur.drop,
+                                     counters_buf.non_ip,
+                                     (double)bps / 1024 / 1024,
+                                     (double)dbps / 1024 / 1024,
+                                     (double)pps / 1000,
+                                     (double)dps / 1000,
+                                     counters_buf.freeq_n);
+
+               if (dosyslog && stat_msg[0])
+                       syslog(LOG_INFO, "%s", stat_msg);
+               if (dostdout && stat_msg[0])
+                       printf("%s\n", stat_msg);
+
+               prev = cur;
+       }
+
+       free(pipe_prev);
+
+       return NULL;
+}
+
+static void
+free_buffers(void)
+{
+       int i, tot = 0;
+       struct port_des *rxport = &ports[glob_arg.output_rings];
+
+       /* build a netmap free list with the buffers in all the overflow queues 
*/
+       for (i = 0; i < glob_arg.output_rings + 1; i++) {
+               struct port_des *cp = &ports[i];
+               struct overflow_queue *q = cp->oq;
+
+               if (!q)
+                       continue;
+
+               while (q->n) {
+                       struct netmap_slot s = oq_deq(q);
+                       uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, 
s.buf_idx);
+
+                       *b = rxport->nmd->nifp->ni_bufs_head;
+                       rxport->nmd->nifp->ni_bufs_head = s.buf_idx;
+                       tot++;
+               }
+       }
+       D("added %d buffers to netmap free list", tot);
+
+       for (i = 0; i < glob_arg.output_rings + 1; ++i) {
+               nm_close(ports[i].nmd);
+       }
+}
+
+
+static void sigint_h(int sig)
+{
+       (void)sig;              /* UNUSED */
+       do_abort = 1;
+       signal(SIGINT, SIG_DFL);
+}
+
+void usage()
+{
+       printf("usage: lb [options]\n");
+       printf("where options are:\n");
+       printf("  -h                    view help text\n");
+       printf("  -i iface              interface name (required)\n");
+       printf("  -p [prefix:]npipes    add a new group of output pipes\n");
+       printf("  -B nbufs              number of extra buffers (default: 
%d)\n", DEF_EXTRA_BUFS);
+       printf("  -b batch              batch size (default: %d)\n", DEF_BATCH);
+       printf("  -w seconds            wait for link up (default: %d)\n", 
DEF_WAIT_LINK);
+       printf("  -W                    enable busy waiting. this will run your 
CPU at 100%%\n");
+       printf("  -s seconds            seconds between syslog stats messages 
(default: 0)\n");
+       printf("  -o seconds            seconds between stdout stats messages 
(default: 0)\n");
+       exit(0);
+}
+
+static int
+parse_pipes(char *spec)
+{
+       char *end = index(spec, ':');
+       static int max_groups = 0;
+       struct group_des *g;
+
+       ND("spec %s num_groups %d", spec, glob_arg.num_groups);
+       if (max_groups < glob_arg.num_groups + 1) {
+               size_t size = sizeof(*g) * (glob_arg.num_groups + 1);
+               groups = realloc(groups, size);
+               if (groups == NULL) {
+                       D("out of memory");
+                       return 1;
+               }
+       }
+       g = &groups[glob_arg.num_groups];
+       memset(g, 0, sizeof(*g));
+
+       if (end != NULL) {
+               if (end - spec > MAX_IFNAMELEN - 8) {
+                       D("name '%s' too long", spec);
+                       return 1;
+               }
+               if (end == spec) {
+                       D("missing prefix before ':' in '%s'", spec);
+                       return 1;
+               }
+               strncpy(g->pipename, spec, end - spec);
+               g->custom_port = 1;
+               end++;
+       } else {
+               /* no prefix, this group will use the
+                * name of the input port.
+                * This will be set in init_groups(),
+                * since here the input port may still
+                * be uninitialized
+                */
+               end = spec;
+       }
+       if (*end == '\0') {
+               g->nports = DEF_OUT_PIPES;
+       } else {
+               g->nports = atoi(end);
+               if (g->nports < 1) {
+                       D("invalid number of pipes '%s' (must be at least 1)", 
end);
+                       return 1;
+               }
+       }
+       glob_arg.output_rings += g->nports;
+       glob_arg.num_groups++;
+       return 0;
+}
+
+/* complete the initialization of the groups data structure */
+void init_groups(void)
+{
+       int i, j, t = 0;
+       struct group_des *g = NULL;
+       for (i = 0; i < glob_arg.num_groups; i++) {
+               g = &groups[i];
+               g->ports = &ports[t];
+               for (j = 0; j < g->nports; j++)
+                       g->ports[j].group = g;
+               t += g->nports;
+               if (!g->custom_port)
+                       strcpy(g->pipename, glob_arg.base_name);
+               for (j = 0; j < i; j++) {
+                       struct group_des *h = &groups[j];
+                       if (!strcmp(h->pipename, g->pipename))
+                               g->first_id += h->nports;
+               }
+       }
+       g->last = 1;
+}
+
+/* push the packet described by slot rs to the group g.
+ * This may cause other buffers to be pushed down the
+ * chain headed by g.
+ * Return a free buffer.
+ */
+uint32_t forward_packet(struct group_des *g, struct netmap_slot *rs)
+{
+       uint32_t hash = rs->ptr;
+       uint32_t output_port = hash % g->nports;
+       struct port_des *port = &g->ports[output_port];
+       struct netmap_ring *ring = port->ring;
+       struct overflow_queue *q = port->oq;
+
+       /* Move the packet to the output pipe, unless there is
+        * either no space left on the ring, or there is some
+        * packet still in the overflow queue (since those must
+        * take precedence over the new one)
+       */
+       if (ring->head != ring->tail && (q == NULL || oq_empty(q))) {
+               struct netmap_slot *ts = &ring->slot[ring->head];
+               struct netmap_slot old_slot = *ts;
+
+               ts->buf_idx = rs->buf_idx;
+               ts->len = rs->len;
+               ts->flags |= NS_BUF_CHANGED;
+               ts->ptr = rs->ptr;
+               ring->head = nm_ring_next(ring, ring->head);
+               port->ctr.bytes += rs->len;
+               port->ctr.pkts++;
+               forwarded++;
+               return old_slot.buf_idx;
+       }
+
+       /* use the overflow queue, if available */
+       if (q == NULL || oq_full(q)) {
+               /* no space left on the ring and no overflow queue
+                * available: we are forced to drop the packet
+                */
+               dropped++;
+               port->ctr.drop++;
+               port->ctr.drop_bytes += rs->len;
+               return rs->buf_idx;
+       }
+
+       oq_enq(q, rs);
+
+       /*
+        * we cannot continue down the chain and we need to
+        * return a free buffer now. We take it from the free queue.
+        */
+       if (oq_empty(freeq)) {
+               /* the free queue is empty. Revoke some buffers
+                * from the longest overflow queue
+                */
+               uint32_t j;
+               struct port_des *lp = &ports[0];
+               uint32_t max = lp->oq->n;
+
+               /* let lp point to the port with the longest queue */
+               for (j = 1; j < glob_arg.output_rings; j++) {
+                       struct port_des *cp = &ports[j];
+                       if (cp->oq->n > max) {
+                               lp = cp;
+                               max = cp->oq->n;
+                       }
+               }
+
+               /* move the oldest BUF_REVOKE buffers from the
+                * lp queue to the free queue
+                */
+               // XXX optimize this cycle
+               for (j = 0; lp->oq->n && j < BUF_REVOKE; j++) {
+                       struct netmap_slot tmp = oq_deq(lp->oq);
+
+                       dropped++;
+                       lp->ctr.drop++;
+                       lp->ctr.drop_bytes += tmp.len;
+
+                       oq_enq(freeq, &tmp);
+               }
+
+               ND(1, "revoked %d buffers from %s", j, lq->name);
+       }
+
+       return oq_deq(freeq).buf_idx;
+}
+
+int main(int argc, char **argv)
+{
+       int ch;
+       uint32_t i;
+       int rv;
+       unsigned int iter = 0;
+       int poll_timeout = 10; /* default */
+
+       glob_arg.ifname[0] = '\0';
+       glob_arg.output_rings = 0;
+       glob_arg.batch = DEF_BATCH;
+       glob_arg.wait_link = DEF_WAIT_LINK;
+       glob_arg.busy_wait = false;
+       glob_arg.syslog_interval = 0;
+       glob_arg.stdout_interval = 0;
+
+       while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) {
+               switch (ch) {
+               case 'i':
+                       D("interface is %s", optarg);
+                       if (strlen(optarg) > MAX_IFNAMELEN - 8) {
+                               D("ifname too long %s", optarg);
+                               return 1;
+                       }
+                       if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, 
"vale", 4)) {
+                               sprintf(glob_arg.ifname, "netmap:%s", optarg);
+                       } else {
+                               strcpy(glob_arg.ifname, optarg);
+                       }
+                       break;
+
+               case 'p':
+                       if (parse_pipes(optarg)) {
+                               usage();
+                               return 1;
+                       }
+                       break;
+
+               case 'B':
+                       glob_arg.extra_bufs = atoi(optarg);
+                       D("requested %d extra buffers", glob_arg.extra_bufs);
+                       break;
+
+               case 'b':
+                       glob_arg.batch = atoi(optarg);
+                       D("batch is %d", glob_arg.batch);
+                       break;
+
+               case 'w':
+                       glob_arg.wait_link = atoi(optarg);
+                       D("link wait for up time is %d", glob_arg.wait_link);
+                       break;
+
+               case 'W':
+                       glob_arg.busy_wait = true;
+                       break;
+
+               case 'o':
+                       glob_arg.stdout_interval = atoi(optarg);
+                       break;
+
+               case 's':
+                       glob_arg.syslog_interval = atoi(optarg);
+                       break;
+
+               case 'h':
+                       usage();
+                       return 0;
+                       break;
+
+               default:
+                       D("bad option %c %s", ch, optarg);
+                       usage();
+                       return 1;
+               }
+       }
+
+       if (glob_arg.ifname[0] == '\0') {
+               D("missing interface name");
+               usage();
+               return 1;
+       }
+
+       /* extract the base name */
+       char *nscan = strncmp(glob_arg.ifname, "netmap:", 7) ?
+                       glob_arg.ifname : glob_arg.ifname + 7;
+       strncpy(glob_arg.base_name, nscan, MAX_IFNAMELEN-1);
+       for (nscan = glob_arg.base_name; *nscan && !index("-*^{}/@", *nscan); 
nscan++)
+               ;
+       *nscan = '\0';
+
+       if (glob_arg.num_groups == 0)
+               parse_pipes("");
+
+       if (glob_arg.syslog_interval) {
+               setlogmask(LOG_UPTO(LOG_INFO));
+               openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1);
+       }
+
+       uint32_t npipes = glob_arg.output_rings;
+
+
+       pthread_t stat_thread;
+
+       ports = calloc(npipes + 1, sizeof(struct port_des));
+       if (!ports) {
+               D("failed to allocate the stats array");
+               return 1;
+       }
+       struct port_des *rxport = &ports[npipes];
+       init_groups();
+
+       memset(&counters_buf, 0, sizeof(counters_buf));
+       counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs));
+       if (!counters_buf.ctrs) {
+               D("failed to allocate the counters snapshot buffer");
+               return 1;
+       }
+
+       /* we need base_req to specify pipes and extra bufs */
+       struct nmreq base_req;
+       memset(&base_req, 0, sizeof(base_req));
+
+       base_req.nr_arg1 = npipes;
+       base_req.nr_arg3 = glob_arg.extra_bufs;
+
+       rxport->nmd = nm_open(glob_arg.ifname, &base_req, 0, NULL);
+
+       if (rxport->nmd == NULL) {
+               D("cannot open %s", glob_arg.ifname);
+               return (1);
+       } else {
+               D("successfully opened %s (tx rings: %u)", glob_arg.ifname,
+                 rxport->nmd->req.nr_tx_slots);
+       }
+
+       uint32_t extra_bufs = rxport->nmd->req.nr_arg3;
+       struct overflow_queue *oq = NULL;
+       /* reference ring to access the buffers */
+       rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0);
+
+       if (!glob_arg.extra_bufs)
+               goto run;
+
+       D("obtained %d extra buffers", extra_bufs);
+       if (!extra_bufs)
+               goto run;
+
+       /* one overflow queue for each output pipe, plus one for the
+        * free extra buffers
+        */

*** DIFF OUTPUT TRUNCATED AT 1000 LINES ***
_______________________________________________
svn-src-head@freebsd.org mailing list
https://lists.freebsd.org/mailman/listinfo/svn-src-head
To unsubscribe, send any mail to "svn-src-head-unsubscr...@freebsd.org"

Reply via email to